【发布时间】:2018-01-11 18:20:07
【问题描述】:
我正在使用流 DSL 对名为 users 的主题进行重复数据删除:
topology.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("users"), byteStringSerde, userSerde));
KStream<ByteString, User> users = topology.stream("users", Consumed.with(byteStringSerde, userSerde));
users.transform(() -> new Transformer<ByteString, User, KeyValue<ByteString, User>>() {
private KeyValueStore<ByteString, User> store;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
store = (KeyValueStore<ByteString, User>) context.getStateStore("users");
}
@Override
public KeyValue<ByteString, User> transform(ByteString key, User value) {
User user = store.get(key);
if (user != null) {
store.put(key, value);
return new KeyValue<>(key, value);
}
return null;
}
@Override
public KeyValue<ByteString, User> punctuate(long timestamp) {
return null;
}
@Override
public void close() {
}
}, "users");
鉴于此代码,Kafka Streams 为 users 存储创建一个内部更改日志主题。我想知道,有什么方法可以使用现有的 users 主题,而不是创建一个基本相同的变更日志主题?
附言。我看到StreamsBuilder 说这是可能的:
但是,由于原始输入主题可用于恢复,因此不会创建内部更改日志主题
但是按照InternalStreamsBuilder#table()和InternalStreamsBuilder#createKTable()的代码,我看不到它是如何实现这种效果的。
【问题讨论】:
-
考虑过这个问题一段时间,我明白原来的
users主题和users存储更改日志主题在语义/概念上有何不同。所以也许这就是问题的症结所在。
标签: apache-kafka apache-kafka-streams