使用 Spark 对 Parquet 数据集进行排序并将排序结果存储为 S3 中的多个文件

奎里翁

s3://my-bucket/events/date=X/我有一个存储在多个零件文件中的镶木地板数据集:

  • part000.snappy.parquet
  • part001.snappy.parquet
  • part002.snappy.parquet
  • ...

数据集中的事件有一个timestamp列,即 ISO 8601 中的字符串。数据集中的事件是完全未排序的。

使用 spark,我想对数据集进行排序并将其存储回 S3,这样:

  • 在每个中partXXX.snappy.parquet,事件按时间戳排序
  • 具有较低 XXX 的部分文件具有较低的时间戳,即 part000 中事件的时间戳 <= 部分 001 中事件的时间戳 <= 部分 002 中事件的时间戳,...

详细信息: - 每个部分文件有 200MB - 1GB - 最终保存的文件可以包含任意数量的事件,只要我能以某种方式控制它们的大小。我想保留小于 1GB 的部分文件。

在 Spark 中这样做容易吗?如何实现这一点?/

奎里翁

以下工作:

target_path = "s3://..."
events = spark.read.parquet("s3://my-bucket/events/date=X/")
events = events.sort("timestamp", ascending=True)
num_files = ceil(float(events.count()) / EVENTS_PER_FILE)
events.coalesce(num_files).write.parquet(
            target_path,
            mode="overwrite")  # note: overwrite deletes old files

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

尝试使用本地Spark从S3读取和写入Parquet文件

如何在EMR上使用Spark在Hive Metastore中注册S3 Parquet文件

在S3中存储时正确的Parquet文件大小?

使用Spark的partitioningBy方法对S3中的大型偏斜数据集进行分区

Hive与Parquet文件上的Spark数据集

S3上的Spark Dataset Parquet分区创建临时文件夹

从Scala读取Parquet文件而不使用Spark

如何使用Spark Session列出S3存储桶中的文件?

使用登台S3A提交器将Parquet写入AWS S3时,Apache Spark中出现UnsatisfiedLinkError

如何在Spark数据框中从AWS S3读取多个文件?

为什么 Spark 应用程序使用多个 csv 文件将 DataFrame 保存到 S3 存储桶

在不使用Spark的Scala中创建Parquet文件

您可以使用Spark SQL / Hive / Presto直接从Parquet / S3复制到Redshift吗?

Apache Spark写入S3未能从临时文件夹中移动Parquet文件

使用 AWS Glue 将 AWS Redshift 转换为 S3 Parquet 文件

使用多个S3帐户运行EMR Spark

使用Spectrify以Parquet格式将数据从Redshift卸载到S3

如何使用纯Java(包括日期和小数类型)生成Parquet文件并将其上传到S3 [Windows](无HDFS)

Spark:使用Scala从S3读取CSV文件

如何使用AWS CLI删除S3存储桶中的多个文件

Firehose JSON-> S3 Parquet-> ETL Spark,错误:无法推断Parquet的架构

如何使用S3中的数据框访问多个json文件

无法使用Spark结构化流在Parquet File中写入数据

从 S3 上的多个目录创建 spark 数据帧

从子目录递归读取文件,并使用s3或本地文件系统中的spark

使用Spark读取Minio存储桶中的多个文件

使用 Glue Spark Scala 脚本检查 S3 中是否存在文件?

在Spark 2.0中读取本地Parquet文件

如何使用java在amazon s3中存储文件