【问题标题】:java.lang.RuntimeException for Flink consumer connecting to Kafka cluster with multiple partitions连接到具有多个分区的 Kafka 集群的 Flink 消费者的 java.lang.RuntimeException
【发布时间】:2020-03-30 22:45:32
【问题描述】:

Flink 1.9.0 版

Scala 版本 2.11.12

Kafka 集群版本 2.3.0

我正在尝试将我所做的 flink 作业连接到具有 3 个分区的 kafka 集群。我已经针对在我的 localhost 上运行的 kafka 集群主题测试了我的工作,该主题有一个分区,它可以读取和写入本地 kafka。当我尝试连接到具有多个分区的主题时,我收到以下错误(topicName 是我尝试使用的主题的名称。奇怪的是,当我尝试生成多分区主题时我没有任何问题。

java.lang.RuntimeException: topicName
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:80)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:508)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)

我的消费者代码如下所示:

  def defineKafkaDataStream[A: TypeInformation](topic: String,
                                                env: StreamExecutionEnvironment,
                                                SASL_username:String,
                                                SASL_password:String,
                                                kafkaBootstrapServer: String = "localhost:9092",
                                                zookeeperHost: String = "localhost:2181",
                                                groupId: String = "test"
                                               )(implicit c: JsonConverter[A]): DataStream[A] = {
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", kafkaBootstrapServer)
    properties.setProperty("security.protocol" , "SASL_SSL")
    properties.setProperty("sasl.mechanism" , "PLAIN")
    val jaasTemplate = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";"
    val jaasConfig = String.format(jaasTemplate, SASL_username, SASL_password)
    properties.setProperty("sasl.jaas.config", jaasConfig)
    properties.setProperty("group.id", "MyConsumerGroup")

    env
      .addSource(new FlinkKafkaConsumer(topic, new JSONKeyValueDeserializationSchema(true), properties))
      .map(x => x.convertTo[A](c))
  }

我应该设置另一个属性以允许单个作业从多个分区使用吗?

【问题讨论】:

  • 我只是在猜测-但这可能与多个分区无关,而是与集群中的身份验证有关吗?分区的发现可能是第一个因此失败的请求......
  • 从文档看来,当身份验证失败时应该抛出org.apache.kafka.common.errors.AuthenticationException
  • @TobiSH 我之前遇到了身份验证错误,而这些错误消息却大不相同。我还可以将消息从 Flink 接收到同一流,所以我认为我的身份验证代码可以正常工作。

标签: apache-kafka apache-flink


【解决方案1】:

在对我的流程中的所有内容进行挖掘和质疑之后,我发现了问题所在。

我查看了出现运行时异常的KafkaPartitionDiscoverer 函数的Java 代码。

我注意到一个部分处理了 RuntimeException

if (kafkaPartitions == null) {
    throw new RuntimeException("Could not fetch partitions for %s. Make sure that the topic exists.".format(topic));
    }

我正在使用一个我不维护的 kafka 集群,并且有一个我没有首先验证的主题名称。当我使用以下方法进行验证时:

kafka-topics --describe --zookeeper serverIP:2181 --topic topicName

它返回了一个响应:

Error while executing topic command : Topics in [] does not exist
ERROR java.lang.IllegalArgumentException: Topics in [] does not exist
    at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:435)
    at kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:350)
    at kafka.admin.TopicCommand$.main(TopicCommand.scala:66)
    at kafka.admin.TopicCommand.main(TopicCommand.scala)

在我获得正确的主题名称后,一切正常。

【讨论】:

    猜你喜欢
    • 2018-12-03
    • 2018-12-31
    • 2017-01-04
    • 2023-03-20
    • 2018-10-15
    • 1970-01-01
    • 2018-04-02
    • 2018-02-28
    • 1970-01-01
    相关资源
    最近更新 更多