【问题标题】:How to access a KStreams Materialized State Store from another Stream Processor如何从另一个流处理器访问 KStreams 物化状态存储
【发布时间】:2020-06-16 10:19:30
【问题描述】:

我需要能够从单独的流处理器中从 Ktable 中删除记录。今天我正在使用 aggregate() 并传递一个物化状态存储。在从“终止”主题读取的单独处理器中,我想在 .transform() 或不同的 .aggregate() 中查询物化状态存储并“删除”该键/值。每次我尝试从单独的流处理器访问物化状态时,它一直告诉我存储没有添加到拓扑中,所以我添加它并再次运行它,然后它告诉我它已经注册并且错误出去。

      builder.stream("topic1").map().groupByKey().aggregate(() -> null,
        (aggKey, newValue, aggValue) -> {
          //add to the Ktable
          return newValue;
        },
        stateStoreMaterialized);

在一个单独的流中,我想从该 stateStoreMaterialized 中删除一个键

builder.stream("topic2")
.transform(stateStoreDeleteTransformer, stateStoreSupplier.name())

stateStoreDeleteTransformer 将查询密钥并将其删除。

//in ctor
        KeyValueBytesStoreSupplier stateStoreSupplier = Stores.persistentKeyValueStore("store1");
    stateStoreMaterialized = Materialized.<String, MyObj>as(stateStoreSupplier)
        .withKeySerde(Serdes.String())
        .withValueSerde(mySerDe);

我的 topic1 流对象值上没有可以触发删除的终端标志。它必须来自另一个流/主题。

当我尝试在两个单独的流处理器上使用相同的 Materialized Store 时,我得到了..

 Invalid topology: Topic STATE_STORE-repartition has already been registered by another source.
    at org.springframework.kafka.config.StreamsBuilderFactoryBean.start(StreamsBuilderFactoryBean.java:268)

编辑:

这是我收到的第一个错误。

原因:org.apache.kafka.streams.errors.StreamsException:处理器 KSTREAM-TRANSFORMVALUES-0000000012 无法访问 StateStore store1,因为存储未连接到处理器。如果您通过“.addStateStore()”手动添加商店,请确保通过将处理器名称提供给“.addStateStore()”或通过“.connectProcessorAndStateStores()”将它们连接到处理器。 DSL 用户需要将 store 名称提供给 '.process()'、'.transform()' 或 '.transformValues()' 以将 store 连接到相应的 operator。如果您不手动添加商店,请在https://issues.apache.org/jira/projects/KAFKA 提交错误报告。

在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:104) 在 org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext.getStateStore(ForwardingDisabledProcessorContext.java:85)

那么我这样做:

stateStoreSupplier = Stores.persistentKeyValueStore(STATE_STORE_NAME);
storeStoreBuilder = Stores.keyValueStoreBuilder(stateStoreSupplier, Serdes.String(), jsonSerDe);
stateStoreMaterialized = Materialized.as(stateStoreSupplier);

然后我得到这个错误:

原因:org.apache.kafka.streams.errors.TopologyException:无效拓扑:StateStore 'state-store' 已添加。 在 org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addStateStore(InternalTopologyBuilder.java:520) 在 org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addStateStore(InternalTopologyBuilder.java:512)


这是解决我的问题的代码。事实证明,在构建流时,顺序很重要。必须先设置实体化存储,然后在后续代码行中设置转换器。

  /**
   * Create the streams using the KStreams DSL - a method to configure the stream and add any state stores.
   */
  @Bean
  public KafkaStreamsConfig setup() {

    final JsonSerDe<Bus> ltaSerde = new JsonSerDe<>(Bus.class);
    final StudentSerde<Student> StudentSerde = new StudentSerde<>();
    //start lta stream
    KStream<String, Bus> ltaStream = builder
        .stream(ltaInputTopic, Consumed.with(Serdes.String(), ltaSerde));

    final KStream<String, Student> statusStream = this.builder
        .stream(this.locoStatusInputTopic,
            Consumed.with(Serdes.String(),
                StudentSerde));

    //create lta store
    KeyValueBytesStoreSupplier ltaStateStoreSupplier = Stores.persistentKeyValueStore(LTA_STATE_STORE_NAME);

    final Materialized<String, Bus, KeyValueStore<Bytes, byte[]>> ltaStateStoreMaterialized =
        Materialized.
            <String, Bus>as(ltaStateStoreSupplier)
            .withKeySerde(Serdes.String())
            .withValueSerde(ltaSerde);

    KTable<String, Bus> ltaStateProcessor = ltaStream
        //map and convert lta stream into Loco / LTA key value pairs
        .groupByKey(Grouped.with(Serdes.String(), ltaSerde))
        .aggregate(
            //The 'aggregate' and 'reduce' functions ignore messages with null values FYI.
            // so if the value after the groupbykey produces a null value, it won't be removed from the state store.
            //which is why it's very important to send a message with some terminal flag indicating this value should be removed from the store.
            () -> null, /* initializer */
            (aggKey, newValue, aggValue) -> {
              if (null != newValue.getAssociationEndTime()) { //if there is an endTime associated to this train/loco then remove it from the ktable
                logger.trace("removing LTA: {} loco from {} train", newValue.getLocoId(), newValue.getTrainAuthorization());
                return null; //Returning null removes the record from the state store as well as its changelog topic. re: https://objectpartners.com/2019/07/31/slimming-down-your-kafka-streams-data/
              }
              logger.trace("adding LTA: {} loco from {} train", newValue.getLocoId(), newValue.getTrainAuthorization());
              return newValue;
            }, /* adder */
            ltaStateStoreMaterialized
        );

    // don't need builder.addStateStore(keyValueStoreStoreBuilder); and CANT use it
    // because the ltaStateStoreMaterialized will already be added to the topology in the KTable aggregate method above.
    // The below transformer can use the state store because it's already added (apparently) by the aggregate method.
    // Add the KTable processors first, then if there are any transformers that need to use the store, add them after the KTable aggregate method.


    statusStream.map((k, v) -> new KeyValue<>(v.getLocoId(), v))
        .transform(locoStatusTransformerSupplier, ltaStateStoreSupplier.name())
        .to("testing.outputtopic", Produced.with(Serdes.String(), StudentSerde));

    return this; //can return anything except for void.
  }

【问题讨论】:

  • 理论上,您应该能够将状态存储从 KTable 连接到转换器 - 请注意,尽管构建拓扑是“惰性的”,因此似乎在之后添加了 KTable 存储您尝试将其附加到变压器上——也许您可以更改对StreamsBuilder 进行相应调用的顺序?此外,您是否启用了拓扑优化(如果是,这也可能会更改顺序)。 -- 手动添加状态存储不适用于这种情况。
  • 我会尝试并报告回来。同时,直接从状态存储中删除它(通过转换)是否也会从 KTable 和更改日志主题中“删除”它(墓碑)?
  • 是的,会的。 KTable 只是存储周围的逻辑抽象,更改日志也是存储本身的一部分。
  • 我能够通过转换访问商店。事实证明,streambuilder 的顺序很重要。我移动了我的代码行,首先使用物化存储创建 KTable,然后创建另一个使用转换来访问存储的流处理器。我不需要手动将 store 添加到 builder.addStore() 中,aggregate() 在幕后完成。
  • 正如我所怀疑的 :) - 因为内部 KTable 状态是内部的,所以从不同的处理器访问它实际上是一种黑客行为......这就是为什么它没有真正记录如何做它。

标签: apache-kafka apache-kafka-streams


【解决方案1】:
  1. stateStoreMaterializedstateStoreSupplier.name()同名吗?

  2. 使用你的拓扑有错误

KStream.transform(stateStoreDeleteTransformer, stateStoreSupplier.name())

您必须在 TransformerSupplier 中为每个 ProcessContext 提供新的 StateStoreDeleteTransformer 瞬间,如下所示:

KStream.transform(StateStoreDeleteTransformer::new, stateStoreSupplier.name()) 
or
KStream.transform(() -> StateStoreDeleteTransformerSupplier.get(), stateStoreSupplier.name())//StateStoreDeleteTransformerSupplier return new instant of StateStoreDeleteTransformer

  1. stateStoreDeleteTransformer 中你打算如何直接在变压器内部使用stateStoreMaterialized? 我有类似的用例,我使用KeyValueStore&lt;String, MyObj&gt;
public void init(ProcessorContext context) {
        kvStore = (KeyValueStore<String, MyObj>) context.getStateStore("store1");
    }

【讨论】:

  • 我用更多上下文更新了我的答案。 1)是的,他们有相同的名字。 2)我解决了这个问题。 3)我认为使用状态存储的唯一方法是使用 context.getStateStore - 我认为它不会返回物化存储,而是 KeyValueStore。我不认为 KStreams 开发人员打算让用户在 aggregate() 方法之外修改物化状态存储。所以我认为不可能在转换操作中管理 KTable 和 Materialized 存储的内部状态。 :( 我想我会放弃使用 Ktable 连接并使用转换方法作为合成连接。
  • KeyValueBytesStoreSupplier 用于创建商店,然后将用于创建StoreBuilder 以添加到拓扑中,您​​能分享一下您是如何构建拓扑的
  • btw Materialized 只是用来描述你如何存储StateStore,然后你可以使用StateStoreTransformerProcessor 中使用getStateStore 来使用Processor
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-10-26
  • 1970-01-01
  • 2022-01-01
  • 2017-03-07
相关资源
最近更新 更多