【问题标题】:Netty OrderedMemoryAwareThreadPoolExecutor not creating multiple threadsNetty OrderedMemoryAwareThreadPoolExecutor 没有创建多个线程
【发布时间】:2013-02-07 22:53:00
【问题描述】:

我将 Netty 用于多线程 TCP 服务器和单个客户端持久连接。 客户端发送许多二进制消息(在我的用例中为 10000 条),并且应该接收每条消息的答案。我在管道中添加了一个 OrderedMemoryAwareThreadPoolExecutor 来处理在多个线程上执行 DB 调用。

如果我在 messageReceived() 方法中运行 DB 调用(或使用 Thread.currentThread().sleep(50) 模拟它),那么所有事件都由 单线程 处理。 p>

    5 count of {main}
    1 count of {New
10000 count of {pool-3-thread-4}

对于 messageReceived() 的简单实现,服务器会按预期创建许多执行线程。

我应该如何配置 ExecutionHandler 以获得业务逻辑的多个线程执行器?

这是我的代码:

public class MyServer {

      public void run() {
            OrderedMemoryAwareThreadPoolExecutor eventExecutor = new OrderedMemoryAwareThreadPoolExecutor(16, 1048576L, 1048576L, 1000, TimeUnit.MILLISECONDS, Executors.defaultThreadFactory());  
            ExecutionHandler executionHandler = new ExecutionHandler(eventExecutor);        
            bootstrap.setPipelineFactory(new ServerChannelPipelineFactory(executionHandler));
      }
    }  



    public class ServerChannelPipelineFactory implements ChannelPipelineFactory {

      public ChannelPipeline getPipeline() throws Exception {

        pipeline.addLast("encoder", new MyProtocolEncoder());
        pipeline.addLast("decoder", new MyProtocolDecoder());
        pipeline.addLast("executor", executionHandler);
        pipeline.addLast("myHandler", new MyServerHandler(dataSource));

      }
    }

    public class MyServerHandler extends SimpleChannelHandler {

      public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) throws DBException {


          // long running DB call simulation
          try {
            Thread.currentThread().sleep(50);
          } catch (InterruptedException ex) {

          }  

          // a simple message  
          final MyMessage answerMsg = new MyMessage();
          if (e.getChannel().isWritable()) {
            e.getChannel().write(answerMsg);
          }  
      }      
    }

【问题讨论】:

    标签: multithreading netty persistent-connection threadpoolexecutor


    【解决方案1】:

    OrderedMemoryAwareThreadPoolExecutor 保证来自单个通道的事件按顺序处理。您可以将其视为将通道绑定到池中的特定线程,然后处理该线程上的所有事件 - 尽管它比这更复杂一些,因此不要依赖始终由同一线程处理的通道。

    如果您启动第二个客户端,您会看到它(很可能)正在池中的另一个线程上进行处理。如果您真的可以并行处理单个客户端的请求,那么您可能需要 MemoryAwareThreadPoolExecutor,但请注意,这不能保证通道事件的顺序。

    【讨论】:

    • 我现在在 FrameDecoder 中遇到异常:工作线程无法处理来自客户端的消息数量。如果我在管道中的解码器之前添加 executionHandler 是否有意义? ERROR {New I/O worker #1} (MyProtocolDecoder.java:161) - decode exception: java.lang.UnsupportedOperationException at org.jboss.netty.buffer.CompositeChannelBuffer.array(CompositeChannelBuffer.java:166) ... at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:422)
    • 我一直试图将执行处理程序放置在尽可能远的管道上,以允许 I/O 线程做尽可能多的工作。它的位置取决于您的阻塞操作开始的位置以及对您的应用程序来说性能最高的位置。关于异常 - CompositeChannelBuffer 不支持 array() 方法。您可能需要 DynamicChannelBuffer - static.netty.io/3.6/api/org/jboss/netty/buffer/…。如果没有看到解码器,很难给出建议。但是,您可能想在其他任务中寻求有关解码器的帮助。
    • 谢谢。是的,错误出现在将 ChannelBuffer 转换为 Java 对象的 MyProtocolDecoder 中。它现在似乎可以工作了。
    猜你喜欢
    • 1970-01-01
    • 2012-04-11
    • 1970-01-01
    • 2019-03-01
    • 2020-03-06
    • 1970-01-01
    • 2013-11-20
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多