【问题标题】:InvalidGroupIdException for Kafka spout in StormStorm 中 Kafka 喷口的 InvalidGroupIdException
【发布时间】:2020-05-02 08:30:01
【问题描述】:

我已经使用来自 Kafka 的 spout 消费者定义了一个基本的 Storm 拓扑(生产者是在 Kafka 单独的模块中创建的)。但是,当我运行该应用程序时,我收到此错误:

java.lang.RuntimeException: org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.
    at org.apache.storm.utils.Utils$1.run(Utils.java:407) ~[storm-client-2.1.0.jar:2.1.0]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
Caused by: org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.

如何设置组 ID?我在本地使用 2.1.0 版本运行 Storm。

这是拓扑的代码:

val cluster = new LocalCluster()

val bootstrapServers = "localhost:9092"
val brokerHosts = new ZkHosts(bootstrapServers)
val topologyBuilder = new TopologyBuilder()

val spoutConfig = KafkaSpoutConfig.builder(bootstrapServers, "tweets").build()
topologyBuilder.setSpout("kafka_spout", new KafkaSpout(spoutConfig), 1)

val config = new Config()
cluster.submitTopology("kafkaTest", config, topologyBuilder.createTopology())

【问题讨论】:

    标签: scala apache-kafka apache-storm


    【解决方案1】:

    您应该使用setProp(java.lang.String, java.lang.Object)ConsumerConfig.GROUP_ID_CONFIGKafkaSpoutConfig 上添加消费者组ID

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2013-11-11
      • 2015-07-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-11-23
      • 1970-01-01
      • 2019-02-08
      相关资源
      最近更新 更多