我有两个PySpark DataFrames(NO Pandas):
df1 =
+----------+--------------+-----------+---------+
|pk |num_id |num_pk |qty_users|
+----------+--------------+-----------+---------+
| 63479840| 12556940| 298620| 13|
| 63480030| 12557110| 298620| 9|
| 63835520| 12627890| 299750| 8|
df2 =
+----------+--------------+-----------+----------+
|pk2 |num_id2 |num_pk2 |qty_users2|
+----------+--------------+-----------+----------+
| 63479800| 11156940| 298620| 10 |
| 63480030| 12557110| 298620| 1 |
| 63835520| 12627890| 299750| 2 |
我想加入两个DataFrame以获得一个DataFrame df
:
+----------+--------------+-----------+---------+
|pk |num_id |num_pk |total |
+----------+--------------+-----------+---------+
| 63479840| 12556940| 298620| 13|
| 63479800| 11156940| 298620| 10|
| 63480030| 12557110| 298620| 10|
| 63835520| 12627890| 299750| 10|
合并的唯一条件是我要对in和qty_users
中具有相同值的那些行的值求和。就像我在上面的示例中所示。< pk, num_id, num_pk >
df1
df2
我该怎么做?
更新:
这是我所做的:
newdf = df1.join(df2,(df1.pk==df2.pk2) & (df1.num_pk==df2.num_pk2) & (df1.num_id==df2.num_id2),'outer')
newdf = newdf.withColumn('total', sum(newdf[col] for col in ["qty_users","qty_users2"]))
但是它给了我9列而不是4列。如何解决这个问题?
外部联接将返回两个表中的所有列。此外,我们必须在qty_users中填充null值,因为sum也将返回null。
最后,我们可以使用Coalsece函数进行选择
from pyspark.sql import functions as F
newdf = df1.join(df2,(df1.pk==df2.pk2) & (df1.num_pk==df2.num_pk2) & (df1.num_id==df2.num_id2),'outer').fillna(0,subset=["qty_users","qty_users2"])
newdf = newdf.withColumn('total', sum(newdf[col] for col in ["qty_users","qty_users2"]))
newdf.select(*[F.coalesce(c1,c2).alias(c1) for c1,c2 in zip(df1.columns,df2.columns)][:-1]+['total']).show()
+--------+--------+------+-----+
| pk| num_id|num_pk|total|
+--------+--------+------+-----+
|63479840|12556940|298620| 13|
|63480030|12557110|298620| 10|
|63835520|12627890|299750| 10|
|63479800|11156940|298620| 10|
+--------+--------+------+-----+
希望这可以帮助。!
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句