您的入站和出站已颠倒。
这是一个可以为您提供所需内容的示例;它使用发布/订阅频道进行广播...
@SpringBootApplication
public class So48213450Application {
private final Map<Integer, IntegrationFlowRegistration> registrations = new HashMap<>();
public static void main(String[] args) {
SpringApplication.run(So48213450Application.class, args);
}
@Bean
public PublishSubscribeChannel channel() {
return new PublishSubscribeChannel();
}
@Bean
public ApplicationRunner runner(PublishSubscribeChannel channel) {
return args -> {
makeANewUdpAdapter(1234);
makeANewUdpAdapter(1235);
channel.send(MessageBuilder.withPayload("foo\n").build());
registrations.values().forEach(r -> {
r.stop();
r.destroy();
});
};
}
@Autowired
private IntegrationFlowContext flowContext;
public void makeANewUdpAdapter(int port) {
System.out.println("Creating an adapter to send to port " + port);
IntegrationFlow flow = IntegrationFlows.from(channel())
.handle(Udp.outboundAdapter("localhost", port))
.get();
IntegrationFlowRegistration registration = flowContext.registration(flow).register();
registrations.put(port, registration);
}
}
结果:
$ nc -u -l 1234 &
[1] 56730
$ nc -u -l 1235 &
[2] 56739
$ jobs
[1]- Running nc -u -l 1234 &
[2]+ Running nc -u -l 1235 &
$ foo
foo
您不能在运行时更改参数,您必须创建新参数。
编辑
在下面回应您的 cmets...
你不能混合和匹配 spring 集成 jars(2.1.x 和 5.0.x);它们都必须使用相同的版本。我上面的例子使用了 Boot 2.0.0.M7(boot 2 计划在下个月发布)。
Udp 工厂类在 5.0.0 中被添加到 spring-integration-ip 中。
这里有一个类似的例子(也添加了接收适配器),用于 boot 1.5.9 和 spring integration 4.3.13...
@SpringBootApplication
public class So482134501Application {
private final Map<Integer, IntegrationFlowRegistration> registrations = new HashMap<>();
@Autowired
private IntegrationFlowContext flowContext;
public static void main(String[] args) {
SpringApplication.run(So482134501Application.class, args);
}
@Bean
public PublishSubscribeChannel channel() {
return new PublishSubscribeChannel();
}
@Bean
public ApplicationRunner runner(PublishSubscribeChannel channel) {
return args -> {
makeANewUdpInbound(1234);
makeANewUdpInbound(1235);
makeANewUdpOutbound(1234);
makeANewUdpOutbound(1235);
Thread.sleep(5_000);
channel.send(MessageBuilder.withPayload("foo\n").build());
this.registrations.values().forEach(r -> {
r.stop();
r.destroy();
});
this.registrations.clear();
};
}
public void makeANewUdpOutbound(int port) {
System.out.println("Creating an adapter to send to port " + port);
IntegrationFlow flow = IntegrationFlows.from(channel())
.handle(new UnicastSendingMessageHandler("localhost", port))
.get();
IntegrationFlowRegistration registration = flowContext.registration(flow).register();
registrations.put(port, registration);
}
public void makeANewUdpInbound(int port) {
System.out.println("Creating an adapter to receive from port " + port);
IntegrationFlow flow = IntegrationFlows.from(new UnicastReceivingChannelAdapter(port))
.<byte[], String>transform(String::new)
.handle(System.out::println)
.get();
IntegrationFlowRegistration registration = flowContext.registration(flow).register();
registrations.put(port, registration);
}
}
结果:
GenericMessage [payload=foo
, headers={ip_packetAddress=localhost/127.0.0.1:54881, ip_address=127.0.0.1, id=db7dae61-078c-5eb6-dde4-f83fc6c591d1, ip_port=54881, ip_hostname=localhost, timestamp=1515764556722}]
GenericMessage [payload=foo
, headers={ip_packetAddress=localhost/127.0.0.1:54880, ip_address=127.0.0.1, id=d1f79e79-569b-637b-57c5-549051f1b031, ip_port=54880, ip_hostname=localhost, timestamp=1515764556722}]