【问题标题】:Where does internal exception in Kafka Streams come from?Kafka Streams 的内部异常从何而来?
【发布时间】:2018-04-02 21:32:20
【问题描述】:

我在尝试聚合 KGroupedStream 时遇到问题 其中 TsdbObject 是一个 POJO,它有一个方法 Double getValue()。以下陈述 显示 groupBy 和尝试的聚合:

KGroupedStream< String, TsdbObject > assets_grouped_by_parents =
    kstream.groupBy( group_by_parent_mapper, Serialized.with( Serdes.String(), tsdb_object_serde ) );

KTable< String, Double > sums_of_groups_by_parents = 
    assets_grouped_by_parents.aggregate( new SummerInitializer(), new SummerAggregator() );                                                  

聚合由以下类完成:

private class SummerAggregator implements Aggregator< String, TsdbObject, Double > {
    @Override
        public Double apply(String key, TsdbObject value, Double aggregate) {
    System.out.println( "SummerAggregator.apply:  key is " + key + ", value is " + value +
                ", aggregate is " + aggregate );
    return aggregate + value.getValue();
    }
}

private class SummerInitializer implements Initializer< Double > {
    @Override
        public Double apply() {
    // TODO Auto-generated method stub
    System.out.println( "SummerInitializer" );
    return 0.0;
    }
}

当我执行应用程序时,我得到以下异常:

Encountered the following error during processing: 
java.lang.ClassCastException: [B cannot be cast to java.lang.Double
    at com.ui.kafka.experiments.metrics.TsdbObjectRollUp$SummerAggregator.apply(TsdbObjectRollUp.java:1)
    at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:79)

KStreamAggregate 中的引用行是:

// try to add the new new value
if (value != null) {
    newAgg = aggregator.apply(key, value, newAgg);
}

奇怪的是newAgg的值,本来应该是Double,却是:

[0, 0, 0, 0, 0, 0, 0, 21]

这当然不能转换为 Double。 这个奇怪的值是从哪里来的?

【问题讨论】:

    标签: apache-kafka-streams


    【解决方案1】:

    您需要使用可选参数Materialized.withValueSerde()assets_grouped_by_parents.aggregate(...) 的结果值类型传入DoubleSerde

    KTable<String, Double> sums_of_groups_by_parents = 
        assets_grouped_by_parents.aggregate(
            new SummerInitializer(),
            new SummerAggregator(),
            Materialized.withValueSerde(Serdes.DoubleSerde()));
    

    如果未在配置中将其设置为默认 serde,您可能还需要为密钥指定 StringSerde

    【讨论】:

    • 马蒂亚斯,谢谢。我最终使用: KTable sums_of_groups_by_parents = assets_grouped_by_parents.aggregate( new SummerInitializer(), new SummerAggregator(),Materialized.with( Serdes.String(), Serdes.Double()) );
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-09-05
    • 1970-01-01
    • 2018-10-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多