【发布时间】:2018-11-09 04:42:21
【问题描述】:
我有一个 Spark Streaming Scala 应用程序,它从 Kafka 主题读取数据并将其放置在 HDFS 上。我希望应用程序将读取消息的偏移量存储到 __consumer_offsets 主题,以便在应用程序失败时开始从中读取。该应用程序运行良好(我可以看到 HDFS 上的数据),但我看不到它对 __consumer_offsets 的提交。
这是我的 KafkaParams:
val kafkaParams = Map(
"metadata.broker.list" -> "xx.xxx.x.xx:6667",
"enable.auto.commit" -> "true",
"group.id" -> "reading_telemetry",
"offsets.storage" -> "kafka"
)
我用来从 __consumer_offsets 获取已提交偏移量的命令如下:
$ /usr/hdp/3.0.0.0-1634/kafka/bin/kafka-console-consumer.sh --consumer.config /tmp/consumer.config --zookeeper xx.xxx.x.xx:2181 --topic __consumer_offsets --from-beginning --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
我得到了一些关于表单提交偏移量的信息
[test1,test,0]::[OffsetMetadata[55,NO_METADATA],CommitTime 1539603328309,过期时间6723603328309]
但我没有看到“reading_telemetry”组 ID 的任何提交。有什么想法,为什么?
我的环境:
卡夫卡:1.0.1 火花:2.3.1 斯卡拉:2.11.8
【问题讨论】: