我有两个数据框,每个数据框都有一个日期列。IE:
+-----------+
| DEADLINES|
+-----------+
| 2023-07-15|
| 2018-08-10|
| 2022-03-28|
| 2021-06-22|
| 2021-12-18|
| 2021-10-11|
| 2021-11-13|
+-----------+
+----------+
| DT_DATE|
+----------+
|2021-04-02|
|2021-04-21|
|2021-05-01|
|2021-06-03|
|2021-09-07|
|2021-10-12|
|2021-11-02|
+----------+
我需要计算DT_DATE
给定参考日期和每个日期之间有多少个DEADLINES
日期。
例如:使用2021-03-31
作为参考日期应该给出以下结果集。
+-----------+------------+
| DEADLINES| dt_count|
+-----------+------------+
| 2023-07-15| 7|
| 2018-08-10| 0|
| 2022-03-28| 7|
| 2021-06-22| 4|
| 2021-12-18| 7|
| 2021-10-11| 5|
| 2021-11-13| 7|
+-----------+------------+
我设法让它在最后期限数据帧的每一行中进行迭代,但是对于更大的数据集,性能变得非常差。
有没有人有更好的解决方案?
编辑:那是我目前的解决方案:
def count_days(deadlines_df, dates_df, ref_date):
for row in deadlines_df.collect():
qtt = dates_df.filter(dates_df.DT_DATE.between(ref_date, row.DEADLINES)).count()
yield row.DEADLINES, qtt
new_df = spark.createDataFrame(count_days(deadlines_df, dates_df, "2021-03-31"), ["DEADLINES", "dt_count"])
两个数据帧都可以用不同的权重组合,窗口函数的范围从开始到当前使用的行(Scala):
val deadlines = Seq(
("2023-07-15"),
("2018-08-10"),
("2022-03-28"),
("2021-06-22"),
("2021-12-18"),
("2021-10-11"),
("2021-11-13")
).toDF("DEADLINES")
val dates = Seq(
("2021-04-02"),
("2021-04-21"),
("2021-05-01"),
("2021-06-03"),
("2021-09-07"),
("2021-10-12"),
("2021-11-02")
).toDF("DT_DATE")
val referenceDate = "2021-03-31"
val united = deadlines.withColumn("weight", lit(0))
.unionAll(
dates
.where($"DT_DATE" >= referenceDate)
.withColumn("weight", lit(1))
)
val fromStartToCurrentRowWindow = Window.orderBy("DEADLINES").rangeBetween(Window.unboundedPreceding, Window.currentRow)
val result = united
.withColumn("dt_count", sum("weight").over(fromStartToCurrentRowWindow))
.where($"weight" === lit(0))
.drop("weight")
输出:
+----------+--------+
|DEADLINES |dt_count|
+----------+--------+
|2018-08-10|0 |
|2021-06-22|4 |
|2021-10-11|5 |
|2021-11-13|7 |
|2021-12-18|7 |
|2022-03-28|7 |
|2023-07-15|7 |
+----------+--------+
注意:计算将在一个分区中执行,Spark 显示这样的警告:WARN Logging - No Partition Defined for Window operation!将所有数据移动到单个分区,这会导致严重的性能下降。
还有其他可能的解决方案,按范围连接两个数据帧,这导致笛卡尔连接。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句