Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

4. How to handle duplicate data.

Proposed solution :

  1. In every stack firehose kinesis should send data in s3 path having release info into it like dev.log.sagebase.org/accessRecord/relaseNuber/year/month/day. And we should use the source for ETL job dev.log.sagebase.org/accessRecord/relaseNuber/ so it process only the data of that release.This way we can avoid reprocessing of data and processed data should be stored into general path without release number e.g dev.log.sagebase.org/processedAccessRecord/year/month/day.

  2. Open for discussion.

  3. Open for discussion. Suggestion As we currently use the processed access record for audit purpose we can schedule our job for every 1 hour The ETL job will be static not created in every release.

  4. Create a github project for our etl job(scala/python). set up a build to github project that will deploy artifact to s3(github action). Add to a stack builder a new static template that will setup a etl job using the s3 artifact that deployed(setup timer etc).how do we include library in our glue job(script). build should include testing.

  5. It should be configurable parameter in stack builder. it should get triggered every hour.( In future we can find another way which can notify that new data is available for processing).

  6. One way to avoid duplicate data is we can used both source and destination as glue table for job and then use left join to identify the duplicate as below :(for now we are not considering duplicate, in future we can look into it, if it will create problem)

Code Block
stagingdatasource = gc.create_dynamic_frame.from_catalog(
        database="stagingdatabase",
        table_name="staging_source_table",
        transformation_ctx="stagingdatasource")

    targetdatasource = gc.create_dynamic_frame.from_catalog(
        database="targetdatabase",
        redshift_tmp_dir=args["TempDir"],
        table_name="target_table",
        transformation_ctx="targetdatasource")

    columnmapping = ApplyMapping.apply(
        frame=stagingdatasource,
        mappings=[("description", "string", "description", "string"), ("id", "int", "id", "int")],
        transformation_ctx="columnmapping")

    ta = columnmapping.toDF().alias('ta')
    tb = targetdatasource.toDF().alias('tb')

    left_join = ta\
        .join(tb, ta.value == tb.value, how='left')\
        .filter(col('tb.value').isNull())\
        .select('ta.*')

    # Inspect left join
    # left_join.show()

    finaldf = DynamicFrame.fromDF(left_join, gc, "nested")

    gc.write_dynamic_frame.from_catalog(
        frame=finaldf,
        database="targetdatabase",
        redshift_tmp_dir=args["TempDir"],
        table_name="target_table")

...

While sending access record from synapse repository to firehose kinesis we can store both request url and processed url/ or only processed url. And kinesis should use dynamic partitioning on timestamp of access record to store data on s3. Now the data stored is already processed.Create a table from source S3 and Athena can query the data. ( not adequate because we might want to process data further, we need raw data in json format so it is available in readable format as well as processed data in parquet formate to query faster).