【问题标题】:Spring Integraton RSocket and Spring RSocket interaction issuesSpring 集成 RSocket 和 Spring RSocket 交互问题
【发布时间】:2023-03-20 00:47:01
【问题描述】:

我创建了一个新示例并将代码放入客户端和服务器端。

完整的代码可以在here找到。

服务器端有3个版本。

  • 服务器 无 Spring Boot 应用,使用 Spring Integration RSocket InboundGateway。
  • server-boot 重用 Spring RSocket autconfiguration,并创建 ServerRSocketConnecterServerRSocketMessageHanlder
  • server-boot-messsagemapping 不使用 Spring Integration,只使用 Spring Boot RSocket autconfiguration,以及 @Controller@MessageMapping

客户端有2个版本。

  • 客户端,使用 Spring Integration Rocket OutboundGateway 发送消息。
  • client-requester 使用 RSocketRequester 发送消息,根本不使用 Spring Integration。

客户端和服务器交互方式为REQUEST_CHANNEL,通过TCP/localhost:7000连接服务器。

服务器

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-rsocket</artifactId>
</dependency>

应用类:

@Configuration
@ComponentScan
@IntegrationComponentScan
@EnableIntegration
public class DemoApplication {

    public static void main(String[] args) throws IOException {
        try (ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(DemoApplication.class)) {
            System.out.println("Press any key to exit.");
            System.in.read();
        } finally {
            System.out.println("Exited.");
        }

    }

    @Bean
    public ServerRSocketConnector serverRSocketConnector() {
        return new ServerRSocketConnector("localhost", 7000);
    }

    @Bean
    public IntegrationFlow rsocketUpperCaseFlow(ServerRSocketConnector serverRSocketConnector) {
        return IntegrationFlows
                .from(RSockets.inboundGateway("/uppercase")
                        .interactionModels(RSocketInteractionModel.requestChannel)
                        .rsocketConnector(serverRSocketConnector)
                )
                .<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
                .get();
    }

}

服务器启动

pom.xml 中的依赖项。

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-rsocket</artifactId>
        </dependency>

application.properties

spring.rsocket.server.port=7000
spring.rsocket.server.transport=tcp

应用程序类。

@SpringBootApplication
@EnableIntegration
public class DemoApplication {

    public static void main(String[] args) throws IOException {
        SpringApplication.run(DemoApplication.class, args);
    }

    // see PR: https://github.com/spring-projects/spring-boot/pull/18834
    @Bean
    ServerRSocketMessageHandler serverRSocketMessageHandler(RSocketStrategies rSocketStrategies) {
        var handler = new ServerRSocketMessageHandler(true);
        handler.setRSocketStrategies(rSocketStrategies);
        return handler;
    }

    @Bean
    public ServerRSocketConnector serverRSocketConnector(ServerRSocketMessageHandler serverRSocketMessageHandler) {
        return new ServerRSocketConnector(serverRSocketMessageHandler);
    }

    @Bean
    public IntegrationFlow rsocketUpperCaseFlow(ServerRSocketConnector serverRSocketConnector) {
        return IntegrationFlows
                .from(RSockets.inboundGateway("/uppercase")
                        .interactionModels(RSocketInteractionModel.requestChannel)
                        .rsocketConnector(serverRSocketConnector)
                )
                .<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
                .get();
    }

}

服务器启动消息映射

pom.xml 中的依赖项。

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>

application.properties。

spring.rsocket.server.port=7000
spring.rsocket.server.transport=tcp

应用程序类。

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

@Controller
class UpperCaseHandler {

    @MessageMapping("/uppercase")
    public Flux<String> uppercase(Flux<String> input) {
        return input.map(String::toUpperCase);
    }
}

客户

在客户端,pom.xml中的依赖是这样的。


<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-rsocket</artifactId>
</dependency>

应用类:


@SpringBootApplication
@EnableIntegration
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    public ClientRSocketConnector clientRSocketConnector() {
        ClientRSocketConnector clientRSocketConnector = new ClientRSocketConnector("localhost", 7000);
        clientRSocketConnector.setAutoStartup(false);
        return clientRSocketConnector;
    }

    @Bean
    public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
        return IntegrationFlows
                .from(Function.class)
                .handle(RSockets.outboundGateway("/uppercase")
                        .interactionModel((message) -> RSocketInteractionModel.requestChannel)
                        .expectedResponseType("T(java.lang.String)")
                        .clientRSocketConnector(clientRSocketConnector))
                .get();
    }
}

@RestController
class HelloController {

    @Autowired()
    @Lazy
    @Qualifier("rsocketUpperCaseRequestFlow.gateway")
    private Function<Flux<String>, Flux<String>> rsocketUpperCaseFlowFunction;

    @GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> uppercase() {
        return rsocketUpperCaseFlowFunction.apply(Flux.just("a", "b", "c", "d"));
    }
}

在运行客户端和服务器应用程序时,尝试通过curl访问http://localhost:8080/hello

当使用 serverserver-boot 使用 InboundGateway 处理消息时,输出如下所示。

curl http://localhost:8080/hello

data:ABCD

当使用 server-boot-messagemapping 时,输出正常:

data:A
data:B
data:C
data:D

客户请求者

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>

应用类:

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

@RestController
class HelloController {
    Mono<RSocketRequester> requesterMono;

    public HelloController(RSocketRequester.Builder builder) {
        this.requesterMono = builder.connectTcp("localhost", 7000);
    }

    @GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> uppercase() {
        return requesterMono.flatMapMany(
                rSocketRequester -> rSocketRequester.route("/uppercase")
                        .data(Flux.just("a", "b", "c", "d"))
                        .retrieveFlux(String.class)
        );
    }
}

在运行这个客户端和3台服务器时,尝试通过curl访问http://localhost:8080/hello

当使用 serverserver-boot 使用 InboundGateway 处理消息时,它会抛出一个类转换异常。

当使用 server-boot-messagemapping 时,输出正常:

data:A
data:B
data:C
data:D

不知道InboundGateway和OutboundGateway的配置问题出在哪里?

【问题讨论】:

    标签: spring spring-boot spring-integration rsocket


    【解决方案1】:

    感谢您提供如此详细的示例!

    所以,我所看到的。两个客户端(普通的 RSocketRequester 和 Spring Integration)都可以很好地与普通的 RSocket 服务器配合使用。

    要使它们与 Spring Integration 服务器一起工作,您必须进行以下更改:

    1. 服务器端:

    .requestElementType(ResolvableType.forClass(String.class)) 添加到RSockets.inboundGateway() 定义中,这样它就会知道将传入的有效负载转换为什么。

    1. 客户端:

      .data(Flux.just("a\n", "b\n", "c\n", "d\n")).

    目前 Spring Integration 的服务器端不将传入的 Flux 视为独立有效负载流。因此,我们尝试将它们全部连接成一个值。 新的行分隔符是我们期望独立值的指示符。 Spring Messaging 则完全相反:它检查 multi-value 预期类型并在其 map() 中解码传入 Flux 中的每个元素,而不是尝试对整个 Publisher 解码。

    这将是一个重大的变化,但可能需要考虑修复 RSocketInboundGateway 逻辑以与常规 @MessageMapping 保持一致以支持 RSocket。随意提出 GH 问题!

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多