Skip to end of metadata
Go to start of metadata

You are viewing an old version of this content. View the current version.

Compare with Current View Version History

« Previous Version 2 Next »

Approach to Process and make the access Record queryable

Create a glue table of raw data

Raw data means the data which we received directly from source, in our case it Synapse repository project. Synapse is sending access records to s3 with firehose kinesis delivery stream.

All the resources are created with cloud formation template in synapse builder project and those are stack specific.create a glue table with stack builder project as we know our schema. table should look like as below:

Column Name

Data Type

stack

string

instance

string

timestamp

bigint

payload

struct<sessionId:string,timestamp:bigint,userId:int,method:string,requestURL:string,userAgent:string,host:string,origin:string,via:string,threadId:int,elapseMS:int,success:boolean,stack:string,instance:string,date:string,vmId:string,returnObjectId:string,queryString:string,responseStatus:int,oauthClientId:string,basicAuthUsername:string,authenticationMethod:string,xforwardedFor:string>

year

string

month

string

day

string

Year, month and day are partition column. S3 partition the data data on basis of when the data has been received.

We can use the timestamp as partition, for this we have to enable dynamic partition scheme.

usage : At any point of time we can query the original raw data and in case our further process fails or some information is required from original data. Data will be available for query from Athena.

Create a ETL job

Once the raw data is available we want to process it further with ETL (Extract, load and transform) job.

Focus for accessRecord is Payload filed which has all the needed information for further process. As a main task of this ETL job we want to create a separate table having all the fields of payload as columns and we want to transform original value of request url field. our ETL job script would look like as below:

# Script generated for node Data Catalog table
DataCatalogtable_node1 = glueContext.create_dynamic_frame.from_catalog(
    database="devssokhalfirehoselogs",
    table_name="accessrecords",
    transformation_ctx="DataCatalogtable_node1",
)


# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
    frame=DataCatalogtable_node1,
    mappings=[
        ("payload.sessionId", "string", "sessionId", "string"),
        ("payload.timestamp", "long", "timestamp", "long"),
        ("payload.userId", "int", "userId", "int"),
        ("payload.method", "string", "method", "string"),
        ("payload.requestURL", "string", "requestURL", "string"),
        ("payload.userAgent", "string", "userAgent", "string"),
        ("payload.host", "string", "host", "string"),
        ("payload.origin", "string", "origin", "string"),
        ("payload.via", "string", "via", "string"),
        ("payload.threadId", "int", "threadId", "int"),
        ("payload.elapseMS", "int", "elapseMS", "int"),
        ("payload.success", "boolean", "success", "boolean"),
        ("payload.stack", "string", "stack", "string"),
        ("payload.instance", "string", "instance", "string"),
        ("payload.date", "string", "date", "string"),
        ("payload.vmId", "string", "vmId", "string"),
        ("payload.returnObjectId", "string", "returnObjectId", "string"),
        ("payload.queryString", "string", "queryString", "string"),
        ("payload.responseStatus", "int", "responseStatus", "int"),
        ("payload.oauthClientId", "string", "oauthClientId", "string"),
        ("payload.basicAuthUsername", "string", "basicAuthUsername", "string"),
        (
            "payload.authenticationMethod",
            "string",
            "authenticationMethod",
            "string",
        ),
        ("payload.xforwardedFor", "string", "xforwardedFor", "string"),
        ("year", "string", "year", "string"),
        ("month", "string", "month", "string"),
        ("day", "string", "day", "string"),
    ],
    transformation_ctx="ApplyMapping_node2",
)

def transform(dynamicRecord):
    tempVar = dynamicRecord["requestURL"].lower()
    if tempVar.startswith("/entity/md5"):
        tempVar = "/entity/md5/#"
    elif tempVar.startswith("/evaluation/name"):
        tempVar = "/evaluation/name/#"
    elif tempVar.startswith("/entity/alias"):
        tempVar = "/entity/alias/#"
    else:
        tempVar = "/#"
        
    dynamicRecord["requestURL"] = tempVar
    print("Schema for mapped_medicare DynamicFrame:" + tempVar)
    return dynamicRecord
    
mapped_record = ApplyMapping_node2.map(f = transform)
mapped_record.printSchema()
    
# Script generated for node S3 bucket
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
    frame=mapped_record,
    connection_type="s3",
    format="json",
    connection_options={
        "path": "s3://dev.log.sagebase.org/processedAccessRecord/",
        "compression": "gzip",
        "partitionKeys": [],
    },
    transformation_ctx="S3bucket_node3",
)

The source of data data will be our glue table created from raw data, ETL converts every record into dynamic frame and we first apply mapping to columns and then transform the request url value to new value and store back the processed data into S3.

Challenges

  1. As we deploy every week new stack, ETL job should not reprocess the old or already processed data again.

  2. How we will maintain the versioning of script of glue job, which means define infrastructure that should take the script and push it into s3 ( deployment process of script)

  3. How to trigger the job e.g on demand, or on schedule time or it should run in a sequence with crawler.

  4. what should be the partitioning scheme of processed records.

Processed data destination

We can choose destination as S3 or glue table.

Integrate with Redash

we need to integrate the processed access records to redash board which we used in our stack review meeting four audit purposes.

  • No labels