我正在尝试展平两个JSON文件(我们称它们为JSON1
&JSON2
)。以下是它们外观的示例。
现在在一个文件中,列数据类型可以是struct,而在另一个文件中,它可以是字符串。最终目标是能够展平这些文件并将数据合并/合并/合并为CSV文件。如何使用Python在Spark中完成此操作?
JSON1:
{
"result": [
{
"promoted_by": "",
"parent": "",
"number": "310346",
"closed_by": {
"link": "https://abcdev.service-now.com/api/now/table/sys_user/e4b0dd",
"value": "e4b0dd"
}
}
]
}
root
|-- result: struct (nullable = true)
| |-- closed_by: struct (nullable = true)
| | |-- link: string (nullable = true)
| | |-- value: string (nullable = true)
| |-- number: string (nullable = true)
| |-- parent: string (nullable = true)
| |-- promoted_by: string (nullable = true)
JSON2:
{
"result": [
{
"promoted_by": "",
"parent": {
"link": "https://abcdev.service-now.com/api/now/table/sys_user/ab00f1",
"value": "ab00f1"
},
"number": "310348",
"closed_by": ""
}
]
}
root
|-- result: struct (nullable = true)
| |-- closed_by: string (nullable = true)
| |-- number: string (nullable = true)
| |-- parent: struct (nullable = true)
| | |-- link: string (nullable = true)
| | |-- value: string (nullable = true)
| |-- promoted_by: string (nullable = true)
只需将2个JSON文件读入同一DataFrame中即可。模式将由Spark自动合并。列closed_by
和parent
都将是类型struct
:
df = spark.read.json("dbfs:/mnt/{json1.json,json2.json}", multiLine=True)
df.printSchema()
#root
# |-- result: array (nullable = true)
# | |-- element: struct (containsNull = true)
# | | |-- closed_by: struct (nullable = true)
# | | | |-- link: string (nullable = true)
# | | | |-- value: string (nullable = true)
# | | |-- number: string (nullable = true)
# | | |-- parent: struct (nullable = true)
# | | | |-- link: string (nullable = true)
# | | | |-- value: string (nullable = true)
# | | |-- promoted_by: string (nullable = true)
要展平结构,请使用explode
+星号展开结构:
from pyspark.sql import functions as F
df1 = df.select(F.explode("result").alias("results")).select("results.*") \
.select(
F.col("number"),
F.col("closed_by.value").alias("closed_by_value"),
F.col("closed_by.link").alias("closed_by_link"),
F.col("parent.value").alias("parent_value"),
F.col("parent.link").alias("parent_link"),
F.col("promoted_by")
)
df1.printSchema()
#root
# |-- number: string (nullable = true)
# |-- closed_by_value: string (nullable = true)
# |-- closed_by_link: string (nullable = true)
# |-- parent_value: string (nullable = true)
# |-- parent_link: string (nullable = true)
# |-- promoted_by: string (nullable = true)
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句