【问题标题】:Retry for specific exception in Reactor重试 Reactor 中的特定异常
【发布时间】:2021-10-22 18:34:04
【问题描述】:

我有微服务应用程序。为了协作,每个服务都使用异步消息传递。我知道,spring data jpa 默认使用乐观锁。但是如果这种锁定方法不是由用户调用,而是由另一个服务调用(在我的示例中,有验证服务,可以将对象的状态更新为有效或无效),我想处理异常并重试更新对象。我还必须在这个微服务上使用 webflux 堆栈。现在我有这样的代码:

public void updateStatus(String id, EventStatus status) {
    eventRepository.findById(id)
            .doOnNext(eventDocument -> {
                eventDocument.setStatus(status);
                eventRepository.save(eventDocument).subscribe();
            }).doOnError(OptimisticLockingFailureException.class, exception -> { //Retry in 2 sec if optimistic lock occurs on update
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
                updateStatus(id, status);
            })
            .subscribe();
}

我不喜欢这里的递归。有没有更好的解决方案?

【问题讨论】:

  • 您可以使用retry 运算符。但是,您的代码似乎存在几个问题。你使用 webflux 吗?为什么要手动订阅?
  • @lkatiforis 此服务方法由 Kafka 消息的回调调用。不是由控制器。这就是我手动使用订阅的原因

标签: java spring spring-data-jpa spring-webflux project-reactor


【解决方案1】:

如果抛出OptimisticLockingFailureException,则每2秒重试一次。

private static final int MAX_ATTEMPTS = Integer.MAX_VALUE;

public void updateStatus(String id, EventStatus status) {
    eventRepository.findById(id)
            .map(eventDocument -> {
                eventDocument.setStatus(status);
                return eventDocument;
            })
            .flatMap(event -> Mono.defer(() -> eventRepository.save(eventDocument)
                    .retryWhen(Retry.backoff(MAX_ATTEMPTS, Duration.ofSeconds(2))
                            .filter(t -> t instanceof OptimisticLockingFailureException))))
            .subscribe();
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-10-29
    • 1970-01-01
    • 2023-02-15
    • 2015-07-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多