解压缩元组列表 - PySpark

我在这里问了相反的问题Create a tuple out of two columns - PySpark我现在要做的是将位于数据框列中的元组列表解压缩到每行两个不同的列表中。所以基于下面的数据框,v_tuple 列回到 v1 和 v2。

+---------------+---------------+--------------------+
|             v1|             v2|             v_tuple|
+---------------+---------------+--------------------+
|[2.0, 1.0, 9.0]|[9.0, 7.0, 2.0]|[(2.0,9.0), (1.0,...|
|[4.0, 8.0, 9.0]|[1.0, 1.0, 2.0]|[(4.0,1.0), (8.0,...|
+---------------+---------------+--------------------+

根据我之前的专栏,我尝试了以下操作但没有成功:

unzip_ = udf(
    lambda l: list(zip(*l)),
    ArrayType(ArrayType("_1", DoubleType()), ArrayType("_2", DoubleType())))

我正在使用 pyspark 1.6

马弗

你可以爆炸你的数组,然后再次将它分组:

首先让我们创建我们的数据框:

df = spark.createDataFrame(
    sc.parallelize([
            [[2.0, 1.0, 9.0], [9.0, 7.0, 2.0], [(2.0,9.0), (1.0,7.), (9.,2.)]],
            [[4.0, 8.0, 9.0], [1.0, 1.0, 2.0], [(4.0,1.0), (8.0,1.), (9., 2.)]]
        ]), 
    ["v1", "v2", "v_tuple"] 
)

让我们添加一个行 id 来唯一标识它:

import pyspark.sql.functions as psf
df = df.withColumn("id", psf.monotonically_increasing_id())

现在,我们可以分解列“v_tuple”并从元组的两个元素创建两列:

df = df.withColumn("v_tuple", psf.explode("v_tuple")).select(
    "id", 
    psf.col("v_tuple._1").alias("v1"), 
    psf.col("v_tuple._2").alias("v2")
)

    +-----------+---+---+
    |         id| v1| v2|
    +-----------+---+---+
    |42949672960|2.0|9.0|
    |42949672960|1.0|7.0|
    |42949672960|9.0|2.0|
    |94489280512|4.0|1.0|
    |94489280512|8.0|1.0|
    |94489280512|9.0|2.0|
    +-----------+---+---+

最后,我们可以再次将其分组:

df = df.groupBy("id").agg(
    psf.collect_list("v1").alias("v1"),
    psf.collect_list("v2").alias("v2")
)

    +-----------+---------------+---------------+
    |         id|             v1|             v2|
    +-----------+---------------+---------------+
    |42949672960|[2.0, 1.0, 9.0]|[9.0, 7.0, 2.0]|
    |94489280512|[4.0, 8.0, 9.0]|[1.0, 1.0, 2.0]|
    +-----------+---------------+---------------+

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章