【问题标题】:How to setup a starting point for the batchId of foreachBatch?如何为foreachBatch的batchId设置起点?
【发布时间】:2020-03-16 06:04:32
【问题描述】:

我面临的问题是我的流程依赖于 foreachBatch 的 batchId 作为对管道第二阶段准备好的某种控制。所以只有第一阶段(批次)完成后才会进入第二阶段。

我想保证万一出现问题,流可以从它停止的地方继续。

我们尝试通过将所有已完成的批次添加到增量表来进行一些控制,但是,我找不到设置初始批次 ID 的方法。

【问题讨论】:

    标签: apache-spark pyspark spark-structured-streaming azure-databricks


    【解决方案1】:

    尝试根据您提供的信息进行分析。可能会使用某种自定义检查点。对于每个批次,使用批次 ID 和状态列存储偏移范围。继续将状态更新为 RUNNING/COMPLETED。

    如果出现问题,请检查最后一批状态,如果未完成,则从该偏移量开始,否则从增量偏移量开始。

    【讨论】:

      【解决方案2】:

      我想保证万一出现问题,流可以从它停止的地方继续。

      这是 foreachBatch 接收器的 checkpointLocation 选项,在出现问题时用作预写日志 (WAL)。

      引用official documentation

      最后,系统通过检查点和预写日志确保端到端的精确一次容错保证。

      然后它在Recovering from Failures with Checkpointing中说:

      如果出现故障或故意关闭,您可以恢复之前查询的进度和状态,并从中断处继续。这是使用检查点和预写日志完成的。您可以使用检查点位置配置查询,并且查询会将所有进度信息(即每个触发器中处理的偏移范围)和正在运行的聚合(例如快速示例中的字数)保存到检查点位置。此检查点位置必须是 HDFS 兼容文件系统中的路径,并且可以在启动查询时在 DataStreamWriter 中设置为选项。

      我认为这完全涵盖了您的用例。


      我找不到设置初始 batchId 的方法。

      这需要在流式查询的 checkpointLocation 选项中使用具有预期批次 ID 的预填充目录。

      您可以简单地自己创建必要的文件,然后让恢复的流式查询从目录开始。

      (我以前从未亲自尝试过,但看起来可行)。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2020-08-28
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-06-24
        • 1970-01-01
        相关资源
        最近更新 更多