【问题标题】:netty: why code in handler can not use future.await()?netty:为什么处理程序中的代码不能使用future.await()?
【发布时间】:2012-03-20 15:11:07
【问题描述】:

大家好!

我使用 netty 3.1 构建了一个套接字调度服务器,它将套接字数据传输到另一个套接字服务器,所以我在第一条消息到达时在 netty 服务器处理程序中创建一个客户端连接,并等待连接完成,当下一个 messageRecv 事件到达时,我只是将缓冲区从服务器通道传输到客户端通道。但是我发现在使用 future.await*() 操作时在处理程序中是禁止的。如果我不使用 await(),因为 connectFuture 是同步的,有可能在下一条消息到达时,但连接未完成。我不知道如何处理这个问题。

如何确保客户端连接在下一个 messageRecv 事件到达之前完成?

现在,我只是做一个锁来同步两个代码,就像这样:

/**
 * server handler
*/
public class ServerChannelHandler extends SimpleChannelUpstreamHandler {

  private static Logger _logger = LoggerFactory.getLogger(cn.szboc.dispatch.server.netty.ServerChannelHandler.class);

  public ServerChannelHandler(ProxyClientFactory clientFactory) {
    this.clientFactory = clientFactory;
  }

/** factory connect another server */
private ProxyClientFactory clientFactory;

/** anotherchannel */
private Channel innerChannel;

private ChannelFuture connectFuture;


private ReentrantLock connectLock = new ReentrantLock();

private Condition notComplete = connectLock.newCondition();

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {

    final ChannelBuffer buffer = ((ChannelBuffer) e.getMessage()).copy();
    final Channel outChannel = ctx.getChannel();

    // first connect
    if (connectFuture == null) {

        final ClientChannelHandler cch = new ClientChannelHandler(ctx.getChannel());

        ProxyClient client = clientFactory.retrieveClient();


        connectFuture = client.getConnectChannelFuture();


        connectFuture.addListener(new ChannelFutureListener() {

            @Override
            public void operationComplete(ChannelFuture future) throws Exception {

                connectLock.lock();
                try {

                    if (future.isSuccess()) {
                        innerChannel = future.getChannel();

                        innerChannel.getPipeline().addLast("clientchannelhandler", cch);
                        innerChannel.write(buffer);
                    } else {

                        Channels.fireExceptionCaught(outChannel, future.getCause());
                    }
                } finally {

                    notComplete.signal();

                    connectLock.unlock();
                }
            }

        });

    } else {

        connectLock.lock();
        try {

            if (!connectFuture.isDone()) {

                if (!notComplete.await(500, TimeUnit.MILLISECONDS)) {

                    throw new Exception("");
                }
            }


            if (connectFuture.isSuccess()) {
                if(innerChannel == null){
                    if (!notComplete.await(500, TimeUnit.MILLISECONDS)) {

                        throw new Exception("");
                    }
                }
                innerChannel.write(buffer);
            } else {

                _logger.error("");
            }

        } finally {
            connectLock.unlock();
        }

    }

}

【问题讨论】:

    标签: netty


    【解决方案1】:

    你不能,因为你可能会死锁netty。您还会阻塞 IO-Worker 线程,这是一件坏事。处理您的情况的最佳方法是将消息“排队”直到连接完成,然后发送它们。

    另一种解决方案是在之前设置 Channel.setReadable(false) 时通过“channelOpen(..)”方法连接到代理客户端。连接完成后,您将再次调用 Channel.setReadable(true),以便处理 messageEvents。

    类似这样的:

        @Override
    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
            throws Exception {
        // Suspend incoming traffic until connected to the remote host.
        final Channel inboundChannel = e.getChannel();
        inboundChannel.setReadable(false);
    
        // Start the connection attempt.
        ClientBootstrap cb = new ClientBootstrap(cf);
        cb.getPipeline().addLast("handler", new OutboundHandler(e.getChannel()));
        ChannelFuture f = cb.connect(new InetSocketAddress(remoteHost, remotePort));
    
        outboundChannel = f.getChannel();
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    // Connection attempt succeeded:
                    // Begin to accept incoming traffic.
                    inboundChannel.setReadable(true);
                } else {
                    // Close the connection if the connection attempt has failed.
                    inboundChannel.close();
                }
            }
        });
    }
    

    有关更多详细信息,请参阅代理示例 [1]。

    [1] https://github.com/netty/netty/tree/3.2/src/main/java/org/jboss/netty/example/proxy

    【讨论】:

    • 非常感谢,我选择了你的第二种方案,但是找到了函数inboundChannel.setReadable(false);是异步的,并且返回的 channelFuture bean 似乎没有调用任何添加到自身的侦听器。
    • 所以我进入netty 3.1代码,类org.jboss.netty.channel.socket.nio.NioWorker第659行到第732行,在函数“void setInterestOps(NioSocketChannel channel, ChannelFuture future, int interestOps)" 我发现,如果key == null或者selector为null,这个函数只是return,而不是设置future的状态,对吗?说实话,我在 netty 核心代码中没有更多的内容。但我认为“inboundChannel.setReadable(false)”中的设置操作只是函数调用,而不是异步的。我认为使用您提供的第二种解决方案是安全的,谢谢!
    • 对不起,我不明白你的问题。你能给我更详细的吗?
    • 你说得对,这是 setInterestedOps(..) 中的一个错误。我会修好它。谢谢!
    • 对不起,我来自中国,我的英语很差,非常感谢,你提供的例子很好,给了我很多积分。
    猜你喜欢
    • 2020-02-20
    • 1970-01-01
    • 2018-06-19
    • 2016-08-12
    • 1970-01-01
    • 2010-10-31
    • 2019-09-16
    • 1970-01-01
    • 2015-08-23
    相关资源
    最近更新 更多