【问题标题】:Kafka Streams (Scala): Invalid topology: StateStore is not added yetKafka Streams (Scala):无效的拓扑:尚未添加 StateStore
【发布时间】:2019-05-08 18:13:04
【问题描述】:

我有一个拓扑,其中有一个流 A

从该流A,我创建了一个WindowedStore S

       A  --> [S]

然后我想根据S 上的数据对A 中的对象进行转换,并且这些转换后的对象也到达WindowStore 逻辑(通过transformValues)。

为此,我为此创建了一个 Transformer,创建了一个 Stream A',并让窗口知道它(即现在,S 将来自 A',而不是来自 A)。

  A -> A'  --> [S]
       ^__read__|

但我不能这样做,因为当我创建拓扑时,会引发异常:

Caused by: org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore storeName is not added yet.

有没有办法解决这个问题?这是一个限制吗?

代码示例:

  // A 
  val sessionElementsStream: KStream[K, SessionElement] = ...
  // A'
  val sessionElementsTransformed : KStream[K, SessionElementTransformed] = {
    // Here we use the sessionStoreName - but it is not added yet to the Topology
    sessionElementsStream.
      transformValues(sessionElementTransformerSupplier, sessionStoreName)
  }

  val sessionElementsWindowedStream: SessionWindowedKStream[K, SessionElementTransformed] = {
    sessionElementsTransformed.
      groupByKey(sessionElementTransformedGroupedBy).
      windowedBy(sessionWindows)
  }

  val sessionStore : KTable[Windowed[K], List[WindowedSession]] = 
    sessionElementsWindowedStream.aggregate(
        initializer = List.empty[WindowedSession])(
        aggregator = anAggregator, merger = aMerger)(materialized = getMaterializedMUPKSessionStore(sessionStoreName))     

最初的问题是,根据之前会话的值,我想在它之后更改会话。但是,如果我在会话之后在转换器中执行此操作,则可以更改这些转换后的会话并将其发送到下游-但它们不会在S 中反映它们的新状态-因此对商店的进一步请求将有旧值。

Kafka Streams 2.1,Scala 2.12.4。 共同划分的主题。

更新

在 DSL 中有一种方法可以做到这一点,使用额外的主题:

  • 发送A'to这个话题
  • 从此主题创建builder.stream 并以此为基础构建商店。
  • 在定义转换之前定义 Store(因此转换步骤可以使用 Store,因为它之前已经定义)。

但是,在这里必须使用额外的主题听起来很麻烦。没有其他更简单的方法来解决它吗?

【问题讨论】:

    标签: scala apache-kafka apache-kafka-streams


    【解决方案1】:

    但我不能这样做,因为当我创建拓扑时,会引发异常:

    Caused by: org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore storeName is not added yet.
    

    看起来您只是忘记将状态存储“添加”到您的处理拓扑中,然后将状态存储附加(“使可用”)到您的Transformer

    这里有一段代码 sn-p 演示了这一点(抱歉,在 Java 中)。

    将状态存储添加到您的拓扑中:

    final StreamsBuilder builder = new StreamsBuilder();
    final StoreBuilder<KeyValueStore<String, Long> myStateStore =
        Stores.keyValueStoreBuilder(
                 Stores.persistentKeyValueStore("my-state-store-name"),
                 Serdes.String(),
                 Serdes.Long())
              .withCachingEnabled();
    builder.addStateStore(myStateStore);
    

    将状态存储附加到您的Transformer

    final KStream<String, Double> stream = builder.stream("your-input-topic", Consumed.with(Serdes.String(), Serdes.Double()));
    
    final KStream<String, Long> transformedStream =
        stream.transform(new YourTransformer(myStateStore.name()), myStateStore.name());
    

    当然,您的Transformer 必须集成状态存储,代码如下(这个Transformer 读取&lt;String, Double&gt; 并写入String, Long&gt;)。

    class MyTransformer implements TransformerSupplier<String, Double, KeyValue<String, Long>> {
    
      private final String myStateStoreName;
    
      MyTransformer(final String myStateStoreName) {
        this.myStateStoreName = myStateStoreName;
      }
    
      @Override
      public Transformer<String, Double, KeyValue<String, Long>> get() {
        return new Transformer<String, Double, KeyValue<String, Long>>() {
    
          private KeyValueStore<String, Long> myStateStore;
          private ProcessorContext context;
    
          @Override
          public void init(final ProcessorContext context) {
            myStateStore = (KeyValueStore<String, Long>) context.getStateStore(myStateStoreName);
          }
    
       // ...
      }
    }
    

    【讨论】:

    • 谢谢。我通过StoreBuilder 为其他 PAPI 创建的表执行此操作。但在这种情况下,我通过 DSL 中的聚合创建商店 - 我也可以通过供应商定义它以添加该商店吗?或者,有没有办法让通过 DSL 聚合创建的商店的供应商?
    • 您还可以在使用 DSL 时命名或提供状态存储。请查看 API 文档,我不记得了。
    • 您可以通过Materialized 命名 DSL 创建的商店,如果这就是您所指的,但您可以在创建后这样做。为了创建商店,我需要输入流,而输入流又使用这个商店——它还没有定义,所以它给出了这个错误。第 22 条军规。
    • 不,您不仅可以使用Materialized 命名州商店,还可以创建商店(通过传递供应商,例如KeyValueBytesStoreSupplier)。见docs.confluent.io/current/streams/javadocs/org/apache/kafka/…
    • 在这种情况下,如何获取尚未创建的基于KTable的商店的供应商?
    猜你喜欢
    • 2017-06-09
    • 1970-01-01
    • 2019-10-23
    • 2021-12-02
    • 2020-08-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-05-04
    相关资源
    最近更新 更多