【发布时间】:2019-02-10 18:47:53
【问题描述】:
我正在使用 PubSubToBigQuery.java 代码,没有任何更改。有人可以告诉我如何在此过程中删除重复记录吗?
我知道诀窍是创建 Window 并使用 GroupBy 但真的不知道如何编写它。
谢谢
【问题讨论】:
-
是的。就是代码
标签: duplicates apache-beam google-cloud-pubsub
我正在使用 PubSubToBigQuery.java 代码,没有任何更改。有人可以告诉我如何在此过程中删除重复记录吗?
我知道诀窍是创建 Window 并使用 GroupBy 但真的不知道如何编写它。
谢谢
【问题讨论】:
标签: duplicates apache-beam google-cloud-pubsub
假设您只想过滤成功解析事件的重复项。你需要在this line之后添加一些代码:
transformOut
.get(TRANSFORM_OUT)
.apply("keyed", WithKeys.of(/* choose your key from table row to identify duplicates */))
.apply(GroupByKey.create())
.apply("dedup", ParDo.of(new DoFn<KV<String, Iterable<TableRow>>, TableRow>() {
public void ProcessElement(ProcessContext context) {
// only output one element from list to dedup.
context.output(context.element().getValue().iterator().next());
}
}
))
.apply(Window.configure().triggering(/* choose your trigger */)
.apply(
"WriteSuccessfulRecords",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.to(options.getOutputTableSpec()));
BeamSQL 实际尝试支持您的用例(检查PubsubToBigqueryIT.java)。 BeamSQL 允许您在 pubsub 主题和 bigquery 表上创建表。从 pubsub 读取、转换 pubsub 消息和写入 BQ 表已经由 BeamSQL 处理。 SQL 可以应用于从 pubsub 读取的数据。但是,BeamSQL 可能会错过一些功能(例如,如果您想在 SQL 中使用 group by 进行重复数据删除,则使用 ANY_VALUE 聚合函数)来完成您的任务。
【讨论】: