【问题标题】:Kafka Streams – best way to get KTable and KStream on same topic?Kafka Streams – 在同一主题上获取 KTable 和 KStream 的最佳方式?
【发布时间】:2017-07-07 11:04:06
【问题描述】:

我对 Kafka Streams (0.10.1.1) 有疑问。我正在尝试在同一主题上创建 KStreamKTable

我尝试的第一种方法是简单地为同一主题的流和表调用KStreamBuilder 方法。这导致

org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic <topicName> has already been registered by another source.

好的,这似乎是 Kafka Streams 内置的一些限制。

我的第二种方法最初是创建一个KTable 并在其上使用toStream() 方法。这有KTables做一些内部缓冲/刷新的问题,所以如果一个键多次出现,输出流不会反映所有输入元素,就像我的例子一样。这是一个问题,因为我正在计算一个键的出现次数。

似乎可行的方法是首先创建一个KStream,按键对其进行分组,然后通过丢弃旧聚合并仅保留新值来“减少”它。我对这种方法不太满意,因为 a) 它看起来非常复杂 b) Reducer 接口没有指定哪个是已经聚合的值,哪个是新的。我遵循惯例并保留了第二个,但是……嗯。

所以问题归结为:有更好的方法吗?我是否遗漏了一些非常明显的东西?

请记住,我不是在研究正确的用例——这只是我了解 Streams-API 的目的。

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    关于添加主题两次:这是不可能的,因为Kafka Streams应用程序是单个“消费者组”,因此只能提交一次主题的偏移量,而添加主题两次则表明该主题获得了消费者两次(和独立的进步)。

    对于KTable#toStream() 的方法,您可以通过StreamsConfig 参数cache.max.bytes.buffering == 0 禁用缓存。但是,这是一个全局设置,会禁用所有 KTables 的缓存/重复数据删除(参见 http://docs.confluent.io/current/streams/developer-guide.html#memory-management)。

    更新:从 Kafka 0.11 开始,可以通过 Materialized 参数单独禁用每个 KTable 的缓存。

    groupBy 方法也有效,即使它需要一些样板文件。我们考虑将 KStream#toTable() 添加到 API 以简化此转换。是的,reduce 中的第二个参数是新值——因为 reduce 用于组合两个值,API 没有“旧”和“新”的概念,因此参数没有这样的命名。

    【讨论】:

    • 为什么不能重复使用之前添加的主题?
    • 如答案中所述。因为我们使用单个消费者,而消费者只能订阅一次主题。
    猜你喜欢
    • 1970-01-01
    • 2017-08-13
    • 2021-12-11
    • 1970-01-01
    • 2022-12-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多