【发布时间】:2021-01-27 20:37:32
【问题描述】:
我们使用 Apache Beam 构建了一个 Dataflow,并部署在 GCP Dataflow 基础架构中。数据流实例第一次运行完美,并按预期创建分区表,但是当它第二次运行时,它会从数据集中清除结果,而不是用该特定分区中的新数据集替换。使用本地设置中的 Direct runner 运行时,作业可以完美运行。
代码示例:
pipeline.apply(
"Read from BigQuery (table_name) Table: ",
BigQueryIO.readTableRows()
.fromQuery(
String.format(
"SELECT %s FROM `%s.%s.%s`",
FIELDS.stream().collect(Collectors.joining(",")), project, dataset, table))
.usingStandardSql()
.withoutValidation()));
PCollection<VideoPlacement.Placement> rows =
tableRow.apply(
"TableRows to BigQueryVideoPlacement.Placement",
MapElements.into(TypeDescriptor.of(Model.class))
.via(Model::fromTableRow));
如果知道我在这里缺少什么,请告诉我。提前致谢!
【问题讨论】:
标签: google-cloud-dataflow apache-beam apache-beam-io