...
The final script might look like the following:
Code Block | ||
---|---|---|
| ||
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job def MapRecord(rec): rec["month"] = str(rec["month"]).zfill(2) rec["day"] = str(rec["day"]).zfill(2) return rec ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "devbackfill", table_name = "filedownloadsbackfill", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "devbackfill", table_name = "filedownloadsbackfill", transformation_ctx = "datasource0") ## @type: Map ## @args: [frame = "datasource0", f = "MapRecord"] ## @return: mapped_datasource ## @inputs: [fram = datasource0] mapped_datasource = Map.apply(frame = datasource0, f = MapRecord, transformation_ctx = "mapped_datasource") ## @type: ApplyMapping ## @args: [mapping = [("userid", "long", "userid", "long"), ("timestamp", "long", "timestamp", "timestamp"), ("projectid", "long", "projectid", "long"), ("filehandleid", "long", "filehandleid", "string"), ("associatetype", "string", "associatetype", "string"), ("associateid", "string", "associateid", "string"), ("stack", "string", "stack", "string"), ("instance", "string", "instance", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = mapped_datasource] applymapping1 = ApplyMapping.apply(frame = mapped_datasource, mappings = [("userid", "long", "userid", "long"), ("timestamp", "long", "timestamp", "timestamp"), ("projectid", "long", "projectid", "long"), ("filehandleid", "long", "filehandleid", "string"), ("associatetype", "string", "associatetype", "string"), ("associateid", "string", "associateid", "string"), ("stack", "string", "stack", "string"), ("instance", "string", "instance", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string")], transformation_ctx = "applymapping1") ## @type: ResolveChoice ## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"] ## @return: resolvechoice2 ## @inputs: [frame = applymapping1] resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2") ## @type: DropNullFields ## @args: [transformation_ctx = "dropnullfields3"] ## @return: dropnullfields3 ## @inputs: [frame = resolvechoice2] dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://dev.log.sagebase.org/fileDownloads/records"}, format = "parquet", transformation_ctx = "datasink4"] ## @return: datasink4 ## @inputs: [frame = dropnullfields3] datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://dev.log.sagebase.org/fileDownloads/records", "partitionKeys": ["year", "month", "day"]}, format = "parquet", transformation_ctx = "datasink4") job.commit() |
Run the ETL job :)