【发布时间】:2021-04-20 13:56:03
【问题描述】:
Google PubSub 服务器在确认截止时间(10 秒)后不重新发送消息
在这里我在 50 秒后确认,但没有收到任何重新发送的消息。但重启订阅者后会得到。
Subscriber subscriber = null;
ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectIdArg,
subscriptionNameArg);
try {
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
int i=0;
while(i<=50){
try {
Thread.sleep(1000);
System.out.println(i+" : message id : "+messageId);
i++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
consumer.ack();
};
ExecutorProvider executorProvider = InstantiatingExecutorProvider
.newBuilder()
.setExecutorThreadCount(5)
.build();
subscriber = Subscriber
.newBuilder(subscriptionName, receiver)
.setParallelPullCount(1)
.setExecutorProvider(executorProvider)
.build();
subscriber.startAsync().awaitRunning();
subscriber.awaitTerminated();
} catch (Exception e) {
log.error("Error in Subscribing Queue. " + e);
if (subscriber != null) {
subscriber.stopAsync().awaitTerminated();
}
}
【问题讨论】:
-
您能否说明您如何验证 pub/sub 没有重新发送消息?根据您的 sn-p,您在确认消息之前添加了 50 秒的延迟,pub/sub 的确认截止日期为 10 秒,但是,它似乎可以延长 while the message is being processed, to then issue the ack or nack of such message when the processing is done。
-
你用一条消息测试过你的代码吗?这将为您提供一个清晰的场景,即如果 pub/sub 在时间循环完成后重新发送消息,那么考虑到您添加的延迟,它可能会生成 backlog in your subscription。如果你想强制 pub/sub 重新发送消息,也许最好发送一个 nack 而不是一个 ack 事件。
-
另外,您能否详细说明您的案例使用情况?目前尚不清楚您是否正在寻找extend the ack time(修改AckDeadline 方法),或者您是否希望在一段时间后使用这些消息(50 秒) 他们发布了,在第二种情况下,我建议你看看this approach。
-
@NoeRomero 正如您所说,由于该过程尚未完成,因此订阅者将 ack 时间段从 10 秒延长到该过程所花费的时间。我使用 maxAckExtensionPeriod 配置的最长时间,默认值为 1 小时,但如果任何其他进程占用的时间超过此时间,它会在一次重试后延长。
标签: java google-cloud-platform duplicates publish-subscribe