【发布时间】:2019-05-08 18:13:04
【问题描述】:
我有一个拓扑,其中有一个流 A。
从该流A,我创建了一个WindowedStore S。
A --> [S]
然后我想根据S 上的数据对A 中的对象进行转换,并且这些转换后的对象也到达WindowStore 逻辑(通过transformValues)。
为此,我为此创建了一个 Transformer,创建了一个 Stream A',并让窗口知道它(即现在,S 将来自 A',而不是来自 A)。
A -> A' --> [S]
^__read__|
但我不能这样做,因为当我创建拓扑时,会引发异常:
Caused by: org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore storeName is not added yet.
有没有办法解决这个问题?这是一个限制吗?
代码示例:
// A
val sessionElementsStream: KStream[K, SessionElement] = ...
// A'
val sessionElementsTransformed : KStream[K, SessionElementTransformed] = {
// Here we use the sessionStoreName - but it is not added yet to the Topology
sessionElementsStream.
transformValues(sessionElementTransformerSupplier, sessionStoreName)
}
val sessionElementsWindowedStream: SessionWindowedKStream[K, SessionElementTransformed] = {
sessionElementsTransformed.
groupByKey(sessionElementTransformedGroupedBy).
windowedBy(sessionWindows)
}
val sessionStore : KTable[Windowed[K], List[WindowedSession]] =
sessionElementsWindowedStream.aggregate(
initializer = List.empty[WindowedSession])(
aggregator = anAggregator, merger = aMerger)(materialized = getMaterializedMUPKSessionStore(sessionStoreName))
最初的问题是,根据之前会话的值,我想在它之后更改会话。但是,如果我在会话之后在转换器中执行此操作,则可以更改这些转换后的会话并将其发送到下游-但它们不会在S 中反映它们的新状态-因此对商店的进一步请求将有旧值。
Kafka Streams 2.1,Scala 2.12.4。 共同划分的主题。
更新
在 DSL 中有一种方法可以做到这一点,使用额外的主题:
- 发送A'
to这个话题 - 从此主题创建
builder.stream并以此为基础构建商店。 - 在定义转换之前定义 Store(因此转换步骤可以使用 Store,因为它之前已经定义)。
但是,在这里必须使用额外的主题听起来很麻烦。没有其他更简单的方法来解决它吗?
【问题讨论】:
标签: scala apache-kafka apache-kafka-streams