Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In phase one, we have already sent access records to S3 in format of json to s3 with kinesis firehose delivery stream. Kinesis delivery stream creates partition of data on the basis of time in format of year, month and day, when the data has been received in s3. we want to process this data and make the processed data queryable. The data is growing with time, so we need to structure this big data in such a way that we can query it and process in adequate time.

Create a glue table of raw data

.

What we actually need is ETL job which can extract the raw data from s3, process the data and load the processed data back in S3. From s3 we can query the data with Athena.

AWS Glue ETL Job

  1. Extract the raw data from source

  2. process the data

  3. load processed data to destination

Raw data means the data which we received directly from source, in our case it Synapse repository project. Synapse is sending access records to s3 with firehose kinesis delivery stream.

All the resources are created with cloud formation template in synapse builder project and those are stack specific.create a glue table with stack builder project as we know our schema. table should look like as below:

...

Column Name

...

Data Type

...

stack

...

string

...

instance

...

string

...

timestamp

...

bigint

...

payload

...

struct<>

...

year

...

string

...

month

...

string

...

day

...

string

Year, month and day are partition column. Kinesis delivery stream creates partition the data data basis of when the data has been received.

usage :

Create a ETL job

Once the raw data is available we want to process it further with ETL (Extract, load and transform) job.

Focus for accessRecord is Payload filed which has all the needed information for further process. As a main task of this ETL job we want to create a separate table having all the fields of payload as columns and we want to transform original value of request url field. our ETL job script would look like as below:

...

For each access record we need to calculate new fields like client, client_version etc from existing data according to business logic, Processing of data includes taking raw data and calculating new field data.

Now we want to keep processed data in glue table having a separate storage location of S3 which will be partitioned on basis of timestamp of access record as year, month and day. And want to execute Athena query on it. Data should be stored in Parquet format, Parquet is column based file format and fast file type to read.

The ETL job script would like as below:

Code Block
languagejson
"""
This script executed by a Glue job. The job take the access record data from S3 and process it.
 Processed data stored in S3 in a parquet file partitioned by timestamp of record as  year / month / day.
"""

import sys
import datetime
import re
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

# Get access record from source and create dynamic frame for futher processing
def get_dynamic_frame(connection_type, file_format, source_path, glue_context):
    dynamic_frame = glue_context.create_dynamic_frame.from_catalogoptions(
    database="devssokhalfirehoselogs",     tableformat_nameoptions={"accessrecordsmultiline": True},
    transformation_ctx="DataCatalogtable_node1",
)    connection_type=connection_type,
    # Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply( format=file_format,
        frame=DataCatalogtable_node1,connection_options={
    mappings=[         ("payload.sessionId", "string", "sessionId", "string"),paths": [source_path],
            ("payload.timestamp", "long", "timestamp", "long"),recurse": True,
            ("payload.userIdgroupFiles",: "intinPartition",
"userId", "int"),         ("payload.method", "string", "methodgroupSize",: "string1048576"),
        ("payload.requestURL", "string", "requestURL", "string"),},
        transformation_ctx="dynamic_frame")
    return dynamic_frame


("payload.userAgent", "string", "userAgent", "string"),
 def apply_mapping(dynamic_frame):
    mapped_dynamic_frame = ApplyMapping.apply(
      ("payload.host", "string", "host", "string"), frame=dynamic_frame,
         ("payload.origin", "string", "origin", "string"),mappings=[
            ("payload.viasessionId", "string", "viaSESSION_ID", "string"),
        ("payload.threadId", "int", "threadId", "int")    .....
        ],
        ("payload.elapseMS", "int", "elapseMS", "int"),
        ("payload.success", "boolean", "success", "boolean"),transformation_ctx="mapped_dynamic_frame")
    return mapped_dynamic_frame


# process the access record
def transform(dynamic_record):
    #transformation
   ("payload.stack", "string", "stack", "string"),
   return dynamic_record


def main():
     ("payload.instance", "string", "instance", "string"),
# Get args and setup environment
    args =  getResolvedOptions("payloadsys.date"argv,
"string", "date", "string"),                  ("payload.vmId", "string", "vmId", "string"),         ("payload.returnObjectId["JOB_NAME", "stringS3_SOURCE_PATH", "returnObjectIdDATABASE_NAME", "stringTABLE_NAME"]),
    sc = SparkContext()
 ("payload.queryString", "string", "queryString", "string"),
  glue_context = GlueContext(sc)
    spark  ("payload.responseStatus", "int", "responseStatus", "int"),
   = glue_context.spark_session
    job = Job(glue_context)
    job.init("payload.oauthClientId", "string", "oauthClientId", "string"),args["JOB_NAME"], args)

    dynamic_frame =  get_dynamic_frame("payload.basicAuthUsernames3", "stringjson", "basicAuthUsername", "string"),args["S3_SOURCE_PATH"], glue_context)
    mapped_dynamic_frame = apply_mapping(dynamic_frame)
 (   transformed_dynamic_frame = mapped_dynamic_frame.map(f=transform)
    
  "payload.authenticationMethod",  #  Write the processed access records to destination
  "string",  write_dynamic_frame = glue_context.write_dynamic_frame.from_catalog(
        "authenticationMethod"frame=transformed_dynamic_frame,
            "string"database=args["DATABASE_NAME"],
        )table_name=args["TABLE_NAME"],
        ("payload.xforwardedFor", "stringadditional_options={"partitionKeys": ["year", "xforwardedFormonth", "stringday")]},
        ("year", "string", "year", "string"),
   transformation_ctx="write_dynamic_frame")

    job.commit()


if __name__ == "__main__":
    ("month", "string", "month", "string"),
 main()

Challenges

  1. As we deploy every week new stack, ETL job should not reprocess the old or already processed data again.

2. How we will maintain the versioning of script of glue job, which means define infrastructure that should take the script and push it into s3 ( deployment process of script)

3. How to trigger the job e.g on demand, or on schedule time.

4. How to handle duplicate data.

Proposed solution :

  1. The ETL job will be static not created in every release.

  2. Create a GitHub project(Synapse-ETL-Jobs) for our etl job. Add GitHub action to create tag/release which will zip the source code and version it with every merge in develop branch, basically use GitHub as artifactory. In Synapse-Stack-Builder project create a new workflow, which will first download the python script form Syanpse-ETL-Jobs from latest tag and upload it to s3 from where glue job and take it for processing.Then create stack with cloud-formation template that will setup a glue job ,glue table, trigger and required resources.Build should include testing.

  3. It should be configurable parameter in stack builder. it should get triggered every hour.( In future we can find another way which can notify that new data is available for processing).

  4. One way to avoid duplicate data is we can used both source and destination as glue table for job and then use left join to identify the duplicate as below (for now we are not considering duplicate, in future we can look into it, if it will create problem)

Code Block
stagingdatasource = gc.create_dynamic_frame.from_catalog(
      ("day", "string", database="daystagingdatabase",
"string"),     ],     transformationtable_ctxname="ApplyMappingstaging_source_node2table",
)  def transform(dynamicRecord):     tempVar = dynamicRecord["requestURL"].lower(transformation_ctx="stagingdatasource")

   if tempVar.startswith("/entity/md5"): targetdatasource = gc.create_dynamic_frame.from_catalog(
        tempVar database= "/entity/md5/#""targetdatabase",
    elif tempVar.startswith("/evaluation/name"):    redshift_tmp_dir=args["TempDir"],
        tempVar table_name= "/evaluation/name/#""target_table",
    elif tempVar.startswith("/entity/alias"):    transformation_ctx="targetdatasource")

    columnmapping = ApplyMapping.apply(
tempVar = "/entity/alias/#"     else: frame=stagingdatasource,
       tempVar mappings=[("description", "/#string", "description", "string"), ("id", "int", "id", "int")],
       dynamicRecord["requestURL"] = tempVar transformation_ctx="columnmapping")

    ta = columnmapping.toDF().alias('ta')
    print("Schema for mapped_medicare DynamicFrame:" + tempVar)
    return dynamicRecordtb = targetdatasource.toDF().alias('tb')

    left_join = ta\
        
mapped_record = ApplyMapping_node2.map(f = transform)
mapped_record.printSchema(.join(tb, ta.value == tb.value, how='left')\
        .filter(col('tb.value').isNull())\
        .select('ta.*')

    # ScriptInspect generatedleft forjoin
node S3 bucket S3bucket_node3 =# glueContext.write_dynamic_frame.from_options(left_join.show()

   frame=mapped_record, finaldf    connection_type="s3",
    format="json",= DynamicFrame.fromDF(left_join, gc, "nested")

    connection_options={gc.write_dynamic_frame.from_catalog(
        "path": "s3://dev.log.sagebase.org/processedAccessRecord/"frame=finaldf,
        "compression": "gzipdatabase="targetdatabase",
        "partitionKeys": [redshift_tmp_dir=args["TempDir"],
    },     transformationtable_ctxname="S3buckettarget_node3table",)
)

The source of data data will be our glue table created from raw data, ETL converts every record into dynamic frame and we first apply mapping to columns and then transform the request url value to new value and store back the processed data into S3.

Challenges

  1. As we deploy every week new stack, ETL job should not reprocess the old or already processed data again.

  2. How we will maintain the versioning of script of glue job, which means define infrastructure that should take the script and push it into s3 ( deployment process of script)

  3. How to trigger the job e.g on demand, or on schedule time.

  4. How to handle duplicate data.

Processed data destination

We can choose destination as S3 or glue table.

Processed data destination

Processed data should be stored in S3 having portioned on access record timestamp. eg

dev.log.sagebase.org/processedAccessRecord/year/month/day.

Integrate with Redash

we need to integrate the processed access records to redash board which we used in our stack review meeting four audit purposes.

Alternative to access record processing :

While sending access record from synapse repository to firehose kinesis we can store both request url and processed url/ or only processed url. And kinesis should use dynamic partitioning on timestamp of access record to store data on s3. Now the data stored is already processed.Create a table from source S3 and Athena can query the data. ( not adequate because we might want to process data further, we need raw data in json format so it is available in readable format as well as processed data in parquet formate to query faster).