【发布时间】:2020-06-16 10:19:30
【问题描述】:
我需要能够从单独的流处理器中从 Ktable 中删除记录。今天我正在使用 aggregate() 并传递一个物化状态存储。在从“终止”主题读取的单独处理器中,我想在 .transform() 或不同的 .aggregate() 中查询物化状态存储并“删除”该键/值。每次我尝试从单独的流处理器访问物化状态时,它一直告诉我存储没有添加到拓扑中,所以我添加它并再次运行它,然后它告诉我它已经注册并且错误出去。
builder.stream("topic1").map().groupByKey().aggregate(() -> null,
(aggKey, newValue, aggValue) -> {
//add to the Ktable
return newValue;
},
stateStoreMaterialized);
在一个单独的流中,我想从该 stateStoreMaterialized 中删除一个键
builder.stream("topic2")
.transform(stateStoreDeleteTransformer, stateStoreSupplier.name())
stateStoreDeleteTransformer 将查询密钥并将其删除。
//in ctor
KeyValueBytesStoreSupplier stateStoreSupplier = Stores.persistentKeyValueStore("store1");
stateStoreMaterialized = Materialized.<String, MyObj>as(stateStoreSupplier)
.withKeySerde(Serdes.String())
.withValueSerde(mySerDe);
我的 topic1 流对象值上没有可以触发删除的终端标志。它必须来自另一个流/主题。
当我尝试在两个单独的流处理器上使用相同的 Materialized Store 时,我得到了..
Invalid topology: Topic STATE_STORE-repartition has already been registered by another source.
at org.springframework.kafka.config.StreamsBuilderFactoryBean.start(StreamsBuilderFactoryBean.java:268)
编辑:
这是我收到的第一个错误。
原因:org.apache.kafka.streams.errors.StreamsException:处理器 KSTREAM-TRANSFORMVALUES-0000000012 无法访问 StateStore store1,因为存储未连接到处理器。如果您通过“.addStateStore()”手动添加商店,请确保通过将处理器名称提供给“.addStateStore()”或通过“.connectProcessorAndStateStores()”将它们连接到处理器。 DSL 用户需要将 store 名称提供给 '.process()'、'.transform()' 或 '.transformValues()' 以将 store 连接到相应的 operator。如果您不手动添加商店,请在https://issues.apache.org/jira/projects/KAFKA 提交错误报告。
在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:104) 在 org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext.getStateStore(ForwardingDisabledProcessorContext.java:85)
那么我这样做:
stateStoreSupplier = Stores.persistentKeyValueStore(STATE_STORE_NAME);
storeStoreBuilder = Stores.keyValueStoreBuilder(stateStoreSupplier, Serdes.String(), jsonSerDe);
stateStoreMaterialized = Materialized.as(stateStoreSupplier);
然后我得到这个错误:
原因:org.apache.kafka.streams.errors.TopologyException:无效拓扑:StateStore 'state-store' 已添加。 在 org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addStateStore(InternalTopologyBuilder.java:520) 在 org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addStateStore(InternalTopologyBuilder.java:512)
这是解决我的问题的代码。事实证明,在构建流时,顺序很重要。必须先设置实体化存储,然后在后续代码行中设置转换器。
/**
* Create the streams using the KStreams DSL - a method to configure the stream and add any state stores.
*/
@Bean
public KafkaStreamsConfig setup() {
final JsonSerDe<Bus> ltaSerde = new JsonSerDe<>(Bus.class);
final StudentSerde<Student> StudentSerde = new StudentSerde<>();
//start lta stream
KStream<String, Bus> ltaStream = builder
.stream(ltaInputTopic, Consumed.with(Serdes.String(), ltaSerde));
final KStream<String, Student> statusStream = this.builder
.stream(this.locoStatusInputTopic,
Consumed.with(Serdes.String(),
StudentSerde));
//create lta store
KeyValueBytesStoreSupplier ltaStateStoreSupplier = Stores.persistentKeyValueStore(LTA_STATE_STORE_NAME);
final Materialized<String, Bus, KeyValueStore<Bytes, byte[]>> ltaStateStoreMaterialized =
Materialized.
<String, Bus>as(ltaStateStoreSupplier)
.withKeySerde(Serdes.String())
.withValueSerde(ltaSerde);
KTable<String, Bus> ltaStateProcessor = ltaStream
//map and convert lta stream into Loco / LTA key value pairs
.groupByKey(Grouped.with(Serdes.String(), ltaSerde))
.aggregate(
//The 'aggregate' and 'reduce' functions ignore messages with null values FYI.
// so if the value after the groupbykey produces a null value, it won't be removed from the state store.
//which is why it's very important to send a message with some terminal flag indicating this value should be removed from the store.
() -> null, /* initializer */
(aggKey, newValue, aggValue) -> {
if (null != newValue.getAssociationEndTime()) { //if there is an endTime associated to this train/loco then remove it from the ktable
logger.trace("removing LTA: {} loco from {} train", newValue.getLocoId(), newValue.getTrainAuthorization());
return null; //Returning null removes the record from the state store as well as its changelog topic. re: https://objectpartners.com/2019/07/31/slimming-down-your-kafka-streams-data/
}
logger.trace("adding LTA: {} loco from {} train", newValue.getLocoId(), newValue.getTrainAuthorization());
return newValue;
}, /* adder */
ltaStateStoreMaterialized
);
// don't need builder.addStateStore(keyValueStoreStoreBuilder); and CANT use it
// because the ltaStateStoreMaterialized will already be added to the topology in the KTable aggregate method above.
// The below transformer can use the state store because it's already added (apparently) by the aggregate method.
// Add the KTable processors first, then if there are any transformers that need to use the store, add them after the KTable aggregate method.
statusStream.map((k, v) -> new KeyValue<>(v.getLocoId(), v))
.transform(locoStatusTransformerSupplier, ltaStateStoreSupplier.name())
.to("testing.outputtopic", Produced.with(Serdes.String(), StudentSerde));
return this; //can return anything except for void.
}
【问题讨论】:
-
理论上,您应该能够将状态存储从 KTable 连接到转换器 - 请注意,尽管构建拓扑是“惰性的”,因此似乎在之后添加了
KTable存储您尝试将其附加到变压器上——也许您可以更改对StreamsBuilder进行相应调用的顺序?此外,您是否启用了拓扑优化(如果是,这也可能会更改顺序)。 -- 手动添加状态存储不适用于这种情况。 -
我会尝试并报告回来。同时,直接从状态存储中删除它(通过转换)是否也会从 KTable 和更改日志主题中“删除”它(墓碑)?
-
是的,会的。
KTable只是存储周围的逻辑抽象,更改日志也是存储本身的一部分。 -
我能够通过转换访问商店。事实证明,streambuilder 的顺序很重要。我移动了我的代码行,首先使用物化存储创建 KTable,然后创建另一个使用转换来访问存储的流处理器。我不需要手动将 store 添加到 builder.addStore() 中,aggregate() 在幕后完成。
-
正如我所怀疑的 :) - 因为内部
KTable状态是内部的,所以从不同的处理器访问它实际上是一种黑客行为......这就是为什么它没有真正记录如何做它。
标签: apache-kafka apache-kafka-streams