【发布时间】:2017-05-18 09:44:15
【问题描述】:
我正在使用带有状态存储的低级处理器 API。直到 0.10.0.1 它工作正常,但我已经升级了 Kafka Streams,我得到了以下错误。我发现这是由于更改日志,它正在查看记录上下文:
java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed
! at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:150)
! at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:60)
! at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:47)
! at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.put(ChangeLoggingKeyValueStore.java:66)
! at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$2.run(MeteredKeyValueStore.java:67)
@Override
public void process(String arg0, List<Data> data {
data.forEach((x) -> {
String rawKey = x.getId();
Data data = kvStore.get(rawKey);
long bytesize = data == null ? 0 : data.getVolume();
x.addVolume(bytesize);
kvStore.put(rawKey, x);
});
}
public void start() {
builder = new KStreamBuilder();
storeSupplier = Stores.create(getKVStoreName()).withKeys(getProcessorKeySerde()).withValues(getProcessorValueSerde()).persistent().build();
builder.addStateStore(storeSupplier);
stream = builder.stream(Serdes.String(), serde(),getTopicName());
processStream(stream);
streams = new KafkaStreams(builder, props);
streams.cleanUp();
streams.start();
}
@Override
public void init(ProcessorContext context) {
super.init(context);
this.context = context;
this.context.schedule(timeinterval);
this.kvStore = (KeyValueStore) context.getStateStore(getKVStoreName());
}
【问题讨论】:
-
如果我禁用了更改日志,则不会发生此问题。对于故障转移,这不好:(
-
你能分享你的代码吗?
-
@Override public void process(String arg0, List data { data.forEach((x) -> { String rawKey = x.getId(); Data data = kvStore.get(rawKey ); long bytesize = val == null ? 0 : data.getVolume(); x.addVolume(bytesize); kvStore.put(rawKey, x); }); }
-
很难说...你试过
0.10.2.1- 它包含几个错误修复。您的经纪人也在0.10.2? -
是的,我的经纪人也在 0.10.2.0