【问题标题】:How to return response to webflux endpoint after pushing message into google cloud pubsub?将消息推送到谷歌云发布订阅后如何返回对 webflux 端点的响应?
【发布时间】:2020-08-16 18:07:02
【问题描述】:

我正在使用 Spring 集成创建一个简单的 Spring Boot 应用程序。下面是这个应用程序的三个主要结构:

  1. 入站网关:接受 http 请求的 WebFluxInboundEndpoint
  2. 出站网关:PubSubMessageHandler 将消息推送到谷歌云 pubsub 主题
  3. 消息通道:FluxMessageChannel 充当请求通道

谷歌云 PubSubMessageHandler 提供失败和成功回调,因为错误/成功响应没有返回到 webflux 端点并且请求无限期等待。

问:收到pubsub的响应后如何返回成功/失败响应?

应用程序的工作副本可在此处获得:https://github.com/piyushpcegarg/spring-gcp-pubsub-webflux-sample

要运行应用程序,请将您的谷歌云服务密钥放入 serviceAccountKey.json 文件中并 然后提供环境变量 GOOGLE_APPLICATION_CREDENTIALS=/PATH_TO/serviceAccountKey.json

示例请求:curl -d "name=piyush" http://localhost:8080/createPerson

以下是接受上述请求并转换为spring消息后的示例文件,它推送到pubsub主题“人”

package com.example;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * Entry point into the sample application.
 *
 * @author Piyush Garg
 */
@SpringBootApplication
public class PubSubWebFluxApplication {

    private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);

    private static final String TOPIC_NAME = "person";

    public static void main(String[] args) {
        SpringApplication.run(PubSubWebFluxApplication.class, args);
    }

    /**
     * bean to deserialize request payload.
     */
    @Bean
    public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) {
        return new JacksonPubSubMessageConverter(objectMapper);
    }

    @Bean
    public MessageChannel pubSubOutputChannel() {
        return MessageChannels.flux().get();
    }

    /**
     * Message handler which will consume messages from message channel.
     * Then it will send google cloud pubsub topic.
     */
    @Bean
    @ServiceActivator(inputChannel = "pubSubOutputChannel")
    public MessageHandler messageSender(PubSubTemplate pubSubTemplate) {
        PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
        handler.setPublishCallback(new ListenableFutureCallback<>() {
            @Override
            public void onFailure(Throwable ex) {
                LOGGER.info("There was an error sending the message.");
            }

            @Override
            public void onSuccess(String result) {
                LOGGER.info("Message was sent successfully.");
            }
        });

        return handler;
    }

    /**
     * Webflux endpoint to consume http request.
     */
    @Bean
    public WebFluxInboundEndpoint webFluxInboundEndpoint() {

        WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();

        RequestMapping requestMapping = new RequestMapping();
        requestMapping.setMethods(HttpMethod.POST);
        requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
        requestMapping.setPathPatterns("/createPerson");
        endpoint.setRequestMapping(requestMapping);

        endpoint.setRequestChannel(pubSubOutputChannel());

        return endpoint;
    }
}

build.gradle 依赖项是:

plugins {
    id 'org.springframework.boot' version '2.2.6.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'java'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

ext {
    set('springCloudVersion', "Hoxton.SR4")
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    implementation 'org.springframework.boot:spring-boot-starter-integration'
    implementation 'org.springframework.integration:spring-integration-webflux'
    implementation 'org.springframework.cloud:spring-cloud-gcp-starter-pubsub'
    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
    }
}

test {
    useJUnitPlatform()
}

将 PubSubMessageHandler 设置为同步并添加 ExpressionEvaluatingRequestHandlerAdvice 后的新应用程序文件,但这会在 MessagingGatewaySupport 创建 Correlator 时给出错误“'beanFactory' must not be null”。

package com.example;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

/**
 * Entry point into the sample application.
 *
 * @author Piyush Garg
 */
@SpringBootApplication
public class PubSubWebFluxApplication {

    private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);

    private static final String TOPIC_NAME = "person";

    public static void main(String[] args) {
        SpringApplication.run(PubSubWebFluxApplication.class, args);
    }

    /**
     * bean to deserialize request payload.
     */
    @Bean
    public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) {
        return new JacksonPubSubMessageConverter(objectMapper);
    }

    @Bean
    public MessageChannel pubSubOutputChannel() {
        return MessageChannels.flux().get();
    }

    @Bean
    public MessageChannel replyChannel() {
        return MessageChannels.flux().get();
    }

    @Bean
    public MessageChannel errorChannel() {
        return MessageChannels.flux().get();
    }

    /**
     * Message handler which will consume messages from message channel.
     * Then it will send google cloud pubsub topic.
     */
    @Bean
    @ServiceActivator(
            inputChannel = "pubSubOutputChannel",
            adviceChain = "expressionAdvice"
    )
    public MessageHandler messageSender(PubSubTemplate pubSubTemplate) {
        PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
        handler.setSync(true);
        return handler;
    }

    /**
     * Webflux endpoint to consume http request.
     */
    @Bean
    public WebFluxInboundEndpoint webFluxInboundEndpoint() {

        WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();

        RequestMapping requestMapping = new RequestMapping();
        requestMapping.setMethods(HttpMethod.POST);
        requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
        requestMapping.setPathPatterns("/createPerson");
        endpoint.setRequestMapping(requestMapping);

        endpoint.setRequestChannel(pubSubOutputChannel());
        endpoint.setReplyChannel(replyChannel());
        endpoint.setErrorChannel(errorChannel());

        return endpoint;
    }

    @Bean
    public Advice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannel(replyChannel());
        advice.setFailureChannel(errorChannel());
        return advice;
    }
}

发送 http 请求后出现的错误堆栈跟踪:

2020-05-04 16:23:47.371 ERROR 59089 --- [ctor-http-nio-3] a.w.r.e.AbstractErrorWebExceptionHandler : [fd79ecbb-1]  500 Server Error for HTTP POST "/createPerson"

java.lang.IllegalArgumentException: 'beanFactory' must not be null
    at org.springframework.util.Assert.notNull(Assert.java:198) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    |_ checkpoint ⇢ HTTP POST "/createPerson" [ExceptionHandlingWebHandler]
Stack trace:
        at org.springframework.util.Assert.notNull(Assert.java:198) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
        at org.springframework.integration.channel.ChannelUtils.getErrorHandler(ChannelUtils.java:52) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
        at org.springframework.integration.endpoint.ReactiveStreamsConsumer.onInit(ReactiveStreamsConsumer.java:126) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
        at org.springframework.integration.context.IntegrationObjectSupport.afterPropertiesSet(IntegrationObjectSupport.java:214) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
        at org.springframework.integration.gateway.MessagingGatewaySupport.registerReplyMessageCorrelatorIfNecessary(MessagingGatewaySupport.java:799) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
        at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessageReactive(MessagingGatewaySupport.java:602) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]

【问题讨论】:

    标签: java spring-boot spring-integration spring-webflux google-cloud-pubsub


    【解决方案1】:

    感谢@Artem。我通过提供自定义请求处理程序建议解决了这个问题,该建议在成功场景中从消息头中识别 replyChannel 并发送消息有效负载以响应 webflux 端点。

    对于错误场景,我依赖 ReactiveStreamsConsumer 的错误处理机制,它在内部使用 errorChannel 将错误发送回 webflux 端点。

    请告知这个实现是否正确。

    下面是 PubSubRequestHandlerAdvice 的代码:

    package com.example;
    
    import org.springframework.integration.core.MessagingTemplate;
    import org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice;
    import org.springframework.integration.message.AdviceMessage;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageChannel;
    
    public class PubSubRequestHandlerAdvice extends AbstractRequestHandlerAdvice {
    
      private final MessagingTemplate messagingTemplate = new MessagingTemplate();
    
      @Override
      protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) {
    
        Object result = callback.execute();
    
        Object evalResult = message.getPayload();
        MessageChannel successChannel = null;
        Object replyChannelHeader = message.getHeaders().getReplyChannel();
        if (replyChannelHeader instanceof MessageChannel) {
          successChannel = (MessageChannel) replyChannelHeader;
        }
    
        if (evalResult != null && successChannel != null) {
          AdviceMessage<?> resultMessage = new AdviceMessage<>(evalResult, message);
          this.messagingTemplate.send(successChannel, resultMessage);
        }
        return result;
      }
    }
    

    将 PubSubRequestHandlerAdvice 用于 PubSubMessageHandler 的最终应用程序文件。

    package com.example;
    
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.aopalliance.aop.Advice;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
    import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
    import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.http.HttpMethod;
    import org.springframework.http.MediaType;
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.integration.dsl.MessageChannels;
    import org.springframework.integration.http.inbound.RequestMapping;
    import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.MessageHandler;
    
    /**
     * Entry point into the sample application.
     *
     * @author Piyush Garg
     */
    @SpringBootApplication
    public class PubSubWebFluxApplication {
    
        private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);
    
        private static final String TOPIC_NAME = "person";
    
        public static void main(String[] args) {
            SpringApplication.run(PubSubWebFluxApplication.class, args);
        }
    
        /**
         * bean to deserialize request payload.
         */
        @Bean
        public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) {
            return new JacksonPubSubMessageConverter(objectMapper);
        }
    
        @Bean
        public MessageChannel pubSubOutputChannel() {
            return MessageChannels.flux().get();
        }
    
        /**
         * Message handler which will consume messages from message channel.
         * Then it will send google cloud pubsub topic.
         */
        @Bean
        @ServiceActivator(
                inputChannel = "pubSubOutputChannel",
                adviceChain = "pubSubAdvice"
        )
        public MessageHandler messageSender(PubSubTemplate pubSubTemplate) {
            PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
            handler.setSync(true);
            return handler;
        }
    
        /**
         * Webflux endpoint to consume http request.
         */
        @Bean
        public WebFluxInboundEndpoint webFluxInboundEndpoint() {
    
            WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
    
            RequestMapping requestMapping = new RequestMapping();
            requestMapping.setMethods(HttpMethod.POST);
            requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
            requestMapping.setPathPatterns("/createPerson");
            endpoint.setRequestMapping(requestMapping);
    
            endpoint.setRequestChannel(pubSubOutputChannel());
    
            return endpoint;
        }
    
        @Bean
        public Advice pubSubAdvice() {
            return new PubSubRequestHandlerAdvice();
        }
    
    }
    

    应用程序的工作副本可在此处获得:https://github.com/piyushpcegarg/spring-gcp-pubsub-webflux-sample

    【讨论】:

      【解决方案2】:

      PubSubMessageHandler 不是为请求/回复行为而设计的。 在大多数情况下,它被用作 send-n-forget。

      由于您真的担心成功/失败回复,我只能建议如下:

      1. PubSubMessageHandler.setSync(true):

        /**
         * Set publish method to be synchronous or asynchronous.
         *
         * <p>Publish is asynchronous be default.
         * @param sync true for synchronous, false for asynchronous
         */
        public void setSync(boolean sync) {
        

      这样,您的PubSubMessageHandler 将等待pubsubFuture.get();,如果失败,将抛出MessageHandlingException

      1. 要处理此sync 场景的成功或失败,我建议查看ExpressionEvaluatingRequestHandlerAdvice 及其successChannelfailureChannel。 在onSuccessExpression 上我认为应该#root 指向requestMessageonFailureExpression 可以查询#exception SpEL 表达式变量,但仍将requestMessage 传播到failureChannel。我之所以谈论requestMessage,是因为它具有重要的replyChannel 来响应WebFluxInboundEndpoint 请求。 在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message-handler-advice-chain

      2. 那些successChannelfailureChannel 以及失败的子流应该正确回复,一些返回将它们的outputChannel 留空。

      但同时我完全同意将PubSubMessageHandler 设置为AbstractReplyProducingMessageHandler 返回一些ListenableFuture 让我们处理发布结果会容易得多。

      【讨论】:

      • 感谢@Artem 的建议。我已经创建了另外 2 个 FluxMessageChannel 作为 replyChannel() 和 errorChannel()。然后将这些通道附加到 WebFluxEndPoint 和 expressionAdvice。但是在发送 http 请求时,MessagingGatewaySupport 会创建 Correlator,但由于 beanFactory 为空而失败。在问题中附加了新的应用程序文件和堆栈跟踪以供参考。请指教,我是否正确配置了 expressionAdvice。
      • 不,您不需要endpoint.setReplyChannel(replyChannel());,因此不需要专用通道bean。您只需要依赖自动创建的 replyChannel 标头,它是一个 TemporaryReplyChannel 实例。您的 ExpressionEvaluatingRequestHandlerAdvice 应该发送到其他不会回复任何内容的频道 - 基本上是提到的 replyChannel 标头。您也不能依赖默认的Message::getPayload,因为您需要来自请求消息的标头,这些标头将包含提到的replyChannel
      • 正如我所说:现有的PubSubMessageHandler 行为将是一项艰巨的任务。您可以考虑将PublishSubscribeChannelPubSubMessageHandler 作为第一个订阅者,将其他内容作为success pub/sub 发布的第二个订阅者。
      • 对于错误变体,我真的会留在您的endpoint.setErrorChannel(errorChannel()),但您仍然需要处理该错误并将合理的内容返回到相同的replyChannel 标头中。
      猜你喜欢
      • 2021-02-01
      • 2022-07-26
      • 2021-09-30
      • 2021-12-03
      • 2018-07-17
      • 1970-01-01
      • 2019-04-24
      • 2018-01-31
      • 1970-01-01
      相关资源
      最近更新 更多