【问题标题】:Consuming Kafka fails due to "Timeout expired while fetching topic metadata"由于“获取主题元数据时超时已过期”,使用 Kafka 失败
【发布时间】:2019-12-25 15:56:51
【问题描述】:

我正在尝试使用 Apache Flink 来处理事件。 代码很基础,尝试用空格连接主题拆分词,打印到控制台。卡夫卡版本是 0.9。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.util.Collector;
import java.util.Properties;

public class KafkaStreaming {

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "kafka servers:9092...");
    props.setProperty("zookeeper.connect", "kafka servers:2181...");
    props.setProperty("group.id", "flinkPOC");
    FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>("topic", new SimpleStringSchema(), props);

    DataStream<String> dataStream = env.addSource(consumer);

    DataStream<String> wordDataStream = dataStream.flatMap(new Splitter());
    wordDataStream.print();
    env.execute("Word Split");

}

public static class Splitter implements FlatMapFunction<String, String> {

    public void flatMap(String sentence, Collector<String> out) throws Exception {

        for (String word : sentence.split(" ")) {
            out.collect(word);
        }
    }

}
}

该应用程序不会在屏幕上打印任何内容(尽管我向 Kafka 生成了事件)。我试图跳过 Splitter FlatMap 功能,但仍然没有任何反应。 Kafka 不需要 SSL。 当我将作业提交到集群时,我在日志超时异常中发现:

2019-08-20 14:36:17,654 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Flat Map -> Sink: Print to Std. Out (1/1) (02258a2cafab83afbc0f5650c088da2b) switched from RUNNING to FAILED.
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

我真的不知道我做错了什么:(

【问题讨论】:

    标签: java apache-kafka apache-flink flink-streaming


    【解决方案1】:

    这是一个常见错误,它只是告诉您您的客户端未能正确连接到 Kafka 集群。你知道集群是否启用了身份验证吗?您可以使用相同的zookeeper 属性通过kafka-topics 脚本连接到集群吗?

    我还会使用来自 Flink 主机的 Kafka 附带的 kafka-console-producerkafka-console-consumer 脚本来确保基本功能。

    【讨论】:

    • 您好,感谢您的快速响应。我可以使用内置的 kafka 脚本 kafka-console-consumer.sh 来消费事件。
    • 而且我还禁用了来自 Kafka 端的身份验证。
    • 我尝试运行相同的代码,但这次使用的是 Kafka 2 和 FlinkKafkaConsumer011,它运行良好。我怀疑问题是特定于 kafka 0.9 连接器的。
    • 好吧,Kafka 0.9 消费者已经很老了,可能不支持您的集群启用的身份验证机制。
    【解决方案2】:

    问题显然出在 Kafka 9 的 Flink 版本中,因为使用 Kafka 2 时相同的代码运行良好。 所以 Kafka 09 恐怕没有解决方案。

    【讨论】:

      猜你喜欢
      • 2021-10-07
      • 2019-06-12
      • 2020-09-18
      • 2015-08-16
      • 2019-11-22
      • 1970-01-01
      • 2021-03-19
      • 2018-05-14
      • 2017-11-10
      相关资源
      最近更新 更多