Approach to Process and make the access Record queryable.
Introduction
In phase one, we have already sent access records to S3 in format of json with kinesis firehose delivery stream. Kinesis delivery stream creates partition of data on the basis of time in format of year, month and day, when the data has been received in s3. we want to process this data and make the processed data queryable. The data is growing with time, so we need to structure this big data in such a way that we can query it and process in adequate time.
What we actually need is ETL job which can extract the raw data from s3, process the data and load the processed data back in S3. From s3 we can query the data with Athena.
AWS Glue ETL Job
Extract the raw data from source
process the data
load processed data to destination
Raw data means the data which we received directly from source, in our case it Synapse repository project. Synapse is sending access records to s3 with firehose kinesis delivery stream.
For access record we need to modify the requesturl field according to business logic, Processing of data includes taking raw data and modifying the requesturl field for each record.
Now we want to keep processed data at separate storage location of S3. And want to execute Athena query on it. Data should be stored in Parquet format, Parquet is column based file format and fast file type to read.
The ETL job script would like as below:
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ["JOB_NAME"]) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) # Script generated for node S3 bucket S3bucket_node1 = glueContext.create_dynamic_frame.from_options( format_options={"multiline": True}, connection_type="s3", format="json", connection_options={ "paths": ["s3://dev.log.sagebase.org/accessRecord/"], "recurse": True, }, transformation_ctx="S3bucket_node1", ) # Script generated for node ApplyMapping ApplyMapping_node2 = ApplyMapping.apply( frame=S3bucket_node1, mappings=[ ("payload.date", "string", "date", "string"), ("payload.requestURL", "string", "requesturl", "string"), ("payload.xforwardedFor", "string", "xforwardedfor", "string"), ("payload.oauthClientId", "null", "oauthclientid", "string"), ("payload.success", "boolean", "success", "boolean"), ("payload.sessionId", "string", "sessionid", "string"), ("payload.queryString", "null", "querystring", "string"), ("payload.method", "string", "method", "string"), ("payload.basicAuthUsername", "null", "basicauthusername", "string"), ("payload.instance", "string", "instance", "string"), ("payload.stack", "string", "stack", "string"), ("payload.host", "string", "host", "string"), ("payload.elapseMS", "int", "elapsems", "int"), ("payload.threadId", "int", "threadid", "int"), ("payload.userId", "int", "userid", "int"), ("payload.authenticationMethod", "null", "authenticationmethod", "string"), ("payload.returnObjectId", "null", "returnobjectid", "string"), ("payload.origin", "null", "origin", "string"), ("payload.via", "null", "via", "string"), ("payload.vmId", "string", "vmid", "string"), ("payload.userAgent", "string", "useragent", "string"), ("payload.responseStatus", "int", "responsestatus", "int"), ("timestamp", "bigint", "timestamp", "bigint"), ], transformation_ctx="ApplyMapping_node2", ) def transform(dynamicRecord): tempVar = dynamicRecord["requestURL"].lower() if tempVar.startswith("/entity/md5"): tempVar = "/entity/md5/#" elif tempVar.startswith("/evaluation/name"): tempVar = "/evaluation/name/#" elif tempVar.startswith("/entity/alias"): tempVar = "/entity/alias/#" else: tempVar = "/#" dynamicRecord["requestURL"] = tempVar print("Schema for mapped_medicare DynamicFrame:" + tempVar) return dynamicRecord mapped_record = ApplyMapping_node2.map(f = transform) mapped_record.printSchema() # Script generated for node S3 bucket S3bucket_node3 = glueContext.write_dynamic_frame.from_options( frame=mapped_record, connection_type="s3", format="praquet", connection_options={ "path": "s3://dev.log.sagebase.org/processedAccessRecord/", "compression": "gzip", "partitionKeys": [], }, transformation_ctx="S3bucket_node3", )
Challenges
As we deploy every week new stack, ETL job should not reprocess the old or already processed data again.
2. How we will maintain the versioning of script of glue job, which means define infrastructure that should take the script and push it into s3 ( deployment process of script)
3. How to trigger the job e.g on demand, or on schedule time.
4. How to handle duplicate data.
Proposed solution :
The ETL job will be static not created in every release.
Create a github project for our etl job(scala/python). set up a build to github project that will deploy artifact to s3(github action). Add to a stack builder a new static template that will setup a etl job using the s3 artifact that deployed(setup timer etc).how do we include library in our glue job(script). build should include testing.
It should be configurable parameter in stack builder. it should get triggered every hour.( In future we can find another way which can notify that new data is available for processing).
One way to avoid duplicate data is we can used both source and destination as glue table for job and then use left join to identify the duplicate as below (for now we are not considering duplicate, in future we can look into it, if it will create problem)
stagingdatasource = gc.create_dynamic_frame.from_catalog( database="stagingdatabase", table_name="staging_source_table", transformation_ctx="stagingdatasource") targetdatasource = gc.create_dynamic_frame.from_catalog( database="targetdatabase", redshift_tmp_dir=args["TempDir"], table_name="target_table", transformation_ctx="targetdatasource") columnmapping = ApplyMapping.apply( frame=stagingdatasource, mappings=[("description", "string", "description", "string"), ("id", "int", "id", "int")], transformation_ctx="columnmapping") ta = columnmapping.toDF().alias('ta') tb = targetdatasource.toDF().alias('tb') left_join = ta\ .join(tb, ta.value == tb.value, how='left')\ .filter(col('tb.value').isNull())\ .select('ta.*') # Inspect left join # left_join.show() finaldf = DynamicFrame.fromDF(left_join, gc, "nested") gc.write_dynamic_frame.from_catalog( frame=finaldf, database="targetdatabase", redshift_tmp_dir=args["TempDir"], table_name="target_table")
Processed data destination
Processed data should be stored in S3 having portioned on access record timestamp. eg
dev.log.sagebase.org/processedAccessRecord/year/month/day.
Integrate with Redash
we need to integrate the processed access records to redash board which we used in our stack review meeting four audit purposes.
Alternative to access record processing :
While sending access record from synapse repository to firehose kinesis we can store both request url and processed url/ or only processed url. And kinesis should use dynamic partitioning on timestamp of access record to store data on s3. Now the data stored is already processed.Create a table from source S3 and Athena can query the data. ( not adequate because we might want to process data further, we need raw data in json format so it is available in readable format as well as processed data in parquet formate to query faster).