【问题标题】:Spring Cloud Stream + Kafka Dynamic Destination + Error handlingSpring Cloud Stream + Kafka 动态目的地 + 错误处理
【发布时间】:2021-08-03 06:48:28
【问题描述】:

我们在 Spring Cloud Stream 上有一个与 Project Reactor 集成的应用程序。我们通过在 Message 头中设置 spring.cloud.stream.sendto.destination 来动态设置目标主题并发布消息。 我们正在寻求处理错误场景,例如 kafka 服务器间歇性关闭或发布时主题不可用。 我们已经实现了@ServiceActivator 来处理所有错误。动态设置主题时,ServiceActivator 不会捕获生产者错误,只有消费者错误会影响 ServiceActivator。 如果我们提前设置 out.destination 主题,那么也会捕获生产者错误。 有什么方法可以为动态目的地启用 errorChannel 吗? 下面的示例代码, 应用程序.yml

spring:
  cloud:
    function:
      definition: generate_flux;process
    stream:
      kafka:
        binder:
          brokers: localhost:9094
          required-acks: all
          producer-properties:
            retries: 2
      bindings:
        generate_flux-out-0:
          destination: source
        process-in-0:
          destination: source
          group: processors
        process-out-0:
          producer:
            error-channel-enabled: true

在应用程序类中,具有动态目标主题集的函数,

@Bean
 public Function<Flux<Message<String>>, Flux<Message<String>>> process() {
     return (msg) -> {
       return msg.flatMap(
           message -> {
             log.info("individual msg " + message);
             Map<String, Object> headers = new HashMap<>();
             headers.put("spring.cloud.stream.sendto.destination", "processed");
             MessageHeaders mh = new MessageHeaders(headers);
             return Mono.just(MessageBuilder.createMessage("vvvv processed ", mh));
           });
     };
 }

服务激活器:

 @ServiceActivator(inputChannel = "errorChannel")
  public void handle(final ErrorMessage em) {
    log.error("Error caught" + em.toString());
  }

启动spring boot应用,确保消息被消费和发布,然后删除topic“已处理”创建失败场景。

有了以上内容, Producer 看到了 TimeoutException,但 Service Activator 没有捕获到 errorChannel:

   Caused by: org.apache.kafka.common.errors.TimeoutException: Topic processed not present in metadata after 60000 ms.
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@f258f76]; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic processed not present in metadata after 60000 ms., failedMessage=GenericMessage [payload=byte[15], headers={id=13dc2799-45c5-0c10-2e62-4e296f62d1dc, spring.cloud.stream.sendto.destination=processed, contentType=application/json, timestamp=1620856125106}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxPeekFuseable] :
    reactor.core.publisher.Flux.doOnNext
    org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.lambda$bindFunctionToDestinations$9(FunctionConfiguration.java:501)
Error has been observed at the following site(s):
    |_ Flux.doOnNext ⇢ at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.lambda$bindFunctionToDestinations$9(FunctionConfiguration.java:501)
Stack trace:
        at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1041)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
        at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.lambda$null$7(FunctionConfiguration.java:512)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:487)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:420)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199)
        at reactor.core.publisher.UnicastManySinkNoBackpressure.tryEmitNext(UnicastManySinkNoBackpressure.java:119)
        at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:97)
        at org.springframework.integration.util.IntegrationReactiveUtils.lambda$null$8(IntegrationReactiveUtils.java:141)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
        at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
        at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:396)
        at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:78)
        at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:455)
        at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:429)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
        at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
        at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2069)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2051)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1988)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1928)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1814)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1531)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1178)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic processed not present in metadata after 60000 ms.
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:574)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:389)
    at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:517)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1041)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.lambda$null$7(FunctionConfiguration.java:512)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:487)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:420)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199)
    at reactor.core.publisher.UnicastManySinkNoBackpressure.tryEmitNext(UnicastManySinkNoBackpressure.java:119)
    at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:97)
    at org.springframework.integration.util.IntegrationReactiveUtils.lambda$null$8(IntegrationReactiveUtils.java:141)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:396)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:78)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:455)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:429)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2069)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2051)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1988)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1928)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1814)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1531)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1178)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.lang.Thread.run(Thread.java:829)

但服务激活器只显示与消费者相关的错误,

 ERROR c.i.f.e.d.DedupeProcessorApplication - intuit_tid= Error caughtErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application-1.process-in-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[43], headers={kafka_offset=4, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7da2cc8f, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=source, kafka_receivedTimestamp=1620856185102, kafka_groupId=processors}], failedMessage=GenericMessage [payload=byte[43], headers={kafka_offset=4, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7da2cc8f, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=source, kafka_receivedTimestamp=1620856185102, kafka_groupId=processors}], headers={kafka_data=ConsumerRecord(topic = source, partition = 0, leaderEpoch = 0, offset = 4, CreateTime = 1620856185102, serialized key size = -1, serialized value size = 43, headers = RecordHeaders(headers = [RecordHeader(key = contentType, value = [97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 106, 115, 111, 110]), RecordHeader(key = spring_json_header_types, value = [123, 34, 99, 111, 110, 116, 101, 110, 116, 84, 121, 112, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125])], isReadOnly = false), key = null, value = [B@4cb51739), id=a2fef876-a0ab-c825-9ee6-50b355044e51, sourceData=ConsumerRecord(topic = source, partition = 0, leaderEpoch = 0, offset = 4, CreateTime = 1620856185102, serialized key size = -1, serialized value size = 43, headers = RecordHeaders(headers = [RecordHeader(key = contentType, value = [97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 106, 115, 111, 110]), RecordHeader(key = spring_json_header_types, value = [123, 34, 99, 111, 110, 116, 101, 110, 116, 84, 121, 112, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125])], isReadOnly = false), key = null, value = [B@4cb51739), timestamp=1620856206192}] for original GenericMessage [payload=byte[43], headers={kafka_offset=4, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7da2cc8f, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=source, kafka_receivedTimestamp=1620856185102, kafka_groupId=processors}]

只需将主题更改为非动态的,即从函数中删除以下行

// headers.put("spring.cloud.stream.sendto.destination", "processed");

并将目标主题添加到 out 配置中,

      process-out-0:
          destination: processed
          producer:
            error-channel-enabled: true

能够在 ServiceActivator 获得生产者错误。在下方登录

Error caughtErrorMessage [payload=org.springframework.integration.kafka.support.KafkaSendFailureException: nested exception is org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for processed-0:120000 ms has passed since batch creation, failedMessage=GenericMessage [payload=byte[15], headers={contentType=application/json, id=0ba150e5-8e6f-81f1-b9c4-cda115171ce0, timestamp=1620856522352}] [record=ProducerRecord(topic=processed, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = contentType, value = [97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 106, 115, 111, 110]), RecordHeader(key = spring_json_header_types, value = [123, 34, 99, 111, 110, 116, 101, 110, 116, 84, 121, 112, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125])], isReadOnly = true), key=null, value=[B@339a63df, timestamp=null)], headers={id=1acaece7-c596-c9ff-440b-37df824bd564, timestamp=1620856642357}]

我的问题是,当目标主题在消息的标头中动态设置且未配置时,如何处理错误?

【问题讨论】:

    标签: project-reactor spring-cloud-stream dynamic-binding


    【解决方案1】:

    我可能应该写一篇我可以参考的博客,但简短的回答是:在 sc-stream 方面,我们无能为力来处理由于反应函数而产生的错误,原因很简单——我们绝对没有控制流中发生的事情。

    响应式函数和命令式函数的根本区别在于命令式函数充当消息侦听器,并由我们(框架)在每条消息上调用。相比之下,响应式函数仅在应用程序初始化期间调用一次,我们将源流连接到用户函数公开的 Publisher,然后我们将完全控制权交给用户。

    所以这是一个工作单元的问题,在命令式函数中,这样的单元是消息,而在响应式函数中,这样的单元是整个消息流,可能是无限的。

    这一切意味着在编写响应式函数时,您必须依赖响应式 API 本身丰富的错误处理能力。

    【讨论】:

    • 感谢奥列格的回复。我们使用了 ProjectReactor 的 onErrorDropped 功能,但这有一个缺点,即流在一定时间后中断并且不再消耗消息。所以,这对我们来说还没有解决。您是否知道 Spring cloud stream kafka binder 最近的任何更改,可以启用自定义处理 errorChannel 中的消息,其中有目标主题的动态绑定?谢谢。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-10-03
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多