【问题标题】:How to Handle/Intercept Exceptions Thrown in Spring Cloud Streams (Elmhurst/2.0)如何处理/拦截 Spring Cloud Streams 中抛出的异常 (Elmhurst/2.0)
【发布时间】:2018-05-24 10:08:44
【问题描述】:

我目前在处理 Spring Cloud Streams (Elmhurst.RELEASE) 中的消息处理期间引发的异常时遇到问题。当我的应用在主处理方法中抛出异常时:

@SpringBootApplication
@EnableBinding(Processor.class)
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public String process(String message) {
        externalService();
        return message.toUpperCase();
    }

    private void externalService() {
        throw new RuntimeException("An external call failed");
    }
}

我似乎完全无法在任何错误通道上捕获此异常。通过阅读文档,我希望我能够在全局“errorChannel”或特定的“input.myGroup.errors”通道上接收到 ErrorMessage。

我尝试使用 Spring Integrations 监听器:

@ServiceActivator(inputChannel="errorChannel")
public ErrorMessage onError(ErrorMessage message) {
    return message;
}

以及 Spring Cloud Streams 侦听器:

@StreamListener("errorChannel")
public ErrorMessage onError(ErrorMessage message) {
    return message;
}

没有成功。我还玩过各种配置设置组合,例如“errorChannelEnabled:true”、“max-retries:1”等,对我能够捕获处理器抛出的错误的影响为零(使用“return”上的调试点消息”来检查)。按照 SCS 文档中的建议配置“bindings.error.destination: myErrorTopic”时,我什至没有收到主题中的任何错误消息。

似乎唯一有效的是在活页夹配置级别上的“enableDlq: true”。然而,这并不能满足能够确定原始异常的重要需求,因为发布到 DLQ 的内容只有一个带有完整堆栈跟踪的标头。我不想解析堆栈跟踪来找出原始异常的类型或消息。

我应该在这里采取更好的方法吗?还是我犯了一些愚蠢的错误?我的总体目标是将具有实际异常类型的异常消息和异常消息发送到 DLQ。

当然,我可以在每个处理器/源/接收器方法的整体周围放置 try/catch 语句,然后手动路由到不同的绑定通道,但这在我看来大大削弱了 SCS 框架的价值主张。

我发现 this example 的自定义 DLQ 处理是 this 早期 StackOverflow 问题的一部分,这似乎可以满足我的需求。然而,这似乎根本不适用于 SCS Elmhurst/2.0 和 Spring Kafka binder,即使在迁移配置以适用于更新的 SCS 之后也是如此。

编辑我添加了一个 Github repository 来重现我的错误,因为复制与 Gary 的答案相同的核心代码似乎不起作用。我开始怀疑这是否是 POM 依赖、配置或 Kafka binder 问题。此 repo 使用与 Gary 的答案相同的核心代码,因为我认为查看问题和调试会更简单一些。

在 GARY 的回答后编辑 我接受了 Gary 的回答,因为它解决了我最初的问题(当配置中有活页夹环境时解决当前的框架问题)。然而,DLQ 消息最终对我的情况毫无帮助。根据 Gary 的回答,我最终订阅了“errorChannel”,并从那里创建了一条自定义错误消息,并将其发送到正常绑定的 SCS 通道。

【问题讨论】:

    标签: spring-integration spring-cloud-stream spring-kafka


    【解决方案1】:

    为什么你有return message;?你希望它去哪里?

    这对我来说很好......

    @SpringBootApplication
    @EnableBinding(Sink.class)
    public class So50506586Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So50506586Application.class, args);
        }
    
        @StreamListener(Sink.INPUT)
        public void listen(byte[] in) {
            throw new RuntimeException("fail");
        }
    
        @ServiceActivator(inputChannel = "errorChannel")
        public void errors(ErrorMessage em) {
            System.out.println(em);
        }
    
    }
    

    ErrorMessage [payload=org.springframework.messaging.MessagingException: 调用 com.example.So50506586Application#listen[1 args] 时抛出异常;嵌套异常是 java.lang.RuntimeException: fail, failedMessage=GenericMessage [payload=byte[3], headers={kafka_offset=0, ...

    ...

    原因:java.lang.RuntimeException: 失败

    ...

    编辑

    这是使用多绑定器支持时的错误(通过environment 的东西)。全局错误通道不可见。

    这行得通...

    spring:
      cloud:
          stream:
            bindings:
              input:
                destination: input
                group: myGroup7
                binder: kafka
    #          error:
    #            destination: errors
            kafka.bindings.input.consumer.autoCommitOnError: true
            kafka:
              binder:
                brokers: localhost:9092
    

    (您不再需要使用 Elmhurst 的 zkNodes)。

    另外,您不应该使用旧的 error.destination 东西,因为发送到错误通道的消息现在是一个包含大量信息的丰富 ErrorMessage 对象,请使用 dlq 东西自动发布到主题。

    我们需要删除那些遗留的错误目标属性。

    这也不行……

    @ServiceActivator(inputChannel = "input.myGroup.errors")
    public void errors(ErrorMessage em) {
        System.out.println(em);
    }
    

    ...因为服务激活器与绑定位于不同的应用程序上下文中。

    目前,唯一的解决方法是不使用environment 配置样式。如果您需要多个活页夹支持,我没有适合您的解决方案。

    顺便说一句,使用 Elmhurst(和 kafka 0.11 或更高版本),堆栈跟踪不再嵌入到发送到 DLQ 的消息的负载中,而是作为 kafka 标头发送。

    【讨论】:

    • 我不打算在那里返回任何东西,只是放在一起足以在该行设置断点。我将整理一个示例应用程序并发布,因为我似乎无法让相同的代码工作。
    • 我编辑了我的原始问题以包含一个 github 存储库来重现该问题。我看到实际 MessagingError 异常的堆栈跟踪被抛出到控制台,而不是记录的 ErrorMessage。
    • 查看我的答案的编辑。 Issue opened.
    • 感谢框架问题的确认,开始认为我要疯了,没有在 errorChannel 上收到任何消息。我看到了 DLQ 格式,不幸的是,它在决定如何处理错误消息时并不是很有用,因为我没有可以为异常类型或异常消息使用的简单字符串。我会用我的一些发现来更新我的问题。
    猜你喜欢
    • 1970-01-01
    • 2012-07-05
    • 1970-01-01
    • 2018-07-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-08-04
    相关资源
    最近更新 更多