【发布时间】:2018-11-06 15:57:01
【问题描述】:
我正在尝试将来自第 3 方 API 的数据提取到 Dataflow 管道中。由于第 3 方不提供 webhook,我编写了一个自定义脚本,不断轮询其端点以获取更多数据。
数据每 15 分钟刷新一次,但由于我不想错过任何数据点并且我想在新数据可用时立即使用,因此我的“爬虫”每 1 分钟运行一次。然后该脚本将数据发送到 PubSub 主题。很容易看出,PubSub 将为源中的每个数据点收到大约 15 条重复消息。
我第一次尝试识别和丢弃那些重复的消息是为每条 PubSub 消息 (eventid) 添加一个自定义属性,该属性是从源中的 [ID + updated_time] 的哈希创建的。
const attributes = {
eventid: Buffer.from(`${item.lastupdate}|${item.segmentid}`).toString('base64'),
timestamp: item.timestamp.toString()
};
const dataBuffer = Buffer.from(JSON.stringify(item))
publisher.publish(dataBuffer, attributes)
然后我用withIdAttribute() 配置了Dataflow(这是新的idLabel(),基于Record IDs)。
PCollection<String> input = p
.apply("ReadFromPubSub", PubsubIO
.readStrings()
.fromTopic(String.format("projects/%s/topics/%s", options.getProject(), options.getIncomingDataTopic()))
.withTimestampAttribute("timestamp")
.withIdAttribute("eventid"))
.apply("OutputToBigQuery", ...)
通过该实现,我期望当脚本第二次发送相同的数据点时,重复的eventid 将是相同的并且消息被丢弃。但由于某种原因,我仍然在输出数据集上看到重复项。
一些问题:
- 如果第三方 API 不提供 Webhook,是否有巧妙的方法将数据从该第三方 API 提取到数据流中?
- 关于为什么数据流在这种情况下不丢弃消息的任何想法?
- 我知道数据流重复数据删除的 10 分钟限制,但即使在第二次插入(2 分钟)时我也看到重复数据。
任何帮助将不胜感激!
【问题讨论】:
-
我不太确定 pubsub 提供的关于 eventid 的保证,但您必须能够使用 GroupByKey 操作或使用有状态操作进行重复数据删除
-
嗯,没想到 GroupByKey 是一种重复数据删除的方法。如果我预计在 15 分钟内收到这些双倍数据,这意味着我需要一个至少有这个时间量的窗口,以便一个小组可以消除所有重复数据,对吗?
标签: duplicates google-cloud-dataflow apache-beam google-cloud-pubsub