...
Create a snapshot of the data warehouse
Upgrade the RDS instance of the snapshot to MySQL 8 (needed for windows functions)
Create a table with a compact version of the node snapshot that contains only information about file and table entities, this table will contain all the nodes transitions in projects along with the (min) timestamp:
Code Block language sql CREATE TABLE NODE_SNAPSHOT_COMPACT SELECT ID, PROJECT_ID, MIN(TIMESTAMP) AS MIN_TIMESTAMP FROM NODE_SNAPSHOT WHERE NODE_TYPE IN ('file', 'table') GROUP BY ID, PROJECT_ID;
Add an index to the table NODE_SNAPSHOT_COMPACT ON the (ID, MIN_TIMESTAMP)
Create a table that contains the information from the FILE_DOWNLOAD_RECORD, joining on the NODE_SNAPSHOT_COMPACT removing duplicates:
Code Block language sql CREARE TABLE DOWNLOADS_BACKFILL_TEMP SELECT TIMESTAMP, USER_ID, FILE_HANDLE_ID, PROJECT_ID, ASSOCIATION_OBJECT_TYPE, ASSOCIATION_OBJECT_ID, STACK, `INSTANCE` FROM ( SELECT D.*, FIRST_VALUE(N.PROJECT_ID) OVER (W ORDER BY MIN_TIMESTAMP DESC) AS PROJECT_ID, ROW_NUMBER() OVER (W ORDER BY MIN_TIMESTAMP DESC) AS R_N, 'prod' AS STACK, 'backfill' AS `INSTANCE` FROM FILE_DOWNLOAD_RECORD D JOIN NODE_SNAPSHOT_COMPACT N WHERE D.ASSOCIATION_OBJECT_TYPE IN ('FileEntity', 'TableEntity') AND D.ASSOCIATION_OBJECT_ID = N.ID AND N.MIN_TIMESTAMP <= D.TIMESTAMP WINDOW W AS (PARTITION BY D.TIMESTAMP, D.USER_ID, D.FILE_HANDLE_ID, D.ASSOCIATION_OBJECT_ID, D.ASSOCIATION_OBJECT_TYPE) ) AS T WHERE R_N = 1;
Add into the DOWNLOADS_BACKFILL_TEMP table the downloads from the FILE_HANDLE_DOWNLOAD_RECORD in the same manner:
Code Block language sql INSERT INTO DOWNLOADS_BACKFILL_TEMP SELECT TIMESTAMP, USER_ID, REQUESTED_FILE_HANDLE_ID AS FILE_HANDLE_ID, PROJECT_ID, ASSOCIATION_OBJECT_TYPE, ASSOCIATION_OBJECT_ID, STACK, `INSTANCE` FROM ( SELECT D.*, FIRST_VALUE(N.PROJECT_ID) OVER (W ORDER BY MIN_TIMESTAMP DESC) AS PROJECT_ID, ROW_NUMBER() OVER (W ORDER BY MIN_TIMESTAMP DESC) AS R_N, 'prod' AS STACK, 'backfill' AS `INSTANCE` FROM FILE_HANDLE_DOWNLOAD_RECORD D JOIN NODE_SNAPSHOT_COMPACT N WHERE D.ASSOCIATION_OBJECT_TYPE IN ('FileEntity', 'TableEntity') AND D.ASSOCIATION_OBJECT_ID = N.ID AND N.MIN_TIMESTAMP <= D.TIMESTAMP WINDOW W AS (PARTITION BY D.TIMESTAMP, D.USER_ID, D.REQUESTED_FILE_HANDLE_ID, D.ASSOCIATION_OBJECT_ID, D.ASSOCIATION_OBJECT_TYPE) ) AS T WHERE R_N = 1;
We got most of the download information in with the project id added from the bulk and batch download endpoint, but technically there are other ways to request a download, among which the GET /file/{id} endpoint. We can add this information as well (even though the amount of downloads will be minimal compared to the previous one).
So, we create a temporary table to gather info about the /file/{id} endpoint:Code Block language sql CREATE TABLE FILE_DOWNLOADS_DIRECT SELECT SESSION_ID, TIMESTAMP FROM PROCESSED_ACCESS_RECORD WHERE NORMALIZED_METHOD_SIGNATURE = 'GET /file/#';
We can now add a primary key on the SESSION_ID, TIMESTAMP
We now have only information about the session and timestamp, we can join with the ACCESS_RECORD table to gather full info for valid downloads (request status code 200):
Code Block language sql CREATE TABLE DOWNLOADS_BACKFILL_DIRECT SELECT D.TIMESTAMP, USER_ID, REQUEST_URL AS FILE_HANDLE_ID, QUERY_STRING AS ASSOCIATION_OBJECT_TYPE, QUERY_STRING AS ASSOCIATION_OBJECT_ID, STACK, `INSTANCE` FROM FILE_DOWNLOADS_DIRECT D JOIN ACCESS_RECORD A WHERE D.SESSION_ID = A.SESSION_ID AND D.TIMESTAMP = A.TIMESTAMP AND RESPONSE_STATUS = 200
Note that this might take hours the first time is run despite indexes since the ACCESS_RECORD table is partitioned daily (which technically can be compared to several joins on multiple table, one per day)
With some regex trickery we can extract information about the various ids we need:
Code Block language sql UPDATE DOWNLOADS_BACKFILL_DIRECT FILE_HANDLE_ID = REGEXP_REPLACE(FILE_HANDLE_ID, '/file/v1/file/([1-9]+)', '$1'), ASSOCIATION_OBJECT_TYPE = REGEXP_REPLACE('fileAssociateType=FileEntity&fileAssociateId=syn17017899&redirect=false', '.*fileAssociateType=(FileEntity|TableEntity).*', '$1'), ASSOCIATION_OBJECT_ID = REGEXP_REPLACE('fileAssociateType=FileEntity&fileAssociateId=syn17017899', '.*fileAssociateId=((syn)?([0-9]+)).*', '$3');
For better performance we can change the column types of FILE_HANDLE_ID AND ASSOCIATION_OBJECT_ID to BIGINT and add an index on TIMESTAMP, ASSOCIATION_OBJECT_TYPE, ASSOCIATION_OBJECT_ID
We now delete all the records that are not FileEntity or TableEntity:
Code Block DELETE FROM DOWNLOADS_BACKFILL_DIRECT WHERE ASSOCIATION_OBJECT_TYPE NOT IN ('FileEntity', 'TableEntity');
Similar to what we did before with the download records we join on the NODE_SNAPSHOT_COMPACT and add the information to our DOWNLOAD_BACKFILL_TEMP:
Code Block language sql INSERT INTO DOWNLOADS_BACKFILL_TEMP SELECT TIMESTAMP, USER_ID, FILE_HANDLE_ID, PROJECT_ID, ASSOCIATION_OBJECT_TYPE, ASSOCIATION_OBJECT_ID, STACK, `INSTANCE` FROM ( SELECT D.*, FIRST_VALUE(N.PROJECT_ID) OVER (W ORDER BY MIN_TIMESTAMP DESC) AS PROJECT_ID, ROW_NUMBER() OVER (W ORDER BY MIN_TIMESTAMP DESC) AS R_N FROM DOWNLOADS_BACKFILL_DIRECT D JOIN NODE_SNAPSHOT_COMPACT N WHERE D.ASSOCIATION_OBJECT_ID = N.ID AND N.MIN_TIMESTAMP <= D.TIMESTAMP WINDOW W AS (PARTITION BY D.TIMESTAMP, D.USER_ID, D.FILE_HANDLE_ID, D.ASSOCIATION_OBJECT_ID, D.ASSOCIATION_OBJECT_TYPE) ) AS T WHERE R_N = 1;
We can now create an index on the DOWNLOAD_BACKFILL_TEMP table on TIMESTAMP, USER_ID, FILE_HANDLE_ID so that we can easily remove duplicates into a final table:
Code Block language sql CREATE TABLE DOWNLOADS_BACKFILL SELECT TIMESTAMP, USER_ID, FILE_HANDLE_ID, MAX(PROJECT_ID) AS PROJECT_ID, MAX(ASSOCIATION_OBJECT_TYPE) AS ASSOCIATE_TYPE, CONCAT('syn', MAX(ASSOCIATION_OBJECT_ID)) AS ASSOCIATE_ID, STACK, `INSTANCE`, YEAR(FROM_UNIXTIME(MAX(TIMESTAMP)/1000)) AS YEAR, MONTH(FROM_UNIXTIME(MAX(TIMESTAMP)/1000)) AS MONTH, DAY(FROM_UNIXTIME(MAX(TIMESTAMP)/1000)) AS DAY FROM DOWNLOADS_BACKFILL_TEMP GROUP BY TIMESTAMP, USER_ID, FILE_HANDLE_ID;
Notice that we added here the year, month and day. This will help us in the ETL to create partitions (not strictly needed to be done in the table, but does not cost much since we are already here). Also, we prefixed the associate_id with ‘syn’ to be conform to the data stored in the event collection.
We now have a table with all the downloads an the project id information, there are various ways to use this data for an ETL job, we can use it as source but it’s much faster to actually dump it into a CSV and use that as the source of the ETL in S3, an example script to dump the table (Notice that I add a TIMESTAMP filter since we already had data collected with the new streaming system after the 9th of September):
Code Block language bash #!/bin/bash HOST=snapshot-instance.us-east-1.rds.amazonaws.com USER=snapshotuser PASSWORD=password BATCH_SIZE=5000000 MAX_RECORDS=20000000 OFFSET=0 FILE_OUT=downloads.csv echo "Dumping $MAX_RECORDS records..." if [ $MAX_RECORDS -lt $BATCH_SIZE ]; then let BATCH_SIZE=MAX_RECORDS fi while [ $OFFSET -lt $MAX_RECORDS ]; do if [ $OFFSET -eq 0 ]; then COLUMN_NAMES="--column-names" else COLUMN_NAMES="--skip-column-names" fi echo "Processing records from $OFFSET to $((OFFSET+BATCH_SIZE))..." mysql -p$PASSWORD -u $USER -h $HOST --database warehouse --compress -e "SELECT * FROM DOWNLOADS_BACKFILL WHERE TIMESTAMP < UNIX_TIMESTAMP('2019-09-09') * 1000 ORDER BY TIMESTAMP DESC LIMIT $BATCH_SIZE OFFSET $OFFSET" $COLUMN_NAMES --batch | sed 's/\t/,/g' >> $FILE_OUT echo "Processing records from $OFFSET to $((OFFSET+BATCH_SIZE))...DONE" let OFFSET=OFFSET+BATCH_SIZE done
We can gzip this csv file and upload it to S3
Now we can go to the ETL part and use glue to perform the job, we create a crawler on the previous file to extract the table information.
Once the table is discovered, we modify its schema with the column names and the correct data types (similar to the table used by firehose for fileDownloads)
The CSV file contains the header with the column names, now modify the table properties, adding skip.header.line.count with value 1.
We can verify that it works running a query with athena
We now create an ETL Job to transform the records in the gzipped CSV file to partitioned parquet files, when creating the spark script we need to make sure that:
We pad the month and day, adding a map transformation:
Code Block language py def MapRecord(rec): rec["month"] = str(rec["month"]).zfill(2) rec["day"] = str(rec["day"]).zfill(2) return rec mapped_datasource = Map.apply(frame = datasource0, f = MapRecord, transformation_ctx = "mapped_datasource")
We partition the data correctly, adding in the final S3 connection property for the parquet transformation:
Code Block language py datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://dev.log.sagebase.org/fileDownloads/backfill/records", "partitionKeys": ["year", "month", "day"]}, format = "parquet", transformation_ctx = "datasink4")
...