按键分组并使用 Spark/Scala 有效地找到在特定时间窗口中发生的事件的前一个时间戳

anonymous_guy

注意:对于聚合,我的分组最多可以包含每组 5-10K 行。所以一个高效的代码是非常可取的。

我的资料

val df1 = sc.parallelize(Seq(
  ("user2", "iphone", "2017-12-23 16:58:08", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:12", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:20", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:25", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:35", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:45", "Success")
)).toDF("username", "device", "attempt_at", "stat")
+--------+------+-------------------+-------+
|username|device|         attempt_at|   stat|
+--------+------+-------------------+-------+
|   user2|iphone|2017-12-23 16:58:08|Success|
|   user2|iphone|2017-12-23 16:58:12|Success|
|   user2|iphone|2017-12-23 16:58:20|Success|
|   user2|iphone|2017-12-23 16:58:25|Success|
|   user2|iphone|2017-12-23 16:58:35|Success|
|   user2|iphone|2017-12-23 16:58:45|Success|
+--------+------+-------------------+-------+

我想要什么
事件发生的最近时间按(用户名、设备)分组。

+--------+------+-------------------+-------+-------------------+
|username|device|         attempt_at|   stat|previous_attempt_at|
+--------+------+-------------------+-------+-------------------+
|   user2|iphone|2017-12-23 16:58:45|Success|2017-12-23 16:58:35|
+--------+------+-------------------+-------+-------------------+

所需输出中的异常:
现在因为我提到它必须在特定的时间窗口中,例如在下面的输入数据集中,最后一行的最新日期时间戳为 12 月 23 日。现在,如果我想要返回 1 天的特定时间窗口并给我最后一次尝试,则“previous_attempt_at”列将为空,因为前一天应该在 1 月 22 日没有事件。这一切都取决于输入时间戳范围。

//Initial Data
+--------+------+-------------------+-------+
|username|device|         attempt_at|   stat|
+--------+------+-------------------+-------+
|   user2|iphone|2017-12-20 16:58:08|Success|
|   user2|iphone|2017-12-20 16:58:12|Success|
|   user2|iphone|2017-12-20 16:58:20|Success|
|   user2|iphone|2017-12-20 16:58:25|Success|
|   user2|iphone|2017-12-20 16:58:35|Success|
|   user2|iphone|2017-12-23 16:58:45|Success|
+--------+------+-------------------+-------+

// Desired Output
A grouping by (username,device) for the latest time an event occurred.

    +--------+------+-------------------+-------+-------------------+
    |username|device|         attempt_at|   stat|previous_attempt_at|
    +--------+------+-------------------+-------+-------------------+
    |   user2|iphone|2017-12-23 16:58:45|Success|               null|
    +--------+------+-------------------+-------+-------------------+

我有什么

val w = (Window.partitionBy("username", "device")
                 .orderBy(col("attempt_at").cast("timestamp").cast("long"))
                   .rangeBetween(-3600, -1)
                 )

val df2 = df1.withColumn("previous_attempt_at", last("attempt_at").over(w))

+--------+------+-------------------+-------+-------------------+
|username|device|         attempt_at|   stat|previous_attempt_at|
+--------+------+-------------------+-------+-------------------+
|   user2|iphone|2017-12-23 16:58:08|Success|               null|
|   user2|iphone|2017-12-23 16:58:12|Success|2017-12-23 16:58:08|
|   user2|iphone|2017-12-23 16:58:20|Success|2017-12-23 16:58:12|
|   user2|iphone|2017-12-23 16:58:25|Success|2017-12-23 16:58:20|
|   user2|iphone|2017-12-23 16:58:35|Success|2017-12-23 16:58:25|
|   user2|iphone|2017-12-23 16:58:45|Success|2017-12-23 16:58:35|
+--------+------+-------------------+-------+-------------------+

注释我的代码对特定用户分组中的每一行进行窗口化。在处理大量数据时效率非常低,也没有给出最新的尝试。除了最后一行,我不需要所有行。

拉梅什·马哈詹

您所需要的只是一个额外的groupByaggregation但在此之前,您需要collect_list累积收集先前日期的udf函数和检查先前尝试的函数是否在时间限制内并将三列( "attempt_at", "stat", "previous_attempt_at")转换struct 为选择最后一列作为

import org.apache.spark.sql.functions._
import java.time._
import java.time.temporal._
import java.time.format._
def durationUdf = udf((actualtimestamp: String, timestamps: Seq[String])=> {
  val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
  val actualDateTime = LocalDateTime.parse(actualtimestamp, formatter)
  val diffDates = timestamps.init.filter(x => LocalDateTime.from(LocalDateTime.parse(x, formatter)).until(actualDateTime, ChronoUnit.DAYS) <= 1)
  if(diffDates.size > 0) diffDates.last else null
})

import org.apache.spark.sql.expressions._
val w = Window.partitionBy("username", "device").orderBy(col("attempt_at").cast("timestamp").cast("long"))

val df2 = df1.withColumn("previous_attempt_at", durationUdf(col("attempt_at"), collect_list("attempt_at").over(w)))
  .withColumn("struct", struct(col("attempt_at").cast("timeStamp").as("attempt_at"),col("stat"), col("previous_attempt_at")))
  .groupBy("username", "device").agg(max("struct").as("struct"))
  .select(col("username"), col("device"), col("struct.attempt_at"), col("struct.stat"), col("struct.previous_attempt_at"))

这应该给你后面的例子

+--------+------+---------------------+-------+-------------------+
|username|device|attempt_at           |stat   |previous_attempt_at|
+--------+------+---------------------+-------+-------------------+
|user2   |iphone|2017-12-23 16:58:45.0|Success|null               |
+--------+------+---------------------+-------+-------------------+

和以下对前一个输入d ATA

+--------+------+---------------------+-------+-------------------+
|username|device|attempt_at           |stat   |previous_attempt_at|
+--------+------+---------------------+-------+-------------------+
|user2   |iphone|2017-12-23 16:58:45.0|Success|2017-12-23 16:58:35|
+--------+------+---------------------+-------+-------------------+

你可以通过改变改变时间逻辑ChronoUnit.DAYSudf功能ChronoUnit.HOURS

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

使用JavaScript按键的检测时间

如何使用Spark Streaming读取流并在一个时间窗口内找到IP?

在多索引数据框中使用“正向填充”有效地重新索引一个级别

如何使用dask有效地并行化时间序列预测?

使用EventSystem进行按键事件

如何在RXJS 5+中按时间有效地对事件进行分组

使用另一个时间戳数据框过滤熊猫上的时间戳数据框

如何通过python中的第一个字符将元素有效地分组在一个巨大的列表中

Spark结构化流分组窗口-我希望第一个时间间隔从第一个时间戳开始

使用foreach为每组记录指定一个时间戳

从按键分组的连接节点中获取所有值

在PostgreSQL中,为另一个表中的每一行有效地使用一个表

如何使用Python中的Pandas有效地将数据帧重组为日期时间项?

Flink-如何与TimestampAssigner一起使用从事件有效负载中获取时间(不使用Kafka时间戳)

使用线程有效地逆转一个链表,该链表有效地拥有多达700万个节点

R:使用一个列表有效地修改另一个列表

如何使用SDL2有效地将按键值存储在数组中?

合并两个哈希并按键对它们进行分组

如何按键对一组对象进行分组?

如何使用python有效地填充“缺失的时间模式”并用特定值“填充它们”?

如何有效地获取每个资产的最后一个时间戳而无需对 timescaledb 进行顺序扫描?

使用 Turtle 绑定按键事件

如何仅使用一个 Excell 单元格有效地找到与表格相比的值的最小正差异?

如何按键对元素进行分组并使用 javascript 附加该特定键的值?

如何使用上一个时间戳的值填充空列

numpy:有效地获取按另一个数组元素分组的数组元素的统计信息

Kafka Stream 固定窗口未按键分组

使用来自另一个数组的索引有效地切片数组

如何在时间戳数组中有效地找到每天/每周/每月的第一个时间戳?