比较两个数据帧Pyspark

学习者

我正在尝试比较具有相同列数的两个数据帧,即在两个数据帧中ID为关键列的ID为4的列

df1 = spark.read.csv("/path/to/data1.csv")
df2 = spark.read.csv("/path/to/data2.csv")

现在我想将新列追加到DF2,即column_names,这是与df1值不同的列的列表

df2.withColumn("column_names",udf())

DF1

+------+---------+--------+------+
|   id | |name  | sal  | Address |
+------+---------+--------+------+
|     1|  ABC   | 5000 | US      |
|     2|  DEF   | 4000 | UK      |
|     3|  GHI   | 3000 | JPN     |
|     4|  JKL   | 4500 | CHN     |
+------+---------+--------+------+

DF2:

+------+---------+--------+------+
|   id | |name  | sal  | Address |
+------+---------+--------+------+
|     1|  ABC   | 5000 | US      |
|     2|  DEF   | 4000 | CAN     |
|     3|  GHI   | 3500 | JPN     |
|     4|  JKL_M | 4800 | CHN     |
+------+---------+--------+------+

现在我要DF3

DF3:

+------+---------+--------+------+--------------+
|   id | |name  | sal  | Address | column_names |
+------+---------+--------+------+--------------+
|     1|  ABC   | 5000 | US      |  []          |
|     2|  DEF   | 4000 | CAN     |  [address]   |
|     3|  GHI   | 3500 | JPN     |  [sal]       |
|     4|  JKL_M | 4800 | CHN     |  [name,sal]  |
+------+---------+--------+------+--------------+

我看到了这样的问题:如何比较两个数据帧和在scala中不同的打印列尝试过,但是结果不同。

我正在考虑通过将UDF函数从每个数据帧传递到udf并逐列比较并返回列列表来使用UDF函数。但是,为此,两个数据帧应按排序顺序,以便将相同的id行发送到udf。此处的分类操作成本很高。有什么办法吗?

复活

假设我们可以使用id来连接这两个数据集,那么我认为不需要UDF。仅通过使用内部连接,arrayarray_remove函数即可解决此问题

首先让我们创建两个数据集:

df1 = spark.createDataFrame([
  [1, "ABC", 5000, "US"],
  [2, "DEF", 4000, "UK"],
  [3, "GHI", 3000, "JPN"],
  [4, "JKL", 4500, "CHN"]
], ["id", "name", "sal", "Address"])

df2 = spark.createDataFrame([
  [1, "ABC", 5000, "US"],
  [2, "DEF", 4000, "CAN"],
  [3, "GHI", 3500, "JPN"],
  [4, "JKL_M", 4800, "CHN"]
], ["id", "name", "sal", "Address"])

首先,我们在两个数据集之间进行内部联接,然后df1[col] != df2[col]为除以外的每一列生成条件id当列不相等时,我们返回列名,否则返回一个空字符串。条件列表将包含一个数组的项目,最后我们从中删除空项目:

from pyspark.sql.functions import col, array, when, array_remove

# get conditions for all columns except id
conditions_ = [when(df1[c]!=df2[c], lit(c)).otherwise("") for c in df1.columns if c != 'id']

select_expr =[
                col("id"), 
                *[df2[c] for c in df2.columns if c != 'id'], 
                array_remove(array(*conditions_), "").alias("column_names")
]

df1.join(df2, "id").select(*select_expr).show()

# +---+-----+----+-------+------------+
# | id| name| sal|Address|column_names|
# +---+-----+----+-------+------------+
# |  1|  ABC|5000|     US|          []|
# |  3|  GHI|3500|    JPN|       [sal]|
# |  2|  DEF|4000|    CAN|   [Address]|
# |  4|JKL_M|4800|    CHN| [name, sal]|
# +---+-----+----+-------+------------+

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章