Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Overview

  1. Bridge-EX-Scheduler lives inside AWS Lambda, with a schedule configured to execute every day at 10am UTC (2am PST, 3am PDT).
    1. Scheduler pulls configs from DynamoDB table Exporter-Scheduler-Config, which contains the SQS queue URL, time zone, and an optional request JSON override.
    2. Scheduler generates the request, fills in yesterday's date, and writes the request to the SQS queue.
  2. Bridge-EX polls SQS, waiting for a request.
    1. When the request arrives, it pulls user health data records from DDB (along with study info, schemas, attachment metadata, and participant options (sharing options)), as well as attachment content from S3.
    2. Bridge-EX writes uploads attachments to Synapse as file handles and writes the record to a TSV (tab-separated values) file on disk.
    3. On completion, Bridge-EX uploads the TSVs to the Synapse tables, creating new tables as necessary.

Components

Bridge-EX components can be broken down into 3 major groups (not including Spring components and various helpers).

  1. Record Processor - This is the entry point into Bridge-EX. It polls SQS for a request. When a request comes in, it iterates through all health data records associated with that request, calling the Worker Manager with each record. When it's done iterating records, it signals "end of stream" to the Worker Manager.
  2. Worker Manager - This contains a thread pool, a collection of worker handlers, and a task queue, as well as various helper logic needed by the handlers. The Worker Manager is called for every record in a request, and queues a task onto each relevant handler. On end of stream, the Worker Manager signals to each handler to upload their TSVs to Synapse.
  3. Handlers - Various handlers that can be run asynchronously and in parallel. These include health data handlers, app version handlers, and legacy iOS survey handlers.

Spring Configs and Launcher

(config) AppInitializer - This is called by Spring Boot and used to initialize the Spring context and the app.

(config) SpringConfig - Annotation-based Spring config. Self-explanatory.

(config) WorkerLauncher - This is a command-line runner that Spring Boot knows about. Spring Boot automatically calls the run() method on this when it's done loading the Spring context. This is what sets up and runs the PollSqsWorker (defined in bridge-base), which in turn calls the BridgeExporterSqsCallback when it gets a request. The WorkerLauncher currently does everything single-threaded, since Bridge-EX workers are already heavily multi-threaded and we never need to run multiple Export requests in parallel.

Record Processor

(record) BridgeExporterRecordProcessor - This is the main entry point into the Exporter. This is called once for each request, with the deserialized request. This calls the RecordIdSourceFactory to get the stream of record IDs (which it then queries DDB to get the full record), the RecordFilterHelper to determine whether to include or exclude the record, and the WorkerManager to queue an asynchronous task for each record. This has a configurable loop delay to prevent browning out DDB and will log progress at configurable intervals.

(record) RecordFilterHelper - Helper function to determine if a record should be included or excluded. Criteria include: (a) sharing scope (b) study whitelist (see Redrives) (c) table whitelest (see Redrives) (d) skipping unconfigured studies. Note for sharing scope, this uses the most restrictive between the sharing scope at the time of the upload and the user's current sharing scope (with missing sharing scopes treated as "no sharing"). This is so that if a user uploads data, and then decides to withdraw, pause, or otherwise stop sharing before the data is exported, we respect their sharing scope. This has the side effect such that if we re-export data for whatever reason, any user who has since stopped sharing will not have their data exported.

(record) RecordIdSource - The Record ID Source represents a stream of record IDs. One is created for each request, and Record Processor iterates on this stream to get record IDs. This can either be a DDB query result or a record ID list from an S3 override file. Internally, the Record ID Source contains an Iterator from the backing stream and a Converter (a functional interface, used by lambdas) to extract the record ID from the elements in the backing stream and convert it to a String. The Record ID Source is itself an Iterator and implements both the Iterator and Iterable interfaces.

(record) RecordIdSourceFactory - This creates Record ID Sources. It reads the Bridge Exporter Request to determine if it needs a DDB query or an S3 override file and creates the corresponding Record ID Source.

(request) BridgeExporterSqsCallback - This callback connects the PollSqsWorker and the Record Processor. The PollSqsWorker (defined in bridge-base) calls this callback with the raw content of the SQS messages. This callback then deserializes the SQS message into a Bridge Exporter Request and calls the Record Processor.

Worker Manager

(worker) ExportSubtask - Corresponds to a single health data record in a Bridge-EX request. Contains references to the parent task, the schema, the raw DDB item, and the JSON representation of the health data (which might be different from the raw DDB item because of legacy surveys; see Legacy Hacks for details). These are immutable, so we reuse the same subtask if multiple handlers need to work on the same record.

(worker) ExportTask - The Worker Manager creates an ExportTask for each request. The Task references the request and stores the current task state, specifically TsvInfo for each study and each schema, the queue of outstanding asynchronous executions, and the set of studies we've seen so far. The Worker Manager and the Handlers refer to this object to track state. Also includes metrics and the task's temp dir on disk.

Note that this also includes the ExporterDate. This is different from the UploadDate because the UploadDate represents when the health data was uploaded to Bridge (generally yesterday) while the ExporterDate represents when the Exporter was run and when the data hit Synapse. The design decision was made this way since researcher's would want to operate on "today's data drop" (ExporterDate) while the Exporter needs to set a cutoff for exporting (all data up through the end of yesterday, that is, UploadDate).

(worker) ExportWorker - An asynchronous execution. The Worker Manager will create one for each record, and may make more than if the record needs, for example, both the Health Data handler and the AppVersion handler. This is executed in the Worker Manager's asynchronous thread pool and tracked in the ExportTask's asynchronous execution queue.

(worker) ExportWorkerManager - The main driver for the Exporter. This includes helper object instances and common helper functions to be used by handlers, as well as methods for task and handler management. ExportWorkerManager itself contains no state and offloads state to the ExportTask and its TsvInfo objects.

The Record Processor calls ExportWorkerManager for each health data record. ExportWorkerManager then creates a subtask and runs it against the AppVersion handler and the HealthData handler (and sometimes the IosSurvey handler; see Legacy Hacks), using ExportWorkers and the asynchronous thread pool to run these tasks asynchronously. We use a fixed thread pool with a configurable thread count to limit the number of concurrent asynchronous executions. Limiting the threads prevents resource starving, and excess tasks are queued up (with Java's built-in Executor handling queueing).

When the Record Processor is done iterating through all the records, it calls endOfStream(), signalling to ExportWorkerManager that there are no more subtasks. At this point, ExportWorkerManager blocks until all asynchronous executions in the current task's queue are complete (using the synchronous Future.get(); this can be thought of as the equivalent to fork-join). Once complete, it will signal to each HealthData and AppVersion handler to upload its payload to Synapse, then invoke the SynapseStatusTableHelper to update status tables.

(worker) TsvInfo - Every ExportTask contains a TsvInfo for each study AppVersion table and for each schema. The TsvInfo contains a reference to the TSV file on disk and a writer for that file. Given a mapping from column names to column values, it knows how to write that to the TSV. It keeps track of the column names so it can do this as well as line counts for metrics. The handlers will get the TsvInfo from from the ExportTask and call writeRow() with the column value map.

It is theoretically possible for two asynchronous executions to call the same handler with the same TsvInfo on two different records. In this case, it's unclear how the data will be written to the TSV. To prevent this, we synchronize writeRow().

Handlers

(handler) AppVersionExportHandler - This handler is for the poorly named AppVersion table. This table was originally meant to track AppVersions for each health data record. As the system evolved, this became the table used to track metadata for all records, used to compute stats and metrics, but the name stuck.

Every study has its own AppVersion handler. The Worker Manager calls this handler with a subtask, and the AppVersion handler will write the subtask's metadata to the AppVersion table, including a column for the original table, where the record's content lives. See the parent class SynapseExportHandler for more details.

(handler) ExportHandler - Abstract parent class for all handlers. The primary purpose for this class is to the ExportWorker (an asynchronous execution) can have a reference to a handler without needing to know what kind of handler it is. It also contains references to the Worker Manager and other various getters and setters concrete handlers need.

(handler) HealthDataExportHandler - Every schema has its own HealthData handler. The Worker Manager calls this handler with a subtask, and the HealthData handler will serialize the record's data into Synapse-compatible values (calling the SynapseHelper) and write those values to the Synapse table. This also includes logic to generate Synapse table column definitions based on a schema. See the parent class SynapseExportHandler for more details.

(handler) IosSurveyExportHandler - A special iOS Survey handler to support a legacy hack. See Legacy Hacks for more details.

(handler) SynapseExportHandler - This is the abstract parent class to both HealthData handler and AppVersion handler. This encapsulates logic to initialize the TsvInfo for a given task and schema, create the Synapse table if it doesn't already exist, column definitions and column values for common metadata columns, and upload the TSV to Synapse when the task is done.

Helpers

(dynamo) DynamoHelper - BridgeEX-specific DDB helper. Encapsulates querying, parsing, and caching for schemas, studies, and participant sharing scope. Also handles defaulting sharing scope to "no sharing" if the sharing scope could not be obtained from DDB.

(helper) ExportHelper - This helper contains some complex logic used for legacy hacks. See Legacy Hacks for more details.

(metrics) Metrics - Object for tracking metrics for a request, lives inside the ExportTask. CounterMap allows you to associate a count to a string. (Example: parkinson-TappingActivity-v6.lineCount = 73) KeyValuesMap allows you to associate one or more values to a given key. (Example: uniqueAppVersions[parkinson] = ["version 1.0.5, build 12", "version 1.2, build 31", "version 1.3-pre9, build 40"]). SetCounterMap allows you to associate a count to a string, but each count is associated with a value, which the counter dedupes on. (Example: uniqueHealthCodes[parkinson] = 49 ) All metrics use sorted data structures, so Bridge-EX can log the metric values in alphabetical order, for ease of log viewing.

Note that KeyValuesMap and SetCounterMap are both backed by a multimap. We keep them separate to make it clear that KeyValuesMap is used for the values while SetCounterMap is used for the count. The difference is because (a) KeyValuesMap is expected to have a small set of values while SetCounterMap is expected to have a large set of values (b) SetCounterMap counts unique health codes, which we don't want to write to our logs.

(metrics) MetricsHelper - Helper to encapsulate a few metrics-related functionality. This is used only by the Record Processor and exists mainly to keep the Record Processor from becoming too complicated. Current responsibilities include capturing metrics common to all records and writing all metrics to the logs at the end of a request.

(synapse) SynapseHelper - SynapseHelper, which includes simple wrappers around Synapse Java Client calls to include Retry annotations, as well as shared complex logic.

serializeToSynapseType() is used to convert raw health data records from JSON values in DDB to values that can be used in Synapse tables. If a value can't be serialized to the given type, it will return null. It also includes logic for downloading attachments from S3 and uploading them to Synapse as file handles. This is currently only used by HealthData handler.

uploadFromS3ToSynapseFileHandle() is a helper method to download an attachment from S3 and upload it to Synapse as a file handle. This also uses the Bridge type to determine the correct MIIME type for JSON and for CSVs, defaulting to application/octet-stream if the type is ambiguous. This is currently only used by SynapseHelper.serializeToSynapseType().

generateFilename() includes some clever logic to preserve file extensions (in case researchers actually care about file extensions), or insert a new file extension if the existing one doesn't make sense (for things like JSON or CSVs). This is currently only used by SynapseHelper.uploadFromS3ToSynapseFileHandle().

uploadTsvFileToTable() encapsulates multiple Synapse calls used to upload TSVs to a Synapse table, as well as a poll-and-wait loop to process the asynchronous call as a blocking call. This is used by SynapseExportHandler and its children.

createTableWithColumnsAndAcls() encapsulates logic to create a Synapse table with the given columns, principal ID (table owner), and data access team ID (permissions to view table). This is a common pattern found in all tables created by Bridge-EX. This is used by SynapseExportHandler and its children as well as the SynapseStatusTableHelper.

(synapse) SynapseStatusTableHelper

(synapse) SynapseTableIterator

(util) BridgeExporterUtil

Deployment

Troubleshooting

Redrives

Legacy Hacks

More Info

Bridge Data Pipeline

Bridge Upload Data Format

Synapse Export Documentation

https://github.com/Sage-Bionetworks/Bridge-Exporter

https://github.com/Sage-Bionetworks/Bridge-EX-Scheduler

  • No labels