【问题标题】:Spring Cloud Dataflow errorChannel not workingSpring Cloud Dataflow errorChannel 不起作用
【发布时间】:2018-11-27 16:49:23
【问题描述】:

我正在尝试为我的 Spring Cloud Dataflow 流创建一个自定义异常处理程序,以路由一些要重新排队的错误和其他要 DLQ 的错误。

为此,我使用了全局 Spring 集成“errorChannel”和基于异常类型的路由。

这是 Spring Integration 错误路由器的代码:

package com.acme.error.router;

import com.acme.exceptions.DlqException;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;


@MessageEndpoint
@EnableBinding({ ErrorMessageChannels.class })
public class ErrorMessageMappingRouter {
   private static final Logger LOGGER = LoggerFactory.getLogger(ErrorMessageMappingRouter.class);

   public static final String ERROR_CHANNEL = "errorChannel";

   @Router(inputChannel = ERROR_CHANNEL)
    public String onError(Message<Object> message) {
      LOGGER.debug("ERROR ROUTER - onError");
      if(message.getPayload() instanceof MessageTransformationException) {
         MessageTransformationException exception = (MessageTransformationException) message.getPayload();
         Message<?> failedMessage = exception.getFailedMessage();
          if(exceptionChainContainsDlq(exception)) {
             return ErrorMessageChannels.DLQ_QUEUE_NAME;
          }
         return ErrorMessageChannels.REQUEUE_CHANNEL;
      }
      return ErrorMessageChannels.DLQ_QUEUE_NAME;
    }

    ...

}

每个流应用程序通过 Spring Boot 应用程序上的包扫描为每个流应用程序拾取错误路由器:

@ComponentScan(basePackages = { "com.acme.error.router" }
@SpringBootApplication
public class StreamApp {}

使用本地Spring Cloud Dataflow服务器(版本1.5.0-RELEASE)部署和运行时,抛出DlqException,消息成功路由到errorRouter中的onError方法,然后放入dlq主题.

但是,当它被部署为带有 SCDF Kubernetes 服务器(也是 1.5.0-RELEASE 版本)的 docker 容器时,永远不会触发 onError 方法。 (router开头的log语句从不输出)

在流应用程序的启动日志中,看起来 bean 被正确拾取并注册为 errorChannel 的侦听器,但由于某种原因,当抛出异常时,它们不会被我们的 onError 方法处理路由器。

启动日志:

o.s.i.endpoint.EventDrivenConsumer : Adding {router:errorMessageMappingRouter.onError.router} as a subscriber to the 'errorChannel' channel
o.s.i.channel.PublishSubscribeChannel : Channel 'errorChannel' has 1 subscriber(s).
o.s.i.endpoint.EventDrivenConsumer : started errorMessageMappingRouter.onError.router

我们使用 spring cloud stream 和 kafka binder 配置的所有默认设置:

spring.cloud:
  stream:
    binders:
      kafka:
        type: kafka
        environment.spring.cloud.stream.kafka.binder.brokers=brokerlist
        environment.spring.cloud.stream.kafka.binder.zkNodes=zklist

编辑:添加来自kubectl describe &lt;pod&gt; 的 pod 参数

Args:
--spring.cloud.stream.bindings.input.group=delivery-stream
--spring.cloud.stream.bindings.output.producer.requiredGroups=delivery-stream
--spring.cloud.stream.bindings.output.destination=delivery-stream.enricher
--spring.cloud.stream.binders.xdkafka.environment.spring.cloud.stream.kafka.binder.zkNodes=<zkNodes>
--spring.cloud.stream.binders.xdkafka.type=kafka
--spring.cloud.stream.binders.xdkafka.defaultCandidate=true
--spring.cloud.stream.binders.xdkafka.environment.spring.cloud.stream.kafka.binder.brokers=<brokers>
--spring.cloud.stream.bindings.input.destination=delivery-stream.config-enricher

我们尝试的另一个想法是尝试使用 Spring Cloud Stream - spring 集成错误通道支持将错误发送到代理主题,但由于消息似乎根本没有登陆全局 Spring Integration errorChannel,那也没用。

我们需要在 SCDF Kubernetes 中做些什么来启用全局 Spring Integration errorChannel 吗?

我在这里错过了什么?

更新来自 cmets 的解决方案:

查看您的配置后,我现在很确定我知道什么 问题是。您有一个多绑定器配置方案。即使 你只处理单个 binder 实例的存在 spring.cloud.stream.binders.... 是什么将要制作框架 将其视为多粘合剂。基本上这是一个错误 - github.com/spring-cloud/spring-cloud-stream/issues/1384。尽你所能 看到它已修复,但您需要升级到 Elmhurst.SR2 或获取 最新快照(我们在 RC2 和 2.1.0.RELEASE 几周后 无论如何)——奥列格·朱拉库斯基

这确实是我们设置的问题。我们没有升级,而是暂时取消了 multi-binder 的使用,问题就解决了。

【问题讨论】:

  • 除了设置连接/凭据的 Kafka 配置设置外,我们不会在 K8s-server 中做任何特殊的事情,也不会乱搞属性。可能是您创建 docker 容器的方式可能会对属性的传播方式产生副作用。请参阅此处了解 entryPoint 选项 - 最好查看您的 Dockerfile 以及示例应用程序。
  • 如果我们使用默认的 errorChannel 设置,那有什么关系吗?它只是默认配置的一部分(我们不会以任何方式覆盖运行时环境变量)。入口点样式是否有其他方式会影响它?
  • 顺便说一下,我们使用的是 exec 样式
  • 在查看了您的配置后,我现在非常确定我知道问题所在。您有一个多绑定器配置方案。即使您只处理单个活页夹实例,spring.cloud.stream.binders.... 的存在也会使框架将其视为多活页夹。基本上这是一个错误 - github.com/spring-cloud/spring-cloud-stream/issues/1384。如您所见,它已修复,但您需要升级到 Elmhurst.SR2 或获取最新快照(我们处于 RC2 和 2.1.0。无论如何,RELEASE 将在几周内发布)
  • 我可以不在乎学分,我能够提供帮助已经足够了!

标签: kubernetes spring-integration spring-cloud-stream spring-cloud-dataflow


【解决方案1】:

更新来自 cmets 的解决方案:

查看您的配置后,我现在很确定我知道什么 问题是。您有一个多绑定器配置方案。即使 你只处理单个 binder 实例的存在 spring.cloud.stream.binders.... 是什么将要制作框架 将其视为多粘合剂。基本上这是一个错误 - github.com/spring-cloud/spring-cloud-stream/issues/1384。尽你所能 看到它已修复,但您需要升级到 Elmhurst.SR2 或获取 最新快照(我们在 RC2 和 2.1.0.RELEASE 几周后 无论如何)——奥列格·朱拉库斯基

这确实是我们设置的问题。我们没有升级,而是暂时取消了 multi-binder 的使用,问题就解决了。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-03-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多