【问题标题】:How to setup a starting point for the batchId of foreachBatch?如何为foreachBatch的batchId设置起点?
【发布时间】:2020-03-16 06:04:32
【问题描述】:
我面临的问题是我的流程依赖于 foreachBatch 的 batchId 作为对管道第二阶段准备好的某种控制。所以只有第一阶段(批次)完成后才会进入第二阶段。
我想保证万一出现问题,流可以从它停止的地方继续。
我们尝试通过将所有已完成的批次添加到增量表来进行一些控制,但是,我找不到设置初始批次 ID 的方法。
【问题讨论】:
标签:
apache-spark
pyspark
spark-structured-streaming
azure-databricks
【解决方案1】:
尝试根据您提供的信息进行分析。可能会使用某种自定义检查点。对于每个批次,使用批次 ID 和状态列存储偏移范围。继续将状态更新为 RUNNING/COMPLETED。
如果出现问题,请检查最后一批状态,如果未完成,则从该偏移量开始,否则从增量偏移量开始。
【解决方案2】:
我想保证万一出现问题,流可以从它停止的地方继续。
这是 foreachBatch 接收器的 checkpointLocation 选项,在出现问题时用作预写日志 (WAL)。
引用official documentation:
最后,系统通过检查点和预写日志确保端到端的精确一次容错保证。
然后它在Recovering from Failures with Checkpointing中说:
如果出现故障或故意关闭,您可以恢复之前查询的进度和状态,并从中断处继续。这是使用检查点和预写日志完成的。您可以使用检查点位置配置查询,并且查询会将所有进度信息(即每个触发器中处理的偏移范围)和正在运行的聚合(例如快速示例中的字数)保存到检查点位置。此检查点位置必须是 HDFS 兼容文件系统中的路径,并且可以在启动查询时在 DataStreamWriter 中设置为选项。
我认为这完全涵盖了您的用例。
我找不到设置初始 batchId 的方法。
这需要在流式查询的 checkpointLocation 选项中使用具有预期批次 ID 的预填充目录。
您可以简单地自己创建必要的文件,然后让恢复的流式查询从目录开始。
(我以前从未亲自尝试过,但看起来可行)。