【发布时间】:2017-04-08 04:27:02
【问题描述】:
我正在使用 Spark Streaming 和 Kafka(使用 Scala API),并希望使用 Spark Streaming 从一组 Kafka 主题中读取消息。
以下方法:
val kafkaParams = Map("metadata.broker.list" -> configuration.getKafkaBrokersList(), "auto.offset.reset" -> "smallest")
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
从 Kafka 读取到最新的可用偏移量,但没有给我我需要的元数据(因为我正在从一组主题中读取,我需要读取该主题的每条消息)但是这种其他方法 @987654322 @ 明确想要一个我没有的偏移量。
我知道有这个 shell 命令可以给你最后的偏移量。
kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list <broker>: <port>
--topic <topic-name> --time -1 --offsets 1
KafkaCluster.scala 是一个 API,它是为曾经公开的开发人员提供的,它可以为您提供我想要的东西。
提示?
【问题讨论】:
标签: apache-kafka spark-streaming