【问题标题】:"Closed while Pending/Unready" warnings from Jetty来自 Jetty 的“待处理/未就绪时关闭”警告
【发布时间】:2019-01-04 18:09:40
【问题描述】:

After fixing synchronization issues in our async servlet我们还是很少见的

java.io.IOException: Closed while Pending/Unready

来自 Jetty 的警告。通过上面的修复,在我们的生产系统中,它从每天约 90 次减少到每天约 5 次。 这很罕见,而且看起来好多了,但可能仍然缺少一些小东西。

完整的堆栈跟踪:

[jetty-63523] (HttpOutput.java:287)   - 
java.io.IOException: Closed while Pending/Unready
        at org.eclipse.jetty.server.HttpOutput.close(HttpOutput.java:285)
        at org.eclipse.jetty.server.Response.closeOutput(Response.java:1044)
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:493)
        at org.eclipse.jetty.server.HttpChannel.run(HttpChannel.java:293)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:762)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:680)
        at java.lang.Thread.run(Thread.java:748)

写入输出流的唯一代码(不同步)是以下setContentType()setStatus()flushBuffer() 调用:

@Override
protected void doPost(final HttpServletRequest req, final HttpServletResponse resp)
        throws ServletException, IOException {
    resp.setContentType(MediaType.OCTET_STREAM.type());
    resp.setStatus(HttpServletResponse.SC_OK);
    resp.setBufferSize(4096);
    resp.flushBuffer();
    final AsyncContext async = req.startAsync();
    async.setTimeout(5_000); // millis
    final ServletOutputStream output = resp.getOutputStream();
    final QueueWriteListener writeListener = new QueueWriteListener(async, output);
    async.addListener(writeListener);
    output.setWriteListener(writeListener);
}

这在我们的QueueWriteListener 设置之前运行,所以如果flushBuffer() 是同步的,它应该不是问题。

无论如何,查找 Jetty 的源代码,Reponse.flush() 调用 HttpOutput.flush() 其中调用 new AsyncFlush().iterate() 似乎很可疑,但调试 doPost()/flushBuffer() 这个 case 分支没有运行。

完整代码:

@Override
protected void doPost(final HttpServletRequest req, final HttpServletResponse resp)
        throws ServletException, IOException {
    resp.setContentType(MediaType.OCTET_STREAM.type());
    resp.setStatus(HttpServletResponse.SC_OK);
    resp.setBufferSize(4096);
    resp.flushBuffer();
    final AsyncContext async = req.startAsync();
    async.setTimeout(5_000); // millis
    final ServletOutputStream output = resp.getOutputStream();
    final QueueWriteListener writeListener = new QueueWriteListener(async, output);
    async.addListener(writeListener);
    output.setWriteListener(writeListener);
}

private static class QueueWriteListener implements AsyncListener, WriteListener {

    private static final Logger logger = LoggerFactory.getLogger(QueueWriteListener.class);

    private final AsyncContext asyncContext;
    private final ServletOutputStream output;

    @GuardedBy("this")
    private boolean completed = false;

    public QueueWriteListener(final AsyncContext asyncContext, final ServletOutputStream output) {
        this.asyncContext = checkNotNull(asyncContext, "asyncContext cannot be null");
        this.output = checkNotNull(output, "output cannot be null");
    }

    @Override
    public void onWritePossible() throws IOException {
        writeImpl();
    }

    private synchronized void writeImpl() throws IOException {
        if (completed) {
            return;
        }
        while (output.isReady()) {
            final byte[] message = getNextMessage();
            if (message == null) {
                output.flush();
                return;
            }
            output.write(message);
        }
    }

    private synchronized void completeImpl() {
        // also stops DataFeederThread to call bufferArrived
        completed = true;
        asyncContext.complete();
    }

    @Override
    public void onError(final Throwable t) {
        logger.error("Writer.onError", t);
        completeImpl();
    }

    public void dataArrived() {
        try {
            writeImpl();
        } catch (RuntimeException | IOException e) {
            ...
        }
    }

    public void noMoreData() {
        completeImpl();
    }

    @Override
    public synchronized void onComplete(final AsyncEvent event) throws IOException {
        completed = true; // might not needed but does not hurt
    }

    @Override
    public synchronized void onTimeout(final AsyncEvent event) throws IOException {
        completeImpl();
    }

    @Override
    public void onError(final AsyncEvent event) throws IOException {
        logger.error("onError", event.getThrowable());
    }

    ...
}

因此,似乎在 Jetty 完成和(异步)关闭输出之间没有人可以写入输出,因此其状态不应更改为挂起或未就绪。尽管如此,它仍然以某种方式发生。 Closed while Pending/Unready 警告的原因可能是什么?

我已经检查了我们的日志(没有任何相关内容)。

onError(AsyncEvent event) 在我们的代码中还不是synchronized,但它不相关,因为它的日志消息从未出现在我们的日志中。

GitHub相关讨论:https://github.com/eclipse/jetty.project/issues/2689

【问题讨论】:

    标签: servlets jetty embedded-jetty servlet-3.1


    【解决方案1】:

    我能够使用以下代码重现警告:

    @Override
    protected void doPost(final HttpServletRequest req, final HttpServletResponse resp)
            throws ServletException, IOException {
        resp.setContentType(MediaType.OCTET_STREAM.type());
        resp.setStatus(HttpServletResponse.SC_OK);
        resp.setBufferSize(8192);
        resp.flushBuffer();
        final AsyncContext async = req.startAsync();
        async.setTimeout(10_000); // millis
        final ServletOutputStream output = resp.getOutputStream();
        final QueueWriteListener writeListener = new QueueWriteListener(output);
        async.addListener(writeListener);
        output.setWriteListener(writeListener);
    }
    
    private static class QueueWriteListener implements AsyncListener, WriteListener {
    
        private static final Logger logger = LoggerFactory.getLogger(QueueWriteListener.class);
    
        private final ServletOutputStream output;
    
        public QueueWriteListener(final ServletOutputStream output) {
            this.output = checkNotNull(output, "output cannot be null");
        }
    
        @Override
        public void onWritePossible() throws IOException {
            logger.info("onWritePossible()");
            long written = 0;
            while (output.isReady()) {
                final byte[] data = new byte[8 * 1024 * 1024];
                Arrays.fill(data, (byte) 'W');
                output.write(data);
                written += data.length;
                logger.info("write OK, written: {} KB", written / 1024);
            }
            logger.info("onWritePossible() end");
        }
    
        @Override
        public void onError(final Throwable t) {
            logger.info("Writer.onError -Error: {}, Message: {}", t.getClass().getName(), t.getMessage());
        }
    
        @Override
        public void onComplete(final AsyncEvent event) throws IOException {
            logger.warn("onComplete: {}", event);
        }
    
        @Override
        public void onTimeout(final AsyncEvent event) throws IOException {
            logger.warn("onTimeout()");
        }
    
        @Override
        public void onError(final AsyncEvent event) throws IOException {
            logger.error("onError: {}", event, event.getThrowable());
        }
    
        @Override
        public void onStartAsync(final AsyncEvent event) throws IOException {
            logger.info("onStartAsync: {}", event);
        }
    }
    

    还有一个缓慢的curl 客户端:

    curl --limit 16 -XPOST http://localhost:35419/example2
    

    结果:

    10:39:29,063  INFO [jetty-16] {Example2Servlet.java:52} - onWritePossible()
    10:39:29,084  INFO [jetty-16] {Example2Servlet.java:59} - write OK, written: 8192 KB
    10:39:29,084  INFO [jetty-16] {Example2Servlet.java:61} - onWritePossible() end
    10:39:39,085  WARN [jetty-17] {Example2Servlet.java:76} - onTimeout()
    10:39:39,088  WARN [jetty-17] {HttpOutput.java:286} - java.io.IOException: Closed while Pending/Unready, requestUrl=http://localhost:35419/example2
    10:39:39,090  INFO [jetty-17] {HttpOutput.java:287} - 
    java.io.IOException: Closed while Pending/Unready, requestUrl=http://localhost:35419/example2
        at org.eclipse.jetty.server.HttpOutput.close(HttpOutput.java:285)
        at org.eclipse.jetty.server.Response.closeOutput(Response.java:1044)
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:493)
        at org.eclipse.jetty.server.HttpChannel.run(HttpChannel.java:293)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:762)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:680)
        at java.lang.Thread.run(Thread.java:748)
    10:39:39,091  WARN [jetty-17] {Example2Servlet.java:71} - onComplete: org.eclipse.jetty.server.AsyncContextEvent@3b51901
    

    注意事项:

    • 它不使用任何第三方线程。
    • 在原始代码中,onTimeout() 中有一个 complete() 调用,但它不会影响行为,因此我将其删除。
    • 我还从方法声明中删除了 synchronized 关键字 - 它们不会影响行为。
    • 我能够用不同的缓冲区和数组大小重现相同的行为,警告迟早会出现。 (例如,8192 字节的缓冲区大小和 1024 字节的数组大小也会重现警告,但需要更多时间。)这通常会导致额外的调试日志,如下所示:

      DEBUG [jetty-12] {HttpOutput.java:1271} - EOF of org.eclipse.jetty.server.HttpOutput$AsyncWrite@4b8dff34[PROCESSING]
      

    【讨论】:

      猜你喜欢
      • 2018-12-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-11-05
      • 2014-11-16
      • 1970-01-01
      • 2021-07-10
      • 1970-01-01
      相关资源
      最近更新 更多