【问题标题】:Attach kafka offset to each record in foreachRDD将kafka偏移量附加到foreachRDD中的每条记录
【发布时间】:2017-08-04 14:18:04
【问题描述】:

我想在 foreachRDD 方法中检索我的 RDD 的每条记录上的每个 kafka 偏移量。我的主题中有一个分区,所以我的 RDD 也有一个分区。我基本上是这样尝试的:

dStream.foreachRDD { rdd =>
  if (!rdd.isEmpty) {
    //get offset first value of the offset
    val firstOffset = rdd.asInstanceOf[HasOffsetRanges].offsetRanges(0).fromOffset
    val rddWithOffset = rdd.map(_.value)
      .zipWithIndex()
      .map{ case (v,i) => (v,i + firstOffset)}
  }
}

例如,在我的生产者中,我使用循环发送消息,并将索引放在名为 position 的列中,如下所示:

+------+-----+--------+
|  name|  age|position|
+------+-----+--------+
|johnny|   26|       1|
| chloe|   42|       2|
| brian|   19|       3|
| eliot|   35|       4|
+------+-----+--------+

不幸的是,当我在消费者中添加偏移列时,我注意到订单没有得到维护:

+------+-----+--------+------+
|  name|  age|position|offset|
+------+-----+--------+------+
|johnny|   26|       1|     1|
| chloe|   42|       2|     3|
| brian|   19|       3|     4|
| eliot|   35|       4|     2|
+------+-----+--------+------+

似乎我在这个过程中失去了秩序。 你有什么主意吗?谢谢

顺便说一下,我的 Java 生产者是这样的:

KafkaRestProducer<String, Object> producer = new KafkaRestProducer<>(props);

ArrayList<String> names = new ArrayList<String>()
names.add("johnny")
names.add("chloe")
names.add("brian")
names.add("eliot")

ArrayList<Integer> ages = ArrayList<Integer>()
names.add(26)
names.add(42)
names.add(19)
names.add(35)

for (int i = 0; i < 3; ++i) {

    String name = names(i)
    Int age = ages(i)     
    Person person = Person
        .newBuilder()
        .setName(name)
        .setAge(age)
        .setPosition(i)
        .build();

    ProducerRecord<String, Object> record = new ProducerRecord<>("/apps/PERSON/streams:myTopic", name, person);

    producer.send(record, null);
    System.out.println(i);
}

【问题讨论】:

  • “丢弃订单”是什么意思?您观察到了什么,它与您的预期有何不同?
  • 感谢您的评论,我编辑我的问题以添加一个示例来说明我如何放松订单。你有什么想法吗?
  • kafka 主题上有多少个分区?
  • 我的主题中有一个分区
  • 你能把你制作人的代码加进去吗?

标签: scala apache-kafka spark-streaming mapr


【解决方案1】:

我的英语很差。我使用此代码:

    val Array(brokers, topic, groupId) = args
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "group.id" -> groupId)
    val topicPartition = Map[TopicAndPartition, Long](TopicAndPartition(topic, 0) -> 1.toLong)
    val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.offset, mmd.message)
    val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (Long, String)](
        ssc, kafkaParams, topicPartition, messageHandler)

    kafkaStream.foreachRDD(rdd => rdd.foreach(println))

输出: (偏移量,lineOfMessage) ...

【讨论】:

  • 您好,感谢您的回答,但我没有找到具有此参数的构造函数 createDiirectStream。你用的是什么版本的kafka?
  • 我正在使用。火花 1.5.2 ,卡夫卡 0.8.2
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-02-14
  • 1970-01-01
  • 1970-01-01
  • 2022-01-06
  • 1970-01-01
  • 2018-07-19
  • 1970-01-01
相关资源
最近更新 更多