【发布时间】:2021-06-22 15:54:22
【问题描述】:
我正在努力为 BQ 写入完成后的写入状态找到一个好的解决方案。
每个数据流必须处理一个文件,并且在没有错误发生后,应该将状态写入Firestore。
我的代码如下所示:
PCollection<TableRow> failedInserts = results.getFailedInserts();
failedInserts
.apply("Set Global Window",
Window.<TableRow>into(new GlobalWindows()))
.apply("Count failures", Count.globally()).apply(ParDo.of(new DoFn<Long, ReportStatusInfo>() {
@ProcessElement
public void processElement(final ProcessContext c) throws IOException {
Long errorNumbers = c.element();
if (errorNumbers > 1) {
//set status to failed
} else if (numberOfErrors == 0) {
//set status to ok
}
insert();
}
}))
它似乎无法正常工作,因为我的印象是它不会等待整个 BQ 写入过程完成。
关于如何解决我在数据流中的问题或上述方法不起作用的任何其他想法?
【问题讨论】:
-
您观察到什么具体行为?这通常看起来像是观察失败插入的正确方法。
-
数据流失败,我的状态是“成功”。似乎使用 Streaming Inserts 方法可以更正常地工作 - 在测试此更改时没有观察到错误的结果,我确实在写入 BigQuery 仍在进行时强制数据流停止,结果符合预期。
标签: google-cloud-platform google-bigquery apache-beam dataflow