【问题标题】:How to get server response with netty client如何使用 netty 客户端获取服务器响应
【发布时间】:2014-06-01 10:26:31
【问题描述】:

我想写一个基于 netty 的客户端。它应该有方法 public String send(String msg); 应该从服务器或将来返回响应 - 没关系。它也应该是多线程的。像这样:

public class Client {
public static void main(String[] args) throws InterruptedException {
    Client client = new Client();

}

private Channel channel;

public Client() throws InterruptedException {
    EventLoopGroup loopGroup = new NioEventLoopGroup();

    Bootstrap b = new Bootstrap();
    b.group(loopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new StringDecoder()).
                    addLast(new StringEncoder()).
                    addLast(new ClientHandler());
        }
    });
    channel = b.connect("localhost", 9091).sync().channel();
}

public String sendMessage(String msg) {
    channel.writeAndFlush(msg);
    return ??????????;
}

}

在调用 writeAndFlush(); 后,我不知道如何从服务器检索响应;我该怎么办?

我也使用 Netty 4.0.18.Final

【问题讨论】:

    标签: java client-server netty


    【解决方案1】:

    为该方法返回一个Future&lt;String&gt; 很简单,我们将实现以下方法签名:

    public Futute<String> sendMessage(String msg) {
    

    当您熟悉异步编程结构时,这相对容易做到。为了解决设计问题,我们将执行以下步骤:

    1. 写入消息时,将Promise&lt;String&gt; 添加到ArrayBlockingQueue&lt;Promise&gt;

      这将作为最近发送的消息列表,并允许我们更改 Future&lt;String&gt; 对象的返回结果。

    2. 当消息返回到处理程序时,针对Queue 的头部解析它

      这让我们能够得到正确的未来来改变。

    3. 更新Promise&lt;String&gt;的状态

      我们调用promise.setSuccess() 最终设置对象的状态,这将传播回未来的对象。

    示例代码

    public class ClientHandler extends SimpleChannelInboundHandler<String> {
        private ChannelHandlerContext ctx;
        private BlockingQueue<Promise<String>> messageList = new ArrayBlockingQueue<>(16);
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            super.channelActive(ctx);
            this.ctx = ctx;
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) {
            super.channelInactive(ctx);
            synchronized(this){
                Promise<String> prom;
                while((prom = messageList.poll()) != null) 
                    prom.setFailure(new IOException("Connection lost"));
                messageList = null;
            }
        }
    
        public Future<String> sendMessage(String message) {
            if(ctx == null) 
                throw new IllegalStateException();
            return sendMessage(message, ctx.executor().newPromise());
        }
    
        public Future<String> sendMessage(String message, Promise<String> prom) {
            synchronized(this){
                if(messageList == null) {
                    // Connection closed
                    prom.setFailure(new IllegalStateException());
                } else if(messageList.offer(prom)) { 
                    // Connection open and message accepted
                    ctx.writeAndFlush(message).addListener();
                } else { 
                    // Connection open and message rejected
                    prom.setFailure(new BufferOverflowException());
                }
                return prom;
            }
        }
        @Override
        protected void messageReceived(ChannelHandlerContext ctx, String msg) {
            synchronized(this){
                if(messageList != null) {
                     messageList.poll().setSuccess(msg);
                }
            }
        }
    }
    

    文档分类

    • private ChannelHandlerContext ctx;

      用于存储我们对 ChannelHandlerContext 的引用,我们使用它来创建 Promise

    • private BlockingQueue&lt;Promise&lt;String&gt;&gt; messageList = new ArrayBlockingQueue&lt;&gt;();

      我们将过去的消息保留在此列表中,以便我们可以更改未来的结果

    • public void channelActive(ChannelHandlerContext ctx)

      连接激活时由 netty 调用。在这里初始化我们的变量。

    • public void channelInactive(ChannelHandlerContext ctx)

      当连接因错误或正常连接关闭而变为非活动状态时由 netty 调用。

    • protected void messageReceived(ChannelHandlerContext ctx, String msg)

      当有新消息到达时由netty调用,这里挑出队列的头部,然后我们对其调用setsuccess。

    警告建议

    当使用futures时,你需要注意一件事,如果future还没有完成,不要从1个netty线程调用get(),不遵循这个简单的规则会导致死锁或BlockingOperationException

    【讨论】:

    • 对此有两个警告:1) 使用的协议必须保证服务器按照接收请求的顺序发送响应,2) 请求只会发送到和接收自单个服务器(否则 1 会中断,因为各个服务器之间的排序可能不再是同步的)。由于单个 Bootstrap 可用于连接到多个服务器,因此第二个可能是一个问题,尽管每个连接都会产生自己的通道,因此应该可以为每个通道设置单独的队列来解决这个假设 (1) 成立。
    • 内存可见性怎么样——channelActive 中的ctx 的分配保证被调用sendMessage(String) 的线程看到?
    • 还有其他人收到此错误吗?在return sendMessage(message, ctx.newPromise()); 中,newPromise 的类型为io.netty.channel.ChannelPromise,需要时为io.netty.util.concurrent.Promise&lt;String&gt;。经过一些演员,我得到ClassCastException: java.lang.String cannot be cast to java.lang.Void
    • 我在 return sendMessage(message, ctx.newPromise());行
    • @mertaksu 它应该是`ctx.executor().newPromise()` 而不是ctx.newPromise(),不知道旧代码是如何编译的,也许我使用了不同的网络版本
    【解决方案2】:

    您可以在 netty 项目中找到示例。 我们可以将结果保存到最后一个处理程序的自定义字段中。在下面的代码中,我们想要的是 handler.getFactorial()。

    参考http://www.lookatsrc.com/source/io/netty/example/factorial/FactorialClient.java?a=io.netty:netty-all

    FactorialClient.java

    public final class FactorialClient {
    
        static final boolean SSL = System.getProperty("ssl") != null;
        static final String HOST = System.getProperty("host", "127.0.0.1");
        static final int PORT = Integer.parseInt(System.getProperty("port", "8322"));
        static final int COUNT = Integer.parseInt(System.getProperty("count", "1000"));
    
        public static void main(String[] args) throws Exception {
            // Configure SSL.
            final SslContext sslCtx;
            if (SSL) {
                sslCtx = SslContextBuilder.forClient()
                    .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
            } else {
                sslCtx = null;
            }
    
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group)
                 .channel(NioSocketChannel.class)
                 .handler(new FactorialClientInitializer(sslCtx));
    
                // Make a new connection.
                ChannelFuture f = b.connect(HOST, PORT).sync();
    
                // Get the handler instance to retrieve the answer.
                FactorialClientHandler handler =
                    (FactorialClientHandler) f.channel().pipeline().last();
    
                // Print out the answer.
                System.err.format("Factorial of %,d is: %,d", COUNT, handler.getFactorial());
            } finally {
                group.shutdownGracefully();
            }
        }
    }
    
    public class FactorialClientHandler extends SimpleChannelInboundHandler<BigInteger> {
    
        private ChannelHandlerContext ctx;
        private int receivedMessages;
        private int next = 1;
        final BlockingQueue<BigInteger> answer = new LinkedBlockingQueue<BigInteger>();
    
        public BigInteger getFactorial() {
            boolean interrupted = false;
            try {
                for (;;) {
                    try {
                        return answer.take();
                    } catch (InterruptedException ignore) {
                        interrupted = true;
                    }
                }
            } finally {
                if (interrupted) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            this.ctx = ctx;
            sendNumbers();
        }
    
        @Override
        public void channelRead0(ChannelHandlerContext ctx, final BigInteger msg) {
            receivedMessages ++;
            if (receivedMessages == FactorialClient.COUNT) {
                // Offer the answer after closing the connection.
                ctx.channel().close().addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) {
                        boolean offered = answer.offer(msg);
                        assert offered;
                    }
                });
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    
        private void sendNumbers() {
            // Do not send more than 4096 numbers.
            ChannelFuture future = null;
            for (int i = 0; i < 4096 && next <= FactorialClient.COUNT; i++) {
                future = ctx.write(Integer.valueOf(next));
                next++;
            }
            if (next <= FactorialClient.COUNT) {
                assert future != null;
                future.addListener(numberSender);
            }
            ctx.flush();
        }
    
        private final ChannelFutureListener numberSender = new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    sendNumbers();
                } else {
                    future.cause().printStackTrace();
                    future.channel().close();
                }
            }
        };
    }
    

    【讨论】:

      【解决方案3】:

      调用channel.writeAndFlush(msg); 已经返回一个ChannelFuture。要处理此方法调用的结果,您可以像这样向未来添加一个侦听器:

      future.addListener(new ChannelFutureListener() {
          public void operationComplete(ChannelFuture future) {
              // Perform post-closure operation
              // ...
          }
      }); 
      

      (取自 Netty 文档,参见:Netty doc

      【讨论】:

      • 但是如何从 ChannelFuture 获取服务器响应?
      • 你需要在频道中注册一个ChannelInboundHandler。事实上,您可能已经这样做了 -> 查看您的 ClientHandler。这个处理程序可以实现一个public void channelRead(ChannelHandlerContext ctx, Object msg) {...} 方法。它处理来自服务器的响应。
      • 我了解netty的基础知识。在服务器上,这很简单。但我仍然不知道如何连接此代码: public String sendMessage(String msg) { channel.writeAndFlush(msg);返回 ??????????; } 与 channelRead(...)
      • 在netty中无法通过这种方法获取服务器响应。数据(在您的情况下是来自服务器的响应)由连接到通道的 InboundHandlers 处理。在这些处理程序中,您可以将服务器响应转发到代码的另一部分。永远记住,netty 是一个异步框架!
      猜你喜欢
      • 2012-02-21
      • 2016-12-03
      • 1970-01-01
      • 2019-05-18
      • 2018-12-02
      • 2019-12-18
      • 2019-02-05
      • 1970-01-01
      • 2012-11-28
      相关资源
      最近更新 更多