【问题标题】:Spring Cloud Stream GCP How to do Re-queue Failed MessagesSpring Cloud Stream GCP如何重新排队失败的消息
【发布时间】:2019-07-25 12:40:50
【问题描述】:

根据文档here,当前支持的绑定器(Rabbit 和 Kafka)依赖于 RetryTemplate。对于 GCP

我的项目详情

Spring Boot 版本 2.1.3.RELEASE

依赖pom.xml

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    <version>1.1.0.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
    <version>1.1.0.RELEASE</version>
</dependency>

application.properties

spring.cloud.stream.bindings.input.destination=inputtopic
spring.cloud.stream.bindings.output.destination=outputtopic

spring.cloud.gcp.project-id=testinggcp
spring.cloud.gcp.credentials.location=file:C:/Users/my_gcp_credentials.json

休息控制器

@EnableBinding({Source.class,Sink.class})
@RestController
public class SourceExample {

    @Autowired
    private Source source;


    @GetMapping("/newMessage")
    public UserMessage sendMessage(@RequestParam("messageBody") String messageBody,
                                   @RequestParam("username") String username) {
        UserMessage userMessage = new UserMessage(messageBody, username, LocalDateTime.now());

        this.source.output().send(new GenericMessage<>(userMessage));
        return userMessage;
    }


    @StreamListener(target = Sink.INPUT)
    public void handle(UserMessage userMessage) throws IOException {
        System.out.println(userMessage);
    }


}

【问题讨论】:

    标签: spring-boot spring-cloud spring-cloud-stream spring-messaging


    【解决方案1】:

    不,Spring Cloud GCP Pub/Sub Binder 不提供任何重试挂钩。

    @ServiceActivator 上使用RequestHandlerRetryAdvice 代替@StreamListener 很容易。因此,您在 POJO 方法中的所有失败都将根据您的配置进行重试。 RequestHandlerRetryAdvice 有一个 RecoveryCallback 选项,这可能只是简单的 ErrorMessageSendingRecoverer,您可以在其中配置一些错误处理并将错误消息发送到 GCP Pub/Sub 上的某些死信主题。

    在参考手册中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/#retry-advice

    还有一点关于Advising Endpoints Using Annotations.

    【讨论】:

    • 我真的很困惑 ServiceActivator 不起作用。如果抛出异常,AFAIK GCP 具有 pubSubAcknowledgementExecutor,它会自动调用句柄函数(Sink.INPUT)。意思是说,Re-queue Failed Messages 的实现就没有必要了?
    • 好吧,在那种情况下,我是这么认为的。如果消息已发送到侦听器,则 ack 是自动的。
    • 以及当生产者在this.source.output()这一行抛出异常时如何重试消息
    猜你喜欢
    • 2016-12-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-12-07
    • 2020-09-28
    • 2021-06-12
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多