【问题标题】:Datasource V2 Reader (Spark Structured Streaming) - offsets out of orderDatasource V2 Reader (Spark Structured Streaming) - 偏移量乱序
【发布时间】:2019-05-30 23:57:40
【问题描述】:

我目前正在使用 V2 api 实现两个自定义阅读器,用于 spark 结构化流作业。作业运行约 30-60 分钟后,它会爆炸:

Caused by: java.lang.RuntimeException: Offsets committed out of order: 608799 followed by 2982

我正在重新利用here 找到的示例,它正在第 206 行轰炸。

我没有使用示例中提供的 twitter 流,而是为 JMS 和 SQS 实现它。

我的问题是:有人遇到过这个问题吗?还是那个实现有问题?

代码sn-p:

override def commit(end: Offset): Unit = {
    internalLog(s"** commit($end) lastOffsetCommitted: $lastOffsetCommitted")

    val newOffset = TwitterOffset.convert(end).getOrElse(
      sys.error(s"TwitterStreamMicroBatchReader.commit() received an offset ($end) that did not " +
        s"originate with an instance of this class")
    )

    val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt

    if (offsetDiff < 0) {
      sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
    }

    tweetList.trimStart(offsetDiff)
    lastOffsetCommitted = newOffset
}

我无法通过我常用的网点找到答案。但是,我确实看到了this。提出的一点是删除检查点数据——这在生产系统中似乎不是一个可行的解决方案。另一个是源系统不维护偏移信息?我的印象是 spark 会自己处理偏移信息。如果这第二点是问题,我如何确保源系统处理这个范式。

如果我可以提供更多信息,请告诉我。

编辑:查看 MicroBatchReader 界面,提交文档说:

    /**
     * Informs the source that Spark has completed processing all data for offsets less than or
     * equal to `end` and will only request offsets greater than `end` in the future.
     */
    void commit(Offset end);

所以问题就变成了,为什么 spark 会向我发送已经提交的提交偏移量?

【问题讨论】:

    标签: scala apache-spark amazon-emr spark-structured-streaming amazon-efs


    【解决方案1】:

    回答我自己的问题以防它帮助某人,

    我应该在问题中添加更多信息 - 此作业在 EMR 上运行,并使用 EFS 检查点数据。

    当我使用亚马逊的amazon-efs-utils 挂载EFS 时出现问题。由于某种原因,每个工作人员都无法看到其他工作人员的读取和写入 - 就好像 EFS 没有挂载一样。

    解决方案是切换到nfs-utils 挂载 EFS(根据 AWS 指令),以便每个工作人员都能准确读取检查点数据。

    【讨论】:

      猜你喜欢
      • 2018-11-02
      • 2021-05-22
      • 2020-09-03
      • 2019-07-30
      • 2018-04-26
      • 2023-03-13
      • 2017-12-31
      • 2020-10-07
      • 1970-01-01
      相关资源
      最近更新 更多