Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In phase one, we have already sent access records to S3 in format of json to s3 with kinesis firehose delivery stream. Kinesis delivery stream creates partition of data on the basis of time in format of year, month and day, when the data has been received in s3. we want to process this data and make the processed data queryable. The data is growing with time, so we need to structure this big data in such a way that we can query it and process in adequate time.

Create a glue table of raw data

.

What we actually need is ETL job which can extract the raw data from s3, process the data and load the processed data back in S3. From s3 we can query the data with Athena.

AWS Glue ETL Job

  1. Extract the raw data from source

  2. process the data

  3. load processed data to destination

Raw data means the data which we received directly from source, in our case it Synapse repository project. Synapse is sending access records to s3 with firehose kinesis delivery stream.

All the resources are created with cloud formation template in synapse builder project and those are stack specific.create a glue table with stack builder project as we know our schema. table should look like as below:

...

Column Name

...

Data Type

...

stack

...

string

...

instance

...

string

...

timestamp

...

bigint

...

payload

...

struct<>

...

year

...

string

...

month

...

string

...

day

...

string

Year, month and day are partition column. Kinesis delivery stream creates partition the data data basis of when the data has been received.

usage :

Create a ETL job

Once the raw data is available we want to process it further with ETL (Extract, load and transform) job.

Focus for accessRecord is Payload filed which has all the needed information for further process. As a main task of this ETL job we want to create a separate table having all the fields of payload as columns and we want to transform original value of request url field. our ETL job script would look For access record we need to modify the requesturl field according to business logic, Processing of data includes taking raw data and modifying the requesturl field for each record.

Now we want to keep processed data at separate storage location of S3. And want to execute Athena query on it. Data should be stored in Parquet format, Parquet is column based file format and fast file type to read.

The ETL job script would like as below:

# Script generated for node Data Catalog table DataCatalogtable_node1 = glueContext.create_dynamic_frame.from_catalog( database="devssokhalfirehoselogs", table_name="accessrecords", transformation_ctx="DataCatalogtable_node1", )
Code Block
languagejson
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node S3 ApplyMappingbucket
ApplyMappingS3bucket_node2node1 = ApplyMapping.applyglueContext.create_dynamic_frame.from_options(
    frame=DataCatalogtable_node1format_options={"multiline": True},
    connection_type="s3",
    mappings=[
 format="json",
    connection_options={
      ("payload.sessionId", "string", "sessionIdpaths",: "string")["s3://dev.log.sagebase.org/accessRecord/"],
        ("payload.timestamp", "long", "timestamp", "long"),
        ("payload.userId", "int", "userId", "int"),recurse": True,
    },
    transformation_ctx="S3bucket_node1",
)

# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
    frame=S3bucket_node1,
    mappings=[
        ("payload.methoddate", "string", "methoddate", "string"),
        ("payload.requestURL", "string", "requestURLrequesturl", "string"),
        ("payload.userAgentxforwardedFor", "string", "userAgentxforwardedfor", "string"),
        ("payload.hostoauthClientId", "stringnull", "hostoauthclientid", "string"),
        ("payload.originsuccess", "stringboolean", "originsuccess", "stringboolean"),
        ("payload.viasessionId", "string", "viasessionid", "string"),
        ("payload.threadIdqueryString", "intnull", "threadIdquerystring", "intstring"),
        ("payload.elapseMSmethod", "intstring", "elapseMSmethod", "intstring"),
        ("payload.successbasicAuthUsername", "booleannull", "successbasicauthusername", "booleanstring"),
        ("payload.stackinstance", "string", "stackinstance", "string"),
        ("payload.instancestack", "string", "instancestack", "string"),
        ("payload.datehost", "string", "datehost", "string"),
        ("payload.vmIdelapseMS", "stringint", "vmIdelapsems", "stringint"),
        ("payload.returnObjectIdthreadId", "stringint", "returnObjectIdthreadid", "stringint"),
        ("payload.queryStringuserId", "stringint", "queryStringuserid", "stringint"),
        ("payload.responseStatusauthenticationMethod", "intnull", "responseStatusauthenticationmethod", "intstring"),
        ("payload.oauthClientIdreturnObjectId", "stringnull", "oauthClientIdreturnobjectid", "string"),
        ("payload.basicAuthUsernameorigin", "stringnull", "basicAuthUsernameorigin", "string"),
        (
            "payload.authenticationMethodvia",
            "string"null",             "authenticationMethod",
            "string",
        "via", "string"),
        ("payload.xforwardedForvmId", "string", "xforwardedForvmid", "string"),
        ("yearpayload.userAgent", "string", "yearuseragent", "string"),
        ("monthpayload.responseStatus", "stringint", "monthresponsestatus", "stringint"),
        ("daytimestamp", "stringbigint", "daytimestamp", "stringbigint"),
    ],
    transformation_ctx="ApplyMapping_node2",
)

def transform(dynamicRecord):
    tempVar = dynamicRecord["requestURL"].lower()
    if tempVar.startswith("/entity/md5"):
        tempVar = "/entity/md5/#"
    elif tempVar.startswith("/evaluation/name"):
        tempVar = "/evaluation/name/#"
    elif tempVar.startswith("/entity/alias"):
        tempVar = "/entity/alias/#"
    else:
        tempVar = "/#"
        
    dynamicRecord["requestURL"] = tempVar
    print("Schema for mapped_medicare DynamicFrame:" + tempVar)
    return dynamicRecord
    
mapped_record = ApplyMapping_node2.map(f = transform)
mapped_record.printSchema()
    
# Script generated for node S3 bucket
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
    frame=mapped_record,
    connection_type="s3",
    format="jsonpraquet",
    connection_options={
        "path": "s3://dev.log.sagebase.org/processedAccessRecord/",
        "compression": "gzip",
        "partitionKeys": [],
    },
    transformation_ctx="S3bucket_node3",
)

...


Challenges

  1. As we deploy every week new stack, ETL job should not reprocess the old or already processed data again.

  2. How we will maintain the versioning of script of glue job, which means define infrastructure that should take the script and push it into s3 ( deployment process of script)

  3. How to trigger the job e.g on demand, or on schedule time.

  4. How to handle duplicate data.

...