【问题标题】:Spring Cloud Stream @ServiceActivator not messaging to errorChannel on exceptionSpring Cloud Stream @ServiceActivator 不会在异常时向 errorChannel 发送消息
【发布时间】:2016-09-09 22:47:55
【问题描述】:

我正在使用 spring-cloud-starter-stream-kafka 使用 spring 云流。我已在application.properties 中将我的频道绑定到 kafka 主题,如下所示:

spring.cloud.stream.bindings.gatewayOutput.destination=received
spring.cloud.stream.bindings.enrichingInput.destination=received
spring.cloud.stream.bindings.enrichingOutput.destination=enriched
spring.cloud.stream.bindings.redeemingInput.destination=enriched
spring.cloud.stream.bindings.redeemingOutput.destination=redeemed
spring.cloud.stream.bindings.fulfillingInput.destination=redeemed
spring.cloud.stream.bindings.error.destination=errors12
spring.cloud.stream.bindings.errorInput.destination=errors12
spring.cloud.stream.bindings.errorOutput.destination=errors12

我无法让我的程序向错误通道生成异常消息。令人惊讶的是,它似乎甚至没有尝试生成它,即使我在不​​同的线程中(我有一个@MessagingGateway 将消息转储到gatewayOutput,然后其余的流程异步发生)。这是我的ServiceActivator的定义:

@Named
@Configuration
@EnableBinding(Channels.class)
@EnableIntegration
public class FulfillingServiceImpl extends AbstractBaseService implements
        FulfillingService {

    @Override
    @Audit(value = "annotatedEvent")
    @ServiceActivator(inputChannel = Channels.FULFILLING_INPUT, requiresReply = "false")
    public void fulfill(TrivialRedemption redemption) throws Exception {

        logger.error("FULFILLED!!!!!!");

        throw new Exception("test exception");

    }
}

这是生成的日志(我已经截断了完整的异常)。没有……

  • 关于 errorChannel 没有任何订阅者的投诉
  • Kafka 生产者线程日志记录
2016-05-13 12:13:14 pool-6-thread-1 DEBUG KafkaMessageChannelBinder$ReceivingHandler:115 - org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ReceivingHandler@2b461688 收到消息:GenericMessage [payload=byte[ 400],标题={kafka_offset=17,kafka_messageKey=null,kafka_topic=redeemed,kafka_partitionId=0,kafka_nextOffset=18}] - {} 2016-05-13 12:13:14 pool-6-thread-1 DEBUG DirectChannel:430 - preSend on channel 'fulfillingInput', message: GenericMessage [payload=com.test.system.poc.model.v3.TrivialRedemption@2581ed90 [endpoints=[com.test.system.poc.model.v3.Breadcrumb@21be7df8],orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f,systemCategory=DEMO,systemSubCategory=,properties=,monetaryRedemptionAmount=456.78], headers= {kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-java-object;type=com.test.system.poc.model.v3.TrivialRedemption}] - { } 2016-05-13 12:13:14 pool-6-thread-1 DEBUG ServiceActivatingHandler:115 - ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@64bce7ab] (fulfillingServiceImpl.fulfill.serviceActivator.handler) 收到消息:GenericMessage [payload=com.test.system.poc.model.v3.TrivialRedemption@2581ed90[endpoints=[com.test.system.poc.model.v3.Breadcrumb@21be7df8],orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f ,systemCategory=DEMO,systemSubCategory=,properties=,monetaryRedemptionAmount=456.78], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-java-object;type =com.test.system.poc.model.v3.TrivialRedemption}] - {} 2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - 返回单例 bean 'integrationEvaluationContext' 的缓存实例 - {} 2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - 返回单例 bean 'integrationConversionService' 的缓存实例 - {} 2016-05-13 12:13:14 pool-6-thread-1 ERROR FulfillingServiceImpl$$EnhancerBySpringCGLIB$$$9dad62:42 - 已完成!!!!!! - {} 2016-05-13 12:13:14 pool-6-thread-1 ERROR LoggingErrorHandler:35 - 处理时出错:KafkaMessage [Message(magic = 0, attributes = 0, crc = 3373691507, key = null, payload = java. nio.HeapByteBuffer[pos=0 lim=400 cap=400]), KafkaMessageMetadata [offset=17, nextOffset=18, Partition[topic='redeemed', id=0]] - {} ... ... 2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 读取 40 个字节。 - {} 2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - 从分区读取 [topic='enriched', id=0]@18 - {} 2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 写入了 60 个字节。 - {} 2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 读取 40 个字节。 - {} 2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - 从分区读取 [topic='redeemed', id=0]@18 - {} 2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 写入了 60 个字节。 - {} 2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 读取 40 个字节。 - {} 2016-05-13 12:13:15 kafka-fetch-1 TRACE DefaultConnection:126 - 从分区读取 [topic='errors12', id=0]@0 - {} 2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 写入了 60 个字节。 - {} 2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 读取 40 个字节。 - {} 2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 写入了 60 个字节。 - {}

编辑:这是我的频道类的内容:

public interface Channels {

    public static final String GATEWAY_OUTPUT = "gatewayOutput";

    public static final String ENRICHING_INPUT = "enrichingInput";
    public static final String ENRICHING_OUTPUT = "enrichingOutput";

    public static final String REDEEMING_INPUT = "redeemingInput";
    public static final String REDEEMING_OUTPUT = "redeemingOutput";

    public static final String FULFILLING_INPUT = "fulfillingInput";
    public static final String FULFILLING_OUTPUT = "fulfillingOutput";

    @Output(GATEWAY_OUTPUT)
    MessageChannel gatewayOutput();

    @Input(ENRICHING_INPUT)
    MessageChannel enrichingInput();

    @Output(ENRICHING_OUTPUT)
    MessageChannel enrichingOutput();

    @Input(REDEEMING_INPUT)
    MessageChannel redeemingInput();

    @Output(REDEEMING_OUTPUT)
    MessageChannel redeemingOutput();

    @Input(FULFILLING_INPUT)
    MessageChannel fulfillingInput();

    @Output(FULFILLING_OUTPUT)
    MessageChannel fulfillingOutput();

【问题讨论】:

    标签: java spring-cloud-stream


    【解决方案1】:

    您没有显示您的 Channels 课程,但活页夹不知道您的“错误”通道是“特殊的”。

    绑定器可以配置重试并将异常路由到死信主题;请参阅 1.0.0.RELEASE 中的 this PR

    或者,您可以在服务激活器之前添加一个“中间流”网关 - 将其想象为 Java 中的“try/catch”块:

    @MessageEndpoint
    public static class GatewayInvoker {
    
        @Autowired
        private ErrorHandlingGateway gw;
    
        @ServiceActivator(inputChannel = Channels.FULFILLING_INPUT)
        public void send(Message<?> message) {
            this.gw.send(message);
        }
    
    }
    
    @Bean
    public GatewayInvoker gate() {
        return new GatewayInvoker();
    }
    
    @MessagingGateway(defaultRequestChannel = "toService", errorChannel = Channels.ERRORS)
    public interface ErrorHandlingGateway {
    
        void send(Message<?> message);
    
    }
    

    将您的服务激活器的输入通道更改为toService

    您必须将@IntegrationComponentScan 添加到您的配置类中,这样框架才能检测到@MessagingGateway 接口并为其构建代理。

    编辑

    刚刚向我建议的另一种选择是在您的服务激活器的建议链中添加一个ExpressionEvaluatingAdvice

    【讨论】:

    • 另外,请关注github.com/spring-cloud/spring-cloud-stream/issues/538的进度,以获取Spring Cloud Stream中的未来解决方案。
    • 感谢您的全面回答,我还在消化它。我已经添加了我的 Channels.class,并且喜欢采用基于 Advice 的方法的建议,因为我也对有状态的重试行为感兴趣。我对您回答的某些观点感到有些困惑。您提到[可以配置活页夹重试并将异常路由到死信主题;请参阅 1.0.0.RELEASE.] 中的此 PR。我正在使用 1.0.0.RC2,它似乎早于 PR。 1.0.0.RELEASE 什么时候发布?
    • 想让您知道我已经使用可配置的 dlq 进行了这项工作。感谢您的所有帮助@GaryRussell
    猜你喜欢
    • 2021-06-12
    • 1970-01-01
    • 1970-01-01
    • 2021-04-27
    • 2022-07-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多