Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Create a snapshot of the data warehouse

  2. Upgrade the RDS instance of the snapshot to MySQL 8 (needed for windows functions)

  3. Create a table with a compact version of the node snapshot that contains only information about file and table entities, this table will contain all the nodes transitions in projects along with the (min) timestamp:

    Code Block
    languagesql
    CREATE TABLE NODE_SNAPSHOT_COMPACT SELECT ID, PROJECT_ID, MIN(TIMESTAMP) AS MIN_TIMESTAMP FROM NODE_SNAPSHOT WHERE NODE_TYPE IN ('file', 'table') GROUP BY ID, PROJECT_ID;
  4. Add an index to the table NODE_SNAPSHOT_COMPACT ON the (ID, MIN_TIMESTAMP)

  5. Create a table that contains the information from the FILE_DOWNLOAD_RECORD, joining on the NODE_SNAPSHOT_COMPACT removing duplicates:

    Code Block
    languagesql
    CREARE TABLE DOWNLOADS_BACKFILL_TEMP SELECT TIMESTAMP, USER_ID, FILE_HANDLE_ID, PROJECT_ID, ASSOCIATION_OBJECT_TYPE, ASSOCIATION_OBJECT_ID, STACK, `INSTANCE`
            FROM (
                SELECT D.*, FIRST_VALUE(N.PROJECT_ID) OVER (W ORDER BY MIN_TIMESTAMP DESC) AS PROJECT_ID, 
                ROW_NUMBER() OVER (W ORDER BY MIN_TIMESTAMP DESC) AS R_N,
                'prod' AS STACK,
                'backfill' AS `INSTANCE`
                FROM FILE_DOWNLOAD_RECORD D JOIN NODE_SNAPSHOT_COMPACT N
                    WHERE D.ASSOCIATION_OBJECT_TYPE IN ('FileEntity', 'TableEntity') AND D.ASSOCIATION_OBJECT_ID = N.ID AND N.MIN_TIMESTAMP <= D.TIMESTAMP
                WINDOW W AS (PARTITION BY D.TIMESTAMP, D.USER_ID, D.FILE_HANDLE_ID, D.ASSOCIATION_OBJECT_ID, D.ASSOCIATION_OBJECT_TYPE)
            ) AS T WHERE R_N = 1;
  6. Add into the DOWNLOADS_BACKFILL_TEMP table the downloads from the FILE_HANDLE_DOWNLOAD_RECORD in the same manner:

    Code Block
    languagesql
    INSERT INTO DOWNLOADS_BACKFILL_TEMP SELECT TIMESTAMP, USER_ID, REQUESTED_FILE_HANDLE_ID AS FILE_HANDLE_ID, PROJECT_ID, ASSOCIATION_OBJECT_TYPE, ASSOCIATION_OBJECT_ID, STACK, `INSTANCE`
            FROM (
                SELECT D.*, FIRST_VALUE(N.PROJECT_ID) OVER (W ORDER BY MIN_TIMESTAMP DESC) AS PROJECT_ID, 
                ROW_NUMBER() OVER (W ORDER BY MIN_TIMESTAMP DESC) AS R_N,
                'prod' AS STACK,
                'backfill' AS `INSTANCE`
                FROM FILE_HANDLE_DOWNLOAD_RECORD D
                JOIN NODE_SNAPSHOT_COMPACT N
                WHERE D.ASSOCIATION_OBJECT_TYPE IN ('FileEntity', 'TableEntity') AND D.ASSOCIATION_OBJECT_ID = N.ID AND N.MIN_TIMESTAMP <= D.TIMESTAMP
                WINDOW W AS (PARTITION BY D.TIMESTAMP, D.USER_ID, D.REQUESTED_FILE_HANDLE_ID, D.ASSOCIATION_OBJECT_ID, D.ASSOCIATION_OBJECT_TYPE)
            ) AS T WHERE R_N = 1;
  7. We got most of the download information in with the project id added from the bulk and batch download endpoint, but technically there are other ways to request a download, among which the GET /file/{id} endpoint. We can add this information as well (even though the amount of downloads will be minimal compared to the previous one).
    So, we create a temporary table to gather info about the /file/{id} endpoint:

    Code Block
    languagesql
    CREATE TABLE FILE_DOWNLOADS_DIRECT SELECT SESSION_ID, TIMESTAMP FROM PROCESSED_ACCESS_RECORD WHERE NORMALIZED_METHOD_SIGNATURE = 'GET /file/#';
  8. We can now add a primary key on the SESSION_ID, TIMESTAMP

  9. We now have only information about the session and timestamp, we can join with the ACCESS_RECORD table to gather full info for valid downloads (request status code 200):

    Code Block
    languagesql
     CREATE TABLE DOWNLOADS_BACKFILL_DIRECT SELECT D.TIMESTAMP, USER_ID, REQUEST_URL AS FILE_HANDLE_ID, QUERY_STRING AS ASSOCIATION_OBJECT_TYPE, QUERY_STRING AS ASSOCIATION_OBJECT_ID, STACK, `INSTANCE` FROM FILE_DOWNLOADS_DIRECT D JOIN ACCESS_RECORD A 
        WHERE D.SESSION_ID = A.SESSION_ID AND D.TIMESTAMP = A.TIMESTAMP
        AND RESPONSE_STATUS = 200

    Note that this might take hours the first time is run despite indexes since the ACCESS_RECORD table is partitioned daily (which technically can be compared to several joins on multiple table, one per day)

  10. With some regex trickery we can extract information about the various ids we need:

    Code Block
    languagesql
    UPDATE DOWNLOADS_BACKFILL_DIRECT 
        FILE_HANDLE_ID = REGEXP_REPLACE(FILE_HANDLE_ID, '/file/v1/file/([1-9]+)', '$1'),
        ASSOCIATION_OBJECT_TYPE = REGEXP_REPLACE('fileAssociateType=FileEntity&fileAssociateId=syn17017899&redirect=false', '.*fileAssociateType=(FileEntity|TableEntity).*', '$1'),
        ASSOCIATION_OBJECT_ID = REGEXP_REPLACE('fileAssociateType=FileEntity&fileAssociateId=syn17017899', '.*fileAssociateId=((syn)?([0-9]+)).*', '$3');
  11. For better performance we can change the column types of FILE_HANDLE_ID AND ASSOCIATION_OBJECT_ID to BIGINT and add an index on TIMESTAMP, ASSOCIATION_OBJECT_TYPE, ASSOCIATION_OBJECT_ID

  12. We now delete all the records that are not FileEntity or TableEntity:

    Code Block
    DELETE FROM DOWNLOADS_BACKFILL_DIRECT WHERE ASSOCIATION_OBJECT_TYPE NOT IN ('FileEntity', 'TableEntity');
  13. Similar to what we did before with the download records we join on the NODE_SNAPSHOT_COMPACT and add the information to our DOWNLOAD_BACKFILL_TEMP:

    Code Block
    languagesql
    INSERT INTO DOWNLOADS_BACKFILL_TEMP SELECT TIMESTAMP, USER_ID, FILE_HANDLE_ID, PROJECT_ID, ASSOCIATION_OBJECT_TYPE, ASSOCIATION_OBJECT_ID, STACK, `INSTANCE`
            FROM (
                SELECT D.*, FIRST_VALUE(N.PROJECT_ID) OVER (W ORDER BY MIN_TIMESTAMP DESC) AS PROJECT_ID, 
                ROW_NUMBER() OVER (W ORDER BY MIN_TIMESTAMP DESC) AS R_N
                FROM DOWNLOADS_BACKFILL_DIRECT D
                JOIN NODE_SNAPSHOT_COMPACT N
                WHERE D.ASSOCIATION_OBJECT_ID = N.ID AND N.MIN_TIMESTAMP <= D.TIMESTAMP
                WINDOW W AS (PARTITION BY D.TIMESTAMP, D.USER_ID, D.FILE_HANDLE_ID, D.ASSOCIATION_OBJECT_ID, D.ASSOCIATION_OBJECT_TYPE)
            ) AS T WHERE R_N = 1;
  14. We can now create an index on the DOWNLOAD_BACKFILL_TEMP table on TIMESTAMP, USER_ID, FILE_HANDLE_ID so that we can easily remove duplicates into a final table:

    Code Block
    languagesql
    CREATE TABLE DOWNLOADS_BACKFILL SELECT TIMESTAMP, USER_ID, FILE_HANDLE_ID, MAX(PROJECT_ID) AS PROJECT_ID, MAX(ASSOCIATION_OBJECT_TYPE) AS ASSOCIATE_TYPE, CONCAT('syn', MAX(ASSOCIATION_OBJECT_ID)) AS ASSOCIATE_ID, STACK, `INSTANCE`, YEAR(FROM_UNIXTIME(MAX(TIMESTAMP)/1000)) AS YEAR, MONTH(FROM_UNIXTIME(MAX(TIMESTAMP)/1000)) AS MONTH, DAY(FROM_UNIXTIME(MAX(TIMESTAMP)/1000)) AS DAY 
        FROM DOWNLOADS_BACKFILL_TEMP GROUP BY TIMESTAMP, USER_ID, FILE_HANDLE_ID;

    Notice that we added here the year, month and day. This will help us in the ETL to create partitions (not strictly needed to be done in the table, but does not cost much since we are already here). Also, we prefixed the associate_id with ‘syn’ to be conform to the data stored in the event collection.

  15. We now have a table with all the downloads an the project id information, there are various ways to use this data for an ETL job, we can use it as source but it’s much faster to actually dump it into a CSV and use that as the source of the ETL in S3, an example script to dump the table (Notice that I add a TIMESTAMP filter since we already had data collected with the new streaming system after the 9th of September):

    Code Block
    languagebash
     #!/bin/bash
    
        HOST=snapshot-instance.us-east-1.rds.amazonaws.com
        USER=snapshotuser
        PASSWORD=password
        BATCH_SIZE=5000000
        MAX_RECORDS=20000000
        OFFSET=0
        FILE_OUT=downloads.csv
    
        echo "Dumping $MAX_RECORDS records..."
    
        if [ $MAX_RECORDS -lt $BATCH_SIZE ]; then
            let BATCH_SIZE=MAX_RECORDS
        fi
    
        while [  $OFFSET -lt $MAX_RECORDS ]; do
            if [ $OFFSET -eq 0 ]; then
                COLUMN_NAMES="--column-names"
            fi
            echo "Processing records from $OFFSET to $((OFFSET+BATCH_SIZE))..."
            mysql -p$PASSWORD -u $USER -h $HOST --database warehouse --compress -e "SELECT * FROM DOWNLOADS_BACKFILL WHERE TIMESTAMP < UNIX_TIMESTAMP('2019-09-09') * 1000 ORDER BY TIMESTAMP DESC LIMIT $BATCH_SIZE OFFSET $OFFSET" $COLUMN_NAMES --batch | sed  's/\t/,/g' >> $FILE_OUT
            echo "Processing records from $OFFSET to $((OFFSET+BATCH_SIZE))...DONE"
            let OFFSET=OFFSET+BATCH_SIZE
        done
  16. We can gzip this csv file and upload it to S3

  17. Now we can go to the ETL part and use glue to perform the job, we create a crawler on the previous file to extract the table information.

  18. Once the table is discovered, we modify its schema with the column names and the correct data types (similar to the table used by firehose for fileDownloads)

  19. The CSV file contains the header with the column names, now modify the table properties, adding skip.header.line.count with value 1.

  20. We can verify that it works running a query with athena

  21. We now create an ETL Job to transform the records in the gzipped CSV file to partitioned parquet files, when creating the spark script we need to make sure that:

    1. We pad the month and day, adding a map transformation:

      Code Block
      languagepy
      def MapRecord(rec):
          rec["month"] = str(rec["month"]).zfill(2)
          rec["day"] = str(rec["day"]).zfill(2)
          return rec
      
      mapped_datasource =  Map.apply(frame = datasource0, f = MapRecord, transformation_ctx = "mapped_datasource")
    2. We partition the data correctly, adding in the final S3 connection property for the parquet transformation:

      Code Block
      languagepy
      datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://dev.log.sagebase.org/fileDownloads/backfill/records", "partitionKeys": ["year", "month", "day"]}, format = "parquet", transformation_ctx = "datasink4")
  22. Run the ETL job :)

...