【发布时间】:2012-05-14 02:07:56
【问题描述】:
我正在分析的代码使用 Netty NioDatagramChannelFactory 创建 UDP 服务器。 它创建一个线程池:
ExecutorService threadPool = Executors.newCachedThreadPool();
然后是数据报通道、pipelineFactory & bootstrap:
int workerCount = 10;
DatagramChannelFactory datagramChannelFactory = new NioDatagramChannelFactory(threadPool, workerCount);
ChannelPipelineFactory pipelineFactory = new SNMPTrapsPipeLineFactory();
ConnectionlessBootstrap bootStrap = new ConnectionlessBootstrap(datagramChannelFactory);
bootStrap.setPipelineFactory(pipelineFactory);
bootStrap.bind(new InetSocketAddress(host, port));
在 pipelineFactory 中,getPipeline() 添加自定义处理程序。
就像它所说的: Multi-threaded Handling of UDP Messages
只有一个线程处理收到的消息。在日志中,线程名称显示为 New I/O datagram worker #1,例如:
2012-04-20 09:20:51,853 新的 I/O 数据报工作程序 #1'-'1 INFO [c.e.m.r.s.h.SNMPTrapsRequestHandler:42] messageReceived |处理:V1TRAP[reqestID=0, ...]
我阅读了文档和这个条目:Lot of UDP requests lost in UDP server with Netty
然后我根据这些条目更改了一些代码。 现在创建线程池:
int corePoolSize = 5;
ExecutorService threadPool = new OrderedMemoryAwareThreadPoolExecutor(corePoolSize, 1048576, 1048576);
还有 pipelineFactory 和 ExecutionHandler:
ExecutionHandler executionHandler = new ExecutionHandler(threadPool);
ChannelPipelineFactory pipelineFactory = new SNMPTrapsPipeLineFactory(executionHandler);
getPipeline() 会添加如下所述的处理程序:
public class SNMPTrapsPipeLineFactory implements ChannelPipelineFactory {
private ExecutionHandler executionHandler = null;
public SNMPTrapsPipeLineFactory(ExecutionHandler executionHandler) {
this.executionHandler = executionHandler;
}
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addFirst("ExecutorHandler", executionHandler);
// Here the custom handlers are added
pipeline.addLast( ... )
}
现在,我在日志中获得了 4 个不同的线程名称。它们显示为 pool-2-thread-1、pool-2-thread-2 等...
例如:
2012-05-09 09:12:19,589 pool-2-thread-1 INFO [c.e.m.r.s.h.SNMPTrapsRequestHandler:46] messageReceived |处理:V1TRAP[reqestID=0, ...]
但它们不会同时处理。 messageReceived() 下的处理必须在一个线程上完成,以便下一个线程处理下一条消息。 我从不同的客户端向服务器发送了大量消息,并且我得到的日志不是交错的。我也试过在messageReceived()里面Thread.sleep(),并确认了之前的。
我错过了什么吗? 有没有办法用 Netty 实现真正的多线程 UDP 服务器? 如何让不同的线程同时执行 messageReceived()?
【问题讨论】:
-
如果没记错的话,OrderedMemoryAwareThreadPoolExecutor 在同一个线程中执行来自同一个客户端的请求。
标签: multithreading concurrency udp netty