我有一个带有时间戳的时间访问数据库
ID, time
1, 1493596800
1, 1493596900
1, 1493432800
2, 1493596800
2, 1493596850
2, 1493432800
我使用spark SQL,并且每个ID都需要有最长的连续日期序列
ID, longest_seq (days)
1, 2
2, 5
3, 1
我尝试调整此答案,以适合我的情况使用SQL检测连续的日期范围,但是我没有达到我的期望。
SELECT ID, MIN (d), MAX(d)
FROM (
SELECT ID, cast(from_utc_timestamp(cast(time as timestamp), 'CEST') as date) AS d,
ROW_NUMBER() OVER(
PARTITION BY ID ORDER BY cast(from_utc_timestamp(cast(time as timestamp), 'CEST')
as date)) rn
FROM purchase
where ID is not null
GROUP BY ID, cast(from_utc_timestamp(cast(time as timestamp), 'CEST') as date)
)
GROUP BY ID, rn
ORDER BY ID
如果有人对如何解决此请求有任何线索,或者有什么问题,我将不胜感激。
[编辑]更明确的输入/输出
ID, time
1, 1
1, 2
1, 3
2, 1
2, 3
2, 4
2, 5
2, 10
2, 11
3, 1
3, 4
3, 9
3, 11
结果将是:
ID, MaxSeq (in days)
1,3
2,3
3,1
所有的访问都带有时间戳,但是我需要连续几天,因此每天的每次访问都按一天计算一次
我钟爱的窗口聚合函数就是这种情况!
我认为以下示例可以为您提供帮助(至少开始使用)。
以下是我使用的数据集。我将您的时间(长时间)转换为数字时间来表示日期(并避免弄乱Spark SQL中的时间戳,这可能会使解决方案更难理解...可能)。
在下面的visit
数据集中,time
列表示日期之间的天数,因此1
一一表示连续的天数。
scala> visits.show
+---+----+
| ID|time|
+---+----+
| 1| 1|
| 1| 1|
| 1| 2|
| 1| 3|
| 1| 3|
| 1| 3|
| 2| 1|
| 3| 1|
| 3| 2|
| 3| 2|
+---+----+
让我们定义窗口规范以将id
行分组在一起。
import org.apache.spark.sql.expressions.Window
val idsSortedByTime = Window.
partitionBy("id").
orderBy("time")
这样,您可以rank
对行和具有相同等级的行进行计数。
val answer = visits.
select($"id", $"time", rank over idsSortedByTime as "rank").
groupBy("id", "time", "rank").
agg(count("*") as "count")
scala> answer.show
+---+----+----+-----+
| id|time|rank|count|
+---+----+----+-----+
| 1| 1| 1| 2|
| 1| 2| 3| 1|
| 1| 3| 4| 3|
| 3| 1| 1| 1|
| 3| 2| 2| 2|
| 2| 1| 1| 1|
+---+----+----+-----+
这似乎(非常接近?)解决方案。您似乎完成了!
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句