【问题标题】:How do I set up a State Store for a Transformer如何为 Transformer 设置 State Store
【发布时间】:2018-05-26 16:30:56
【问题描述】:

我正在尝试创建一个 Transformer,但在初始化其 StateStore 时遇到了问题。我查看了How to register a stateless processor (that seems to require a StateStore as well)? 中的示例 这是有道理的,但我正在尝试不同的东西:

KeyValueBytesStoreSupplier groupToKVStore_supplier = 
    Stores.persistentKeyValueStore( state_store_name );
StoreBuilder< KeyValueStore< G, KeyValue< K, V > > > groupToKVStore_builder =
    Stores.keyValueStoreBuilder( groupToKVStore_supplier, Gserde, KVserde );
stream_builder.addStateStore( groupToKVStore_builder );

我的意图是使用 String 作为 State Store 键和 KeyValue 作为 State Store 值。上面的公式是否正确?我问是因为当包含我的 Transformer 的流启动时,它会抛出一个异常,上面写着:

Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Processor KSTREAM-TRANSFORM-0000000001 has no access to StateStore state_store_1582785598
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:72)
    at com.ui.streaming.processors.sort.WindowedTimeSorter.init(WindowedTimeSorter.java:135)
    at org.apache.kafka.streams.kstream.internals.KStreamTransform$KStreamTransformProcessor.init(KStreamTransform.java:51)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:54)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:10

根据 Matthias 的建议,我在我的 Stream 中的 transform 调用中添加了一个 StateStore 名称参数,这似乎使我们摆脱了上面显示的错误。但是,我们得到以下异常:

ERROR stream-thread [A.Completely.Different.appID-b04af4b4-fdbb-4353-9aa5-6d71f7c22f9e-StreamThread-1] Failed to process stream task 0_1 due to the following error: (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks:105) 
java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:153)
    at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:29)
    at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:198)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
    at com.ui.streaming.processors.sort.WindowedTimeSorter.transform(WindowedTimeSorter.java:167)
    at com.ui.streaming.processors.sort.WindowedTimeSorter.transform(WindowedTimeSorter.java:1)
    at org.apache.kafka.streams.kstream.internals.KStreamTransform$KStreamTransformProcessor.process(KStreamTransform.java:56)

唉,事情还是不太对劲:首先,我的 Transformer 的 init 方法被调用了 3 次;它应该只有一次,对吗?其次,当我的 Transformer 的 transform 方法第一次尝试将某些内容存储到 StateStore 中时,我遇到了运行时错误:

INFO stream-thread [A.Completely.Different.appID-7dc67466-20f4-4e6c-8a69-bc0710a42f3c-StreamThread-1] Shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread:1124) 
Exception in thread "A.Completely.Different.appID-7dc67466-20f4-4e6c-8a69-bc0710a42f3c-StreamThread-1" java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:153)
    at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:29)
    at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:198)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
    at com.ui.streaming.processors.sort.WindowedTimeSorter.transform(WindowedTimeSorter.java:155)

【问题讨论】:

  • 对我来说看起来是正确的。你的实际问题是什么?你试过了吗?
  • 添加了上述异常的描述。

标签: apache-kafka-streams


【解决方案1】:

仅将商店添加到拓扑中是不够的。您还需要通过将商店名称传递给 transform() 来将商店连接到转换器:

stream.transform(..., state_store_name);

更新:

对于第二个例外,我假设您在调用TransformerSupplier#get() 时不会返回新对象,但每次都返回相同的对象。正如“供应商模式”所建议的那样,每次调用#get() 时都需要创建一个新对象(否则,供应商将没有意义,并且可以直接提交单个对象)。比较常见问题:https://docs.confluent.io/current/streams/faq.html#why-do-i-get-an-illegalstateexception-when-accessing-record-metadata

【讨论】:

  • 我按照建议,通过了第一个问题,但遇到了第二个问题,我将其添加到原帖中。另外,在对stream.transform 的调用中省略state_store_name 不应该导致编译错误吗?
  • 扩展了我的答案。关于缺少商店名称:商店名称是可选的,因为 Transformer 也可以是无状态的。因此,在编译时,无法检查传入的 Transformer 是否实际上正在访问存储。
猜你喜欢
  • 2019-07-07
  • 2022-12-02
  • 1970-01-01
  • 2023-01-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-05-04
相关资源
最近更新 更多