我有一个带有列id
和的Spark Dataframe hashes
,其中列hashes
包含一个Seq
length的整数值n
。例:
+----+--------------------+
+ id| hashes|
+----+--------------------+
|0 | [1, 2, 3, 4, 5]|
|1 | [1, 5, 3, 7, 9]|
|2 | [9, 3, 6, 8, 0]|
+-------------------------+
我想获得一个数据帧,其中所有与之hashes
匹配的行至少在一个位置匹配。更正式地说,我想和一个额外的列数据帧matches
,对于每一行r
包含Seq
的id
行S其中hashes[r][i] == hashes[k][i]
有k
是任何其他行在执法机关的一个值i
。
对于我的示例数据,结果将是:
+---+---------------+-------+
|id |hashes |matches|
+---+---------------+-------+
|0 |[1, 2, 3, 4, 5]|[1] |
|1 |[1, 5, 3, 7, 9]|[0] |
|2 |[9, 3, 6, 8, 0]|[] |
+---+---------------+-------+
在Spark 3中,以下代码在行之间比较数组,仅保留两个数组在同一位置共享至少一个元素的行。df
是您的输入数据框:
df.join(
df.withColumnRenamed("id", "id2").withColumnRenamed("hashes", "hashes2"),
exists(arrays_zip(col("hashes"), col("hashes2")), x => x("hashes") === x("hashes2"))
)
.groupBy("id")
.agg(first(col("hashes")).as("hashes"), collect_list("id2").as("matched"))
.withColumn("matched", filter(col("matched"), x => x.notEqual(col("id"))))
首先,我们执行自动交叉联接,并根据您在两个哈希数组上相同位置的至少一个元素的条件进行过滤。
为了建立条件,我们压缩两个哈希数组,一个哈希数组来自第一个数据帧,一个哈希数组用于第二个连接的数据帧,也就是第一个已重命名列的数据帧。通过压缩,我们得到的数组,{"hashes":x, "hashes2":y}
接下来我们只需检查该数组中是否存在where元素x = y
。完整的条件如下:
exists(arrays_zip(col("hashes"), col("hashes2")), x => x("hashes") === x("hashes2"))
然后,我们将按列汇总id
以收集所有id2
保留的行,这意味着符合您条件的行
为了保持“哈希”列,对于具有相同“ id”的两行,“哈希”列相等,我们为每个“ id”获得“哈希”的第一个匹配项。然后,我们使用collect_list收集所有“ id2” :
.agg(first(col("hashes")).as("hashes"), collect_list("id2").as("matches"))
最后,我们从“匹配”列中过滤出当前行的ID
.withColumn("matches", filter(col("matches"), x => x.notEqual(col("id"))))
如果需要按顺序排列“ id”,则可以添加一个orderBy
子句:
.orderBy("id")
对于df
包含以下值的数据框:
+---+---------------+
|id |hashes |
+---+---------------+
|0 |[1, 2, 3, 4, 5]|
|1 |[1, 5, 3, 7, 9]|
|2 |[9, 3, 6, 8, 0]|
+---+---------------+
您将获得以下输出:
+---+---------------+-------+
|id |hashes |matches|
+---+---------------+-------+
|0 |[1, 2, 3, 4, 5]|[1] |
|1 |[1, 5, 3, 7, 9]|[0] |
|2 |[9, 3, 6, 8, 0]|[] |
+---+---------------+-------+
联接是笛卡尔积,非常昂贵。尽管条件过滤器会过滤结果,但它可能导致大型数据集上的大量计算/混洗,并且性能可能非常差。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句