Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Raw data means the data which we received directly from source, in our case it Synapse repository project. Synapse is sending access records to s3 with firehose kinesis delivery stream.

For each access record we need to modify the requesturl field calculate new fields like client, client_version etc from existing data according to business logic, Processing of data includes taking raw data and modifying the requesturl field for each recordcalculating new field data.

Now we want to keep processed data at in glue table having a separate storage location of S3 which will be partitioned on basis of timestamp of access record as year, month and day. And want to execute Athena query on it. Data should be stored in Parquet format, Parquet is column based file format and fast file type to read.

The ETL job script would like as below:

Code Block
languagejson
"""
This script executed by a Glue job. The job take the access record data from S3 and process it.
 Processed data stored in S3 in a parquet file partitioned by timestamp of record as  year / month / day.
"""

import sys
import datetime
import re
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import argsDynamicFrame
= getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node S3 bucket
S3bucket_node1 = glueContext
# Get access record from source and create dynamic frame for futher processing
def get_dynamic_frame(connection_type, file_format, source_path, glue_context):
    dynamic_frame = glue_context.create_dynamic_frame.from_options(
        format_options={"multiline": True},
    connection_type="s3",     format="json",
    connection_options={
        "paths": ["s3://dev.log.sagebase.org/accessRecord/"]connection_type=connection_type,
        "recurse": True,
    },
    transformation_ctx="S3bucket_node1",
)

# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(format=file_format,
        connection_options={
     frame=S3bucket_node1,     mappings=[         ("payload.date", "string", "date", "string"),"paths": [source_path],
          ("payload.requestURL", "string", "requesturlrecurse", "string"): True,
        ("payload.xforwardedFor", "string", "xforwardedfor", "string"),         ("payload.oauthClientId", "null"groupFiles": "inPartition",
"oauthclientid", "string"),         ("payload.success", "boolean", "successgroupSize",: "boolean1048576"),
        ("payload.sessionId", "string", "sessionid", "string")},
        ("payload.queryString", "null", "querystring", "string"),
        ("payload.method", "string", "method", "string"),transformation_ctx="dynamic_frame")
    return dynamic_frame


def apply_mapping(dynamic_frame):
    mapped_dynamic_frame =   ("payload.basicAuthUsername", "null", "basicauthusername", "string"),ApplyMapping.apply(
        ("payload.instance"frame=dynamic_frame,
"string", "instance", "string"),      mappings=[
  ("payload.stack", "string", "stack", "string"),
        ("payload.hostsessionId", "string", "hostSESSION_ID", "string"),

       ("payload.elapseMS", "int", "elapsems", "int"),  .....
      ("payload.threadId", "int", "threadid" ],
"int"),         ("payload.userId", "int", "userid", "int"),
        ("payload.authenticationMethod", "null", "authenticationmethod", "string"),
        ("payload.returnObjectId", "null", "returnobjectid", "string"),
        ("payload.origin", "null", "origin", "string"),
        ("payload.via", "null", "via", "string"),
        ("payload.vmId", "string", "vmid", "string"),transformation_ctx="mapped_dynamic_frame")
    return mapped_dynamic_frame


# process the access record
def transform(dynamic_record):
    #transformation
    return dynamic_record


def main():
    # Get args and setup environment
    args = getResolvedOptions(sys.argv,
           ("payload.userAgent", "string", "useragent", "string"),         ("payload.responseStatus", "int", "responsestatus", "int"),         ("timestamp["JOB_NAME", "bigintS3_SOURCE_PATH", "timestampDATABASE_NAME", "bigintTABLE_NAME"]),
    ],
    transformation_ctx="ApplyMapping_node2",
)

def transform(dynamicRecord):sc = SparkContext()
    tempVarglue_context = dynamicRecord["requestURL"].lower()GlueContext(sc)
    spark if= tempVar.startswith("/entity/md5"):glue_context.spark_session
        tempVarjob = "/entity/md5/#"
    elif tempVar.startswith("/evaluation/name"):Job(glue_context)
        tempVar = "/evaluation/name/#"
    elif tempVar.startswith("/entity/alias"):job.init(args["JOB_NAME"], args)

       tempVar dynamic_frame = "/entity/alias/#"
    else:
        tempVar = "/#"
        
    dynamicRecord["requestURL"] = tempVar
    print("Schema for mapped_medicare DynamicFrame:" + tempVarget_dynamic_frame("s3", "json", args["S3_SOURCE_PATH"], glue_context)
    mapped_dynamic_frame = apply_mapping(dynamic_frame)
    return dynamicRecord
    transformed_dynamic_frame = mapped_record = ApplyMapping_node2dynamic_frame.map(f = transform)
mapped_record.printSchema()
    
    # Script generated for node S3 bucket
S3bucket_node3 = glueContext  Write the processed access records to destination
    write_dynamic_frame = glue_context.write_dynamic_frame.from_optionscatalog(
        frame=mappedtransformed_dynamic_recordframe,
        connection_typedatabase=args["s3DATABASE_NAME"],

   format="praquet",     connectiontable_options={name=args["TABLE_NAME"],
        additional_options={"pathpartitionKeys": "s3://dev.log.sagebase.org/processedAccessRecord/["year",         "compression": "gzip""month", "day"]},
        "partitionKeys": [],transformation_ctx="write_dynamic_frame")

    },job.commit()


if  transformation_ctx="S3bucket_node3",
__name__ == "__main__":
    main()

Challenges

  1. As we deploy every week new stack, ETL job should not reprocess the old or already processed data again.

...

  1. The ETL job will be static not created in every release.

  2. Create a github project GitHub project(Synapse-ETL-Jobs) for our etl job(scala/python). set up a build to github project that will deploy artifact to s3(github action). Add to a stack builder a new static . Add GitHub action to create tag/release which will zip the source code and version it with every merge in develop branch, basically use GitHub as artifactory. In Synapse-Stack-Builder project create a new workflow, which will first download the python script form Syanpse-ETL-Jobs from latest tag and upload it to s3 from where glue job and take it for processing.Then create stack with cloud-formation template that will setup a etl job using the s3 artifact that deployed(setup timer etc).how do we include library in our glue job(script). build glue job ,glue table, trigger and required resources.Build should include testing.

  3. It should be configurable parameter in stack builder. it should get triggered every hour.( In future we can find another way which can notify that new data is available for processing).

  4. One way to avoid duplicate data is we can used both source and destination as glue table for job and then use left join to identify the duplicate as below (for now we are not considering duplicate, in future we can look into it, if it will create problem)

...