【问题标题】:kafka incremental aggregationkafka增量聚合
【发布时间】:2020-02-23 17:53:30
【问题描述】:

我有一个 kafka 主题中的数字流 delta,需要以特殊方式聚合,即:

aggregate[0] = 0
aggregate[N] = aggregate[N-1] * (N - 1) / N + delta[N - 1] / N

(确切的公式无关紧要,但请注意 aggregate 中对前一个元素的依赖)

本质上,我需要同时订阅两个kafka主题,其中我同时在两个主题中前进:当我阅读delta主题中的一个项目时,我需要从@987654325中阅读相应的项目@topic 也一样,并将结果写入aggregate topic,在delta topic 中的下一项被消费之前。

这在kafka中是否可能? ksql 有一个聪明的 join 帮助吗?

【问题讨论】:

  • 请注意,不能保证主题中元素的顺序。您只能在分区级别使用它。
  • @TobiSH 我想这对我来说没问题,只要它不会导致竞争条件

标签: apache-kafka ksqldb


【解决方案1】:

我想知道我的伪代码是否有帮助。 假设有两个主题,“delta”和“aggregate”。 并且两个主题的分区都是1以简化情况(以便我们获得全局顺序)

# this is just pseudocode to show my thoughts
def demo():
    delta_consumer = Consumer("delta")
    aggregate_consumer = Consumer("aggregate")
    aggregate_producer = Producer("aggregate")

    is_pre_aggregate_result_exists = aggregate_consumer.get_offset() != 0 # check whether it's first running 
    for delta_data in delta_consumer.poll():
        if not is_pre_aggregate_result_exists:
            last_aggregate_result = 0
        else:
            last_aggregate_result = aggregate_consumer.get_last_record()
        new_aggregate_result = user_define_function(delta_data, last_aggregate_result)
        aggregate_producer.producer(new_aggregate_result)
        is_pre_aggregate_result_exists = True

同时,我想 kafka+structurd-steaming 可以解决您的问题,因为您问题的内部需要是在流表上获取 aggregate_result,然后将结果输出到 kafka 主题,其中 kafka+structured-steraming 是一个完美的解决方案.

【讨论】:

  • 这行得通,我想说,但前提是只有一个这样的工人,对吧?如果我们想扩展它,这不会导致两个工作人员同时从aggregate 读取值,然后继续计算新值,然后写入aggregate 的竞争条件。还是我在这里遗漏了一些重要的东西?
  • 其实我觉得mutil-worker是不需要的更好的计划。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-09-03
  • 1970-01-01
  • 2023-03-26
  • 1970-01-01
  • 2019-06-06
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多