【问题标题】:kafka-stream aggregation on multiple inputs with cogroup and filter使用 cogroup 和过滤器对多个输入进行 kafka-stream 聚合
【发布时间】:2022-10-02 20:55:24
【问题描述】:

我正在尝试在多个 (4) 输入主题上实现 kafka-stream 聚合。

让我们的主题是:A、B、C、D;

拓扑应该:

  • 从A和B拉2条单条消息,应用聚合,应用过滤器,存储在KTable上
  • 从C和D拉N条消息,应用聚合,存储在KTable上

未提供聚合器代码,但行为是:

  • 来自 B 的消息包含一个值,我们称 X
  • n 来自 C 和 D 的消息被处理为计数器递增,聚合对象应该对来自 C 的计数器 +1 和 +1 对来自 D 的计数器和最终
  • 过滤器应验证 X = C_counter + D_counter
  • 等式验证后,存储在 KTable 中
  • 过滤/存储后终于做点什么

这里的代码sn-p:

private Topology buildTopology() {
    StreamsBuilder streamsBuilder = new StreamsBuilder();

    // create the 4 streams, reading strings
    KStream<String, String> streamA_AsString = streamsBuilder.stream(DemoTopic_A);
    KStream<String, String> streamC_AsString = streamsBuilder.stream(DemoTopic_C);
    KStream<String, String> streamB_AsString = streamsBuilder.stream(DemoTopic_B);
    KStream<String, String> streamD_AsString = streamsBuilder.stream(DemoTopic_D);

    // map the strings to java object (the entity used for aggregation)
    KStream<String, DemoEntity> streamA = streamA_AsString.map(demoKeyValueMapper);
    KStream<String, DemoEntity> streamC = streamC_AsString.map(demoKeyValueMapper);
    KStream<String, DemoEntity> streamB = streamB_AsString.map(demoKeyValueMapper);
    KStream<String, DemoEntity> streamD = streamD_AsString.map(demoKeyValueMapper);

    // group the message/object by key
    final KGroupedStream<String, DemoEntity> streamA_Grouped = streamA.groupByKey();
    final KGroupedStream<String, DemoEntity> streamProgressGrouped = streamC.groupByKey();
    final KGroupedStream<String, DemoEntity> streamPushingGrouped = streamB.groupByKey();
    final KGroupedStream<String, DemoEntity> streamErrorGrouped = streamD.groupByKey();

    // instance the aggregator
    DemoAggregator demoAggregator = new DemoAggregator();

    // build the aggregation chain
    // using cogroup to group previous kgrouped, providing the aggregator
    streamA_Grouped
        .cogroup(demoAggregator)
        .cogroup(streamProgressGrouped, demoAggregator)
        .cogroup(streamPushingGrouped, demoAggregator)
        .cogroup(streamErrorGrouped, demoAggregator)
        // provide the initializer
        .aggregate(demoInitializer)
        // apply the filter and, at same time, store into KTable
        .filter(isCompleted, Named.as(DemoCompletionStorageTableName))
        // transform to stateless KStream for further usage
        // from here, no more stateful by changelog
        .toStream()
        .foreach((key, value) -> {
            // use values
            log.info(\"here we would use values for: { key:{}, message:{} }\", () -> key, () -> value);
        });

    return streamsBuilder.build();
}

不幸的是,拓扑不会启动,这是错误:

原因:org.apache.kafka.streams.errors.TopologyException:无效拓扑:处理器 COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition-filter 已添加。

似乎它已经将 COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition-filter 添加到对象 NodeFactory 中,因此例外。 来自 Kafka 依赖项的类是方法 \"addProcessor\" 上的 \"InternalTopologyBuilder\"。

在谷歌上搜索那个错误字符串,我只找到了 KafkaStreams 的源代码......没有其他 stackoverflow 问题,也没有论坛,什么都没有......

任何想法?

提前致谢

    标签: java apache-kafka apache-kafka-streams spring-kafka spring-cloud-stream


    【解决方案1】:

    你找到解决办法了吗? Kafka Streams v3.3.0 (Java 11/17) 遇到了完全相同的问题

    【讨论】:

    • 请不要添加我也是作为答案。它实际上并没有提供问题的答案。如果你有一个不同但相关的问题,那么ask它(如果它有助于提供上下文,请参考这个问题)。如果您对这个特定问题感兴趣,您可以upvote 它,留下comment,或者在您有足够的reputation 时启动bounty
    猜你喜欢
    • 1970-01-01
    • 2023-03-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-06-16
    • 1970-01-01
    • 2014-09-18
    相关资源
    最近更新 更多