【问题标题】:kafka and Spark: Get first offset of a topic via APIkafka 和 Spark:通过 API 获取主题的第一个偏移量
【发布时间】:2017-04-08 04:27:02
【问题描述】:

我正在使用 Spark Streaming 和 Kafka(使用 Scala API),并希望使用 Spark Streaming 从一组 Kafka 主题中读取消息。

以下方法:

val kafkaParams = Map("metadata.broker.list" -> configuration.getKafkaBrokersList(), "auto.offset.reset" -> "smallest")
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

从 Kafka 读取到最新的可用偏移量,但没有给我我需要的元数据(因为我正在从一组主题中读取,我需要读取该主题的每条消息)但是这种其他方法 @987654322 @ 明确想要一个我没有的偏移量。

我知道有这个 shell 命令可以给你最后的偏移量。

kafka-run-class.sh kafka.tools.GetOffsetShell 
  --broker-list <broker>:  <port> 
  --topic <topic-name> --time -1 --offsets 1 

KafkaCluster.scala 是一个 API,它是为曾经公开的开发人员提供的,它可以为您提供我想要的东西。

提示?

【问题讨论】:

    标签: apache-kafka spark-streaming


    【解决方案1】:

    您可以使用 GetOffsetShell.scala kafka API documentation 中的代码

    val consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId)
    val topicAndPartition = TopicAndPartition(topic, partitionId)
    val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
    val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
    

    或者您可以创建具有唯一 groupId 的新消费者并将其用于获取第一个偏移量

    val consumer=new KafkaConsumer[String, String](createConsumerConfig(config.brokerList))
    consumer.partitionsFor(config.topic).foreach(pi => {
          val topicPartition = new TopicPartition(pi.topic(), pi.partition())
    
          consumer.assign(List(topicPartition))
          consumer.seekToBeginning()
          val firstOffset = consumer.position(topicPartition)
     ...
    

    【讨论】:

    • 谢谢@Natalia!在第一个 sn-p .. 什么是时间? time 是什么,nOffset 是什么?
    • 哦,我明白了,你从here得到了第一个sn-p
    • @salvob 这个问题的回答正确吗?如果是这样,请标记为已回答的问题,否则如果您能告诉我们您是如何解决问题的,我将不胜感激:)
    • 不幸的是,我无法获得最小的 kafka 偏移量并从中获取主题。我们最终编写了另一个模块,以便从不同的 kafka 主题中读取。没有很好的例子以不同的方式处理来自不同主题的数据。如果您找到任何解决方案,请告诉我。谢谢
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-05-27
    • 1970-01-01
    • 2019-01-06
    • 1970-01-01
    • 2017-06-22
    • 2017-12-12
    • 2019-04-03
    相关资源
    最近更新 更多