【发布时间】:2018-04-02 21:32:20
【问题描述】:
我在尝试聚合 KGroupedStream 时遇到问题
其中 TsdbObject 是一个 POJO,它有一个方法 Double getValue()。以下陈述
显示 groupBy 和尝试的聚合:
KGroupedStream< String, TsdbObject > assets_grouped_by_parents =
kstream.groupBy( group_by_parent_mapper, Serialized.with( Serdes.String(), tsdb_object_serde ) );
KTable< String, Double > sums_of_groups_by_parents =
assets_grouped_by_parents.aggregate( new SummerInitializer(), new SummerAggregator() );
聚合由以下类完成:
private class SummerAggregator implements Aggregator< String, TsdbObject, Double > {
@Override
public Double apply(String key, TsdbObject value, Double aggregate) {
System.out.println( "SummerAggregator.apply: key is " + key + ", value is " + value +
", aggregate is " + aggregate );
return aggregate + value.getValue();
}
}
private class SummerInitializer implements Initializer< Double > {
@Override
public Double apply() {
// TODO Auto-generated method stub
System.out.println( "SummerInitializer" );
return 0.0;
}
}
当我执行应用程序时,我得到以下异常:
Encountered the following error during processing:
java.lang.ClassCastException: [B cannot be cast to java.lang.Double
at com.ui.kafka.experiments.metrics.TsdbObjectRollUp$SummerAggregator.apply(TsdbObjectRollUp.java:1)
at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:79)
KStreamAggregate 中的引用行是:
// try to add the new new value
if (value != null) {
newAgg = aggregator.apply(key, value, newAgg);
}
奇怪的是newAgg的值,本来应该是Double,却是:
[0, 0, 0, 0, 0, 0, 0, 21]
这当然不能转换为 Double。 这个奇怪的值是从哪里来的?
【问题讨论】: