Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

(config) WorkerLauncher - This is a command-line runner that Spring Boot knows about. Spring Boot automatically calls the run() method on this when it's done loading the Spring context. This is what sets up and runs the PollSqsWorker (defined in bridge-base), which in turn calls the BridgeExporterSqsCallback when it gets a request. The WorkerLauncher currently does everything single-threaded, since Bridge-EX workers are already heavily multi-threaded and we never need to run multiple Export requests in parallel.

...

(record) BridgeExporterRecordProcessor - This is the main entry point into the Exporter. This is called once for each request, with the deserialized request. This calls the RecordIdSourceFactory to get the stream of requestsrecord IDs (which it then queries DDB to get the full record), the RecordFilterHelper to determine whether to include or exclude the record, and the WorkerManager to queue an asynchronous task for each record. This has a configurable loop delay to prevent browning out DDB and will log progress at configurable intervals.

(record) RecordFilterHelper - Helper function to determine if a record should be included or excluded. Criteria include: (a) sharing scope (b) study whitelist (see Redrives) (c) table whitelest (see Redrives) (d) skipping unconfigured studies. Note for sharing scope, this uses the most restrictive between the sharing scope at the time of the upload and the user's current sharing scope (with missing sharing scopes treated as "no sharing"). This is so that if a user uploads data, and then decides to withdraw, pause, or otherwise stop sharing before the data is exported, we respect their sharing scope. This has the side effect such that if we re-export data for whatever reason, any user who has since stopped sharing will not have their data exported.

(record) RecordIdSource - The Record ID Source represents a stream of record IDs. One is created for each request, and Record Processor iterates on this stream to get record IDs. This can either be a DDB query result or a record ID list from an S3 override file. Internally, the Record ID Source contains an Iterator from the backing stream and a Converter (a functional interface, used by lambdas) to extract the record ID from the elements in the backing stream and convert it to a String. The Record ID Source is itself an Iterator and implements both the Iterator and Iterable interfaces.

(record) RecordIdSourceFactory - This creates Record ID Sources. It reads the Bridge Exporter Request to determine if it needs a DDB query or an S3 override file and creates the corresponding Record ID Source.

(request) BridgeExporterSqsCallback - This callback connects the PollSqsWorker and the Record Processor. The PollSqsWorker (defined in bridge-base) calls this callback with the raw content of the SQS messages. This callback then deserializes the SQS message into a Bridge Exporter Request and calls the Record Processor.

Worker Manager

(worker) ExportSubtask - Corresponds to a single health data record in a Bridge-EX request. Contains references to the parent task, the schema, the raw DDB item, and the JSON representation of the health data (which might be different from the raw DDB item because of legacy surveys; see Legacy Hacks for details). These are immutable, so we reuse the same subtask if multiple handlers need to work on the same record.

(worker) ExportTask - The Worker Manager creates an ExportTask for each request. The Task references the request and stores the current task state, specifically TsvInfo for each study and each schema, the queue of outstanding asynchronous executions, and the set of studies we've seen so far. The Worker Manager and the Handlers refer to this object to track state. Also includes metrics and the task's temp dir on disk.

Note that this also includes the ExporterDate. This is different from the UploadDate because the UploadDate represents when the health data was uploaded to Bridge (generally yesterday) while the ExporterDate represents when the Exporter was run and when the data hit Synapse. The design decision was made this way since researcher's would want to operate on "today's data drop" (ExporterDate) while the Exporter needs to set a cutoff for exporting (all data up through the end of yesterday, that is, UploadDate).

(worker) ExportWorker - An asynchronous execution. The Worker Manager will create one for each record, and may make more than if the record needs, for example, both the Health Data handler and the AppVersion handler. This is executed in the Worker Manager's asynchronous thread pool and tracked in the ExportTask's asynchronous execution queue.

(worker) ExportWorkerManager - The main driver for the Exporter. This includes helper object instances and common helper functions to be used by handlers, as well as methods for task and handler management. ExportWorkerManager itself contains no state and offloads state to the ExportTask and its TsvInfo objects.

The Record Processor calls ExportWorkerManager for each health data record. ExportWorkerManager then creates a subtask and runs it against the AppVersion handler and the HealthData handler (and sometimes the IosSurvey handler; see Legacy Hacks), using ExportWorkers and the asynchronous thread pool to run these tasks asynchronously. We use a fixed thread pool with a configurable thread count to limit the number of concurrent asynchronous executions. Limiting the threads prevents resource starving, and excess tasks are queued up (with Java's built-in Executor handling queueing).

When the Record Processor is done iterating through all the records, it calls endOfStream(), signalling to ExportWorkerManager that there are no more subtasks. At this point, ExportWorkerManager blocks until all asynchronous executions in the current task's queue are complete (using the synchronous Future.get(); this can be thought of as the equivalent to fork-join). Once complete, it will signal to each HealthData and AppVersion handler to upload its payload to Synapse, then invoke the SynapseStatusTableHelper to update status tables.

(worker) TsvInfo

Handlers

(handler) AppVersionExportHandler

...