【发布时间】:2021-11-24 01:13:19
【问题描述】:
我正在运行分析管道。
- 吞吐量约为每秒 11 条消息。
- 我的 Pub/Sub 主题安排了大约 200 万条消息。
- 80 个 GCE 实例正在并行拉取消息。
这是我的主题和订阅:
gcloud pubsub topics create pipeline-input
gcloud beta pubsub subscriptions create pipeline-input-sub \
--topic pipeline-input \
--ack-deadline 600 \
--expiration-period never \
--dead-letter-topic dead-letter
这是我拉消息的方式:
import { PubSub, Message } from '@google-cloud/pubsub'
const pubSubClient = new PubSub()
const queue: Message[] = []
const populateQueue = async () => {
const subscription = pubSubClient.subscription('pipeline-input-sub', {
flowControl: {
maxMessages: 5
}
})
const messageHandler = async (message: Message) => {
queue.push(message)
}
subscription.on('message', messageHandler)
}
const processQueueMessage = () => {
const message = queue.shift()
try {
...
message.ack()
} catch {
...
message.nack()
}
processQueueMessage()
}
processQueueMessage()
处理时间约为 7 秒。
这是许多类似的重复案例之一。 相同的消息被传递 5 (!!!) 次到不同的 GCE 实例:
- 03:37:42.377
- 03:45:20.883
- 03:48:14.262
- 04:01:33.848
- 05:57:45.141
所有 5 次消息都被成功处理并.ack()ed。输出包含的消息比输入多 50%!我很清楚"at least once" behavior,但我认为它可能会重复 0.01% 的消息,而不是 50%。
主题输入 100% 没有重复。我通过云监控验证了主题输入法和未确认消息的数量。数字匹配:发布/订阅主题中没有重复项。
更新:
- 看起来所有这些重复都是由于确认截止日期到期而创建的。我 100% 确信我在 600 秒截止日期之前确认了 99.9% 的消息。
【问题讨论】:
标签: google-cloud-platform google-cloud-pubsub