我有一个pyspark数据框,其中的列包含stringtype的StructField,该字符串具有动态长度的列表。
df.schema: StructType(List(StructField(id,StringType,true),StructField(recs,StringType,true)))
|id | recs |
|ABC|[66, [["AB", 10]]]
|XYZ|[66, [["XY", 10], ["YZ", 20]]]
|DEF|[66, [["DE", 10], ["EF", 20], ["FG", 30]]]
我正在尝试将列表整理成这样
|id | like_id
|ABC|AB|
|XYZ|XY|
|XYZ|YZ|
|DEF|DE|
|DEF|EF|
|DEF|FG|
我尝试了什么:
我尝试使用数组表达式,因为recs是预期的StringType,这使我出错
我能够在熊猫中使用json加载和itertools来处理此问题,但我需要在火花中进行此处理,因为数据帧很大,约为3000万,结果是10倍。
df["recs"].apply(
lambda x: [rec_id[0] for rec_id in json.loads(x)[1:][0]]
)
for i, row in df.iterrows():
....
IIUC,您可以使用pattern将String in列拆分,找到第二个元素,然后使用from_json检索数组数组:recs
, (?=\[\[)|\]$
from pyspark.sql import functions as F
df1 = df.withColumn('recs1', F.split('recs', ', (?=\[\[)|\]$')[1]) \
.withColumn('recs2', F.from_json('recs1', 'array<array<string>>'))
其中:拆分模式, (?=\[\[)|\]$
包含两个子模式:
, (?=\[\[)
\]$
结果:
df1.show(truncate=False)
+---+------------------------------------------+------------------------------------+------------------------------+
|id |recs |recs1 |recs2 |
+---+------------------------------------------+------------------------------------+------------------------------+
|ABC|[66, [["AB", 10]]] |[["AB", 10]] |[[AB, 10]] |
|XYZ|[66, [["XY", 10], ["YZ", 20]]] |[["XY", 10], ["YZ", 20]] |[[XY, 10], [YZ, 20]] |
|DEF|[66, [["DE", 10], ["EF", 20], ["FG", 30]]]|[["DE", 10], ["EF", 20], ["FG", 30]]|[[DE, 10], [EF, 20], [FG, 30]]|
+---+------------------------------------------+------------------------------------+------------------------------+
然后,您可以使用explode
以获得所需的结果:
df1.selectExpr("id", "explode_outer(recs2) as recs") \
.selectExpr("id", "recs[0] as like_id") \
.show()
+---+-------+
| id|like_id|
+---+-------+
|ABC| AB|
|XYZ| XY|
|XYZ| YZ|
|DEF| DE|
|DEF| EF|
|DEF| FG|
+---+-------+
简而言之,我们可以将以上代码编写为以下代码:
df_new = df.selectExpr("id", r"explode_outer(from_json(split(recs, ', (?=\\[\\[)|\\]$')[1], 'array<array<string>>')) as recs") \
.selectExpr("id", "recs[0] as like_id")
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句