【发布时间】:2020-05-23 07:57:50
【问题描述】:
我有一个特定的用例,我试图在 Spanner 中写入 TB 的数据。我们正在从 DynamoDb 中提取该数据,并将该数据以 bzip2 格式导出到 Google Cloud Storage 中。所以基本上我们在 Spanner 中有主 ID,我们必须忽略 Spanner 中已经存在的行。所以我写了下面的代码来实现同样的效果。
Mutation.WriteBuilder mutation = Mutation.newInsertBuilder(spannerTable.get());
我已经编写了插入生成器,因为我不想更新 Spanner 中的现有行。使用以下代码通过设置 FailureMode 将行写入 Spanner。
results2.apply("Write Mutations to Spanner",SpannerIO.write()
.withInstanceId(spannerInstanceId)
.withDatabaseId(spannerDatabaseId)
//.withBatchSizeBytes(2000000)
//.withMaxNumMutations(maxNumMutations)
.withFailureMode(FailureMode.REPORT_FAILURES)
);
但代码的问题是,由于“已经存在”突变,Dataflow 代码正在重试整个批次。我不能使用 FailureMode.FAST_FAIL,因为它会停止整个管道。我还尝试设置最小 MaxNumMutation 以减小 batch_size(基本上是为了降低 Mutation 批处理中“已经存在”记录的概率),但整体性能受到了阻碍。那么有什么办法可以停止“已经存在”突变记录的重试机制?
【问题讨论】:
标签: google-cloud-platform amazon-dynamodb google-cloud-dataflow apache-beam google-cloud-spanner