【问题标题】:Kafka Streams App - count and sum aggregateKafka Streams App - 计数和总和
【发布时间】:2019-02-28 15:49:28
【问题描述】:

我正在尝试从 KGroupedStream 创建一个 KTable 来存储每个键的值的总和。

 final StreamsBuilder builder = new StreamsBuilder();
 final KTable<String, Long> sum = builder.stream("streams-plaintext-input")
            .groupByKey()
            .aggregate(new Initializer<Long>() {
                @Override
                public Long apply() {
                    return Long.MIN_VALUE;
                }
            }, new Aggregator<String, Long, Long>() {
                @Override
                public Long apply(final String key, final Long value,final Long aggregate) {
                    aggregate += value;
                    return aggregate;
                }
            }, Materialized.<String, Long, KeyValueStore<Byte, byte[]>>as("counts-store"));

但我得到了错误:

The method aggregate(Initializer<VR>, Aggregator<? super Object,? super Object,VR>, Materialized<Object,VR,KeyValueStore<Bytes,byte[]>>) in the type KGroupedStream<Object,Object> is not applicable for the arguments (new Initializer<Long>(){}, new Aggregator<String,Long,Long>(){}, Materialized<String,Long,KeyValueStore<Byte,byte[]>>)

我见过的所有示例都将 Serde 作为第三个参数传递,但我已经尝试过这个并得到一个非常相似的错误(我认为这可能来自旧版本,因为它与当前的签名不匹配实施?):

final StreamsBuilder builder = new StreamsBuilder();
    final KTable<String, Long> sum = builder.stream("streams-plaintext-input")
            .groupByKey()
            .aggregate(new Initializer<Long>() {
                @Override
                public Long apply() {
                    return Long.MIN_VALUE;
                }
            }, new Aggregator<String, Long, Long>() {
                @Override
                public Long apply(final String key, final Long value,final Long aggregate) {
                    aggregate += value;
                    return aggregate;
                }
            }, Serdes.Long());

错误:

The method aggregate(Initializer<VR>, Aggregator<? super Object,? super Object,VR>, Materialized<Object,VR,KeyValueStore<Bytes,byte[]>>) in the type KGroupedStream<Object,Object> is not applicable for the arguments (new Initializer<Long>(){}, new Aggregator<String,Long,Long>(){}, Serde<Long>)

我做错了什么?

使用Kafka版本:2.1.0

【问题讨论】:

  • 旁注:要计算总和,您的初始值应该为零,而不是 Long.MIN_VALUE 来计算正确的结果。计数类似。

标签: java apache-kafka apache-kafka-streams


【解决方案1】:

您的代码中存在一些问题:

  1. 对于Materialized.as 而不是java.lang.Byte 你应该通过org.apache.kafka.common.utils.Bytes
  2. 你不应该修改final变量:aggregate += value;
  3. 你必须在StreamsBuilder::stream调用(builder.&lt;String, Long&gt;stream("streams-plaintext-input"))中添加键和值的类型

修改后应该大致如下:

KTable<String, Long> sum = builder.<String, Long>stream("streams-plaintext-input")
        .groupByKey()
        .aggregate(new Initializer<Long>() {
            @Override
            public Long apply() {
                return Long.MIN_VALUE;
            }
        }, new Aggregator<String, Long, Long>() {
            @Override
            public Long apply(final String key, final Long value,final Long aggregate) {
                return aggregate + value;
            }
        }, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));

【讨论】:

  • 太棒了,非常感谢。 3是根本原因,现在很明显! 1我已经在做(从sn-p不清楚)。 2 在我修复 3 后出现。再次感谢
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-06-02
  • 2021-06-07
  • 2019-09-04
  • 1970-01-01
  • 1970-01-01
  • 2020-09-08
相关资源
最近更新 更多