【问题标题】:Netty async write response and large data unknown with sizeNetty 异步写入响应和大小未知的大数据
【发布时间】:2018-01-06 22:49:09
【问题描述】:

我开发了一个netty http服务器,但是当我在ChannelInboundHandlerAdapter.channelRead0方法中写响应时,我的响应结果来自另一个服务器,结果的大小是未知的,所以它的http响应头可能有内容长度或分块。所以我使用缓冲区,如果足够(读取完整数据)无论内容长度或分块,我使用内容长度,否则我使用分块。

  1. 我如何保持第一个连接的写入通道,然后将其传递给第二个处理程序以写入响应。 (我只是直接传ctx写但没有返回)

  2. 我如何有条件地决定将分块数据写入通道或具有内容长度的普通数据(如果在 channelRead0 时需要分块,添加 ChunkWriteHandler 似乎不起作用。

以一个简单的代码为例:

```java

EventLoopGroup bossGroup = new NioEventLoopGroup();
    final EventLoopGroup workerGroup = new NioEventLoopGroup();

    try {
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new ChannelInitializer<Channel>(){

                @Override
                protected void initChannel(Channel ch) throws Exception
                {
                    System.out.println("Start, I accept client");
                    ChannelPipeline pipeline = ch.pipeline();

                    // Uncomment the following line if you want HTTPS
                    // SSLEngine engine =
                    // SecureChatSslContextFactory.getServerContext().createSSLEngine();
                    // engine.setUseClientMode(false);
                    // pipeline.addLast("ssl", new SslHandler(engine));

                    pipeline.addLast("decoder", new HttpRequestDecoder());
                    // Uncomment the following line if you don't want to handle HttpChunks.
                    // pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));
                    pipeline.addLast("encoder", new HttpResponseEncoder());
                    // Remove the following line if you don't want automatic content
                    // compression.
                    //pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));
                    pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
                    pipeline.addLast("deflater", new HttpContentCompressor());
                    pipeline.addLast("handler", new SimpleChannelInboundHandler<HttpObject>(){

                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception
                            {
                                System.out.println("msg=" + msg);

                                final ChannelHandlerContext ctxClient2Me = ctx;

                                // TODO: Implement this method
                                Bootstrap bs = new Bootstrap();
                                try{
                                //bs.resolver(new DnsAddressResolverGroup(NioDatagramChannel.class,  DefaultDnsServerAddressStreamProvider.INSTANCE));
                                //.option(ChannelOption.TCP_NODELAY, java.lang.Boolean.TRUE)
                                bs.resolver(DefaultAddressResolverGroup.INSTANCE);
                                }catch(Exception e){
                                    e.printStackTrace();
                                }

                                bs.channel(NioSocketChannel.class);
                                EventLoopGroup cg = workerGroup;//new NioEventLoopGroup();
                                bs.group(cg).handler(new ChannelInitializer<Channel>(){

                                        @Override
                                        protected void initChannel(Channel ch) throws Exception
                                        {
                                            System.out.println("start, server accept me");
                                            // TODO: Implement this method
                                            ch.pipeline().addLast("http-request-encode", new HttpRequestEncoder());
                                            ch.pipeline().addLast(new HttpResponseDecoder());
                                            ch.pipeline().addLast("http-res", new SimpleChannelInboundHandler<HttpObject>(){

                                                    @Override
                                                    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception
                                                    {
                                                        // TODO: Implement this method
                                                        System.out.println("target = " + msg);
                                                        //
                                                        if(msg instanceof HttpResponse){
                                                            HttpResponse res = (HttpResponse) msg;
                                                            HttpUtil.isTransferEncodingChunked(res);
                                                            DefaultHttpResponse resClient2Me = new DefaultHttpResponse(HttpVersion.HTTP_1_1, res.getStatus());

                                                            //resClient2Me.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
                                                            //resClient2Me.headers().set(HttpHeaderNames.CONTENT_LENGTH, "");

                                                            ctxClient2Me.write(resClient2Me);
                                                        }
                                                        if(msg instanceof LastHttpContent){
                                                            // now response the request of the client, it wastes x seconds from receiving request to response
                                                            ctxClient2Me.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(ChannelFutureListener.CLOSE);
                                                            ctx.close();
                                                        }else if( msg instanceof HttpContent){
                                                            //ctxClient2Me.write(new DefaultHttpContent(msg)); write chunk by chunk ?
                                                        }
                                                    }


                                                });

                                            System.out.println("end, server accept me");

                                        }

                                });

                                final URI uri = new URI("http://example.com/");
                                String host = uri.getHost();
                                ChannelFuture connectFuture= bs.connect(host, 80);

                                System.out.println("to connect me to server");

                                connectFuture.addListener(new ChannelFutureListener(){

                                        @Override
                                        public void operationComplete(ChannelFuture cf) throws Exception
                                        {
                                        }

                                });


                                ChannelFuture connetedFuture = connectFuture.sync(); // TODO optimize, wait io 
                                System.out.println("connected me to server");

                                DefaultFullHttpRequest req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath());
                                //req.headers().set(HttpHeaderNames.HOST, "");
                                connetedFuture.channel().writeAndFlush(req);

                                System.out.println("end of Client2Me channelRead0");
                                System.out.println("For the seponse of Me2Server, see SimpleChannelInboundHandler.channelRead0");
                            }

                    });
                    System.out.println("end, I accept client");
                }

            });

            System.out.println("========");

        ChannelFuture channelFuture = serverBootstrap.bind(2080).sync();
        channelFuture.channel().closeFuture().sync();
    } finally {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }

```

【问题讨论】:

    标签: java netty


    【解决方案1】:

    在尝试从非 Netty 事件循环线程发送响应后,我终于找到了问题所在。如果您的客户端使用

    关闭输出流
    socketChannel.shutdownOutput()
    

    那么你需要在 Netty 中设置ALLOW_HALF_CLOSURE 属性为真,这样它就不会关闭通道。 这是一个示例服务器。客户端留给读者作为练习:-)

        final ServerBootstrap b = new ServerBootstrap();
    
        b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.ALLOW_HALF_CLOSURE, true)         // This option doesn't work
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<io.netty.channel.socket.SocketChannel>() {
                    @Override
                    protected void initChannel(io.netty.channel.socket.SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                                ctx.channel().config().setOption(ChannelOption.ALLOW_HALF_CLOSURE, true);       // This is important
                            }
    
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuffer byteBuffer = ((ByteBuf) msg).nioBuffer();
                                String id = ctx.channel().id().asLongText();
    
                                // When Done reading all the bytes, send response 1 second later
                                timer.schedule(new TimerTask() {
                                    @Override
                                    public void run() {
                                        ctx.write(Unpooled.copiedBuffer(CONTENT.asReadOnlyBuffer()));
                                        ctx.flush();
                                        ctx.close();
    
                                        log.info("[{}] Server time to first response byte: {}", id, System.currentTimeMillis() - startTimes.get(id));
                                        startTimes.remove(id);
                                    }
                                }, 1000);
                            }
                        }
                    }
                });
        Channel ch = b.bind("localhost", PORT).sync().channel();
        ch.closeFuture().sync();
    

    当然,正如线程中其他人所说,您不能发送字符串,您需要使用Unpooled.copiedBuffer发送一个ByteBuf

    【讨论】:

      【解决方案2】:
      1. 查看Channel的cmet,可以保留ChannelInboundHandlerAdapter.channelRead(ChannelHandlerContext ctx, Object msg)(自动返回后不释放msg)或SimpleChannelInboundHandler.channelRead0(ChannelHandlerContext ctx, I msg)(返回后自动释放接收到的消息)中收到的Channel以备后用。也许您可以参考最后的示例,将频道传递给另一个ChannelHandler

      所有 I/O 操作都是异步的。

      Netty 中的所有 I/O 操作都是异步的。这意味着任何 I/O 调用都会立即返回,但不能保证在调用结束时请求的 I/O 操作已经完成。相反,您将返回一个 ChannelFuture 实例,该实例将在请求的 I/O 操作成功、失败或取消时通知您。

      public interface Channel extends AttributeMap, Comparable<Channel> {
      
          /**
           * Request to write a message via this {@link Channel} through the {@link ChannelPipeline}.
           * This method will not request to actual flush, so be sure to call {@link #flush()}
           * once you want to request to flush all pending data to the actual transport.
           */
          ChannelFuture write(Object msg);
      
          /**
           * Request to write a message via this {@link Channel} through the {@link ChannelPipeline}.
           * This method will not request to actual flush, so be sure to call {@link #flush()}
           * once you want to request to flush all pending data to the actual transport.
           */
          ChannelFuture write(Object msg, ChannelPromise promise);
      
          /**
           * Request to flush all pending messages.
           */
          Channel flush();
      
          /**
           * Shortcut for call {@link #write(Object, ChannelPromise)} and {@link #flush()}.
           */
          ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
      
          /**
           * Shortcut for call {@link #write(Object)} and {@link #flush()}.
           */
          ChannelFuture writeAndFlush(Object msg);
      }
      
      1. 如果您已将HttpResponseEncoder(它是HttpObjectEncoder 的子类,它有一个私有文件private int state = ST_INIT; 用于记住是否将HTTP 正文数据编码为分块)到@987654330,则无需担心这一点@,唯一要做的就是添加一个标题“传输编码:分块”,例如HttpUtil.setTransferEncodingChunked(srcRes, true);

      ```java

      public class NettyToServerChat extends SimpleChannelInboundHandler<HttpObject> {
        private static final Logger LOGGER = LoggerFactory.getLogger(NettyToServerChat.class);
        public static final String CHANNEL_NAME = "NettyToServer";
      
        protected final ChannelHandlerContext ctxClientToNetty;
        /** Determines if the response supports keepalive */
        private boolean responseKeepalive = true;
        /** Determines if the response is chunked */
        private boolean responseChunked = false;
      
        public NettyToServerChat(ChannelHandlerContext ctxClientToNetty) {
          this.ctxClientToNetty = ctxClientToNetty;
        }
      
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
              if (msg instanceof HttpResponse) {
            HttpResponse response = (HttpResponse) msg;
      
            HttpResponseStatus resStatus = response.status();
            //LOGGER.info("Status Line: {} {} {}", response.getProtocolVersion(), resStatus.code(), resStatus.reasonPhrase());
      
            if (!response.headers().isEmpty()) {
              for (CharSequence name : response.headers().names()) {
                for (CharSequence value : response.headers().getAll(name)) {
                  //LOGGER.info("HEADER: {} = {}", name, value);
                }
              }
              //LOGGER.info("");
            }
            //response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
      
            HttpResponse srcRes = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
            if (HttpUtil.isTransferEncodingChunked(response)) {
              responseChunked = true;
              HttpUtil.setTransferEncodingChunked(srcRes, true);
              ctxNettyToServer.channel().write(srcRes);
              //ctx.channel().pipeline().addAfter(CHANNEL_NAME, "ChunkedWrite",  new ChunkedWriteHandler());
            } else {
              ctxNettyToServer.channel().write(srcRes);
              //ctx.channel().pipeline().remove("ChunkedWrite");
            }
          }
      
          if (msg instanceof LastHttpContent) { // prioritize the subclass interface
            ctx.close();
            LOGGER.debug("ctxNettyToServer.channel().isWritable() = {}", ctxNettyToServer.channel().isWritable());
            Thread.sleep(3000);
            LOGGER.debug("ctxNettyToServer.channel().isWritable() = {}", ctxNettyToServer.channel().isWritable());
      
            if(!responseChunked){
              HttpContent content = (HttpContent) msg;
              // https://github.com/netty/netty/blob/4.1/transport/src/main/java/io/netty/channel/SimpleChannelInboundHandler.java
              // @see {@link SimpleChannelInboundHandler<I>#channelRead(ChannelHandlerContext, I)}
              ctxNettyToServer.writeAndFlush(content.retain()).addListener(ChannelFutureListener.CLOSE);
            }else{
              ctxNettyToServer.close();
            }
            LOGGER.debug("ctxNettyToServer.channel().isWritable() = {}", ctxNettyToServer.channel().isWritable());
          } else if (msg instanceof HttpContent) {
            HttpContent content = (HttpContent) msg;
            // We need to do a ReferenceCountUtil.retain() on the buffer to increase the reference count by 1
            ctxNettyToServer.write(content.retain());
          }
        }
      }
      

      ```

      【讨论】:

        猜你喜欢
        • 2016-11-16
        • 2020-11-13
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2014-07-22
        相关资源
        最近更新 更多