我有一个包含以下列的数据集(df):
ID Date Next Date Datediff in days
1 2017-10-25 null null
1 2016-12-13 2017-10-25 316
1 2016-11-23 2016-12-13 20
1 2016-11-14 2016-11-23 9
1 2015-07-07 2016-11-14 496
2 2015-05-15 null null
2 2013-11-29 2015-05-15 532
2 2013-11-16 2013-11-29 13
2 2009-02-06 2013-11-16 1744
2 2006-06-21 2009-02-06 961
2 2002-06-25 2006-06-21 1457
在这种情况下,“下一个日期”仅在日期差大于30天时才有效。现在,我想用有效的下一个日期替换下一个日期条目,并创建一个“ New Next Date”列,如下所示:
ID Date Next Date Datediff in days New Next Date
1 2017-10-25 null null null
1 2016-12-13 2017-10-25 316 2017-10-25
1 2016-11-23 2016-12-13 20 2017-10-25
1 2016-11-14 2016-11-23 9 2017-10-25
1 2015-07-07 2016-11-14 496 2016-11-14
2 2015-05-15 null null null
2 2013-11-29 2015-05-15 532 2015-05-15
2 2013-11-16 2013-11-29 13 2015-05-15
2 2009-02-06 2013-11-16 1744 2013-11-16
2 2006-06-21 2009-02-06 961 2009-02-06
2 2002-06-25 2006-06-21 1457 2006-06-21
我考虑过要使用lag函数来创建一个辅助列来替换小于30天的日期。但是对于客户ID 1,可能会发生两个无效日期连续的情况。然后这不起作用。
您可以使用lag函数在lagDate列中添加,否则使用带datediff的else语句来比较差异:
import org.apache.spark.sql.functions.{when, col, lag, datediff, lag, date_add}
val windowId = Window.partitionBy(col("ID")).orderBy("Date")
val newDf = df.withColumn("dateLimit", date_add(col("Date"), 30))
.withColumn("lagDate1", lag(col("Date"), -1).over(windowId))
.withColumn("lagDate2", lag(col("Date"), -2).over(windowId))
.withColumn("lagDate",
when(datediff(col("lagDate1"), col("dateLimit")) >=0, col("lagDate1")).otherwise(col("lagDate2"))
)
.withColumn("correctNextDate",
when(datediff(col("NextDate"), col("Date")) > 30 || col("NextDate").isNull,
col("NextDate")).
otherwise(col("lagDate"))
)
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句