【问题标题】:Apache Beam PubSubToBigQuery.java duplicate removal?Apache Beam PubSubToBigQuery.java 重复删除?
【发布时间】:2019-02-10 18:47:53
【问题描述】:

我正在使用 PubSubToBigQuery.java 代码,没有任何更改。有人可以告诉我如何在此过程中删除重复记录吗?

我知道诀窍是创建 Window 并使用 GroupBy 但真的不知道如何编写它。

谢谢

【问题讨论】:

标签: duplicates apache-beam google-cloud-pubsub


【解决方案1】:

假设您只想过滤成功解析事件的重复项。你需要在this line之后添加一些代码:

transformOut
    .get(TRANSFORM_OUT)
    .apply("keyed", WithKeys.of(/* choose your key from table row to identify duplicates */))
    .apply(GroupByKey.create())
    .apply("dedup", ParDo.of(new DoFn<KV<String, Iterable<TableRow>>, TableRow>() {
      public void ProcessElement(ProcessContext context) {
        // only output one element from list to dedup.
        context.output(context.element().getValue().iterator().next());
      }
    }
    ))
    .apply(Window.configure().triggering(/* choose your trigger */)
    .apply(
        "WriteSuccessfulRecords",
        BigQueryIO.writeTableRows()
            .withoutValidation()
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .to(options.getOutputTableSpec()));

BeamSQL 实际尝试支持您的用例(检查PubsubToBigqueryIT.java)。 BeamSQL 允许您在 pubsub 主题和 bigquery 表上创建表。从 pubsub 读取、转换 pubsub 消息和写入 BQ 表已经由 BeamSQL 处理。 SQL 可以应用于从 pubsub 读取的数据。但是,BeamSQL 可能会错过一些功能(例如,如果您想在 SQL 中使用 group by 进行重复数据删除,则使用 ANY_VALUE 聚合函数)来完成您的任务。

【讨论】:

    猜你喜欢
    • 2021-01-27
    • 1970-01-01
    • 2021-12-23
    • 1970-01-01
    • 1970-01-01
    • 2018-12-14
    • 2019-06-04
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多