使用Spark Scala为数据中的每个组进行窗口操作后选择最新的时间戳记录

anonymous_guy

我在一天的时间范围(86400)中对(user,app)的尝试次数进行了计算。我想提取具有最新时间戳的行以及删除不必要的先前计数。确保您的答案考虑了时间范围。一个拥有1台设备的用户每天或每周可以进行多次尝试,我想能够在每个特定窗口中检索带有最终计数的特定时刻。

我的初始数据集是这样的:

val df = sc.parallelize(Seq(
  ("user1", "iphone", "2017-12-22 10:06:18", "Success"),
  ("user1", "iphone", "2017-12-22 11:15:12",  "failed"),
  ("user1", "iphone", "2017-12-22 12:06:18", "Success"),
  ("user1", "iphone", "2017-12-22 09:15:12",  "failed"),
  ("user1", "iphone", "2017-12-20 10:06:18", "Success"),
  ("user1", "iphone", "2017-12-20 11:15:12",  "failed"),
  ("user1", "iphone", "2017-12-20 12:06:18", "Success"),
  ("user1", "iphone", "2017-12-20 09:15:12",  "failed"),
  ("user1", "android", "2017-12-20 09:25:20", "Success"),
  ("user1", "android", "2017-12-20 09:44:22", "Success"),
  ("user1", "android", "2017-12-20 09:58:22", "Success"),
  ("user1", "iphone", "2017-12-20 16:44:20", "Success"),
  ("user1", "iphone", "2017-12-20 16:44:25", "Success"),
  ("user1", "iphone", "2017-12-20 16:44:35", "Success")
)).toDF("username", "device", "date_time", "status")

我运行的代码以及得到的内容

// Basically I'm looking 1 day which is 86400 seconds
val w1 = Window.partitionBy("username", "device")
               .orderBy(col("date_time").cast("date_time").cast("long").desc)
               .rangeBetween(-86400, 0) 


val countEveryAttemptDF = df.withColumn("attempts", count("device").over(w1))

我现在有

// countEveryAttemptDF.show
+--------+--------------+---------------------+-------+--------+
|username|.       device|            date_time| status|attempts|
+--------+--------------+---------------------+-------+--------+
|   user1|       android|  2017-12-20 09:58:22|Success|       1|
|   user1|       android|  2017-12-20 09:44:22|Success|       2|
|   user1|       android|  2017-12-20 09:25:20|Success|       3|
|   user1|        iphone|  2017-12-22 12:06:18|Success|       1|
|   user1|        iphone|  2017-12-22 11:15:12| failed|       2|
|   user1|        iphone|  2017-12-22 10:06:18|Success|       3|
|   user1|        iphone|  2017-12-22 09:15:12| failed|       4|
|   user1|        iphone|  2017-12-20 16:44:35|Success|       1|
|   user1|        iphone|  2017-12-20 16:44:25|Success|       2|
|   user1|        iphone|  2017-12-20 16:44:20|Success|       3|
|   user1|        iphone|  2017-12-20 12:06:18|Success|       4|
|   user1|        iphone|  2017-12-20 11:15:12| failed|       5|
|   user1|        iphone|  2017-12-20 10:06:18|Success|       6|
|   user1|        iphone|  2017-12-20 09:15:12| failed|       7|
+--------+--------------+---------------------+-------+--------+

我要什么因此,通过确保我处于同一时间窗口中,我希望获得最新的时间戳及其计数。

+--------+--------------+---------------------+-------+--------+
|username|.       device|            date_time| status|attempts|
+--------+--------------+---------------------+-------+--------+
|  user1     |       android    |  2017-12-20 09:25:20|Success|       3|
|  user1     |        iphone    |  2017-12-22 09:15:12| failed|       4|
|  user1     |        iphone    |  2017-12-20 09:15:12| failed|       7|
+--------+--------------+---------------------+-------+--------+**
拉梅什·马哈然(Ramesh Maharjan)

你快到了您已经通过查看一天的范围来算出计数。现在,您要做的就是找出该天范围内的最新记录,可以通过在同一窗口功能上使用last来完成,但范围倒转

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._

def day(x: Int) = x * 86400

val w1 = Window.partitionBy("username", "device")
  .orderBy(col("date_time").cast("timestamp").cast("long").desc)
  .rangeBetween(-day(1), 0)
val w2 = Window.partitionBy("username", "device")
  .orderBy(col("date_time").cast("timestamp").cast("long").desc)
  .rangeBetween(0, day(1))

val countEveryAttemptDF = df.withColumn("attempts", count("application_id").over(w1))
                            .withColumn("att", last("attempts").over(w2))
                            .filter(col("attempts") === col("att"))
                            .drop("att")

这应该给你

+--------+--------------+---------------------+-------+--------+
|username|        device|            date_time| status|attempts|
+--------+--------------+---------------------+-------+--------+
|user1   |android       |2017-12-20 09:25:20  |Success|3       |
|user1   |iphone        |2017-12-22 09:15:12  | Failed|4       |
|user1   |iphone        |2017-12-20 09:15:12  | Failed|7       |
+--------+--------------+---------------------+-------+--------+

与以下评论中所述类似

1天有86400秒。我想回首1天。同样,3600秒是1个小时。1周内达到604,800秒

您可以如下将日功能更改为小时和周,并在窗口中使用它们 rangeBetween

def hour(x: Int) = x * 3600
def week(x: Int) = x * 604800

我希望答案是有帮助的

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章