【发布时间】:2018-05-24 10:08:44
【问题描述】:
我目前在处理 Spring Cloud Streams (Elmhurst.RELEASE) 中的消息处理期间引发的异常时遇到问题。当我的应用在主处理方法中抛出异常时:
@SpringBootApplication
@EnableBinding(Processor.class)
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public String process(String message) {
externalService();
return message.toUpperCase();
}
private void externalService() {
throw new RuntimeException("An external call failed");
}
}
我似乎完全无法在任何错误通道上捕获此异常。通过阅读文档,我希望我能够在全局“errorChannel”或特定的“input.myGroup.errors”通道上接收到 ErrorMessage。
我尝试使用 Spring Integrations 监听器:
@ServiceActivator(inputChannel="errorChannel")
public ErrorMessage onError(ErrorMessage message) {
return message;
}
以及 Spring Cloud Streams 侦听器:
@StreamListener("errorChannel")
public ErrorMessage onError(ErrorMessage message) {
return message;
}
没有成功。我还玩过各种配置设置组合,例如“errorChannelEnabled:true”、“max-retries:1”等,对我能够捕获处理器抛出的错误的影响为零(使用“return”上的调试点消息”来检查)。按照 SCS 文档中的建议配置“bindings.error.destination: myErrorTopic”时,我什至没有收到主题中的任何错误消息。
似乎唯一有效的是在活页夹配置级别上的“enableDlq: true”。然而,这并不能满足能够确定原始异常的重要需求,因为发布到 DLQ 的内容只有一个带有完整堆栈跟踪的标头。我不想解析堆栈跟踪来找出原始异常的类型或消息。
我应该在这里采取更好的方法吗?还是我犯了一些愚蠢的错误?我的总体目标是将具有实际异常类型的异常消息和异常消息发送到 DLQ。
当然,我可以在每个处理器/源/接收器方法的整体周围放置 try/catch 语句,然后手动路由到不同的绑定通道,但这在我看来大大削弱了 SCS 框架的价值主张。
我发现 this example 的自定义 DLQ 处理是 this 早期 StackOverflow 问题的一部分,这似乎可以满足我的需求。然而,这似乎根本不适用于 SCS Elmhurst/2.0 和 Spring Kafka binder,即使在迁移配置以适用于更新的 SCS 之后也是如此。
编辑我添加了一个 Github repository 来重现我的错误,因为复制与 Gary 的答案相同的核心代码似乎不起作用。我开始怀疑这是否是 POM 依赖、配置或 Kafka binder 问题。此 repo 使用与 Gary 的答案相同的核心代码,因为我认为查看问题和调试会更简单一些。
在 GARY 的回答后编辑 我接受了 Gary 的回答,因为它解决了我最初的问题(当配置中有活页夹环境时解决当前的框架问题)。然而,DLQ 消息最终对我的情况毫无帮助。根据 Gary 的回答,我最终订阅了“errorChannel”,并从那里创建了一条自定义错误消息,并将其发送到正常绑定的 SCS 通道。
【问题讨论】:
标签: spring-integration spring-cloud-stream spring-kafka