【问题标题】:Java: Google PubSub server not resending messages after acknowledgment deadline, which is 10 secondsJava:Google PubSub 服务器在确认截止日期(10 秒)后未重新发送消息
【发布时间】:2021-04-20 13:56:03
【问题描述】:

Google PubSub 服务器在确认截止时间(10 秒)后不重新发送消息

在这里我在 50 秒后确认,但没有收到任何重新发送的消息。但重启订阅者后会得到。

Subscriber subscriber = null;
    ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectIdArg,
                    subscriptionNameArg);
    try {
    MessageReceiver receiver =
                        (PubsubMessage message, AckReplyConsumer consumer) -> {
        
    int i=0;
            while(i<=50){
    
                try {
                    Thread.sleep(1000);
                    System.out.println(i+" : message id : "+messageId);
                    i++;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            }
    consumer.ack();
    
     };
     ExecutorProvider executorProvider = InstantiatingExecutorProvider
                .newBuilder()
                .setExecutorThreadCount(5)
                .build();
    
     subscriber = Subscriber
                .newBuilder(subscriptionName, receiver)
                .setParallelPullCount(1)
                .setExecutorProvider(executorProvider)
                .build();
    
     subscriber.startAsync().awaitRunning();
                subscriber.awaitTerminated();

 } catch (Exception e) {
            log.error("Error in Subscribing Queue. " + e);
            if (subscriber != null) {
                subscriber.stopAsync().awaitTerminated();
            }
        }

【问题讨论】:

  • 您能否说明您如何验证 pub/sub 没有重新发送消息?根据您的 sn-p,您在确认消息之前添加了 50 秒的延迟,pub/sub 的确认截止日期为 10 秒,但是,它似乎可以延长 while the message is being processed, to then issue the ack or nack of such message when the processing is done
  • 你用一条消息测试过你的代码吗?这将为您提供一个清晰的场景,即如果 pub/sub 在时间循环完成后重新发送消息,那么考虑到您添加的延迟,它可能会生成 backlog in your subscription。如果你想强制 pub/sub 重新发送消息,也许最好发送一个 nack 而不是一个 ack 事件。
  • 另外,您能否详细说明您的案例使用情况?目前尚不清楚您是否正在寻找extend the ack time(修改AckDeadline 方法),或者您是否希望在一段时间后使用这些消息(50 秒) 他们发布了,在第二种情况下,我建议你看看this approach
  • @NoeRomero 正如您所说,由于该过程尚未完成,因此订阅者将 ack 时间段从 10 秒延长到该过程所花费的时间。我使用 maxAckExtensionPeriod 配置的最长时间,默认值为 1 小时,但如果任何其他进程占用的时间超过此时间,它会在一次重试后延长。

标签: java google-cloud-platform duplicates publish-subscribe


【解决方案1】:

我发现了背后发生的事情。即使确认截止时间是 10 秒,并且处理时间超过此时间,也会扩展到订阅者 maxAckExtensionPeriod 配置,其默认值为一小时。一小时后发布-订阅服务器将重试,此重试也会发生一次,因为如果进程耗时超过一小时,该值也会延伸到之前的最大处理时间。

【讨论】:

    猜你喜欢
    • 2021-09-14
    • 2019-06-23
    • 2020-10-18
    • 1970-01-01
    • 2020-08-08
    • 2019-12-25
    • 1970-01-01
    • 2018-01-01
    • 2019-05-01
    相关资源
    最近更新 更多