【问题标题】:Stream data using Spark from a partiticular partition within Kafka topics使用 Spark 从 Kafka 主题中的特定分区流式传输数据
【发布时间】:2018-11-16 22:50:54
【问题描述】:

我已经看到了和clickhere类似的问题

但我仍然想知道是否无法从特定分区流式传输数据?我在 Spark Streaming 订阅方法中使用了 Kafka 消费者策略

ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, 偏移量)

这是我尝试订阅主题和分区的代码sn-p,

val topics = Array("cdc-classic")
val topic="cdc-classic"
val partition=2;
val offsets= 
Map(new TopicPartition(topic, partition) -> 2L)//I am not clear with this line, (I tried to set topic and partition number as 2)
val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams,offsets))

但是当我运行这段代码时,我得到了以下异常,

     Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in stage 0.0 (TID 5, localhost, executor driver): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {cdc-classic-2=2}
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:878)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {cdc-classic-2=2}
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:878)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)

PS:cdc-classic 是 17 个分区的主题名称

【问题讨论】:

    标签: apache-spark apache-kafka apache-spark-sql spark-streaming kafka-consumer-api


    【解决方案1】:

    Kafka 的分区是 Spark 的并行化单元。因此,即使从技术上讲它是可能的,它也没有任何意义,因为所有数据都将由单个执行程序处理。您可以简单地以KafkaConsumer 身份启动您的进程,而不是使用 Spark:

     String topic = "foo";
     TopicPartition partition0 = new TopicPartition(topic, 0);
     TopicPartition partition1 = new TopicPartition(topic, 1);
     consumer.assign(Arrays.asList(partition0, partition1));
    

    (https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html)

    如果您想从 Spark 自动重试中获益,您可以简单地使用它创建一个 Docker 映像,然后使用 Kubernetes 启动它并使用适当的重试配置。

    关于Spark,如果你真的要使用它,你应该检查你读取的分区的偏移量是多少。可能您提供了一个不正确的信息,它会返回“超出范围”的偏移消息(可能以 0 开头?)。

    【讨论】:

    • 我已经尝试过使用 Kafka 消费者。这很清楚。那么在 Spark Streaming 中,只有我们正确地知道偏移量才有可能?
    • 我整理好了,我们需要正确指定起始偏移数和分区。谢谢!
    【解决方案2】:

    在此行指定分区号和分区起始偏移量,

    Map(new TopicPartition(topic, partition) -> 2L)
    

    在哪里,

    • partition是分区号

    • 2L指的是分区的起始偏移数。

    然后我们可以从选定的分区流式传输数据。

    【讨论】:

    • 如果您解决了问题,请考虑将答案标记为accepted
    猜你喜欢
    • 1970-01-01
    • 2019-05-13
    • 2016-10-15
    • 2019-07-21
    • 2016-06-09
    • 2017-02-22
    • 1970-01-01
    • 2019-05-20
    相关资源
    最近更新 更多