Technical Design: Agent-Assisted Sample Sheet Generation
Technical Design: Agent-Assisted Sample Sheet Generation
Status: DRAFT
Context
Data contributors upload raw files to Synapse and annotate them via Curator. They need to generate workflow-ready sample sheets (e.g., nf-core/sarek) from that metadata to feed downstream processing pipelines. This design introduces a new async job endpoint on CurationTaskController that executes any task with executable execution details, a worker that dispatches to type-specific sub-workers based on the TaskExecutionDetails concrete type, and a high-level description of the sample sheet generation supervisor agent (built on Spring AI Bedrock AgentCore multi-agent architecture).
Scope
New async job endpoint on
CurationTaskControllerComputeTaskExecutionRequest/ComputeTaskExecutionResponseschemasNew
ExecutableTaskExecutionDetailsinterface extendingTaskExecutionDetailsWorker that dispatches to sub-workers by
executionDetailsconcrete typeAsync job status propagation to task status
High-level description of the sample sheet generation supervisor agent + its tools/sub-agents
Out of scope: Sub-worker implementation details, Spring AI wiring, agent prompt engineering
1. Dispatch Mechanism: TaskExecutionDetails Concrete Type
Existing Model
TaskStatus
├── state: TaskState (NOT_STARTED, IN_PROGRESS, COMPLETED, CANCELED)
├── executionDetails: TaskExecutionDetails (polymorphic interface)
│ └── concreteType: string (discriminator)
├── lastUpdatedBy, lastUpdatedOn
└── etagToday there is one concrete implementation: GridExecutionDetails (with activeSessionId).
New Interface: ExecutableTaskExecutionDetails
A marker/extension interface that signals "this task can be dispatched to a sub-worker for execution."
Path: lib/lib-auto-generated/.../curation/execution/ExecutableTaskExecutionDetails.json
{
"description": "A TaskExecutionDetails that supports asynchronous execution by a sub-worker. The concreteType determines which sub-worker handles execution.",
"type": "interface",
"implements": [
{ "$ref": "org.sagebionetworks.repo.model.curation.TaskExecutionDetails" }
],
"properties": {
"asyncJobId": {
"type": "string",
"description": "The ID of the async job currently executing this task. Only the user who started the job can poll job status directly via this ID. Set by the system when execution starts; cleared on completion or failure."
},
"startedBy": {
"type": "integer",
"description": "The principal ID of the user who started the execution."
},
"startedOn": {
"type": "string",
"format": "date-time",
"description": "When the execution was started."
},
"errorMessage": {
"type": "string",
"description": "If execution failed, the error description from the async job."
},
"errorDetails": {
"type": "string",
"description": "If execution failed, additional error details (e.g., stack trace summary)."
}
}
}Concrete Implementation: SampleSheetGenerationExecutionDetails
Path: lib/lib-auto-generated/.../curation/execution/SampleSheetGenerationExecutionDetails.json
{
"description": "Execution details for sample sheet generation via a multi-agent system.",
"implements": [
{ "$ref": "org.sagebionetworks.repo.model.curation.execution.ExecutableTaskExecutionDetails" }
],
"properties": {
"inputFileViewId": {
"type": "string",
"description": "The synId of the EntityView containing source annotations."
},
"outputFolderId": {
"type": "string",
"description": "The synId of the folder where the output RecordSet will be written."
},
"targetSchemaId": {
"type": "string",
"description": "The $id of the JSON Schema defining the target sample sheet format."
},
"outputRecordSetId": {
"type": "string",
"description": "The synId of the generated RecordSet (set on success)."
},
"reviewTaskId": {
"type": "integer",
"description": "The ID of the review CurationTask created (set on success)."
}
}
}Dispatch Pattern
The worker reads taskStatus.getExecutionDetails():
If it implements
ExecutableTaskExecutionDetails→ dispatch to sub-worker keyed by concrete classIf not → reject (task is not executable)
Adding a new executable task type = creating a new ExecutableTaskExecutionDetails implementation + registering a sub-worker for that class.
2. Async Job Status and Task Status Propagation
Key constraint: Only the user who started the async job can poll the job's status via GET /asynchronous/job/{jobId}. Other users with access to the task need visibility into execution progress and results.
Design: Task Status as the Shared View
The TaskStatus is the authoritative view of execution state for all users. The async job is an implementation detail — its state is propagated back to the task's status by the worker.
Async Job State | Task State | Execution Details Update |
|---|---|---|
PROCESSING (job started) | EXECUTING |
|
COMPLETE (worker returns) | IN_REVIEW | Output fields populated (e.g., |
FAILED (worker throws) | NOT_STARTED |
|
After the job completes, the task state is set to IN_REVIEW. Only after the user has reviewed and accepted the produced data sheet does the task change to be COMPLETED. This implies that execution task’s final state is tied to the state of review task.
Visibility Model
User | Can poll async job directly? | Can see task status? |
|---|---|---|
User who started execution | Yes (via | Yes (via |
Other users with task access | No | Yes (via |
This means:
The job starter can poll the async job for real-time progress (progress messages, percentage) while the job is running.
All users with access to the task can read
TaskStatusto see: whether execution is in progress, who started it, when it started, and after completion — the results or error.The
asyncJobIdis stored in execution details so the UI can offer the job starter a richer progress view while the job runs.
Worker Responsibility: State Propagation
The ComputeTaskDispatcher is responsible for keeping task status in sync with job outcomes:
// Simplified dispatcher logic
public ComputeTaskExecutionResponse dispatch(UserInfo user, String jobId, Long taskId,
AsyncJobProgressCallback callback) {
TaskStatus status = curationTaskManager.getTaskStatus(user, taskId);
ExecutableTaskExecutionDetails details = (ExecutableTaskExecutionDetails) status.getExecutionDetails();
// 1. Mark task as in-progress with job reference
details.setAsyncJobId(jobId);
details.setStartedBy(user.getId());
details.setStartedOn(new Date());
details.setErrorMessage(null);
details.setErrorDetails(null);
status.setState(TaskState.IN_PROGRESS);
curationTaskManager.updateTaskStatus(user, taskId, status);
try {
// 2. Dispatch to sub-worker
subWorker.execute(user, jobId, task, details, callback);
// 3. On success: mark completed, clear job reference
details.setAsyncJobId(null);
status.setState(TaskState.COMPLETED);
curationTaskManager.updateTaskStatus(user, taskId, status);
return buildResponse(taskId, details);
} catch (Exception e) {
// 4. On failure: revert to NOT_STARTED, capture error, clear job reference
details.setAsyncJobId(null);
details.setErrorMessage(e.getMessage());
details.setErrorDetails(extractDetails(e));
status.setState(TaskState.NOT_STARTED);
curationTaskManager.updateTaskStatus(user, taskId, status);
throw e; // Re-throw so AsyncJobRunnerAdapter marks the job FAILED
}
}Note: The asyncJobId is cleared on both success and failure. While the job is running, it serves as a reference for the starter to poll progress. Once the job finishes, all relevant information has been propagated to the task's execution details and the job ID is no longer needed.
3. New Async Job: Execute Task
API Endpoints
Add to CurationTaskController:
POST /curationTask/{taskId}/execute/async/start
GET /curationTask/{taskId}/execute/async/get/{asyncToken}The taskId path parameter is the only input. The system reads the task's executionDetails concrete type to determine execution behavior.
Request Schema
Path: lib/lib-auto-generated/.../curation/compute/ComputeTaskExecutionRequest.json
{
"description": "Request to execute a curation task. The task must have ExecutableTaskExecutionDetails.",
"implements": [
{ "$ref": "org.sagebionetworks.repo.model.asynch.AsynchronousRequestBody" }
],
"properties": {
"taskId": {
"type": "integer",
"description": "The ID of the CurationTask to execute."
}
}
}Response Schema
Path: lib/lib-auto-generated/.../curation/compute/ComputeTaskExecutionResponse.json
{
"description": "Response from a completed task execution.",
"implements": [
{ "$ref": "org.sagebionetworks.repo.model.asynch.AsynchronousResponseBody" }
],
"properties": {
"taskId": {
"type": "integer",
"description": "The ID of the CurationTask that was executed."
},
"executionDetails": {
"description": "The updated execution details after completion.",
"$ref": "org.sagebionetworks.repo.model.curation.execution.ExecutableTaskExecutionDetails"
}
}
}Registration in AsynchJobType
COMPUTE_TASK_EXECUTION(ComputeTaskExecutionRequest.class, ComputeTaskExecutionResponse.class),Queue: {stack}-{instance}-COMPUTE_TASK_EXECUTION
4. Controller Endpoints
@RequiredScope({view, modify})
@ResponseStatus(HttpStatus.CREATED)
@RequestMapping(value = "/curationTask/{taskId}/execute/async/start", method = RequestMethod.POST)
public @ResponseBody AsyncJobId startTaskExecution(
@RequestParam(value = AuthorizationConstants.USER_ID_PARAM) Long userId,
@PathVariable Long taskId) {
ComputeTaskExecutionRequest request = new ComputeTaskExecutionRequest();
request.setTaskId(taskId);
AsynchronousJobStatus job = serviceProvider.getAsynchronousJobServices()
.startJob(userId, request);
AsyncJobId asyncJobId = new AsyncJobId();
asyncJobId.setToken(job.getJobId());
return asyncJobId;
}
@RequiredScope({view})
@ResponseStatus(HttpStatus.OK)
@RequestMapping(value = "/curationTask/{taskId}/execute/async/get/{asyncToken}", method = RequestMethod.GET)
public @ResponseBody ComputeTaskExecutionResponse getTaskExecutionResult(
@RequestParam(value = AuthorizationConstants.USER_ID_PARAM) Long userId,
@PathVariable String asyncToken) {
AsynchronousJobStatus status = serviceProvider.getAsynchronousJobServices()
.getJobStatusAndThrow(userId, asyncToken);
return (ComputeTaskExecutionResponse) status.getResponseBody();
}5. Worker Architecture
ComputeTaskExecutionWorker
@Service
public class ComputeTaskExecutionWorker
implements AsyncJobRunner<ComputeTaskExecutionRequest, ComputeTaskExecutionResponse> {
private final ComputeTaskDispatcher dispatcher;
@Override
public Class<ComputeTaskExecutionRequest> getRequestType() {
return ComputeTaskExecutionRequest.class;
}
@Override
public Class<ComputeTaskExecutionResponse> getResponseType() {
return ComputeTaskExecutionResponse.class;
}
@Override
public ComputeTaskExecutionResponse run(String jobId, UserInfo user,
ComputeTaskExecutionRequest request, AsyncJobProgressCallback callback) {
return dispatcher.dispatch(user, jobId, request.getTaskId(), callback);
}
}ComputeTaskDispatcher
public interface ComputeTaskDispatcher {
ComputeTaskExecutionResponse dispatch(UserInfo user, String jobId, Long taskId,
AsyncJobProgressCallback callback);
}Implementation logic:
Load
CurationTaskandTaskStatusby IDValidate
taskStatus.executionDetails instanceof ExecutableTaskExecutionDetailsValidate user has UPDATE access on the task's project
Validate task state is
NOT_STARTED(cannot re-execute an in-progress or completed task)Transition task state to
IN_PROGRESS; setasyncJobId,startedBy,startedOn; clear any priorerrorMessageLook up sub-worker by
executionDetails.getClass()Delegate to sub-worker:
subWorker.execute(user, jobId, task, executionDetails, callback)On success: transition task to
COMPLETED, update output fields, clearasyncJobId, return responseOn failure: revert task to
NOT_STARTED, seterrorMessage/errorDetails, clearasyncJobId, re-throw
ComputeTaskSubWorker Interface
public interface ComputeTaskSubWorker<T extends ExecutableTaskExecutionDetails> {
Class<T> getExecutionDetailsType();
void execute(UserInfo user, String jobId, CurationTask task, T executionDetails,
AsyncJobProgressCallback callback);
}Sub-workers are auto-discovered via Spring List<ComputeTaskSubWorker<?>> injection. The dispatcher builds a Map<Class<?>, ComputeTaskSubWorker<?>> keyed by getExecutionDetailsType().
6. Worker Configuration
@Bean
public SimpleTriggerFactoryBean computeTaskExecutionTrigger(
ConcurrentManager concurrentStackManager,
ComputeTaskExecutionWorker worker) {
String queueName = stackConfig.getQueueName("COMPUTE_TASK_EXECUTION");
MessageDrivenRunner adapter = new AsyncJobRunnerAdapter<>(jobStatusManager, userManager, worker);
return new WorkerTriggerBuilder()
.withStack(ConcurrentWorkerStack.builder()
.withSemaphoreLockKey("computeTaskExecutionWorker")
.withSemaphoreMaxLockCount(5)
.withSemaphoreLockAndMessageVisibilityTimeoutSec(600)
.withMaxThreadsPerMachine(2)
.withSingleton(concurrentStackManager)
.withCanRunInReadOnly(false)
.withQueueName(queueName)
.withWorker(adapter)
.build())
.withRepeatInterval(3000)
.withStartDelay(2000)
.build();
}Register <ref bean="computeTaskExecutionTrigger"/> in main-scheduler-spb.xml.
7. State Machine
TaskStatus.state:
NOT_STARTED ─[POST /execute/async/start]─> IN_PROGRESS
│
┌─────────┴─────────┐
│ │
[success] [failure]
│ │
v v
COMPLETED NOT_STARTED
(errorMessage set)
ExecutableTaskExecutionDetails lifecycle:
┌──────────────────────────────────────────────────────────────────────┐
│ On start: asyncJobId=X, startedBy=user, startedOn=now, error=null │
│ On success: asyncJobId=null, output fields populated │
│ On failure: asyncJobId=null, errorMessage=msg, errorDetails=detail │
└──────────────────────────────────────────────────────────────────────┘
Async Job (visible only to starter):
PROCESSING ──> COMPLETE / FAILED8. High-Level: Sample Sheet Generation Supervisor Agent
The sub-worker for SampleSheetGenerationExecutionDetails will use Spring AI Bedrock AgentCore to orchestrate a multi-agent system. Below is the conceptual architecture — implementation details are covered in a separate design.
Supervisor Agent
A coordinator agent that plans and orchestrates the sample sheet generation workflow. It:
Receives the task context (input file view, target schema, user instructions)
Breaks the work into steps and delegates to specialized sub-agents
Validates output before persisting
Handles errors and retries
Sub-Agents
Sub-Agent | Responsibility |
|---|---|
Data Retrieval Agent | Queries the input FileView to retrieve all file annotations. Returns structured annotation data. |
Schema Analysis Agent | Retrieves and interprets the target JSON Schema. Produces a mapping specification describing required fields, types, and constraints. |
ETL Code Generation Agent | Given source annotation structure + target schema spec, generates a Python/pandas transformation script. |
Code Execution Agent | Executes the generated ETL script in a Code Interpreter sandbox. Returns generated CSV content or error details. |
Validation Agent | Validates the generated sample sheet against the target schema. Reports completeness and correctness gaps. |
Tools Available to the System
Tool | Used By | Description |
|---|---|---|
QueryFileView | Data Retrieval Agent | Executes SQL against the input EntityView |
GetJsonSchema | Schema Analysis Agent | Retrieves a JSON Schema by $id |
CodeInterpreter | Code Execution Agent | Executes Python in a sandbox (built-in AgentCore capability) |
WriteRecordSet | Supervisor Agent | Persists final CSV as a Synapse RecordSet in the output folder |
CreateReviewTask | Supervisor Agent | Creates a RecordBasedMetadataTaskProperties curation task for human review |
Execution Flow
Supervisor
│
├─> Data Retrieval Agent ──[QueryFileView]──> annotation data
│
├─> Schema Analysis Agent ──[GetJsonSchema]──> mapping spec
│
├─> ETL Code Generation Agent ──(annotation data + mapping spec)──> Python script
│
├─> Code Execution Agent ──[CodeInterpreter]──> generated CSV
│
├─> Validation Agent ──(CSV + schema)──> validation report
│
├─> (if valid) ──[WriteRecordSet]──> RecordSet synId
│
└─> [CreateReviewTask]──> review task IDError Handling (High Level)
Code Execution failure → Supervisor retries with ETL Code Generation Agent (up to 3 attempts)
Validation failure → Supervisor asks ETL Code Generation Agent to fix identified issues
All retries exhausted → Supervisor reports error to dispatcher (task reverts to NOT_STARTED with errorMessage)
Transient infrastructure errors →
RecoverableMessageExceptionat worker level
9. Files to Create/Modify
New Files
File | Purpose |
|---|---|
| Async request schema |
| Async response schema |
| Interface for executable tasks |
| Concrete execution details |
| Dispatcher interface |
| Dispatch logic + state propagation |
| Sub-worker interface |
| Async job worker |
Modified Files
File | Change |
|---|---|
| Add execute async start/get endpoints |
| Add |
| Add |
| Register trigger |
External (out of codebase)
System | Change |
|---|---|
Synapse-Stack-Builder | Add |
AWS Bedrock AgentCore | Configure multi-agent system (separate design) |
10. Verification Plan
Unit tests: ComputeTaskDispatcher (dispatch logic, state transitions, error propagation), ComputeTaskExecutionWorker
Integration test: Create task with
SampleSheetGenerationExecutionDetails→ trigger execution → verify task state transitions, asyncJobId visibility, and error message propagationMulti-user test: Verify non-starter user can read task status but cannot poll async job directly
Sub-worker tests: Deferred to sub-worker implementation design