有谁知道在Glue作业中将源文件名添加为列的方法吗?
我们创建了一个流程,在其中爬行了S3中的一些文件以创建模式。然后,我们编写了一个作业,将文件转换为新格式,并将这些文件作为CSV写回到另一个S3存储桶,以供我们的管道的其余部分使用。我们想要做的是访问某种作业元属性,以便我们可以向输出文件中添加一个包含原始文件名的新列。
我浏览了AWS文档和aws-glue-libs源,但没有发现任何问题。理想情况下,将有某种方法可以从awsglue.job
包中获取元数据(我们使用的是python风格)。
我仍在学习Glue,因此如果我使用了错误的术语,我们深表歉意。我也用spark标签对其进行了标记,因为我相信这就是Glue在幕后使用的东西。
您可以在您的etl工作中使用spark做到这一点:
var df = glueContext.getCatalogSource(
database = database,
tableName = table,
transformationContext = s"source-$database.$table"
).getDynamicFrame()
.toDF()
.withColumn("input_file_name", input_file_name())
glueContext.getSinkWithFormat(
connectionType = "s3",
options = JsonOptions(Map(
"path" -> args("DST_S3_PATH")
)),
transformationContext = "",
format = "parquet"
).writeDynamicFrame(DynamicFrame(df, glueContext))
请记住,它仅适用于getCatalogSource()API,不适用于create_dynamic_frame_from_options()
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句