【问题标题】:在 Java 中从 GCP 发布/订阅重试消息需要哪些设置
【发布时间】:2021-11-09 22:48:46
【问题描述】:

我是 GCP 发布/订阅的新手,并尝试重新发送未确认的消息 (ack/nack)。在 GCP 控制台仪表板的订阅中,我提到过:

在我的 java 代码中,我创建了一个订阅者

public Subscriber createSubscriber(String subscriptionId, MessageReceiver receiver) throws MessagingException {
        Subscriber subscriber = null;
        ProjectSubscriptionName subscriptionName = null;
        String projectId = getProjectId();
        if (Objects.isNull(projectId) || Objects.isNull(subscriptionId)) {
            throw new MessagingException(MessagingErrorCodes.MIX90810
                    + " Project Id/Subscription Id is null for subscriptionId = " + subscriptionId + " projectId= "
                    + projectId, MessagingErrorCodes.MIX90810);
        }
        try {
            subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId);
            subscriber = Subscriber.newBuilder(subscriptionName, receiver).setExecutorProvider(getExecutorProvider()).build();
        } catch (Exception e) {
            throw new MessagingException(MessagingErrorCodes.MAX34540
                            + " Error occurred while creating the subscriber for the subscriptionId = " + subscriptionId
                            + "projectId " + projectId + "subscriptionName= " + subscriptionName,
                    MessagingErrorCodes.MAX34540, e);
        }
    enter code here
        return subscriber;
    }

我第一次收到关于我的 receiveMessage(PubsubMessage 消息,AckReplyConsumer 消费者) 的消息,但如果我没有确认消息,则不会再次收到消息。但是如果发送 nack 它会再次发送消息。

@Service
public class MyMessageReceiver implements MessageReceiver {

    @Override
    public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
        System.out.println(message.getMessageId());
    }
}

我是否需要提及其他配置以启用重试以防不确认消息?

【问题讨论】:

  • 您的确认截止日期是 10 秒。
  • 超时后结束,你又收到消息了吗?
  • 我每隔 1 小时收到一次消息。正如在 Subscriber.java 中有 DEFAULT_MAX_ACK_EXTENSION_PERIOD= Duration.ofMinutes(60); 来进行自定义重试时间。我必须在 setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) 函数中设置持续时间。
  • @BoristheSpider 无论如何我可以看到延长的时间在日志或仪表板中工作正常

标签: java google-cloud-platform spring-cloud-gcp


【解决方案1】:

关于重试策略,documentation 表示 Pub/Sub 仅在确认截止日期到期或订阅者不接受消息时才会尝试重新传递消息。一旦确认截止日期过去,该消息将成为重新传递的候选者。重新交付可能不会立即进行,因为重新交付是在尽力而为的基础上执行的。

正如 cmets 中已经提到的,DEFAULT_MAX_ACK_EXTENSION_PERIODSubscriber.java 中设置为 60 分钟,这就是造成此延迟的原因。 Ack 截止日期将继续延长(由客户端库作为本机功能),直到达到此持续时间。意思是,未确认的消息被订阅者租用了 60 分钟,并且在此期间没有资格重新传递。 setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) 用于将自定义值设置为消息的确认截止日期将延长到的最大期限。

还请注意,这些值都不能保证邮件不会在该时间范围内重新传递。由于网络或服务器故障,消息可能会在maxAckExtensionPeriod 之前重新传递。

【讨论】:

  • 设置 setMaxAckExtensionPeriod(Duration.ofSeconds(20)) 后。我很早就收到了消息,但不是在 20 秒的时间间隔内。有时 1 分钟甚至更晚。
  • 嗨@RishabhJain,我根据您的评论更新了答案。
猜你喜欢
  • 2011-01-02
  • 2021-04-24
  • 2019-09-21
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-07-03
  • 2016-11-25
  • 1970-01-01
相关资源
最近更新 更多