Document toolboxDocument toolbox

File Downloads Backfill

The purpose of this page is to document the process that was needed to backfill the file downloads statistics. The new event collection system with kinesis firehose was initially deployed in September 2019 and in order to compute statistics for months previous to September we used a snapshot of the data warehouse (https://sagebionetworks.jira.com/wiki/spaces/DW/pages/796819457/Warehouse+Snapshot) to gather as much information about file downloads that happened in the preceding months.

There were 2 main issues that needed to be addressed:

  1. How to get the file download records with the correct project information from the data we had

  2. How to map this information into Parquet files that are partitioned correctly by year, month and day

The approach taken was to create a data warehouse snapshot and build a table containing all the information needed to run an ETL job with AWS Glue that can easily transform the records into parquet files.

The following steps were taken in order to achieve this:

  1. Create a snapshot of the data warehouse

  2. Upgrade the RDS instance of the snapshot to MySQL 8 (needed for windows functions)

  3. Create a table with a compact version of the node snapshot that contains only information about file and table entities, this table will contain all the nodes transitions in projects along with the (min) timestamp:

    CREATE TABLE NODE_SNAPSHOT_COMPACT SELECT ID, PROJECT_ID, MIN(TIMESTAMP) AS MIN_TIMESTAMP FROM NODE_SNAPSHOT WHERE NODE_TYPE IN ('file', 'table') GROUP BY ID, PROJECT_ID;
  4. Add an index to the table NODE_SNAPSHOT_COMPACT ON the (ID, MIN_TIMESTAMP)

  5. Create a table that contains the information from the FILE_DOWNLOAD_RECORD, joining on the NODE_SNAPSHOT_COMPACT removing duplicates:

    CREARE TABLE DOWNLOADS_BACKFILL_TEMP SELECT TIMESTAMP, USER_ID, FILE_HANDLE_ID, PROJECT_ID, ASSOCIATION_OBJECT_TYPE, ASSOCIATION_OBJECT_ID, STACK, `INSTANCE` FROM ( SELECT D.*, FIRST_VALUE(N.PROJECT_ID) OVER (W ORDER BY MIN_TIMESTAMP DESC) AS PROJECT_ID, ROW_NUMBER() OVER (W ORDER BY MIN_TIMESTAMP DESC) AS R_N, 'prod' AS STACK, 'backfill' AS `INSTANCE` FROM FILE_DOWNLOAD_RECORD D JOIN NODE_SNAPSHOT_COMPACT N WHERE D.ASSOCIATION_OBJECT_TYPE IN ('FileEntity', 'TableEntity') AND D.ASSOCIATION_OBJECT_ID = N.ID AND N.MIN_TIMESTAMP <= D.TIMESTAMP WINDOW W AS (PARTITION BY D.TIMESTAMP, D.USER_ID, D.FILE_HANDLE_ID, D.ASSOCIATION_OBJECT_ID, D.ASSOCIATION_OBJECT_TYPE) ) AS T WHERE R_N = 1;
  6. Add into the DOWNLOADS_BACKFILL_TEMP table the downloads from the FILE_HANDLE_DOWNLOAD_RECORD in the same manner:

    INSERT INTO DOWNLOADS_BACKFILL_TEMP SELECT TIMESTAMP, USER_ID, REQUESTED_FILE_HANDLE_ID AS FILE_HANDLE_ID, PROJECT_ID, ASSOCIATION_OBJECT_TYPE, ASSOCIATION_OBJECT_ID, STACK, `INSTANCE` FROM ( SELECT D.*, FIRST_VALUE(N.PROJECT_ID) OVER (W ORDER BY MIN_TIMESTAMP DESC) AS PROJECT_ID, ROW_NUMBER() OVER (W ORDER BY MIN_TIMESTAMP DESC) AS R_N, 'prod' AS STACK, 'backfill' AS `INSTANCE` FROM FILE_HANDLE_DOWNLOAD_RECORD D JOIN NODE_SNAPSHOT_COMPACT N WHERE D.ASSOCIATION_OBJECT_TYPE IN ('FileEntity', 'TableEntity') AND D.ASSOCIATION_OBJECT_ID = N.ID AND N.MIN_TIMESTAMP <= D.TIMESTAMP WINDOW W AS (PARTITION BY D.TIMESTAMP, D.USER_ID, D.REQUESTED_FILE_HANDLE_ID, D.ASSOCIATION_OBJECT_ID, D.ASSOCIATION_OBJECT_TYPE) ) AS T WHERE R_N = 1;
  7. We got most of the download information in with the project id added from the bulk and batch download endpoint, but technically there are other ways to request a download, among which the GET /file/{id} endpoint. We can add this information as well (even though the amount of downloads will be minimal compared to the previous one).
    So, we create a temporary table to gather info about the /file/{id} endpoint:

  8. We can now add a primary key on the SESSION_ID, TIMESTAMP

  9. We now have only information about the session and timestamp, we can join with the ACCESS_RECORD table to gather full info for valid downloads (request status code 200):

    Note that this might take hours the first time is run despite indexes since the ACCESS_RECORD table is partitioned daily (which technically can be compared to several joins on multiple table, one per day)

  10. With some regex trickery we can extract information about the various ids we need:

  11. For better performance we can change the column types of FILE_HANDLE_ID AND ASSOCIATION_OBJECT_ID to BIGINT and add an index on TIMESTAMP, ASSOCIATION_OBJECT_TYPE, ASSOCIATION_OBJECT_ID

  12. We now delete all the records that are not FileEntity or TableEntity:

  13. Similar to what we did before with the download records we join on the NODE_SNAPSHOT_COMPACT and add the information to our DOWNLOAD_BACKFILL_TEMP:

  14. We can now create an index on the DOWNLOAD_BACKFILL_TEMP table on TIMESTAMP, USER_ID, FILE_HANDLE_ID so that we can easily remove duplicates into a final table:

    Notice that we added here the year, month and day. This will help us in the ETL to create partitions (not strictly needed to be done in the table, but does not cost much since we are already here). Also, we prefixed the associate_id with ‘syn’ to be conform to the data stored in the event collection.

  15. We now have a table with all the downloads an the project id information, there are various ways to use this data for an ETL job, we can use it as source but it’s much faster to actually dump it into a CSV and use that as the source of the ETL in S3, an example script to dump the table (Notice that I add a TIMESTAMP filter since we already had data collected with the new streaming system after the 9th of September):

  16. We can gzip this csv file and upload it to S3

  17. Now we can go to the ETL part and use glue to perform the job, we create a crawler on the previous file to extract the table information.

  18. Once the table is discovered, we modify its schema with the column names and the correct data types (similar to the table used by firehose for fileDownloads)

  19. The CSV file contains the header with the column names, now modify the table properties, adding skip.header.line.count with value 1.

  20. We can verify that it works running a query with athena

  21. We now create an ETL Job to transform the records in the gzipped CSV file to partitioned parquet files, when creating the spark script we need to make sure that:

    1. We pad the month and day, adding a map transformation:

    2. We partition the data correctly, adding in the final S3 connection property for the parquet transformation:

       

The final script might look like the following: