【问题标题】:Got wrong record for spark-executor-<groupid> <topic> 0 even after seeking to offset <number>spark-executor-<groupid> <topic> 0 的记录错误,即使在寻求偏移 <number> 之后也是如此
【发布时间】:2019-07-13 11:36:41
【问题描述】:

我的 Spark 作业引发如下异常:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.AssertionError: assertion failed: Got wrong record for spark-executor-test-local-npp_consumer_grp_3 <topic> 0 even after seeking to offset 29599
    at scala.Predef$.assert(Predef.scala:170)

我已禁用 auto.commit (enable.auto.commit=false) 并使用 Kafka API 提交偏移量

((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges.get());`). 

出现这种错误的原因可能是什么? 出现此错误是由于 Kafka 消费者端的问题还是由于我的 spark-kafka 消费者程序?*

查看CachedKafkaConsumer source code 后,我认为这应该是由于连续的缓冲区未命中(我的缓冲区大小是默认大小 - 65536 - receive.buffer.bytes = 65536)但我没有看到缓冲区未命中消息 - Buffer miss for $groupId $topic $partition $offset在我的日志中。

所以,我想知道这是否是由于缓冲区大小的原因?

我尝试将receive.buffer.bytes 增加到655360 但我的spark-kafka 消费者失败并出现同样的错误。 这个错误可能是由于我的 Kafka 源由于大量数据而发送的吗

【问题讨论】:

    标签: apache-spark spark-streaming kafka-consumer-api


    【解决方案1】:

    我遇到了同样的问题,在 spark-streaming 的 CachedKafkaCounsumer 类中找到了以下源代码。 这显然是由于消费者轮询的偏移量和消费者寻求的偏移量不相等。

    我重现了这个问题,发现从一个 topicAndPartition 的偏移量在 Kafka 中是不连续的

    def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
    logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset")
    if (offset != nextOffset) {
      logInfo(s"Initial fetch for $groupId $topic $partition $offset")
      seek(offset)
      poll(timeout)
    }
    
    if (!buffer.hasNext()) { poll(timeout) }
    assert(buffer.hasNext(),
      s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
    var record = buffer.next()
    
    if (record.offset != offset) {
      logInfo(s"Buffer miss for $groupId $topic $partition $offset")
      seek(offset)
      poll(timeout)
      assert(buffer.hasNext(),
        s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
      record = buffer.next()
      assert(record.offset == offset,
        s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset")
    }
    
    nextOffset = offset + 1
    record
    }
    

    【讨论】:

      【解决方案2】:

      当我阅读使用事务生产者填充的主题时,我遇到了同样的问题here。此问题是由 spark-streaming-kafka 无法读取的事务标记(提交/中止)引起的。当您在此主题上使用 --print-offsets 选项运行 SimpleConsumerShell 时,您应该会看到偏移之间的“间隙”。

      我现在看到的唯一解决方案是禁用事务生产者,因为尚未实现较新的 spark-streaming-kafka。

      【讨论】:

        【解决方案3】:

        我也遇到了这个问题,遇到了这个链接: http://apache-spark-user-list.1001560.n3.nabble.com/quot-Got-wrong-record-after-seeking-to-offset-quot-issue-td30609.html

        此问题已在 2.4.0 版本中解决:https://issues.apache.org/jira/browse/SPARK-17147

        我正在使用来自压缩主题(压缩)的消息,并使用无法处理压缩的 spark-streaming-kafka-0-10_2 版本 2.3.0。

        通过转到 spark-streaming-kafka-0-10_2 的 2.4.0 版本,我能够解决它: org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0

        我还需要配置:spark.streaming.kafka.allowNonConsecutiveOffsets=true

        我的提交命令如下:

        spark-submit --class com.streamtest.Main --master spark://myparkhost:7077 --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0,org.apache.spark:spark-streaming_2.11:2.3.0,org.apache.spark:spark-core_2.11: 2.3.0 --conf spark.streaming.kafka.allowNonConsecutiveOffsets=true /work/streamapp/build/libs/streamapp.jar

        【讨论】:

          猜你喜欢
          • 2018-04-20
          • 2017-03-11
          • 2021-04-03
          • 2011-12-11
          • 1970-01-01
          • 1970-01-01
          • 2021-08-17
          • 2017-09-16
          • 2017-06-10
          相关资源
          最近更新 更多