【问题标题】:Java's GZIPOutputStream refuses to flushJava 的 GZIPOutputStream 拒绝刷新
【发布时间】:2021-03-23 21:37:32
【问题描述】:

这是一个演示我的问题的 JUnit 测试:

package stream;

import static org.junit.jupiter.api.Assertions.*;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import org.junit.jupiter.api.Test;

class StreamTest {
    public static class LoopbackStream {
        private final byte[] END_MARKER = new byte[0];
        private final ArrayBlockingQueue<byte[]> queue = new ArrayBlockingQueue<>(1024);

        public OutputStream getOutputStream() {
            return new OutputStream() {
                @Override
                public void write(int b) throws IOException {
                    this.write(new byte[] { (byte) b });
                }

                @Override
                public void write(byte[] b, int off, int len) {
                    try {
                        queue.put(Arrays.copyOfRange(b, off, len - off));
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }

                @Override
                public void close() {
                    try {
                        queue.put(END_MARKER);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            };
        }

        public InputStream getInputStream() {
            return new InputStream() {
                private boolean finished = false;
                private ByteBuffer current = ByteBuffer.wrap(new byte[0]);

                @Override
                public int read() {
                    if (ensureData()) {
                        return Byte.toUnsignedInt(current.get());
                    } else {
                        return -1;
                    }
                }

                @Override
                public int read(byte[] b, int off, int len) {
                    if (ensureData()) {
                        int position = current.position();
                        current.get(b, off, Math.min(len, current.remaining()));
                        return current.position() - position;
                    } else {
                        return -1;
                    }
                }

                private boolean ensureData() {
                    if (!finished && !current.hasRemaining()) {
                        try {
                            byte[] data = queue.take();
                            current = ByteBuffer.wrap(data);
                            finished = data == END_MARKER;
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            return false;
                        }
                    }
                    return !finished;
                }
            };
        }

    }

    @Test
    void testVanilla() throws IOException {
        LoopbackStream objectUnderTest = new LoopbackStream();

        PrintWriter pw = new PrintWriter(new OutputStreamWriter(objectUnderTest.getOutputStream()), true);
        BufferedReader br = new BufferedReader(new InputStreamReader(objectUnderTest.getInputStream()));

        pw.println("Hello World!");

        assertEquals("Hello World!", br.readLine());
    }

    @Test
    void testVanilla2() throws IOException {
        LoopbackStream objectUnderTest = new LoopbackStream();

        PrintWriter pw = new PrintWriter(new OutputStreamWriter(objectUnderTest.getOutputStream()), true);
        BufferedReader br = new BufferedReader(new InputStreamReader(objectUnderTest.getInputStream()));

        pw.println("Hello World!");
        assertEquals("Hello World!", br.readLine());

        pw.println("Hello Otherworld!");
        assertEquals("Hello Otherworld!", br.readLine());
    }

    @Test
    void testGzipped() throws IOException {
        LoopbackStream objectUnderTest = new LoopbackStream();

        PrintWriter pw = new PrintWriter(new OutputStreamWriter(new GZIPOutputStream(objectUnderTest.getOutputStream(), true)), true);
        BufferedReader br = new BufferedReader(new InputStreamReader(new GZIPInputStream(objectUnderTest.getInputStream())));

        pw.println("Hello World!");

        assertEquals("Hello World!", br.readLine());
    }
}

有两个单独的测试。一个使用普通输入和输出流(工作正常),另一个将这些流包装在它们的 gzip 等效项中。

我使用了GZIPOutputStreamsyncFlush 选项,我希望在刷新父流时自动刷新流中的所有剩余字节。我使用PrintWriterautoFlush 选项在执行println 时刷新其数据。

有没有更好的方法来强制GZIPOutputStreamprintln 之后刷新其缓冲区?

【问题讨论】:

  • 这一切的目的是什么?你知道mark()reset() 吗?和ByteArrayInputStream()?
  • mark()reset() 如何影响GZIPOutputStream

标签: java stream gzipoutputstream


【解决方案1】:

我知道这不是您问题的完整答案,但是评论太长了...


更新:

经过进一步调查,似乎不是GZIPOutputStream 没有刷新(通过在public void write(byte[] b, int off, int len) 方法中添加System.out.println("xy"); 语句,您可以看到GZIPOutputStream 将两个字节数组写入您的OutputStream :一个是gzip流头,另一个是第一行文本的编码内容)。

似乎由于java.io.InputStreamReader(分别是它使用的sun.nio.cs.StreamDecoder)和GZIPInputStream之间的不良交互而导致读取过程阻塞。

基本上,如果StreamDecoder 需要从底层流中读取字节,它会尝试读取尽可能多的字节(只要底层流报告in.available() &gt; 0,这意味着底层流可以在不阻塞的情况下产生更多字节)

这个检查的问题是InflaterInputStreamGZIPInputStream 的超类)总是返回1 的可用字节数,即使它的源流有 no 字节可用 (see the source of InflaterInputStream.available())

看来,虽然您可以逐行写入GZIPOutputStream,但您无法轻松地逐行读取GZIPInputStream...


原答案:

问题不在于GZIPOutputStream,问题在于boolean ensureData() 方法拒绝读取多个块。

以下测试也因原版流而失败:

@Test
void testVanilla2() throws IOException {
    LoopbackStream objectUnderTest = new LoopbackStream();

    PrintWriter pw = new PrintWriter(new OutputStreamWriter(objectUnderTest.getOutputStream()), true);
    BufferedReader br = new BufferedReader(new InputStreamReader(objectUnderTest.getInputStream()));

    pw.println("Hello World!");
    assertEquals("Hello World!", br.readLine());

    pw.println("Hello Otherworld!");
    assertEquals("Hello Otherworld!", br.readLine());
}

【讨论】:

  • 你说得对,我的 LoopbackStream 类中有一个错误。谢谢!但是,修复错误并不能解决我的实际问题,因此我更新了原始问题中的代码。
  • @spierepf 我已经用GZIPOutputStream / GZIPInputStream 的发现更新了答案
猜你喜欢
  • 2023-03-14
  • 2011-04-08
  • 2017-10-04
  • 2012-04-15
  • 2012-04-05
  • 2020-11-11
  • 1970-01-01
  • 2018-09-23
相关资源
最近更新 更多