【问题标题】:How to process Row number window function on incremental data如何处理增量数据的行号窗口函数
【发布时间】:2021-11-13 11:51:19
【问题描述】:

我有一个表,它作为一些 ID 运行的行号窗口函数。 现在,每次新数据到来时,它都会完全加载,并再次为它们分配新的行号。所以 Row Num 再次在整个数据集上运行,这是非常低效的,因为大量资源被消耗并且它使得它成为 CPU 密集型的。这张桌子每 15 到 30 分钟建立一次。我试图实现相同的目标,但使用增量,然后将增量的结果添加到特定客户 ID 的最后一个 row_count

所以当新记录出现时,我想保存该特定记录的最大 row_num 让我们说 max_row_num = 4 ,现在有两个新记录用于 ID ,所以增量的 row_num 是 1,2。最终输出应该是 4+1 和 4+2 的东西。所以新的行号看起来像 1,2,3,4,5,6 加上 1 和 2 到前一个 Row_num 的最大值。

我实际上想在我的 Pyspark 中实现逻辑!但我对 python 解决方案持开放态度,以后可能会转换为 pyspark DataFrame。

请帮助并提出可能的解决方案

满载——初始表

Row_num customer_ID
1 ABC123
2 ABC123
3 ABC123
1 ABC125
2 ABC125
1 ABC225
2 ABC225
3 ABC225
4 ABC225
5 ABC225

增量负载

Row_num customer_ID
1 ABC123
2 ABC123
1 ABC125
1 ABC225
2 ABC225
1 ABC330

期望的输出

Row_num customer_ID
1 ABC123
2 ABC123
3 ABC123
4 ABC123
1 ABC125
2 ABC125
3 ABC125
1 ABC225
2 ABC225
3 ABC225
4 ABC225
5 ABC225
6 ABC225
1 ABC330

【问题讨论】:

    标签: sql python-3.x apache-spark pyspark apache-spark-sql


    【解决方案1】:

    如果您尝试插入具有新行号的值,您可以加入最大现有行号:

    insert into full (row_num, customer_id)
        select i.row_number + coalesce(f.max_row_number, 0), i.customer_id
        from incremental i left join
             (select f.customer_id, max(row_number) as max_row_number
              from full f
              group by f.customer_id
             ) f
             on i.customer_id = f.customer_id;
    

    【讨论】:

    • 非常感谢,它运行良好。感谢您的意见
    猜你喜欢
    • 2016-07-01
    • 2022-01-25
    • 1970-01-01
    • 1970-01-01
    • 2019-01-26
    • 1970-01-01
    • 2013-09-28
    • 1970-01-01
    • 2011-12-20
    相关资源
    最近更新 更多