【发布时间】:2018-03-21 14:05:28
【问题描述】:
我正在使用 Kafka 构建一个相关系统。假设有一个服务 A 执行数据处理,并且有成千上万的客户端 B 向它提交作业。 B是短暂的,它们出现在网络上,将数据推送到A,然后发生两件重要的事情:
- B 将立即收到来自 A 的状态;
- B 然后要么 完全退出,保持在线以接收更多更新 状态,或者偶尔会弹出来检查状态。
(这与网格计算或 mpi 没有什么不同)。
这两点都应该使用众所周知的correlationId 概念来实现:B 拥有一个唯一的 id(在我的例子中是 UUID),它会发送给 A在标头中,然后将其用作Reply-To 主题以向其发送状态更新。这意味着它必须动态创建主题,它们无法预先确定。
我打开了auto.create.topics.enable,它确实动态地创建了主题,但现有消费者不知道它们并且需要重新启动[如果我理解文档正确,我想获取主题元数据]。我还检查了消费者的metadata.max.age.ms 设置,但它似乎无济于事,即使我将其设置为非常低的值。
据我所知,这尚未得到答复,即:kafka filtering/Dynamic topic creation、kafka consumer to dynamically detect topics added、Can a Kafka producer create topics and partitions? 或回答不满意。
由于有数百个 A 和数千个 B,我不可能使用共享主题或类似的东西,以免我的网络过载。我可以使用 Kafka 的 AdminTools 或任何其他名称来预先创建主题,但我觉得它有点愚蠢(尽管我看到了人们使用它与 Zookeeper 和 Kafka 基础设施本身对话的真实例子)。
所以问题是,有没有一种方法可以动态地创建 Kafka 主题,让消费者和生产者都知道它而无需重新启动或其他任何东西?而且,在最坏的情况下,AdminTools 真的会帮助它吗?我必须在哪一侧使用它 - A 还是 B?
Kafka 0.11, Java 8
更新
无论出于何种原因,使用AdminClient 创建主题都无济于事,当我尝试订阅时,消费者仍然会抛出LEADER_NOT_AVAILABLE。
【问题讨论】:
标签: apache-kafka kafka-consumer-api