【问题标题】:Threads not executing concurrently in Netty UDP server在 Netty UDP 服务器中未同时执行的线程
【发布时间】:2012-05-14 02:07:56
【问题描述】:

我正在分析的代码使用 Netty NioDatagramChannelFactory 创建 UDP 服务器。 它创建一个线程池:

ExecutorService threadPool = Executors.newCachedThreadPool();

然后是数据报通道、pipelineFactory & bootstrap:

int workerCount = 10;
DatagramChannelFactory datagramChannelFactory = new NioDatagramChannelFactory(threadPool, workerCount);
ChannelPipelineFactory pipelineFactory = new SNMPTrapsPipeLineFactory();

ConnectionlessBootstrap bootStrap = new ConnectionlessBootstrap(datagramChannelFactory);
bootStrap.setPipelineFactory(pipelineFactory);
bootStrap.bind(new InetSocketAddress(host, port));

在 pipelineFactory 中,getPipeline() 添加自定义处理程序。

就像它所说的: Multi-threaded Handling of UDP Messages

只有一个线程处理收到的消息。在日志中,线程名称显示为 New I/O datagram worker #1,例如:

2012-04-20 09:20:51,853 新的 I/O 数据报工作程序 #1'-'1 INFO [c.e.m.r.s.h.SNMPTrapsRequestHandler:42] messageReceived |处理:V1TRAP[reqestID=0, ...]

我阅读了文档和这个条目:Lot of UDP requests lost in UDP server with Netty

然后我根据这些条目更改了一些代码。 现在创建线程池:

int corePoolSize = 5;
ExecutorService threadPool = new OrderedMemoryAwareThreadPoolExecutor(corePoolSize, 1048576, 1048576);

还有 pipelineFactory 和 ExecutionHandler:

ExecutionHandler executionHandler = new ExecutionHandler(threadPool);
ChannelPipelineFactory pipelineFactory = new SNMPTrapsPipeLineFactory(executionHandler);

getPipeline() 会添加如下所述的处理程序:

public class SNMPTrapsPipeLineFactory implements ChannelPipelineFactory {

    private ExecutionHandler executionHandler = null;

    public SNMPTrapsPipeLineFactory(ExecutionHandler executionHandler) { 
        this.executionHandler = executionHandler;
    }

    @Override
    public ChannelPipeline getPipeline() throws Exception {

        ChannelPipeline pipeline = Channels.pipeline();
        pipeline.addFirst("ExecutorHandler", executionHandler);

        // Here the custom handlers are added
        pipeline.addLast( ... )
    }

现在,我在日志中获得了 4 个不同的线程名称。它们显示为 pool-2-thread-1pool-2-thread-2 等...

例如:

2012-05-09 09:12:19,589 pool-2-thread-1 INFO [c.e.m.r.s.h.SNMPTrapsRequestHandler:46] messageReceived |处理:V1TRAP[reqestID=0, ...]

但它们不会同时处理。 messageReceived() 下的处理必须在一个线程上完成,以便下一个线程处理下一条消息。 我从不同的客户端向服务器发送了大量消息,并且我得到的日志不是交错的。我也试过在messageReceived()里面Thread.sleep(),并确认了之前的。

我错过了什么吗? 有没有办法用 Netty 实现真正的多线程 UDP 服务器? 如何让不同的线程同时执行 messageReceived()?

【问题讨论】:

  • 如果没记错的话,OrderedMemoryAwareThreadPoolExecutor 在同一个线程中执行来自同一个客户端的请求。

标签: multithreading concurrency udp netty


【解决方案1】:

根据我的经验和对 UDP 的 Netty 的理解,只有一个线程处理 UDP 消息进行解码是正常的。由于 UDP 是无会话的,因此只有一个线程可以在一个 UDP 端口上接收数据并对其进行解码。

一旦您解码了数据并将其包装到缓冲区或特定的 java 对象中,您就可以将该对象放入将处理它的线程池(执行处理程序 -> 您的业务处理程序)。然后,一旦您将先前解码的数据释放到执行处理程序中,就可以解码 UDP 端口上即将出现的新数据。

您可以在创建 NioDatagramChannelFactory 时指定的池线程仅在您侦听多个端口上的数据时使用。每个端口只有一个线程有意义。即使您在该构造函数中指定了 100 个工作器,如果您配置了一个 UDP 端口,也只会使用一个。

【讨论】:

  • 看来你是对的,每个端口只有一个线程。但对我来说,这没有任何意义......似乎这就是 netty 的设计方式。当我使用一个处理具有数万亿选项的线程池的框架时,我必须添加我的“线程代码”才能做到这一点......我不敢相信......
  • 您愿意发布您的代码作为答案吗?我同意你的观点,即使只有一个接收端口,也可能有很多来源,所以多线程不仅有意义,而且似乎有必要为大量客户提供服务。
【解决方案2】:

让我大吃一惊的是,您将执行处理程序 first 放入管道中。我相信其意图是直到“应用程序”处理程序的整个管道应该由执行 IO 解码的 IO 线程执行。

因此,我会断言您首先要添加所有 SNMPTrap 解码处理程序,然后,当您有一个实际的 SNMPTrap 时,它会被移交给执行处理程序,该处理程序又将陷阱传递给实际的消费者的陷阱做一些有用的事情。

@Override
public ChannelPipeline getPipeline() throws Exception {

    ChannelPipeline pipeline = Channels.pipeline(
         new SomethingSomethingDecoder(),
         new SNMPTrapDecoder(),
         executionHandler.
         snmpTrapConsumerHandler
    );
}

至少,ExecutionHandlerjavadoc 中是这样显示的,以上是我对它的解释。

【讨论】:

  • 是的。我尝试了这种方法,即文档中描述的方法,但它不起作用。然后我尝试使用 addFirst() 添加 executionHandler,就像问题的答案中描述的那样:Lot of UDP requests lost in UDP server with Netty。但它们都不起作用。
猜你喜欢
  • 1970-01-01
  • 2011-01-22
  • 1970-01-01
  • 2021-06-09
  • 2012-10-05
  • 1970-01-01
  • 2014-04-24
  • 2015-04-11
  • 1970-01-01
相关资源
最近更新 更多