注意:对于聚合,我的分组最多可以包含每组 5-10K 行。所以一个高效的代码是非常可取的。
我的资料
val df1 = sc.parallelize(Seq(
("user2", "iphone", "2017-12-23 16:58:08", "Success"),
("user2", "iphone", "2017-12-23 16:58:12", "Success"),
("user2", "iphone", "2017-12-23 16:58:20", "Success"),
("user2", "iphone", "2017-12-23 16:58:25", "Success"),
("user2", "iphone", "2017-12-23 16:58:35", "Success"),
("user2", "iphone", "2017-12-23 16:58:45", "Success")
)).toDF("username", "device", "attempt_at", "stat")
+--------+------+-------------------+-------+
|username|device| attempt_at| stat|
+--------+------+-------------------+-------+
| user2|iphone|2017-12-23 16:58:08|Success|
| user2|iphone|2017-12-23 16:58:12|Success|
| user2|iphone|2017-12-23 16:58:20|Success|
| user2|iphone|2017-12-23 16:58:25|Success|
| user2|iphone|2017-12-23 16:58:35|Success|
| user2|iphone|2017-12-23 16:58:45|Success|
+--------+------+-------------------+-------+
我想要什么
事件发生的最近时间按(用户名、设备)分组。
+--------+------+-------------------+-------+-------------------+
|username|device| attempt_at| stat|previous_attempt_at|
+--------+------+-------------------+-------+-------------------+
| user2|iphone|2017-12-23 16:58:45|Success|2017-12-23 16:58:35|
+--------+------+-------------------+-------+-------------------+
所需输出中的异常:
现在因为我提到它必须在特定的时间窗口中,例如在下面的输入数据集中,最后一行的最新日期时间戳为 12 月 23 日。现在,如果我想要返回 1 天的特定时间窗口并给我最后一次尝试,则“previous_attempt_at”列将为空,因为前一天应该在 1 月 22 日没有事件。这一切都取决于输入时间戳范围。
//Initial Data
+--------+------+-------------------+-------+
|username|device| attempt_at| stat|
+--------+------+-------------------+-------+
| user2|iphone|2017-12-20 16:58:08|Success|
| user2|iphone|2017-12-20 16:58:12|Success|
| user2|iphone|2017-12-20 16:58:20|Success|
| user2|iphone|2017-12-20 16:58:25|Success|
| user2|iphone|2017-12-20 16:58:35|Success|
| user2|iphone|2017-12-23 16:58:45|Success|
+--------+------+-------------------+-------+
// Desired Output
A grouping by (username,device) for the latest time an event occurred.
+--------+------+-------------------+-------+-------------------+
|username|device| attempt_at| stat|previous_attempt_at|
+--------+------+-------------------+-------+-------------------+
| user2|iphone|2017-12-23 16:58:45|Success| null|
+--------+------+-------------------+-------+-------------------+
我有什么。
val w = (Window.partitionBy("username", "device")
.orderBy(col("attempt_at").cast("timestamp").cast("long"))
.rangeBetween(-3600, -1)
)
val df2 = df1.withColumn("previous_attempt_at", last("attempt_at").over(w))
+--------+------+-------------------+-------+-------------------+
|username|device| attempt_at| stat|previous_attempt_at|
+--------+------+-------------------+-------+-------------------+
| user2|iphone|2017-12-23 16:58:08|Success| null|
| user2|iphone|2017-12-23 16:58:12|Success|2017-12-23 16:58:08|
| user2|iphone|2017-12-23 16:58:20|Success|2017-12-23 16:58:12|
| user2|iphone|2017-12-23 16:58:25|Success|2017-12-23 16:58:20|
| user2|iphone|2017-12-23 16:58:35|Success|2017-12-23 16:58:25|
| user2|iphone|2017-12-23 16:58:45|Success|2017-12-23 16:58:35|
+--------+------+-------------------+-------+-------------------+
注释。我的代码对特定用户分组中的每一行进行窗口化。在处理大量数据时效率非常低,也没有给出最新的尝试。除了最后一行,我不需要所有行。
您所需要的只是一个额外的groupBy
,aggregation
但在此之前,您需要collect_list
累积收集先前日期的udf
函数和检查先前尝试的函数是否在时间限制内,并将三列( "attempt_at", "stat", "previous_attempt_at"
)转换struct
为选择最后一列作为
import org.apache.spark.sql.functions._
import java.time._
import java.time.temporal._
import java.time.format._
def durationUdf = udf((actualtimestamp: String, timestamps: Seq[String])=> {
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
val actualDateTime = LocalDateTime.parse(actualtimestamp, formatter)
val diffDates = timestamps.init.filter(x => LocalDateTime.from(LocalDateTime.parse(x, formatter)).until(actualDateTime, ChronoUnit.DAYS) <= 1)
if(diffDates.size > 0) diffDates.last else null
})
import org.apache.spark.sql.expressions._
val w = Window.partitionBy("username", "device").orderBy(col("attempt_at").cast("timestamp").cast("long"))
val df2 = df1.withColumn("previous_attempt_at", durationUdf(col("attempt_at"), collect_list("attempt_at").over(w)))
.withColumn("struct", struct(col("attempt_at").cast("timeStamp").as("attempt_at"),col("stat"), col("previous_attempt_at")))
.groupBy("username", "device").agg(max("struct").as("struct"))
.select(col("username"), col("device"), col("struct.attempt_at"), col("struct.stat"), col("struct.previous_attempt_at"))
这应该给你后面的例子
+--------+------+---------------------+-------+-------------------+
|username|device|attempt_at |stat |previous_attempt_at|
+--------+------+---------------------+-------+-------------------+
|user2 |iphone|2017-12-23 16:58:45.0|Success|null |
+--------+------+---------------------+-------+-------------------+
和以下对前一个输入d ATA
+--------+------+---------------------+-------+-------------------+
|username|device|attempt_at |stat |previous_attempt_at|
+--------+------+---------------------+-------+-------------------+
|user2 |iphone|2017-12-23 16:58:45.0|Success|2017-12-23 16:58:35|
+--------+------+---------------------+-------+-------------------+
你可以通过改变改变时间逻辑ChronoUnit.DAYS
的udf
功能ChronoUnit.HOURS
等
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句