【发布时间】:2020-08-16 18:07:02
【问题描述】:
我正在使用 Spring 集成创建一个简单的 Spring Boot 应用程序。下面是这个应用程序的三个主要结构:
- 入站网关:接受 http 请求的 WebFluxInboundEndpoint
- 出站网关:PubSubMessageHandler 将消息推送到谷歌云 pubsub 主题
- 消息通道:FluxMessageChannel 充当请求通道
谷歌云 PubSubMessageHandler 提供失败和成功回调,因为错误/成功响应没有返回到 webflux 端点并且请求无限期等待。
问:收到pubsub的响应后如何返回成功/失败响应?
应用程序的工作副本可在此处获得:https://github.com/piyushpcegarg/spring-gcp-pubsub-webflux-sample
要运行应用程序,请将您的谷歌云服务密钥放入 serviceAccountKey.json 文件中并 然后提供环境变量 GOOGLE_APPLICATION_CREDENTIALS=/PATH_TO/serviceAccountKey.json
示例请求:curl -d "name=piyush" http://localhost:8080/createPerson
以下是接受上述请求并转换为spring消息后的示例文件,它推送到pubsub主题“人”
package com.example;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* Entry point into the sample application.
*
* @author Piyush Garg
*/
@SpringBootApplication
public class PubSubWebFluxApplication {
private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);
private static final String TOPIC_NAME = "person";
public static void main(String[] args) {
SpringApplication.run(PubSubWebFluxApplication.class, args);
}
/**
* bean to deserialize request payload.
*/
@Bean
public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) {
return new JacksonPubSubMessageConverter(objectMapper);
}
@Bean
public MessageChannel pubSubOutputChannel() {
return MessageChannels.flux().get();
}
/**
* Message handler which will consume messages from message channel.
* Then it will send google cloud pubsub topic.
*/
@Bean
@ServiceActivator(inputChannel = "pubSubOutputChannel")
public MessageHandler messageSender(PubSubTemplate pubSubTemplate) {
PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
handler.setPublishCallback(new ListenableFutureCallback<>() {
@Override
public void onFailure(Throwable ex) {
LOGGER.info("There was an error sending the message.");
}
@Override
public void onSuccess(String result) {
LOGGER.info("Message was sent successfully.");
}
});
return handler;
}
/**
* Webflux endpoint to consume http request.
*/
@Bean
public WebFluxInboundEndpoint webFluxInboundEndpoint() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setMethods(HttpMethod.POST);
requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
requestMapping.setPathPatterns("/createPerson");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannel(pubSubOutputChannel());
return endpoint;
}
}
build.gradle 依赖项是:
plugins {
id 'org.springframework.boot' version '2.2.6.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
ext {
set('springCloudVersion', "Hoxton.SR4")
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-integration'
implementation 'org.springframework.integration:spring-integration-webflux'
implementation 'org.springframework.cloud:spring-cloud-gcp-starter-pubsub'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}
test {
useJUnitPlatform()
}
将 PubSubMessageHandler 设置为同步并添加 ExpressionEvaluatingRequestHandlerAdvice 后的新应用程序文件,但这会在 MessagingGatewaySupport 创建 Correlator 时给出错误“'beanFactory' must not be null”。
package com.example;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* Entry point into the sample application.
*
* @author Piyush Garg
*/
@SpringBootApplication
public class PubSubWebFluxApplication {
private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);
private static final String TOPIC_NAME = "person";
public static void main(String[] args) {
SpringApplication.run(PubSubWebFluxApplication.class, args);
}
/**
* bean to deserialize request payload.
*/
@Bean
public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) {
return new JacksonPubSubMessageConverter(objectMapper);
}
@Bean
public MessageChannel pubSubOutputChannel() {
return MessageChannels.flux().get();
}
@Bean
public MessageChannel replyChannel() {
return MessageChannels.flux().get();
}
@Bean
public MessageChannel errorChannel() {
return MessageChannels.flux().get();
}
/**
* Message handler which will consume messages from message channel.
* Then it will send google cloud pubsub topic.
*/
@Bean
@ServiceActivator(
inputChannel = "pubSubOutputChannel",
adviceChain = "expressionAdvice"
)
public MessageHandler messageSender(PubSubTemplate pubSubTemplate) {
PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
handler.setSync(true);
return handler;
}
/**
* Webflux endpoint to consume http request.
*/
@Bean
public WebFluxInboundEndpoint webFluxInboundEndpoint() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setMethods(HttpMethod.POST);
requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
requestMapping.setPathPatterns("/createPerson");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannel(pubSubOutputChannel());
endpoint.setReplyChannel(replyChannel());
endpoint.setErrorChannel(errorChannel());
return endpoint;
}
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannel(replyChannel());
advice.setFailureChannel(errorChannel());
return advice;
}
}
发送 http 请求后出现的错误堆栈跟踪:
2020-05-04 16:23:47.371 ERROR 59089 --- [ctor-http-nio-3] a.w.r.e.AbstractErrorWebExceptionHandler : [fd79ecbb-1] 500 Server Error for HTTP POST "/createPerson"
java.lang.IllegalArgumentException: 'beanFactory' must not be null
at org.springframework.util.Assert.notNull(Assert.java:198) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ HTTP POST "/createPerson" [ExceptionHandlingWebHandler]
Stack trace:
at org.springframework.util.Assert.notNull(Assert.java:198) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.channel.ChannelUtils.getErrorHandler(ChannelUtils.java:52) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.endpoint.ReactiveStreamsConsumer.onInit(ReactiveStreamsConsumer.java:126) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.context.IntegrationObjectSupport.afterPropertiesSet(IntegrationObjectSupport.java:214) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.gateway.MessagingGatewaySupport.registerReplyMessageCorrelatorIfNecessary(MessagingGatewaySupport.java:799) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessageReactive(MessagingGatewaySupport.java:602) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
【问题讨论】:
标签: java spring-boot spring-integration spring-webflux google-cloud-pubsub