【问题标题】:Kafka Streams state store exception when putting a value放置值时Kafka Streams状态存储异常
【发布时间】:2017-05-18 09:44:15
【问题描述】:

我正在使用带有状态存储的低级处理器 API。直到 0.10.0.1 它工作正常,但我已经升级了 Kafka Streams,我得到了以下错误。我发现这是由于更改日志,它正在查看记录上下文:

java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed
! at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:150)
! at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:60)
! at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:47)
! at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.put(ChangeLoggingKeyValueStore.java:66)
! at     org.apache.kafka.streams.state.internals.MeteredKeyValueStore$2.run(MeteredKeyValueStore.java:67)

@Override
    public void process(String arg0, List<Data> data {
        data.forEach((x) -> {
            String rawKey = x.getId();
            Data data = kvStore.get(rawKey);
            long bytesize = data == null ? 0 : data.getVolume();
            x.addVolume(bytesize);
            kvStore.put(rawKey, x);
        });
    }
  
public void start() {
        builder = new KStreamBuilder();
        storeSupplier =     Stores.create(getKVStoreName()).withKeys(getProcessorKeySerde()).withValues(getProcessorValueSerde()).persistent().build();
        builder.addStateStore(storeSupplier);
        stream = builder.stream(Serdes.String(), serde(),getTopicName());
        processStream(stream);
        streams = new KafkaStreams(builder, props);
        streams.cleanUp();
        streams.start();
    }

    @Override
    public void init(ProcessorContext context) {
        super.init(context);
        this.context = context;
        this.context.schedule(timeinterval);
        this.kvStore = (KeyValueStore) context.getStateStore(getKVStoreName());
    }

【问题讨论】:

  • 如果我禁用了更改日志,则不会发生此问题。对于故障转移,这不好:(
  • 你能分享你的代码吗?
  • @Override public void process(String arg0, List data { data.forEach((x) -> { String rawKey = x.getId(); Data data = kvStore.get(rawKey ); long bytesize = val == null ? 0 : data.getVolume(); x.addVolume(bytesize); kvStore.put(rawKey, x); }); }
  • 很难说...你试过0.10.2.1 - 它包含几个错误修复。您的经纪人也在0.10.2?
  • 是的,我的经纪人也在 0.10.2.0

标签: apache-kafka-streams


【解决方案1】:

在多个流线程或分区中使用Processor相同实例时可能会出现此类异常。

确保您将新实例返回到ProcessorSupplier

new ProcesorSupplier(() -> new Processor(...

这同样适用于TransformerTransformerSupplier

广泛引用文档:

创建单个 Processor/Transformer 对象并在 ProcesorSupplier/TransformerSupplier#get() 中返回相同的对象引用将违反供应商模式并导致运行时异常。

【讨论】:

  • 可能会引发类似的异常,这是由同一件事引起的:org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch. 通常当涉及持久状态存储时,写入更改日志的生产者可能会引发此异常。
猜你喜欢
  • 1970-01-01
  • 2019-07-03
  • 1970-01-01
  • 1970-01-01
  • 2018-10-24
  • 2021-01-04
  • 1970-01-01
  • 2019-07-14
  • 2023-03-23
相关资源
最近更新 更多