Technical Design: Agent-Assisted Sample Sheet Generation

Technical Design: Agent-Assisted Sample Sheet Generation

Technical Design: Agent-Assisted Sample Sheet Generation

PLFM-9729 - Getting issue details... STATUS

Status: DRAFT

Context

Data contributors upload raw files to Synapse and annotate them via Curator. They need to generate workflow-ready sample sheets (e.g., nf-core/sarek) from that metadata to feed downstream processing pipelines. This design introduces a new async job endpoint on CurationTaskController that executes any task with executable execution details, a worker that dispatches to type-specific sub-workers based on the TaskExecutionDetails concrete type, and a high-level description of the sample sheet generation supervisor agent (built on Spring AI Bedrock AgentCore multi-agent architecture).

Scope

  1. New async job endpoint on CurationTaskController

  2. ComputeTaskExecutionRequest / ComputeTaskExecutionResponse schemas

  3. New ExecutableTaskExecutionDetails interface extending TaskExecutionDetails

  4. Worker that dispatches to sub-workers by executionDetails concrete type

  5. Async job status propagation to task status

  6. High-level description of the sample sheet generation supervisor agent + its tools/sub-agents

Out of scope: Sub-worker implementation details, Spring AI wiring, agent prompt engineering

1. Dispatch Mechanism: TaskExecutionDetails Concrete Type

Existing Model

TaskStatus ├── state: TaskState (NOT_STARTED, IN_PROGRESS, COMPLETED, CANCELED) ├── executionDetails: TaskExecutionDetails (polymorphic interface) │ └── concreteType: string (discriminator) ├── lastUpdatedBy, lastUpdatedOn └── etag

Today there is one concrete implementation: GridExecutionDetails (with activeSessionId).

New Interface: ExecutableTaskExecutionDetails

A marker/extension interface that signals "this task can be dispatched to a sub-worker for execution."

Path: lib/lib-auto-generated/.../curation/execution/ExecutableTaskExecutionDetails.json

{ "description": "A TaskExecutionDetails that supports asynchronous execution by a sub-worker. The concreteType determines which sub-worker handles execution.", "type": "interface", "implements": [ { "$ref": "org.sagebionetworks.repo.model.curation.TaskExecutionDetails" } ], "properties": { "asyncJobId": { "type": "string", "description": "The ID of the async job currently executing this task. Only the user who started the job can poll job status directly via this ID. Set by the system when execution starts; cleared on completion or failure." }, "startedBy": { "type": "integer", "description": "The principal ID of the user who started the execution." }, "startedOn": { "type": "string", "format": "date-time", "description": "When the execution was started." }, "errorMessage": { "type": "string", "description": "If execution failed, the error description from the async job." }, "errorDetails": { "type": "string", "description": "If execution failed, additional error details (e.g., stack trace summary)." } } }

Concrete Implementation: SampleSheetGenerationExecutionDetails

Path: lib/lib-auto-generated/.../curation/execution/SampleSheetGenerationExecutionDetails.json

{ "description": "Execution details for sample sheet generation via a multi-agent system.", "implements": [ { "$ref": "org.sagebionetworks.repo.model.curation.execution.ExecutableTaskExecutionDetails" } ], "properties": { "inputFileViewId": { "type": "string", "description": "The synId of the EntityView containing source annotations." }, "outputFolderId": { "type": "string", "description": "The synId of the folder where the output RecordSet will be written." }, "targetSchemaId": { "type": "string", "description": "The $id of the JSON Schema defining the target sample sheet format." }, "outputRecordSetId": { "type": "string", "description": "The synId of the generated RecordSet (set on success)." }, "reviewTaskId": { "type": "integer", "description": "The ID of the review CurationTask created (set on success)." } } }

Dispatch Pattern

The worker reads taskStatus.getExecutionDetails():

  • If it implements ExecutableTaskExecutionDetails → dispatch to sub-worker keyed by concrete class

  • If not → reject (task is not executable)

Adding a new executable task type = creating a new ExecutableTaskExecutionDetails implementation + registering a sub-worker for that class.

2. Async Job Status and Task Status Propagation

Key constraint: Only the user who started the async job can poll the job's status via GET /asynchronous/job/{jobId}. Other users with access to the task need visibility into execution progress and results.

Design: Task Status as the Shared View

The TaskStatus is the authoritative view of execution state for all users. The async job is an implementation detail — its state is propagated back to the task's status by the worker.

Async Job State

Task State

Execution Details Update

Async Job State

Task State

Execution Details Update

PROCESSING (job started)

EXECUTING

asyncJobId set, startedBy set, startedOn set

COMPLETE (worker returns)

IN_REVIEW

Output fields populated (e.g., outputRecordSetId, reviewTaskId). asyncJobId cleared.

FAILED (worker throws)

NOT_STARTED

errorMessage and errorDetails populated from the exception. asyncJobId cleared.

After the job completes, the task state is set to IN_REVIEW. Only after the user has reviewed and accepted the produced data sheet does the task change to be COMPLETED. This implies that execution task’s final state is tied to the state of review task.

Visibility Model

User

Can poll async job directly?

Can see task status?

User

Can poll async job directly?

Can see task status?

User who started execution

Yes (via asyncJobId)

Yes (via GET /curationTask/{taskId}/status)

Other users with task access

No

Yes (via GET /curationTask/{taskId}/status)

This means:

  • The job starter can poll the async job for real-time progress (progress messages, percentage) while the job is running.

  • All users with access to the task can read TaskStatus to see: whether execution is in progress, who started it, when it started, and after completion — the results or error.

  • The asyncJobId is stored in execution details so the UI can offer the job starter a richer progress view while the job runs.

Worker Responsibility: State Propagation

The ComputeTaskDispatcher is responsible for keeping task status in sync with job outcomes:

// Simplified dispatcher logic public ComputeTaskExecutionResponse dispatch(UserInfo user, String jobId, Long taskId, AsyncJobProgressCallback callback) { TaskStatus status = curationTaskManager.getTaskStatus(user, taskId); ExecutableTaskExecutionDetails details = (ExecutableTaskExecutionDetails) status.getExecutionDetails(); // 1. Mark task as in-progress with job reference details.setAsyncJobId(jobId); details.setStartedBy(user.getId()); details.setStartedOn(new Date()); details.setErrorMessage(null); details.setErrorDetails(null); status.setState(TaskState.IN_PROGRESS); curationTaskManager.updateTaskStatus(user, taskId, status); try { // 2. Dispatch to sub-worker subWorker.execute(user, jobId, task, details, callback); // 3. On success: mark completed, clear job reference details.setAsyncJobId(null); status.setState(TaskState.COMPLETED); curationTaskManager.updateTaskStatus(user, taskId, status); return buildResponse(taskId, details); } catch (Exception e) { // 4. On failure: revert to NOT_STARTED, capture error, clear job reference details.setAsyncJobId(null); details.setErrorMessage(e.getMessage()); details.setErrorDetails(extractDetails(e)); status.setState(TaskState.NOT_STARTED); curationTaskManager.updateTaskStatus(user, taskId, status); throw e; // Re-throw so AsyncJobRunnerAdapter marks the job FAILED } }

Note: The asyncJobId is cleared on both success and failure. While the job is running, it serves as a reference for the starter to poll progress. Once the job finishes, all relevant information has been propagated to the task's execution details and the job ID is no longer needed.

3. New Async Job: Execute Task

API Endpoints

Add to CurationTaskController:

POST /curationTask/{taskId}/execute/async/start GET /curationTask/{taskId}/execute/async/get/{asyncToken}

The taskId path parameter is the only input. The system reads the task's executionDetails concrete type to determine execution behavior.

Request Schema

Path: lib/lib-auto-generated/.../curation/compute/ComputeTaskExecutionRequest.json

{ "description": "Request to execute a curation task. The task must have ExecutableTaskExecutionDetails.", "implements": [ { "$ref": "org.sagebionetworks.repo.model.asynch.AsynchronousRequestBody" } ], "properties": { "taskId": { "type": "integer", "description": "The ID of the CurationTask to execute." } } }

Response Schema

Path: lib/lib-auto-generated/.../curation/compute/ComputeTaskExecutionResponse.json

{ "description": "Response from a completed task execution.", "implements": [ { "$ref": "org.sagebionetworks.repo.model.asynch.AsynchronousResponseBody" } ], "properties": { "taskId": { "type": "integer", "description": "The ID of the CurationTask that was executed." }, "executionDetails": { "description": "The updated execution details after completion.", "$ref": "org.sagebionetworks.repo.model.curation.execution.ExecutableTaskExecutionDetails" } } }

Registration in AsynchJobType

COMPUTE_TASK_EXECUTION(ComputeTaskExecutionRequest.class, ComputeTaskExecutionResponse.class),

Queue: {stack}-{instance}-COMPUTE_TASK_EXECUTION

4. Controller Endpoints

@RequiredScope({view, modify}) @ResponseStatus(HttpStatus.CREATED) @RequestMapping(value = "/curationTask/{taskId}/execute/async/start", method = RequestMethod.POST) public @ResponseBody AsyncJobId startTaskExecution( @RequestParam(value = AuthorizationConstants.USER_ID_PARAM) Long userId, @PathVariable Long taskId) { ComputeTaskExecutionRequest request = new ComputeTaskExecutionRequest(); request.setTaskId(taskId); AsynchronousJobStatus job = serviceProvider.getAsynchronousJobServices() .startJob(userId, request); AsyncJobId asyncJobId = new AsyncJobId(); asyncJobId.setToken(job.getJobId()); return asyncJobId; } @RequiredScope({view}) @ResponseStatus(HttpStatus.OK) @RequestMapping(value = "/curationTask/{taskId}/execute/async/get/{asyncToken}", method = RequestMethod.GET) public @ResponseBody ComputeTaskExecutionResponse getTaskExecutionResult( @RequestParam(value = AuthorizationConstants.USER_ID_PARAM) Long userId, @PathVariable String asyncToken) { AsynchronousJobStatus status = serviceProvider.getAsynchronousJobServices() .getJobStatusAndThrow(userId, asyncToken); return (ComputeTaskExecutionResponse) status.getResponseBody(); }

5. Worker Architecture

ComputeTaskExecutionWorker

@Service public class ComputeTaskExecutionWorker implements AsyncJobRunner<ComputeTaskExecutionRequest, ComputeTaskExecutionResponse> { private final ComputeTaskDispatcher dispatcher; @Override public Class<ComputeTaskExecutionRequest> getRequestType() { return ComputeTaskExecutionRequest.class; } @Override public Class<ComputeTaskExecutionResponse> getResponseType() { return ComputeTaskExecutionResponse.class; } @Override public ComputeTaskExecutionResponse run(String jobId, UserInfo user, ComputeTaskExecutionRequest request, AsyncJobProgressCallback callback) { return dispatcher.dispatch(user, jobId, request.getTaskId(), callback); } }

ComputeTaskDispatcher

public interface ComputeTaskDispatcher { ComputeTaskExecutionResponse dispatch(UserInfo user, String jobId, Long taskId, AsyncJobProgressCallback callback); }

Implementation logic:

  1. Load CurationTask and TaskStatus by ID

  2. Validate taskStatus.executionDetails instanceof ExecutableTaskExecutionDetails

  3. Validate user has UPDATE access on the task's project

  4. Validate task state is NOT_STARTED (cannot re-execute an in-progress or completed task)

  5. Transition task state to IN_PROGRESS; set asyncJobId, startedBy, startedOn; clear any prior errorMessage

  6. Look up sub-worker by executionDetails.getClass()

  7. Delegate to sub-worker: subWorker.execute(user, jobId, task, executionDetails, callback)

  8. On success: transition task to COMPLETED, update output fields, clear asyncJobId, return response

  9. On failure: revert task to NOT_STARTED, set errorMessage/errorDetails, clear asyncJobId, re-throw

ComputeTaskSubWorker Interface

public interface ComputeTaskSubWorker<T extends ExecutableTaskExecutionDetails> { Class<T> getExecutionDetailsType(); void execute(UserInfo user, String jobId, CurationTask task, T executionDetails, AsyncJobProgressCallback callback); }

Sub-workers are auto-discovered via Spring List<ComputeTaskSubWorker<?>> injection. The dispatcher builds a Map<Class<?>, ComputeTaskSubWorker<?>> keyed by getExecutionDetailsType().

6. Worker Configuration

@Bean public SimpleTriggerFactoryBean computeTaskExecutionTrigger( ConcurrentManager concurrentStackManager, ComputeTaskExecutionWorker worker) { String queueName = stackConfig.getQueueName("COMPUTE_TASK_EXECUTION"); MessageDrivenRunner adapter = new AsyncJobRunnerAdapter<>(jobStatusManager, userManager, worker); return new WorkerTriggerBuilder() .withStack(ConcurrentWorkerStack.builder() .withSemaphoreLockKey("computeTaskExecutionWorker") .withSemaphoreMaxLockCount(5) .withSemaphoreLockAndMessageVisibilityTimeoutSec(600) .withMaxThreadsPerMachine(2) .withSingleton(concurrentStackManager) .withCanRunInReadOnly(false) .withQueueName(queueName) .withWorker(adapter) .build()) .withRepeatInterval(3000) .withStartDelay(2000) .build(); }

Register <ref bean="computeTaskExecutionTrigger"/> in main-scheduler-spb.xml.

7. State Machine

TaskStatus.state: NOT_STARTED ─[POST /execute/async/start]─> IN_PROGRESS ┌─────────┴─────────┐ │ │ [success] [failure] │ │ v v COMPLETED NOT_STARTED (errorMessage set) ExecutableTaskExecutionDetails lifecycle: ┌──────────────────────────────────────────────────────────────────────┐ │ On start: asyncJobId=X, startedBy=user, startedOn=now, error=null │ │ On success: asyncJobId=null, output fields populated │ │ On failure: asyncJobId=null, errorMessage=msg, errorDetails=detail │ └──────────────────────────────────────────────────────────────────────┘ Async Job (visible only to starter): PROCESSING ──> COMPLETE / FAILED

8. High-Level: Sample Sheet Generation Supervisor Agent

The sub-worker for SampleSheetGenerationExecutionDetails will use Spring AI Bedrock AgentCore to orchestrate a multi-agent system. Below is the conceptual architecture — implementation details are covered in a separate design.

Supervisor Agent

A coordinator agent that plans and orchestrates the sample sheet generation workflow. It:

  • Receives the task context (input file view, target schema, user instructions)

  • Breaks the work into steps and delegates to specialized sub-agents

  • Validates output before persisting

  • Handles errors and retries

Sub-Agents

Sub-Agent

Responsibility

Sub-Agent

Responsibility

Data Retrieval Agent

Queries the input FileView to retrieve all file annotations. Returns structured annotation data.

Schema Analysis Agent

Retrieves and interprets the target JSON Schema. Produces a mapping specification describing required fields, types, and constraints.

ETL Code Generation Agent

Given source annotation structure + target schema spec, generates a Python/pandas transformation script.

Code Execution Agent

Executes the generated ETL script in a Code Interpreter sandbox. Returns generated CSV content or error details.

Validation Agent

Validates the generated sample sheet against the target schema. Reports completeness and correctness gaps.

Tools Available to the System

Tool

Used By

Description

Tool

Used By

Description

QueryFileView

Data Retrieval Agent

Executes SQL against the input EntityView

GetJsonSchema

Schema Analysis Agent

Retrieves a JSON Schema by $id

CodeInterpreter

Code Execution Agent

Executes Python in a sandbox (built-in AgentCore capability)

WriteRecordSet

Supervisor Agent

Persists final CSV as a Synapse RecordSet in the output folder

CreateReviewTask

Supervisor Agent

Creates a RecordBasedMetadataTaskProperties curation task for human review

Execution Flow

Supervisor ├─> Data Retrieval Agent ──[QueryFileView]──> annotation data ├─> Schema Analysis Agent ──[GetJsonSchema]──> mapping spec ├─> ETL Code Generation Agent ──(annotation data + mapping spec)──> Python script ├─> Code Execution Agent ──[CodeInterpreter]──> generated CSV ├─> Validation Agent ──(CSV + schema)──> validation report ├─> (if valid) ──[WriteRecordSet]──> RecordSet synId └─> [CreateReviewTask]──> review task ID

Error Handling (High Level)

  • Code Execution failure → Supervisor retries with ETL Code Generation Agent (up to 3 attempts)

  • Validation failure → Supervisor asks ETL Code Generation Agent to fix identified issues

  • All retries exhausted → Supervisor reports error to dispatcher (task reverts to NOT_STARTED with errorMessage)

  • Transient infrastructure errors → RecoverableMessageException at worker level

9. Files to Create/Modify

New Files

File

Purpose

File

Purpose

.../schema/.../curation/compute/ComputeTaskExecutionRequest.json

Async request schema

.../schema/.../curation/compute/ComputeTaskExecutionResponse.json

Async response schema

.../schema/.../curation/execution/ExecutableTaskExecutionDetails.json

Interface for executable tasks

.../schema/.../curation/execution/SampleSheetGenerationExecutionDetails.json

Concrete execution details

.../manager/curation/compute/ComputeTaskDispatcher.java

Dispatcher interface

.../manager/curation/compute/ComputeTaskDispatcherImpl.java

Dispatch logic + state propagation

.../manager/curation/compute/ComputeTaskSubWorker.java

Sub-worker interface

.../worker/ComputeTaskExecutionWorker.java

Async job worker

Modified Files

File

Change

File

Change

CurationTaskController.java

Add execute async start/get endpoints

AsynchJobType.java

Add COMPUTE_TASK_EXECUTION

AsyncJobWorkersConfig.java

Add computeTaskExecutionTrigger bean

main-scheduler-spb.xml

Register trigger

External (out of codebase)

System

Change

System

Change

Synapse-Stack-Builder

Add COMPUTE_TASK_EXECUTION SQS queue

AWS Bedrock AgentCore

Configure multi-agent system (separate design)

10. Verification Plan

  1. Unit tests: ComputeTaskDispatcher (dispatch logic, state transitions, error propagation), ComputeTaskExecutionWorker

  2. Integration test: Create task with SampleSheetGenerationExecutionDetails → trigger execution → verify task state transitions, asyncJobId visibility, and error message propagation

  3. Multi-user test: Verify non-starter user can read task status but cannot poll async job directly

  4. Sub-worker tests: Deferred to sub-worker implementation design