Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. As we deploy every week new stack, ETL job should not reprocess the old or already processed data again.

2. How we will maintain the versioning of script of glue job, which means define infrastructure that should take the script and push it into s3 ( deployment process of script)

3. How to trigger the job e.g on demand, or on schedule time.

4. How to handle duplicate data.

Processed data destination

...

Proposed solution :

  1. In every stack firehose kinesis should send data in s3 path having release info into it like dev.log.sagebase.org/accessRecord/relaseNuber/year/month/day. And we should use the source for ETL job dev.log.sagebase.org/accessRecord/relaseNuber/ so it process only the data of that release.This way we can avoid reprocessing of data and processed data should be stored into general path without release number e.g dev.log.sagebase.org/processedAccessRecord/year/month/day.

  2. Open for discussion.

  3. Open for discussion. Suggestion As we currently use the processed access record for audit purpose we can schedule our job for every 1 hour.

  4. One way to avoid duplicate data is we can used both source and destination as glue table for job and then use left join to identify the duplicate as below:

Code Block
stagingdatasource = gc.create_dynamic_frame.from_catalog(
        database="stagingdatabase",
        table_name="staging_source_table",
        transformation_ctx="stagingdatasource")

    targetdatasource = gc.create_dynamic_frame.from_catalog(
        database="targetdatabase",
        redshift_tmp_dir=args["TempDir"],
        table_name="target_table",
        transformation_ctx="targetdatasource")

    columnmapping = ApplyMapping.apply(
        frame=stagingdatasource,
        mappings=[("description", "string", "description", "string"), ("id", "int", "id", "int")],
        transformation_ctx="columnmapping")

    ta = columnmapping.toDF().alias('ta')
    tb = targetdatasource.toDF().alias('tb')

    left_join = ta\
        .join(tb, ta.value == tb.value, how='left')\
        .filter(col('tb.value').isNull())\
        .select('ta.*')

    # Inspect left join
    # left_join.show()

    finaldf = DynamicFrame.fromDF(left_join, gc, "nested")

    gc.write_dynamic_frame.from_catalog(
        frame=finaldf,
        database="targetdatabase",
        redshift_tmp_dir=args["TempDir"],
        table_name="target_table")

Processed data destination

Processed data should be stored in S3 having portioned on access record timestamp. eg

dev.log.sagebase.org/processedAccessRecord/year/month/day.

Integrate with Redash

we need to integrate the processed access records to redash board which we used in our stack review meeting four audit purposes.

Alternative to access record processing :

While sending access record from synapse repository to firehose kinesis we can store both request url and processed url/ or only processed url. And kinesis should use dynamic partitioning on timestamp of access record to store data on s3. Now the data stored is already processed.Create a table from source S3 and Athena can query the data.