如何处理增量数据的行号窗口函数

大数据艺术家

我有一个表,它作为为某些 ID 运行的行号窗口函数。现在,每次新数据满载时,都会再次为它们分配新的行号。所以 Row Num 再次在整个数据集上运行,这是非常低效的,因为大量资源被消耗,并使其成为 CPU 密集型。这张桌子每 15 到 30 分钟建立一次。我试图实现同样的事情,但使用增量,然后将增量的结果添加到特定 customer_ID 的最后一个 row_count

因此,当新记录出现时,我想保存该特定记录的最大 row_num 假设 max_row_num = 4 ,现在有两个新记录用于 ID ,因此增量的 row_num 为 1,2。最终输出应该是 4+1 和 4+2 的东西。所以新的行号看起来像 1,2,3,4,5,6 加上 1 和 2 到前一个 Row_num 的最大值。

我实际上想在我的 Pyspark 中实现逻辑!但我对 python 解决方案持开放态度,然后可能会在以后转换为 pyspark DataFrame。

请帮助并提出可能的解决方案

满载——初始表

行数 顾客ID
1 ABC123
2 ABC123
3 ABC123
1 ABC125
2 ABC125
1 ABC225
2 ABC225
3 ABC225
4 ABC225
5 ABC225

增量负载

行数 顾客ID
1 ABC123
2 ABC123
1 ABC125
1 ABC225
2 ABC225
1 ABC330

期望的输出

行数 顾客ID
1 ABC123
2 ABC123
3 ABC123
4 ABC123
1 ABC125
2 ABC125
3 ABC125
1 ABC225
2 ABC225
3 ABC225
4 ABC225
5 ABC225
6 ABC225
1 ABC330
戈登·利诺夫

如果您尝试使用新行号插入值,您可以加入最大现有行号:

insert into full (row_num, customer_id)
    select i.row_number + coalesce(f.max_row_number, 0), i.customer_id
    from incremental i left join
         (select f.customer_id, max(row_number) as max_row_number
          from full f
          group by f.customer_id
         ) f
         on i.customer_id = f.customer_id;

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章