【问题标题】:How can I create a state store that is restorable from an existing changelog topic?如何创建可从现有变更日志主题恢复的状态存储?
【发布时间】:2018-01-11 18:20:07
【问题描述】:

我正在使用流 DSL 对名为 users 的主题进行重复数据删除:

topology.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("users"), byteStringSerde, userSerde));                                                                                                                                                   

KStream<ByteString, User> users = topology.stream("users", Consumed.with(byteStringSerde, userSerde));                                                                                                                                                                      

users.transform(() -> new Transformer<ByteString, User, KeyValue<ByteString, User>>() {                                                                                                                                                                                     

    private KeyValueStore<ByteString, User> store;                                                                                                                                                                                                                          

    @Override                                                                                                                                                                                                                                                               
    @SuppressWarnings("unchecked")                                                                                                                                                                                                                                          
    public void init(ProcessorContext context) {                                                                                                                                                                                                                            
        store = (KeyValueStore<ByteString, User>) context.getStateStore("users");                                                                                                                                                                                           
    }                                                                                                                                                                                                                                                                       

    @Override                                                                                                                                                                                                                                                               
    public KeyValue<ByteString, User> transform(ByteString key, User value) {                                                                                                                                                                                               
        User user = store.get(key);                                                                                                                                                                                                                                         
        if (user != null) {                                                                                                                                                                                                                                                 
            store.put(key, value);                                                                                                                                                                                                                                          
            return new KeyValue<>(key, value);                                                                                                                                                                                                                              
        }                                                                                                                                                                                                                                                                   
        return null;                                                                                                                                                                                                                                                        
    }                                                                                                                                                                                                                                                                       

    @Override                                                                                                                                                                                                                                                               
    public KeyValue<ByteString, User> punctuate(long timestamp) {                                                                                                                                                                                                           
        return null;                                                                                                                                                                                                                                                        
    }                                                                                                                                                                                                                                                                       

    @Override                                                                                                                                                                                                                                                               
    public void close() {                                                                                                                                                                                                                                                   

    }                                                                                                                                                                                                                                                                       
}, "users"); 

鉴于此代码,Kafka Streams 为 users 存储创建一个内部更改日志主题。我想知道,有什么方法可以使用现有的 users 主题,而不是创建一个基本相同的变更日志主题?

附言。我看到StreamsBuilder 说这是可能的:

但是,由于原始输入主题可用于恢复,因此不会创建内部更改日志主题

但是按照InternalStreamsBuilder#table()InternalStreamsBuilder#createKTable()的代码,我看不到它是如何实现这种效果的。

【问题讨论】:

  • 考虑过这个问题一段时间,我明白原来的users 主题和users 存储更改日志主题在语义/概念上有何不同。所以也许这就是问题的症结所在。

标签: apache-kafka apache-kafka-streams


【解决方案1】:

并非 DSL 所做的所有事情都可以在处理器 API 级别上实现——它使用了一些不属于公共 API 的内部结构来实现您所描述的。

调用InternalTopologyBuilder#connectSourceStoreAndTopic() 可以解决问题(参见InternalStreamsBuilder#table())。

对于您关于重复数据删除的用例,您似乎需要两个主题(取决于您应用的重复数据删除逻辑)。通过更改日志主题进行恢复会执行基于键的更新,因此不考虑值(这也可能是您的重复数据删除逻辑的一部分)。

【讨论】:

  • 不过,如果我很好地理解了它们,我并不擅长使用内部 API。似乎有点奇怪,我需要复制一个流只是为了加入它自己。 connectSourceAndTopic() 是我在该代码路径中没有探索的少数路线之一。也许我会看看:)
  • 查看connectSourceAndTopic()——似乎它只是将主题添加到storeToChangelogTopic 映射中,如果它认为有必要,KafkaStreams 必须在启动时使用恢复。很高兴知道。谢谢。
猜你喜欢
  • 2018-05-01
  • 1970-01-01
  • 1970-01-01
  • 2020-06-22
  • 2021-01-28
  • 2021-10-16
  • 2018-11-28
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多