【问题标题】:Send multiple asynchonous requests on a Netty client在 Netty 客户端上发送多个异步请求
【发布时间】:2012-04-07 05:00:30
【问题描述】:

首先,让我解释一下上下文:

我必须创建一个客户端,它将发送许多 HTTP 请求来下载图像。这些请求必须是异步的,因为一旦图像完成,它将被添加到队列中,然后打印到屏幕上。由于图像可能很大并且响应分块,我的处理程序必须将其聚合到缓冲区中。

所以我遵循 Netty 示例代码 (HTTP spoon example)。

目前,我有三个静态映射来为每个通道存储通道 ID 和缓冲区/块布尔值/我的最终对象。

private static final ConcurrentHashMap<Integer, ChannelBuffer> BUFFER_MAP = new ConcurrentHashMap<Integer, ChannelBuffer>();
private static final ConcurrentHashMap<Integer, ImagePack> PACK_MAP = new ConcurrentHashMap<Integer, ImagePack>();
private static final ConcurrentHashMap<Integer, Boolean> CHUNKS_MAP = new ConcurrentHashMap<Integer, Boolean>();

之后,我创建了我的引导客户端和计数器来倒数待处理请求的数量。当响应图像完成时,最终队列和计数器将传递给我的处理程序。

    final ClientBootstrap bootstrap = new ClientBootstrap(
            new NioClientSocketChannelFactory(
            Executors.newCachedThreadPool(),
            Executors.newCachedThreadPool()));
    bootstrap.setOption("keepAlive", true);
    bootstrap.setOption("tcpNoDelay", true);
    bootstrap.setOption("reuseAddress", true);
    bootstrap.setOption("connectTimeoutMillis", 30000);


    final CountDownLatch latch = new CountDownLatch(downloadList.size()) {

        @Override
        public void countDown() {
            super.countDown();
            if (getCount() <= 0) {
                try {
                    queue.put(END_OF_QUEUE);
                    bootstrap.releaseExternalResources();
                } catch (InterruptedException ex) {
                    LOGGER.log(Level.WARNING, ex.getMessage(), ex);
                }
            }
        }
    };
    bootstrap.getPipeline().addLast("codec", new HttpClientCodec());
    bootstrap.getPipeline().addLast("handler", new TileClientHandler(queue, latch));

之后,我为每个要下载的图像创建一个通道,当通道连接时,将创建并发送请求。主机和端口之前已经提取出来了。

for (final ImagePack pack : downloadList) {

        final ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));

        future.addListener(new ChannelFutureListener() {

            public void operationComplete(ChannelFuture cf) throws Exception {

                final Channel channel = future.getChannel();

                PACK_MAP.put(channel.getId(), pack);

                final HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, pack.url);
                request.setHeader(HttpHeaders.Names.HOST, host);
                request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
                request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.BYTES);

                if (channel.isWritable()) {
                    channel.write(request);
                }
            }
        });
    }

现在,这是我的 ChannelHandler,它是一个扩展 SimpleChannelUpstreamHandler 的内部类。连接通道后,会在BUFFER_MAPCHUNKS_MAP 中创建一个新条目。 BUFFER_MAP 包含处理程序用于从通道聚合图像块的所有图像缓冲区,CHUNKS_MAP 包含响应分块布尔值。响应完成后,图片InputSteam被加入队列,latch倒计时,通道关闭。

private class TileClientHandler extends SimpleChannelUpstreamHandler {

    private CancellableQueue<Object> queue;
    private CountDownLatch latch;

    public TileClientHandler(final CancellableQueue<Object> queue, final CountDownLatch latch) {
        this.queue = queue;
        this.latch = latch;
    }

    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        if(!BUFFER_MAP.contains(ctx.getChannel().getId())){
            BUFFER_MAP.put(ctx.getChannel().getId(), new DynamicChannelBuffer(50000));
        }
        if(!CHUNKS_MAP.contains(ctx.getChannel().getId())){
            CHUNKS_MAP.put(ctx.getChannel().getId(), false);
        }
    }

    @Override
    public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
        super.writeComplete(ctx, e);
        if(!BUFFER_MAP.contains(ctx.getChannel().getId())){
            BUFFER_MAP.put(ctx.getChannel().getId(), new DynamicChannelBuffer(50000));
        }
        if(!CHUNKS_MAP.contains(ctx.getChannel().getId())){
            CHUNKS_MAP.put(ctx.getChannel().getId(), false);
        }
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        final Integer channelID = ctx.getChannel().getId();
        if (!CHUNKS_MAP.get(channelID)) {
            final HttpResponse response = (HttpResponse) e.getMessage();

            if (response.isChunked()) {
                CHUNKS_MAP.put(channelID, true);

            } else {
                final ChannelBuffer content = response.getContent();
                if (content.readable()) {
                    final ChannelBuffer buf = BUFFER_MAP.get(channelID);
                    buf.writeBytes(content);
                    BUFFER_MAP.put(channelID, buf);
                    messageCompleted(e);

                }
            }
        } else {
            final HttpChunk chunk = (HttpChunk) e.getMessage();
            if (chunk.isLast()) {
                CHUNKS_MAP.put(channelID, false);
                messageCompleted(e);
            } else {
                final ChannelBuffer buf = BUFFER_MAP.get(channelID);
                buf.writeBytes(chunk.getContent());
                BUFFER_MAP.put(channelID, buf);
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        e.getCause().printStackTrace();
        latch.countDown();
        e.getChannel().close();
    }

    private void messageCompleted(MessageEvent e) {
        final Integer channelID = e.getChannel().getId();
        if (queue.isCancelled()) {
            return;
        }

        try {
            final ImagePack p = PACK_MAP.get(channelID);
            final ChannelBuffer b = BUFFER_MAP.get(channelID);

            p.setBuffer(new ByteArrayInputStream(b.array()));
            queue.put(p.getTile());
        } catch (Exception ex) {
            LOGGER.log(Level.WARNING, ex.getMessage(), ex);
        }
        latch.countDown();
        e.getChannel().close();
    }
}

我的问题是,当我执行这段代码时,我遇到了这些异常:

 java.lang.IllegalArgumentException: invalid version format: 3!}@
    at org.jboss.netty.handler.codec.http.HttpVersion.<init>(HttpVersion.java:108)
    at org.jboss.netty.handler.codec.http.HttpVersion.valueOf(HttpVersion.java:68)
    at org.jboss.netty.handler.codec.http.HttpResponseDecoder.createMessage(HttpResponseDecoder.java:110)
    at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:198)
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113)
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101)
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470)
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:443)
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
    at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:662)

java.lang.IllegalArgumentException: invalid version format: 
    at org.jboss.netty.handler.codec.http.HttpVersion.<init>(HttpVersion.java:108)
    at org.jboss.netty.handler.codec.http.HttpVersion.valueOf(HttpVersion.java:68)
    at org.jboss.netty.handler.codec.http.HttpResponseDecoder.createMessage(HttpResponseDecoder.java:110)
    at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:198)
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113)
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101)
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470)
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.cleanup(ReplayingDecoder.java:546)
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.channelDisconnected(ReplayingDecoder.java:449)
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
    at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:360)
    at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:595)
    at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:101)
    at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:60)
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleDownstream(HttpClientCodec.java:82)
    at org.jboss.netty.channel.Channels.close(Channels.java:720)
    at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:200)
    at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:515)
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461)
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
    at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432)
    at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
    at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:662)

22 mars 2012 15:27:31 org.jboss.netty.channel.DefaultChannelPipeline
ATTENTION: An exception was thrown by a user handler while handling an exception event ([id: 0x3cd16610, /172.16.30.91:34315 :> tile.openstreetmap.org/193.63.75.98:80] EXCEPTION: java.lang.IllegalArgumentException: invalid version format: 
java.lang.IllegalStateException: An Executor cannot be shut down from the thread acquired from itself.  Please make sure you are not calling releaseExternalResources() from an I/O worker thread.
    at org.jboss.netty.util.internal.ExecutorUtil.terminate(ExecutorUtil.java:71)
    at org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.releaseExternalResources(NioClientSocketChannelFactory.java:171)
    at org.jboss.netty.bootstrap.Bootstrap.releaseExternalResources(Bootstrap.java:324)
    at org.geotoolkit.client.map.CachedPyramidSet$1.countDown(CachedPyramidSet.java:314)
    at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:514)
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461)
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
    at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432)
    at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52)
    at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:360)
    at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:595)
    at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:101)
    at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:60)
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleDownstream(HttpClientCodec.java:82)
    at org.jboss.netty.channel.Channels.close(Channels.java:720)
    at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:200)
    at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:515)
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461)
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
    at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432)
    at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
    at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:662)

还有一些 NPE 有时会出现。

java.lang.NullPointerException
    at org.jboss.netty.handler.codec.http.HttpMessageDecoder.skipControlCharacters(HttpMessageDecoder.java:409)
    at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:184)
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113)
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101)
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470)
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:443)
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
    at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)

所有这些代码都适用于一个请求,但是当发送许多请求时,一些奇怪的东西会附加在缓冲区上。

任何想法我在这里缺少什么?谢谢。

在我的第一个版本中,我为每个请求的图像复制引导程序/处理程序,它工作正常但不是很优化。

【问题讨论】:

    标签: java client-server netty


    【解决方案1】:

    问题是您在所有频道之间共享一个 HttpClientCodec。为所有通道克隆引导程序中指定的默认管道,因此每个通道看到每个处理程序的相同实例。 http 编解码器是有状态的,因此您可以看到不同响应混合在一起的效果。

    最简单的解决方案是将 ChannelPipelineFactory 传递给引导程序。这将为每个新通道调用,您可以使用 HttpClientCodec 的新实例创建管道。没有什么可以阻止您为您创建的每个管道使用相同的 TileClientHandler 实例,如果这是它的预期工作方式。

    不过我很好奇。鉴于您同时发出每个请求,在 HttpClientCodec 上游添加 HttpChunkAggregator 并让 Netty 将所有块聚合到单个 HttpResponse 中会不会更容易。然后你就从那里抓取重新组装的内容?

    【讨论】:

    • 嗨 johnstlr,感谢这个快速有用的答案,我现在使用 ChannelPipelineFactory 来实例化 HTTPCodec 和我的 Tile 处理程序。它工作正常,但我仍然有java.lang.IllegalStateException: An Executor cannot be shut down from the thread acquired from itself. Please make sure you are not calling releaseExternalResources() from an I/O worker thread. 异常。你对此有什么想法吗?对于信息,我没有使用 HttpChunkAggregator 的原因是您必须为 HttpChunkAggregator 构造函数设置缓冲区大小。
    • 您正在从 CountDownLatch.countDown 中调用 bootstrap.releaseExternalResources,这是从处理程序方法中的 IO 线程调用的。不幸的是,你不能这样做。您需要从不在 Netty 使用的线程池中的线程调用 releaseExternalResources。一种选择可能是在您的线程中调用 releaseExternalResources,一旦它完成了队列的处理,它就会从您的内部队列中读取。此外,您对 HttpChunkAggregator 是完全正确的。对不起!
    猜你喜欢
    • 2014-02-07
    • 1970-01-01
    • 2016-09-04
    • 1970-01-01
    • 2015-06-09
    • 2015-11-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多