AWS Glue Job - Writing into single Parquet file

Emre Alparslan

I'm collecting JSON formatted datas in S3 Bucket with partition.

Examples:

 s3://bucket/app-events/year=2019/month=9/day=30/0001.json
 s3://bucket/app-events/year=2019/month=9/day=30/0002.json
 s3://bucket/app-events/year=2019/month=9/day=30/0003.json

A crawler works in s3://bucket/app-events/ and creates a Table.

I want to transform these JSON files into a single Parquet file but my job creates a new Parquet for each JSON file.

Here is my job script in Python:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

print("-------------- Execute Script --------------\n")

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

print("-------------- JOB_NAME: " + args['JOB_NAME'] + " --------------\n")

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

print("-------------- Execute Finding Sources --------------\n")

## @type: DataSource
## @args: [database = "my-db", table_name = "year_2019", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "my-db", table_name = "year_2019", transformation_ctx = "datasource0")

# datasource0 = glueContext.create_dynamic_frame_from_options("s3", {'paths': ["s3://bucket/app-events"], 'recurse':True, 'groupFiles': 'inPartition', 'groupSize': '1048576'}, format="json")
# Print out the count of found datasources
print("-------------- Sources Found: " + str(datasource0.count()) + "--------------\n")

## @type: ApplyMapping
## @args: [mapping = [("debug", "boolean", "debug", "boolean"), ("_id", "string", "_id", "string"), ("os", "string", "os", "string"), ("`data.language`", "string", "`data.language`", "string"), ("`data.ad_id`", "string", "`data.ad_id`", "string"), ("`data.ad_contenttype`", "string", "`data.ad_contenttype`", "string"), ("`data.ad_name`", "string", "`data.ad_name`", "string"), ("`data.shop_name`", "string", "`data.shop_name`", "string"), ("`data.shop_id`", "string", "`data.shop_id`", "string"), ("device_id", "string", "device_id", "string"), ("session_id", "string", "session_id", "string"), ("os_version", "string", "os_version", "string"), ("distinct_id", "string", "distinct_id", "string"), ("shop_id", "string", "shop_id", "string"), ("page", "string", "page", "string"), ("name", "string", "name", "string"), ("start_timestamp", "string", "start_timestamp", "string"), ("id", "string", "id", "string"), ("ip_address", "string", "ip_address", "string"), ("location", "string", "location", "string"), ("city", "string", "city", "string"), ("country", "string", "country", "string"), ("start_timestamp_unix", "int", "start_timestamp_unix", "int"), ("postal", "string", "postal", "string"), ("region", "string", "region", "string"), ("`data.entity_order`", "string", "`data.entity_order`", "string"), ("`data.entity_id`", "string", "`data.entity_id`", "string"), ("`data.entity_type`", "string", "`data.entity_type`", "string"), ("`data.entity_name`", "string", "`data.entity_name`", "string"), ("`data.entity_image`", "string", "`data.entity_image`", "string"), ("`data.feedbackform_id`", "string", "`data.feedbackform_id`", "string"), ("`data.feedbackform_question_count`", "string", "`data.feedbackform_question_count`", "string"), ("`data.feedbackform_name`", "string", "`data.feedbackform_name`", "string"), ("`data.shop_pincode`", "string", "`data.shop_pincode`", "string"), ("`data.entity_quantity`", "string", "`data.entity_quantity`", "string"), ("`data.entity_choice`", "string", "`data.entity_choice`", "string"), ("`data.entity_comment`", "string", "`data.entity_comment`", "string"), ("`data.entity_price`", "string", "`data.entity_price`", "string"), ("`data.old_language`", "string", "`data.old_language`", "string"), ("app", "string", "app", "string"), ("event", "string", "event", "string"), ("shop", "string", "shop", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string"), ("hour", "string", "hour", "string"), ("minute", "string", "minute", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("debug", "boolean", "debug", "boolean"), ("_id", "string", "_id", "string"), ("os", "string", "os", "string"), ("`data.language`", "string", "`data.language`", "string"), ("`data.ad_id`", "string", "`data.ad_id`", "string"), ("`data.ad_contenttype`", "string", "`data.ad_contenttype`", "string"), ("`data.ad_name`", "string", "`data.ad_name`", "string"), ("`data.shop_name`", "string", "`data.shop_name`", "string"), ("`data.shop_id`", "string", "`data.shop_id`", "string"), ("device_id", "string", "device_id", "string"), ("session_id", "string", "session_id", "string"), ("os_version", "string", "os_version", "string"), ("distinct_id", "string", "distinct_id", "string"), ("shop_id", "string", "shop_id", "string"), ("page", "string", "page", "string"), ("name", "string", "name", "string"), ("start_timestamp", "string", "start_timestamp", "string"), ("id", "string", "id", "string"), ("ip_address", "string", "ip_address", "string"), ("location", "string", "location", "string"), ("city", "string", "city", "string"), ("country", "string", "country", "string"), ("start_timestamp_unix", "int", "start_timestamp_unix", "int"), ("postal", "string", "postal", "string"), ("region", "string", "region", "string"), ("`data.entity_order`", "string", "`data.entity_order`", "string"), ("`data.entity_id`", "string", "`data.entity_id`", "string"), ("`data.entity_type`", "string", "`data.entity_type`", "string"), ("`data.entity_name`", "string", "`data.entity_name`", "string"), ("`data.entity_image`", "string", "`data.entity_image`", "string"), ("`data.feedbackform_id`", "string", "`data.feedbackform_id`", "string"), ("`data.feedbackform_question_count`", "string", "`data.feedbackform_question_count`", "string"), ("`data.feedbackform_name`", "string", "`data.feedbackform_name`", "string"), ("`data.shop_pincode`", "string", "`data.shop_pincode`", "string"), ("`data.entity_quantity`", "string", "`data.entity_quantity`", "string"), ("`data.entity_choice`", "string", "`data.entity_choice`", "string"), ("`data.entity_comment`", "string", "`data.entity_comment`", "string"), ("`data.entity_price`", "string", "`data.entity_price`", "string"), ("`data.old_language`", "string", "`data.old_language`", "string"), ("app", "string", "app", "string"), ("event", "string", "event", "string"), ("shop", "string", "shop", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string"), ("hour", "string", "hour", "string"), ("minute", "string", "minute", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://bucket/app-events-processed"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = l_history, connection_type = "s3", connection_options = {"path": "s3://bucket/app-events-processed/singles"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

How can I achieve that?

Harsh Bafna

AWS Glue is based on Apache Spark, which partitions data across multiple nodes to achieve high throughput. When writing data to a file-based sink like Amazon S3, Glue will write a separate file for each partition. To change the number of partitions in a DynamicFrame, you can first convert it into a DataFrame and then leverage Apache Spark's partitioning capabilities.

# Convert to a dataframe and partition based on "partition_col"
partitioned_dataframe = datasource0.toDF().repartition(1)

# Convert back to a DynamicFrame for further processing.
partitioned_dynamicframe = DynamicFrame.fromDF(partitioned_dataframe, glueContext, "partitioned_df")

Reference : AWS Glue : How to do things

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

AWS Glue write parquet with partitions

AWS Glue: Removing quote character from a CSV file while writing

Reading parquet files in AWS Glue

Optional job parameter in AWS Glue?

AWS GLUE job latency

Can a SSE:KMS Key ID be specified when writing to S3 in an AWS Glue Job?

Issue while writing a parquet file

How to load a csv/txt file into AWS Glue job

AWS glue job to map string to date and time format while converting from csv to parquet

aws glue job dependency in step function

Parametrized/reusable AWS Glue job

AWS Glue job accessing parameters

AWS Glue ETL job failing with "Failed to delete key: parquet-output/_temporary"

Writing a partitioned parquet file with SparkR

AWS Glue job write to s3 in parquet format error with Not Found

AWS Glue using job bookmark fails with "Datasource does not support writing empty or nested empty schemas"

AWS Glue for Mongo to Parquet file in S3

Is crawler required for creating an AWS glue job?

Merging multiple parquet files and creating a larger parquet file in s3 using AWS glue

AWS GLUE job failure working with partitioned Parquet files in nested s3 folders

AWS Glue fail to write parquet, out of memory

AWS Redshift to S3 Parquet Files Using AWS Glue

Glue Job fails to write file

Can AWS Glue Job write to SAP HANA?

AWS Glue job to unzip a file from S3 and write it back to S3

How can we rename the generated/output parquet file in PYSPARK or Dynamic Frames in AWS Glue?

Add .csv extension in output file AWS Python Glue Job

Triggering an aws glue job when in progress

Retrieve metadata for last runs of multiple job definitions using a single API call in AWS Glue Boto3