【发布时间】:2016-12-03 00:03:25
【问题描述】:
我正在使用带有 flink 的 kafka。 在一个简单的程序中,我使用了 flinks FlinkKafkaConsumer09,并为其分配了 group id。
根据 Kafka 的行为,当我在具有相同 group.Id 的同一主题上运行 2 个消费者时,它应该像消息队列一样工作。我认为它应该像这样工作: 如果向 Kafka 发送 2 条消息,则每个或一个 flink 程序将总共处理 2 条消息两次(假设总共 2 行输出)。
但实际结果是,每个程序会收到2条消息。
我尝试使用随 kafka 服务器下载一起提供的消费者客户端。它以记录的方式工作(处理了 2 条消息)。
我尝试在 flink 程序的同一个 Main 函数中使用 2 个 kafka 消费者。总共处理了 4 条消息。
我还尝试运行 2 个 flink 实例,并为每个实例分配相同的 kafka consumer 程序。 4 条消息。
有什么想法吗? 这是我期望的输出:
1> Kafka and Flink2 says: element-65
2> Kafka and Flink1 says: element-66
这是我总是得到错误的输出:
1> Kafka and Flink2 says: element-65
1> Kafka and Flink1 says: element-65
2> Kafka and Flink2 says: element-66
2> Kafka and Flink1 says: element-66
下面是这段代码:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));
messageStream.rebalance().map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
return "Kafka and Flink1 says: " + value;
}
}).print();
env.execute();
}
我曾尝试运行它两次,也以另一种方式运行: 在 Main 函数中为每个数据流创建 2 个数据流和 env.execute()。
【问题讨论】:
-
我也试过了,在 flink 外部使用 kafka 客户端运行 2 个消费者,在 flink 实例中使用 flink-kafka-connector 运行 2 个消费者。外面的2个消费者似乎工作正常,总共2个。但是 flink 内部的另外 2 个似乎是独立工作的(对外部,也对彼此),它们每个收到 2 条消息,所以总共 4 条。