【问题标题】:How can a samza task consume more than one kafka partitioned streamssamza 任务如何消耗多个 kafka 分区流
【发布时间】:2017-08-15 17:42:36
【问题描述】:

我有一个典型的 samza 任务,它使用 2 个主题:dataconfig,并将来自 config 的消息作为本地状态存储在 Rocksdb 中,以检查来自 data 的消息是否正常。

如果这两个主题中的每一个都只有一个分区,则此任务可以正常工作。一旦我将data 分成十个分区并且config 仍然是一个分区,情况就发生了变化。默认情况下,samza 会创建 10 个任务来消费 data 主题的分区 0 ~ 9,并且只有任务 0 消费 config 主题:

task[0] -> config, data[0] task[1] -> data[1] ... task[9] -> data[9]

似乎每个任务都是用自己的rocksdb实例初始化的,所以只有task[0]将所有配置数据存储在它的rocksdb实例中,task[1~9]没有配置数据,因此无法找到它的配置信息传入的数据。

我的预期是每个任务都使用来自其数据分区和配置流的消息,如下所示:

task[0] -> config, data[0] task[1] -> config, data[1] ... task[9] -> config, data[9]

有什么办法可以做到吗?

【问题讨论】:

    标签: apache-kafka partition apache-samza


    【解决方案1】:

    输入流分区的分布由使用“job.systemstreampartition.grouper.factor”配置的可插入分组器控制。默认情况下,此类跨任务实例对传入流分区进行分组。默认情况下,我相信它会使用 GroupByPartitionId。这就是为什么您在 task[0] 中看到 data[0] 和 config[0] 的原因。

    您可以实现自定义 SSPGrouper。但是,您正在寻找的是将“数据”流视为常规输入流,将“配置”流视为“广播”输入流。广播意味着 Samza 作业中的每个任务都从该流的分区中读取。这样,每个任务实例都可以使用配置流的数据填充其本地 RocksDB。您可以将广播流配置为: task.broadcast.inputs=<systemName>.<streamName>#[<partition-range>], <systemName>.<streamName>#[<partition-range>]

    对于您的情况,您可以配置: task.inputs = <systemName>.data task.broadcast.inputs = <systemName>.config#0

    查看Broadcast Streams in Samza

    【讨论】:

    • 根据@NavinaRamesh 的建议,我遇到了一个异常:Exception in thread "main" java.lang.IllegalArgumentException: incorrect format in kafka.config#[0]. Broadcast stream names should be in the form 'system.stream#partitionId' or 'system.stream#[partitionN-partitionM]',这意味着正确的格式应该是task.broadcast.inputs = &lt;systemName&gt;.config#0,并且在我更改为正确的格式后,一切都像魅力一样.谢谢和欢呼,@NavinaRamesh :)
    • 很高兴您能弄清楚 :) 我已在回复中修复了配置格式。谢谢!
    猜你喜欢
    • 2021-02-01
    • 1970-01-01
    • 2018-11-21
    • 2015-11-29
    • 2020-02-08
    • 2020-09-06
    • 1970-01-01
    • 2019-02-21
    • 1970-01-01
    相关资源
    最近更新 更多