【问题标题】:Error sending message to Dlq Spring cloud stream with Kafka使用 Kafka 向 Dlq Spring 云流发送消息时出错
【发布时间】:2018-04-13 07:26:25
【问题描述】:

pom.xml

<dependencies>
         <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
         </dependency>
         <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        </dependencies>
        <dependencyManagement>
        <dependencies>
        <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Edgware.SR3</version>
                <type>pom</type>
                <scope>import</scope>
          </dependency>
          </dependencies>
          </dependencyManagement>

@Component
public class QueueConsumer {

    /** The Constant LOG. */
    public static final Logger LOG = LoggerFactory.getLogger(QueueConsumer.class);

    /** The processor. */
    @Autowired
    private IMessageProcessor processor;

    /**
     * Consume.
     *
     * @param message the message
     */
    @StreamListener(value = OrderEventSink.ORDER_EVENT)
    public void consume(Message<String> message) {
        try {
            processor.process(message);
        } catch (MessageProcessingFailedException e) {
            LOG.error("Error Code "+ e.getCode().getCode() + " " + e.getCode().getDescription(), e);
            throw e;
        }
    }
}
  1. 我正在使用 Spring Cloud Stream 来读取来自 kafka 主题的消息。正在从队列中读取消息并进行处理,如果在处理过程中失败,则消息应该进入配置的错误队列,但会出现以下错误。
  2. 从邮件中提取标头时出现异常,解决此问题的最佳方法是什么?
  3. Kafka版本为1.0,kafka客户端为2.11-1.0

application.properties

     spring.cloud.stream.bindings.orderEvent.destination=orderEvents
     spring.cloud.stream.bindings.orderEvent.content- 
     type=application/json
     spring.cloud.stream.bindings.orderEvent.group=orderEvents-consumer
     spring.cloud.stream.bindings.orderEvent.consumer.back-off- 
     multiplier=5
     spring.cloud.stream.bindings.orderEvent.consumer.back-off-initial- 
     interval=60000
     spring.cloud.stream.bindings.orderEvent.consumer.max-attempts=1
     spring.cloud.stream.bindings.orderEvent.consumer.headerMode=raw
     spring.cloud.stream.bindings.kafka.binder.brokers=localhost
     spring.cloud.stream.bindings.kafka.binder.defaultBrokerPort=9092
     spring.cloud.stream.bindings.kafka.binder.zkNodes=localhost
     spring.cloud.stream.bindings.kafka.binder.defaultZkPort=2181
     spring.cloud.stream.kafka.bindings.orderEvent.consumer.
     enableDlq=true
     spring.cloud.stream.kafka.bindings.orderEvent.consumer.
     dlqName=dead-queue
     spring.cloud.stream.kafka.bindings.orderEvent.consumer.
     dlqProducerProperties.configuration.key.
     serializer=org.apache.kafka.common.serialization.StringSerializer
     spring.cloud.stream.kafka.bindings.orderEvent.consumer.
     dlqProducerProperties.configuration.value.
     serializer=org.apache.kafka.common.serialization.StringSerializer

org.springframework.messaging.MessageDeliveryException:无法将消息发送到通道'scm-orderEvents.scm-orderEvents-consumer.errors';嵌套异常是 java.lang.RuntimeException:java.lang.StringIndexOutOfBoundsException:字符串索引超出范围:4297 在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:451) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] 在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] 在 org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] 在 org.springframework.integration.endpoint.MessageProducerSupport.sendErrorMessageIfNecessary(MessageProducerSupport.java:207) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] 在 org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:191) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] 在 org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$200(KafkaMessageDrivenChannelAdapter.java:63) ~[spring-integration-kafka-2.1.2.RELEASE.jar:na] 在 org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:372) ~[spring-integration-kafka-2.1.2.RELEASE.jar:na] 在 org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:352) ~[spring-integration-kafka-2.1.2.RELEASE.jar:na] 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:794) [spring-kafka-1.1.6.RELEASE.jar:na] 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:738) [spring-kafka-1.1.6.RELEASE.jar:na] 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2200(KafkaMessageListenerContainer.java:245) [spring-kafka-1.1.6.RELEASE.jar:na] 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:1031) [spring-kafka-1.1.6.RELEASE.jar:na] 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_162] 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_162] 在 java.lang.Thread.run(Thread.java:748) [na:1.8.0_162] 引起:java.lang.RuntimeException:java.lang.StringIndexOutOfBoundsException:字符串索引超出范围:4297 在 org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4.handleMessage(KafkaMessageChannelBinder.java:380) ~[spring-cloud-stream-binder-kafka-1.3.2.RELEASE.jar:1.3.2.RELEASE] 在 org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:236) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] 在 org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:185) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] 在 org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] 在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] ...省略了16个常用框架 引起:java.lang.StringIndexOutOfBoundsException:字符串索引超出范围:4297 在 java.lang.String.checkBounds(String.java:385) ~[na:1.8.0_162] 在 java.lang.String.(String.java:425) ~[na:1.8.0_162] 在 org.springframework.cloud.stream.binder.EmbeddedHeaderUtils.oldExtractHeaders(EmbeddedHeaderUtils.java:154) ~[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE] 在 org.springframework.cloud.stream.binder.EmbeddedHeaderUtils.extractHeaders(EmbeddedHeaderUtils.java:115) ~[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE] 在 org.springframework.cloud.stream.binder.EmbeddedHeaderUtils.extractHeaders(EmbeddedHeaderUtils.java:107) ~[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE] 在 org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4.handleMessage(KafkaMessageChannelBinder.java:368) ~[spring-cloud-stream-binder-kafka-1.3.2.RELEASE.jar:1.3.2.RELEASE] ...省略了20个常用框架

【问题讨论】:

    标签: java spring-boot apache-kafka spring-cloud-stream


    【解决方案1】:

    这是 kafka binder 的 1.3.2.RELEASE 中的一个 bug;它是fixed on master (1.3.3.BUILD-SNAPSHOT)。

    顺便说一句,最好的解决方案是使用 Spring Boot 2.0.1 和 SCSt Emlhurst.RELEASE(由云 FINCHLEY 引入 - 目前处于 M9 里程碑)。

    这些版本原生支持 Kafka 1.0。

    迁移到与 SCSt 1.3.x 兼容的 kafka11 binder artifact (1.3.0) 可能也会取得一些成功,例如 discussed on the Wiki

    【讨论】:

      猜你喜欢
      • 2020-07-14
      • 2020-12-03
      • 1970-01-01
      • 2021-01-27
      • 1970-01-01
      • 1970-01-01
      • 2019-02-27
      • 1970-01-01
      • 2017-01-19
      相关资源
      最近更新 更多