【发布时间】:2017-03-09 22:59:44
【问题描述】:
我想从flink读取多个kafka。
我有 3 台用于 kafka 的计算机集群。有以下话题
Topic:myTopic PartitionCount:3 ReplicationFactor:1 Configs:
Topic: myTopic Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: myTopic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: myTopic Partition: 2 Leader: 1 Replicas: 1 Isr: 1
我从 Flink 执行以下代码:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092");
properties.setProperty("group.id", "flink");
DataStream<T> stream = env.addSource(new FlinkKafkaConsumer09<>("myTopic", new SimpleStringSchema(), properties)
stream.map(....)
env.execute()
我启动了 3 次相同的工作。
如果我使用一个代理执行此代码,它运行良好,但有 3 个损坏(在 3 台不同的机器上)只有一个分区被读取。
(In this question)提出的解决方案是
为每个集群创建单独的 FlinkKafkaConsumer 实例(这就是您已经在做的),然后合并生成的流
在我的情况下它不起作用。
所以我的问题是:
- 我错过了什么吗?
- 如果我们在 Kafka 集群中有一台新计算机,我们是否需要更改 flink 的代码来为新的 borker 添加消费者?或者我们可以在运行时自动处理吗?
【问题讨论】: