【问题标题】:How to create a spring integration flow with many producers如何与许多生产者一起创建 Spring 集成流
【发布时间】:2016-12-02 16:18:22
【问题描述】:

我阅读了这个SOF 问题,我有一个补充问题。

我今天有一个像这样工作的 Spring 集成流程:

@Bean
public IntegrationFlow udpSource1() {
    return IntegrationFlows.from(new MulticastReceivingChannelAdapter("224.0.0.1", 2000)).
            transform(new ObjectToStringTransformer("utf8")).channel("stringified").get();

}
@Bean
public IntegrationFlow udpSource2() {
    return IntegrationFlows.from(new MulticastReceivingChannelAdapter("224.0.0.1", 2001)).
            transform(new ObjectToStringTransformer("utf8")).channel("stringified").get();

}

我有几个 (10) 个 UDP 源。

我想用我的所有 UDP 源创建一个流,所有这些源都将数据推送到“字符串化”通道。

我想从一个 ArrayList 中获取我所有的 UDP 端口,然后遍历这个列表并创建 UDP 源...

有可能吗?

【问题讨论】:

    标签: spring-integration


    【解决方案1】:

    是的,它叫IntegrationFlowRegistrationhttps://spring.io/blog/2016/09/27/java-dsl-for-spring-integration-1-2-release-candidate-1-is-available

    查看Dynamic TCP Sample 以获得类似的解决方案。

    关键代码是这样的:

    private MessageChannel createNewSubflow(Message<?> message) {
            String host = (String) message.getHeaders().get("host");
            Integer port = (Integer) message.getHeaders().get("port");
            Assert.state(host != null && port != null, "host and/or port header missing");
            String hostPort = host + port;
    
            TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
            TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
            handler.setConnectionFactory(cf);
            IntegrationFlow flow = f -> f.handle(handler);
            IntegrationFlowRegistration flowRegistration =
                    this.flowContext.registration(flow)
                            .addBean(cf)
                            .id(hostPort + ".flow")
                            .register();
            MessageChannel inputChannel = flowRegistration.getInputChannel();
            this.subFlows.put(hostPort, inputChannel);
            return inputChannel;
        }
    

    【讨论】:

      【解决方案2】:

      这是我如何使用 Artem 提供的技巧来解决我的问题。

      @Configuration
      public class UdpSources {
          @Value("#{'${udp.nmea.listeningports}'.split(',')}")
          List<Integer> allUDPPorts;
      
          public static final String outChannel = "stringified";
      
          @Autowired
          private IntegrationFlowContext flowContext;
      
          @PostConstruct
          public void createAllUDPEndPoints(){
              for(int port : allUDPPorts){
                  flowContext.registration(getUdpChannel(port)).autoStartup(false).id("udpSource"+port).register();
              }
          }
      
          private IntegrationFlow getUdpChannel(int port){
              return IntegrationFlows.from(new MulticastReceivingChannelAdapter("224.0.0.1", port)).
                      transform(new ObjectToStringTransformer("UTF-8")).channel(outChannel).get();
          }
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2016-08-15
        • 1970-01-01
        • 1970-01-01
        • 2017-12-17
        • 2017-01-27
        相关资源
        最近更新 更多