我有一个表,它作为为某些 ID 运行的行号窗口函数。现在,每次新数据满载时,都会再次为它们分配新的行号。所以 Row Num 再次在整个数据集上运行,这是非常低效的,因为大量资源被消耗,并使其成为 CPU 密集型。这张桌子每 15 到 30 分钟建立一次。我试图实现同样的事情,但使用增量,然后将增量的结果添加到特定 customer_ID 的最后一个 row_count
因此,当新记录出现时,我想保存该特定记录的最大 row_num 假设 max_row_num = 4 ,现在有两个新记录用于 ID ,因此增量的 row_num 为 1,2。最终输出应该是 4+1 和 4+2 的东西。所以新的行号看起来像 1,2,3,4,5,6 加上 1 和 2 到前一个 Row_num 的最大值。
我实际上想在我的 Pyspark 中实现逻辑!但我对 python 解决方案持开放态度,然后可能会在以后转换为 pyspark DataFrame。
请帮助并提出可能的解决方案
满载——初始表
行数 | 顾客ID |
---|---|
1 | ABC123 |
2 | ABC123 |
3 | ABC123 |
1 | ABC125 |
2 | ABC125 |
1 | ABC225 |
2 | ABC225 |
3 | ABC225 |
4 | ABC225 |
5 | ABC225 |
增量负载
行数 | 顾客ID |
---|---|
1 | ABC123 |
2 | ABC123 |
1 | ABC125 |
1 | ABC225 |
2 | ABC225 |
1 | ABC330 |
期望的输出
行数 | 顾客ID |
---|---|
1 | ABC123 |
2 | ABC123 |
3 | ABC123 |
4 | ABC123 |
1 | ABC125 |
2 | ABC125 |
3 | ABC125 |
1 | ABC225 |
2 | ABC225 |
3 | ABC225 |
4 | ABC225 |
5 | ABC225 |
6 | ABC225 |
1 | ABC330 |
如果您尝试使用新行号插入值,您可以加入最大现有行号:
insert into full (row_num, customer_id)
select i.row_number + coalesce(f.max_row_number, 0), i.customer_id
from incremental i left join
(select f.customer_id, max(row_number) as max_row_number
from full f
group by f.customer_id
) f
on i.customer_id = f.customer_id;
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句