【问题标题】:Spring Integration multiple UDP inbound/outbound channelsSpring集成多个UDP入站/出站通道
【发布时间】:2018-06-21 03:50:03
【问题描述】:

我正在尝试使用 Spring Boot 构建一个要部署在多个节点上的模块。由于特定应用程序的时间限制,我必须使用 UDP,不能依赖 Spring 提供的更易于使用的 REST 工具。

我必须能够将数据报发送到可能随时间变化的一组节点(即该组可能会增长或缩小,或者某些节点可能会移动到新的 ip/port“坐标” )。通信必须是单播

我一直在阅读有关 TCP 和 UDP 支持TCP and UDP support 的官方文档,但它相当...紧凑且不透明。 org.springframework.integration 类上的 javadocs 也相当简短。 据我所知,“入站”通道用于发送数据包,而出站通道用于接收数据包。

到目前为止,我无法找到以下入站问题的答案(即“发送”渠道,如果我理解得很好的话): - 如何在运行时创建更多通道,将数据包发送到多个目的地? - 如果主机被移动,我应该直接销毁通道并设置一个新通道,还是可以在运行时更改通道的参数(目标 IP/端口)?

对于出站渠道(“接收”渠道,如果我理解得很好的话),我有与上述类似的问题,如: - 如何在运行时设置多个通道? - 如何在运行时更改现有频道的目的地,而不必将其拆除并重新设置? - 我应该只打开/关闭“原始”UDP 套接字吗?

【问题讨论】:

    标签: java spring udp spring-integration


    【解决方案1】:

    您的入站和出站已颠倒。

    这是一个可以为您提供所需内容的示例;它使用发布/订阅频道进行广播...

    @SpringBootApplication
    public class So48213450Application {
    
        private final Map<Integer, IntegrationFlowRegistration> registrations = new HashMap<>();
    
        public static void main(String[] args) {
            SpringApplication.run(So48213450Application.class, args);
        }
    
        @Bean
        public PublishSubscribeChannel channel() {
            return new PublishSubscribeChannel();
        }
    
        @Bean
        public ApplicationRunner runner(PublishSubscribeChannel channel) {
            return args -> {
                makeANewUdpAdapter(1234);
                makeANewUdpAdapter(1235);
                channel.send(MessageBuilder.withPayload("foo\n").build());
                registrations.values().forEach(r -> {
                    r.stop();
                    r.destroy();
                });
            };
        }
    
        @Autowired
        private IntegrationFlowContext flowContext;
    
        public void makeANewUdpAdapter(int port) {
            System.out.println("Creating an adapter to send to port " + port);
            IntegrationFlow flow = IntegrationFlows.from(channel())
                    .handle(Udp.outboundAdapter("localhost", port))
                    .get();
            IntegrationFlowRegistration registration = flowContext.registration(flow).register();
            registrations.put(port, registration);
        }
    
    }
    

    结果:

    $ nc -u -l 1234 &
    [1] 56730
    $ nc -u -l 1235 &
    [2] 56739
    $ jobs
    [1]-  Running                 nc -u -l 1234 &
    [2]+  Running                 nc -u -l 1235 &
    $ foo
    foo
    

    您不能在运行时更改参数,您必须创建新参数。

    编辑

    在下面回应您的 cmets...

    你不能混合和匹配 spring 集成 jars(2.1.x 和 5.0.x);它们都必须使用相同的版本。我上面的例子使用了 Boot 2.0.0.M7(boot 2 计划在下个月发布)。

    Udp 工厂类在 5.0.0 中被添加到 spring-integration-ip 中。

    这里有一个类似的例子(也添加了接收适配器),用于 boot 1.5.9 和 spring integration 4.3.13...

    @SpringBootApplication
    public class So482134501Application {
    
        private final Map<Integer, IntegrationFlowRegistration> registrations = new HashMap<>();
    
        @Autowired
        private IntegrationFlowContext flowContext;
    
        public static void main(String[] args) {
            SpringApplication.run(So482134501Application.class, args);
        }
    
        @Bean
        public PublishSubscribeChannel channel() {
            return new PublishSubscribeChannel();
        }
    
        @Bean
        public ApplicationRunner runner(PublishSubscribeChannel channel) {
            return args -> {
                makeANewUdpInbound(1234);
                makeANewUdpInbound(1235);
                makeANewUdpOutbound(1234);
                makeANewUdpOutbound(1235);
                Thread.sleep(5_000);
                channel.send(MessageBuilder.withPayload("foo\n").build());
                this.registrations.values().forEach(r -> {
                    r.stop();
                    r.destroy();
                });
                this.registrations.clear();
            };
        }
    
        public void makeANewUdpOutbound(int port) {
            System.out.println("Creating an adapter to send to port " + port);
            IntegrationFlow flow = IntegrationFlows.from(channel())
                    .handle(new UnicastSendingMessageHandler("localhost", port))
                    .get();
            IntegrationFlowRegistration registration = flowContext.registration(flow).register();
            registrations.put(port, registration);
        }
    
        public void makeANewUdpInbound(int port) {
            System.out.println("Creating an adapter to receive from port " + port);
            IntegrationFlow flow = IntegrationFlows.from(new UnicastReceivingChannelAdapter(port))
                    .<byte[], String>transform(String::new)
                    .handle(System.out::println)
                    .get();
            IntegrationFlowRegistration registration = flowContext.registration(flow).register();
            registrations.put(port, registration);
        }
    
    }
    

    结果:

    GenericMessage [payload=foo
    , headers={ip_packetAddress=localhost/127.0.0.1:54881, ip_address=127.0.0.1, id=db7dae61-078c-5eb6-dde4-f83fc6c591d1, ip_port=54881, ip_hostname=localhost, timestamp=1515764556722}]
    GenericMessage [payload=foo
    , headers={ip_packetAddress=localhost/127.0.0.1:54880, ip_address=127.0.0.1, id=d1f79e79-569b-637b-57c5-549051f1b031, ip_port=54880, ip_hostname=localhost, timestamp=1515764556722}]
    

    【讨论】:

    • 非常感谢。我还不能真正使用 Udp.outboundAdapter(...)。我想我的 pom 文件可能有问题(这是一个 maven 项目)。父工件是 spring-boot-starter-parent (1.5.9 RELEASE),对于集成我有 spring-integration-core (5.0.0 RELEASE) 和 spring-integration-ip (2.1.3 发布)。我正在使用 Java 8。我需要什么来解决这种依赖关系?
    • 在 s-i-ip 5.0 中添加了 Udp 工厂。请参阅我对启动 1.5.9 版本的回答的编辑。
    • 我找到了丢失的包,现在我设法让它工作了。再次感谢你。标记为正确答案。
    猜你喜欢
    • 2015-07-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-02-02
    • 1970-01-01
    • 2012-12-17
    • 2014-12-07
    • 2023-03-21
    相关资源
    最近更新 更多