【问题标题】:@MessagingGateway, Spring Cloud Stream, and error handling across both@MessagingGateway、Spring Cloud Stream 和两者的错误处理
【发布时间】:2018-06-05 12:25:23
【问题描述】:

关于为How can @MessagingGateway be configured with Spring Cloud Stream MessageChannels? 发布的答案,处理可以从 Spring Cloud Stream 服务返回的@MessagingGateway 错误的正确方法是什么?

回顾一下,我有一个@MessagingGateway,它提供对使用 Spring Cloud Stream 构建的异步服务的同步访问。当 Spring Cloud Stream 服务层中发生错误时,我会创建一个错误响应并通过 SubscribableChannel 将其发送到处理错误的其他 @StreamListener 服务。

例如,当创建帐户时,我向accountCreated 频道发送消息。当发生错误时,我会向accountNotCreated 频道发送错误响应。

这很好用,但我也想向@MessagingGateway 的客户端发送错误响应,以便他们同步接收错误响应。 @MessagingGateway 注释具有errorChannel 属性,但@Gateway 注释没有。因此,@MessagingGateway 的客户端应该能够阻止并等待 1) 创建帐户或 2) 错误响应。

同样,这里的目标是构建“后端”服务,将 Spring Cloud Stream 用于事务服务(即创建、更新或删除数据的服务),同时为我们的客户提供“网关”访问该块并等待返回响应。 Artem Bilan 为我提供的解决方案适用于快乐的道路,但是当发生错误时,我不清楚 Spring Integration 如何最适合处理此问题。

更新代码示例

GatewayApplication.java

package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.HeaderEnricherSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@EnableBinding({GatewayApplication.GatewayChannels.class})
@SpringBootApplication
public class GatewayApplication {

  @Component
  public interface GatewayChannels {
    String TO_UPPERCASE_REPLY = "to-uppercase-reply";
    String TO_UPPERCASE_REQUEST = "to-uppercase-request";

    @Input(TO_UPPERCASE_REPLY)
    SubscribableChannel toUppercaseReply();

    @Output(TO_UPPERCASE_REQUEST)
    SubscribableChannel toUppercaseRequest();
  }

  @MessagingGateway
  public interface StreamGateway {
    public static final String ENRICH = "enrich";

    @Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.TO_UPPERCASE_REPLY)
    StringWrapper process(StringWrapper payload) throws MyException;
  }

  @RestController
  public class UppercaseController {
    @Autowired
    StreamGateway gateway;

    @GetMapping(value = "/string/{string}",
        produces = {MediaType.APPLICATION_JSON_VALUE, MediaType.APPLICATION_XML_VALUE})
    public ResponseEntity<StringWrapper> getUser(@PathVariable("string") String string) {
      try {
        StringWrapper result = gateway.process(new StringWrapper(string));
        // Instead of catching the exception in the below catch clause, here we have just a string
        // representation of the stack trace when an exception occurs.
        return new ResponseEntity<StringWrapper>(result, HttpStatus.OK);
      } catch (MyException e) {
        // Why is the exception not caught here?
        return new ResponseEntity<StringWrapper>(new StringWrapper("An error has occurred"),
            HttpStatus.INTERNAL_SERVER_ERROR);
      }
    }
  }

  public static void main(String[] args) {
    SpringApplication.run(GatewayApplication.class, args);
  }

  @Bean
  public IntegrationFlow headerEnricherFlow() {
    return IntegrationFlows.from(StreamGateway.ENRICH)
        .enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
        .channel(GatewayChannels.TO_UPPERCASE_REQUEST).get();
  }

}

application.yml

spring:
  cloud:
    stream:
      bindings:
        to-uppercase-request:
          destination: to-uppercase-request
          content-type: application/json
          group: stream-to-uppercase-request
        to-uppercase-reply:
          destination: to-uppercase-reply
          content-type: application/json
          producer:
            required-groups: gateway-to-uppercase-reply, stream-listener-to-uppercase-reply
server:
  port: 8088

StringWrapper.java(在所有三个项目中使用)

package com.example.demo;

import com.fasterxml.jackson.annotation.JsonProperty;

public class StringWrapper {
  @JsonProperty
  private String string;

  @JsonProperty
  private long time = System.currentTimeMillis();

  public StringWrapper() {
    super();
  }

  public StringWrapper(String string) {
    this.string = string;
  }

  public String getString() {
    return string;
  }


  public long getTime() {
    return time;
  }

  public void setString(String string) {
    this.string = string;
  }

  @Override
  public String toString() {
    return "StringWrapper [string=" + string + ", time=" + time + "]";
  }

}

CloudStreamApplication.java

package com.example.demo;

import java.util.Random;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@EnableBinding({CloudStreamApplication.CloudStreamChannels.class})
@SpringBootApplication
public class CloudStreamApplication {

  @Component
  interface CloudStreamChannels {

    String TO_UPPERCASE_REPLY = "to-uppercase-reply";
    String TO_UPPERCASE_REQUEST = "to-uppercase-request";

    @Output(TO_UPPERCASE_REPLY)
    SubscribableChannel toUppercaseReply();

    @Input(TO_UPPERCASE_REQUEST)
    SubscribableChannel toUppercaseRequest();
  }

  @Component
  public class Processor {

    @Autowired
    CloudStreamChannels channels;

    @StreamListener(CloudStreamChannels.TO_UPPERCASE_REQUEST)
    public void process(Message<StringWrapper> request) {
      StringWrapper uppercase = null;
      try {
        uppercase = toUppercase(request);
      } catch (MyException e) {
        channels.toUppercaseReply()
            .send(MessageBuilder.withPayload(e).setHeader("__TypeId__", e.getClass().getName())
                .copyHeaders(request.getHeaders()).build());
      }
      if (uppercase != null) {
        channels.toUppercaseReply()
            .send(MessageBuilder.withPayload(uppercase)
                .setHeader("__TypeId__", StringWrapper.class.getName())
                .copyHeaders(request.getHeaders()).build());
      }
    }

    private StringWrapper toUppercase(Message<StringWrapper> request) throws MyException {
      Random random = new Random();
      int number = random.nextInt(50) + 1;
      if (number > 25) {
        throw new MyException("An error occurred.");
      }
      return new StringWrapper(request.getPayload().getString().toUpperCase());
    }
  }

  public static void main(String[] args) {
    SpringApplication.run(CloudStreamApplication.class, args);
  }

}

application.yml

spring:
  cloud:
    stream:
      bindings:
        to-uppercase-request:
          destination: to-uppercase-request
          content-type: application/json
          group: stream-to-uppercase-request
        to-uppercase-reply:
          destination: to-uppercase-reply
          content-type: application/json
          producer:
            required-groups: gateway-to-uppercase-reply, stream-listener-to-uppercase-reply
server:
  port: 8088

StreamListenerApplication.java

package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;

@EnableBinding({StreamListenerApplication.CloudStreamChannels.class})
@SpringBootApplication
public class StreamListenerApplication {

  @Component
  interface CloudStreamChannels {

    String TO_UPPERCASE_REPLY = "to-uppercase-reply";

    @Input(TO_UPPERCASE_REPLY)
    SubscribableChannel toUppercaseReply();

  }

  public static void main(String[] args) {
    SpringApplication.run(StreamListenerApplication.class, args);
  }

  @Autowired
  CloudStreamChannels channels;

  @StreamListener(CloudStreamChannels.TO_UPPERCASE_REPLY)
  public void processToUppercaseReply(Message<StringWrapper> message) {
    System.out.println("Processing message: " + message.getPayload());
  }

}

application.yml

spring:
  cloud:
    stream:
      bindings:
        to-uppercase-reply:
          destination: to-uppercase-reply
          content-type: application/json
          group: stream-listener-to-uppercase-reply
server:
  port: 8089

【问题讨论】:

标签: spring-integration spring-cloud-stream


【解决方案1】:

@MessagingGateway 上只有一个全局errorChannel 用于所有@Gateway 方法。如果您的网关具有多个@Gateway 方法,则每个方法都可以设置一个消息头来指示哪个方法失败。

如果您将Message&lt;Throwable&gt; 发送到网关的回复通道(并且没有错误通道),则有效负载将被抛出给调用者。

如果网关方法具有throws 子句,则会尝试解开原因树以查找该异常。

如果您添加一个errorChannel,而不是向调用者抛出异常,一个带有异常的ErrorMessage 将其有效负载发送到错误通道 - 然后您可以在错误通道上进行任何进一步的后处理如果需要,流并向调用者抛出一些其他异常。不过,听起来你不需要那个。

所以,把它们放在一起......

  1. 让错误处理服务将一些消息发送到另一个目的地。
  2. 在网关服务中,为该目标添加@StreamListener
  3. @StreamListener 中构造一个带有Exception 有效负载的消息并将其发送到网关的回复通道。
  4. 网关随后会将有效负载扔给调用者。

这样的东西应该可以工作......

@Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.TO_UPPERCASE_REPLY)
String process(String payload) throws MyException;

.

@StreamListener(CloudStreamChannels.TO_UPPERCASE_FAILURES)
public void failed(Message<FailInfo> failed) { // payload has info about the failure
    Message m = MessageBuilder.withPayload(new MyException(failed.getPayload())).
         .copyHeaders(failed.getHeaders())
         .build();
    this.reply.send(m); // send directly to the gateway's reply channel (perhaps @Autowired)
}

无论涉及多少远程服务,端到端传播回复通道标头都很重要。

编辑

@SpringBootApplication
@EnableBinding(TwoAsyncPipes.class)
public class So47948454aApplication {

    public static void main(String[] args) {
        SpringApplication.run(So47948454aApplication.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(Gate gate) {
        return args -> {
            System.out.println(gate.foo(new Foo("foo")));
            try {
                gate.foo(new Foo("fail"));
            }
            catch (MyException e) {
                System.out.println(e);
            }
        };
    }

    @MessagingGateway
    public interface Gate {

        @Gateway(requestChannel = "enrich", replyChannel = "transformed")
        Foo foo(Foo foo) throws MyException;

    }

    @Bean
    public IntegrationFlow headerEnricherFlow() {
        return IntegrationFlows.from("enrich")
                .enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
                .channel("gateOut").get();
    }

    @Bean
    public MessageChannel transformed() {
        return new DirectChannel();
    }

    @Transformer(inputChannel = "gateIn", outputChannel = "transformed")
    public Object jsonToObject(Message<?> in) {
        return jtot().transform(in);
    }

    @Bean
    public JsonToObjectTransformer jtot() {
        return new JsonToObjectTransformer();
    }

    @StreamListener("serviceIn")
    @SendTo("serviceOut")
    public Message<?> listen(Foo in) {
        if (in.foo.equals("fail")) {
            return MessageBuilder.withPayload(new MyException("failed"))
                    .setHeader(JsonHeaders.TYPE_ID,
                            MyException.class.getName())
                    .build();
        }
        else {
            return MessageBuilder.withPayload(new Foo("bar"))
                    .setHeader(JsonHeaders.TYPE_ID,
                            Foo.class.getName())
                    .build();
        }
    }

    public static class Foo {

        String foo;

        public Foo() {
            super();
        }

        public Foo(String foo) {
            this.foo = foo;
        }

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

        @Override
        public String toString() {
            return "Foo [foo=" + this.foo + "]";
        }

    }

    @SuppressWarnings("serial")
    public static class MyException extends RuntimeException {

        private String error;

        public MyException() {
            super();
        }

        public MyException(String error) {
            this.error = error;
        }

        public String getError() {
            return this.error;
        }

        public void setError(String error) {
            this.error = error;
        }

        @Override
        public String toString() {
            return "MyException [error=" + this.error + "]";
        }

    }

    public interface TwoAsyncPipes {

        @Output("gateOut")
        MessageChannel gateOut();

        @Input("serviceIn")
        MessageChannel serviceIn();

        @Output("serviceOut")
        MessageChannel serviceOut();

        @Input("gateIn")
        MessageChannel gateIn();

    }

}

Foo [foo=bar]
MyException [error=failed]

POM

http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0

<groupId>com.example</groupId>
<artifactId>so47948454a</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>so47948454a</name>
<description>Demo project for Spring Boot</description>

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.9.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <spring-cloud.version>Edgware.RELEASE</spring-cloud.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-java-dsl</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

兔子活页夹 1.3.0.RELEASE Spring 集成 4.3.12

2017-12-26 13:56:18.121  INFO 39008 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: SpringAMQP#7e87ef9e:0/SimpleConnection@45843650 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 60995]
Foo [foo=bar]
MyException [error=failed]
2017-12-26 13:56:18.165  INFO 39008 --- [           main] com.example.So47948454aApplication       : Started So47948454aApplication in 3.422 seconds (JVM running for 3.858)

application.yml:

spring:
  cloud:
    stream:
      bindings:
        gateIn:
          destination: serviceOut
          content-type: application/json
        gateOut:
          destination: serviceIn
          content-type: application/json
        serviceIn:
          destination: serviceIn
          content-type: application/json
        serviceOut:
          destination: serviceOut
          content-type: application/json

【讨论】:

  • 感谢您的反馈,加里。我整理了一些示例代码,并将其作为更新包含在我的原始帖子中。这类似于我试图完成的事情,我不确定为什么在 GatewayApplication.java 中我没有碰到 @RestController 类的 catch 块。根据您的回答,我认为这应该可行。你看到我忘记了什么吗?
  • 异常通常对 JSON 不友好(例如,无参数 CTOR)。这就是为什么我建议发送一个简单的消息并在本地构建异常。您是否在网关服务器上的日志中看到任何错误?如果向 MyException 添加一个无参数 CTOR 和一个 setter 会发生什么?使用 DEBUG 日志运行双方应该会有所帮助。
  • Gary,我刚刚再次更新了我的代码。有点不对劲。我在几个地方引用了 String 而不是 StringWrapper 。现在我看到以下错误:{ "timestamp": 1514307185336, "status": 500, "error": "Internal Server Error", "exception": "org.springframework.core.convert.ConverterNotFoundException", "message": "No converter found capable of converting from type [java.lang.String] to type [com.example.demo.StringWrapper]", "path": "/string/hellobuddy" }。我不想在这里添加另一个问题,但是您可以运行我的示例代码来查看问题吗?
  • 快乐之路行得通吗?我看不到网关服务如何获得关于将 JSON 解组到哪个对象的任何线索。在任何情况下,由于您有两种返回类型(StringWrapperMyException),您将需要更复杂的技术,例如回复通道路径上的转换器和/或路由器。
  • 我在回答中的第一个示例正是这样做的(为异常的不同目的地添加了一个流侦听器。只需 @AutowiredMessageChannel 以及回复通道的名称并将异常发送给它.
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-10-03
  • 2018-05-27
  • 1970-01-01
  • 1970-01-01
  • 2019-06-09
  • 2021-08-03
  • 1970-01-01
相关资源
最近更新 更多