【问题标题】:Duplicates on Apache Beam / Dataflow inputs even when using withIdAttribute即使使用 withIdAttribute 也会在 Apache Beam / Dataflow 输入上重复
【发布时间】:2018-11-06 15:57:01
【问题描述】:

我正在尝试将来自第 3 方 API 的数据提取到 Dataflow 管道中。由于第 3 方不提供 webhook,我编写了一个自定义脚本,不断轮询其端点以获取更多数据。

数据每 15 分钟刷新一次,但由于我不想错过任何数据点并且我想在新数据可用时立即使用,因此我的“爬虫”每 1 分钟运行一次。然后该脚本将数据发送到 PubSub 主题。很容易看出,PubSub 将为源中的每个数据点收到大约 15 条重复消息。

我第一次尝试识别和丢弃那些重复的消息是为每条 PubSub 消息 (eventid) 添加一个自定义属性,该属性是从源中的 [ID + updated_time] 的哈希创建的。

const attributes = {
         eventid: Buffer.from(`${item.lastupdate}|${item.segmentid}`).toString('base64'),
         timestamp: item.timestamp.toString()
      };
const dataBuffer = Buffer.from(JSON.stringify(item))
publisher.publish(dataBuffer, attributes)

然后我用withIdAttribute() 配置了Dataflow(这是新的idLabel(),基于Record IDs)。

PCollection<String> input = p
    .apply("ReadFromPubSub", PubsubIO
       .readStrings()
       .fromTopic(String.format("projects/%s/topics/%s", options.getProject(), options.getIncomingDataTopic()))
       .withTimestampAttribute("timestamp")
       .withIdAttribute("eventid"))
   .apply("OutputToBigQuery", ...)

通过该实现,我期望当脚本第二次发送相同的数据点时,重复的eventid 将是相同的并且消息被丢弃。但由于某种原因,我仍然在输出数据集上看到重复项。

一些问题:

  1. 如果第三方 API 不提供 Webhook,是否有巧妙的方法将数据从该第三方 API 提取到数据流中?
  2. 关于为什么数据流在这种情况下不丢弃消息的任何想法?
    • 我知道数据流重复数据删除的 10 分钟限制,但即使在第二次插入(2 分钟)时我也看到重复数据。

任何帮助将不胜感激!

【问题讨论】:

  • 我不太确定 pubsub 提供的关于 eventid 的保证,但您必须能够使用 GroupByKey 操作或使用有状态操作进行重复数据删除
  • 嗯,没想到 GroupByKey 是一种重复数据删除的方法。如果我预计在 15 分钟内收到这些双倍数据,这意味着我需要一个至少有这个时间量的窗口,以便一个小组可以消除所有重复数据,对吗?

标签: duplicates google-cloud-dataflow apache-beam google-cloud-pubsub


【解决方案1】:

我认为你在正确的轨道上,而不是我建议使用时间戳的哈希。更好的方法是使用 Windows。查看此document,它过滤了窗口外的数据。

关于额外的重复数据,如果您使用拉取订阅并且在处理数据之前达到确认截止日期,则将按照at-least-once delivery 重新发送消息。在这种情况下更改确认截止时间,默认为 10 秒。

【讨论】:

    猜你喜欢
    • 2018-05-30
    • 2020-03-31
    • 2018-05-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-12-14
    • 2021-05-10
    • 1970-01-01
    相关资源
    最近更新 更多