【问题标题】:Iterable gzip deflate/inflate in JavaJava中的可迭代gzip放气/膨胀
【发布时间】:2012-11-14 20:59:16
【问题描述】:

在 Internet 中是否有根据 ByteBuffers 进行 gzip 压缩的库?允许我们推送原始数据然后提取压缩数据的东西?我们已经搜索过了,但只找到了处理 InputStreams 和 OutputStreams 的库。

我们的任务是创建 gzip 过滤器,用于压缩管道架构中的 ByteBuffer 流。这是一种拉取架构,其中最后一个元素从较早的元素中提取数据。我们的 gzip 过滤器处理 ByteBuffer 流,没有可用的单个 Stream 对象。

我们曾尝试将数据流调整为某种 InputStream,然后使用 GZipOutputStream 来满足我们的要求,但适配器代码的数量至少可以说是烦人。

接受后编辑:为了记录,我们的架构类似于 GStreamer 等。

【问题讨论】:

  • 我模糊的印象是你必须同时拥有整个输入才能正确地压缩它——这可能就是它需要一个 InputStream 的原因,而 ByteBuffers 通常用于存储中间数据,而不是保存整个文件。
  • 不是真正的 gzip,但有 JZlib 旨在以“灵活”块的形式提供(解)编码,而不是 Java 的 ZIP 支持功能。 Zlib is not gzip,当然,但也许你仍然可以以某种方式利用它。
  • @Hanno Java 7 包含用于有效传输压缩数据块的关键 Deflater.SYNC_FLUSH 常量。
  • @Hanno,我们可以使用类似 zlib 的库中的原语来实现 gzip,是的。如果我们没有找到预先存在的解决方案,我们可能会向 Github 贡献一个。

标签: java gzip nio


【解决方案1】:

我不明白“隐藏在互联网中”的部分,但zlib 会进行内存中 gzip 格式的压缩和解压缩。 java.util.zip API 提供了一些对 zlib 的访问,尽管它是有限的。由于接口限制,您不能要求 zlib 直接生成和使用 gzip 流。但是,您可以使用nowrap 选项来生成和使用原始放气数据。然后使用java.util.zip 中的CRC32 类很容易滚动您自己的gzip 标头和预告片。您可以预先添加一个固定的 10 字节标头,添加四字节 CRC,然后添加四字节未压缩长度(模 232),两者都以 little-endian 顺序排列,您可以去吧。

【讨论】:

  • “隐藏在互联网中”部分是指“我们已经搜索过了”,事实上我们找不到任何不需要 Stream 对象可用的东西。
  • 哇。 JZLib作者的回答。 :-D
  • 我是 zlib 的两位作者之一。 JZlib 是由 Atsuhiko Yamanaka 完成的 zlib 到 Java 的翻译/改编。
  • 啊,好吧,我完全看错了——我认为 ByteBuffers 以某种方式隐藏在 Internet 中,因为它说“ByteBuffers hidden in the Internet”。
  • 嗯。重新阅读我的问题,您的困惑是有道理的。我的错。
【解决方案2】:

非常感谢 Mark Adler 提出了这种方法,这比我原来的答案要好得多。

package stack;

import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.zip.CRC32;
import java.util.zip.Deflater;

public class BufferDeflate2 {
    /** The standard 10 byte GZIP header */
    private static final byte[] GZIP_HEADER = new byte[] { 0x1f, (byte) 0x8b,
            Deflater.DEFLATED, 0, 0, 0, 0, 0, 0, 0 };

    /** CRC-32 of uncompressed data. */
    private final CRC32 crc = new CRC32();

    /** Deflater to deflate data */
    private final Deflater deflater = new Deflater(Deflater.BEST_COMPRESSION,
            true);

    /** Output buffer building area */
    private final ByteArrayOutputStream buffer = new ByteArrayOutputStream();

    /** Internal transfer space */
    private final byte[] transfer = new byte[1000];

    /** The flush mode to use at the end of each buffer */
    private final int flushMode;


    /**
     * New buffer deflater
     * 
     * @param syncFlush
     *            if true, all data in buffer can be immediately decompressed
     *            from output buffer
     */
    public BufferDeflate2(boolean syncFlush) {
        flushMode = syncFlush ? Deflater.SYNC_FLUSH : Deflater.NO_FLUSH;
        buffer.write(GZIP_HEADER, 0, GZIP_HEADER.length);
    }


    /**
     * Deflate the buffer
     * 
     * @param in
     *            the buffer to deflate
     * @return deflated representation of the buffer
     */
    public ByteBuffer deflate(ByteBuffer in) {
        // convert buffer to bytes
        byte[] inBytes;
        int off = in.position();
        int len = in.remaining();
        if( in.hasArray() ) {
            inBytes = in.array();
        } else {
            off = 0;
            inBytes = new byte[len];
            in.get(inBytes);
        }

        // update CRC and deflater
        crc.update(inBytes, off, len);
        deflater.setInput(inBytes, off, len);

        while( !deflater.needsInput() ) {
            int r = deflater.deflate(transfer, 0, transfer.length, flushMode);
            buffer.write(transfer, 0, r);
        }

        byte[] outBytes = buffer.toByteArray();
        buffer.reset();
        return ByteBuffer.wrap(outBytes);
    }


    /**
     * Write the final buffer. This writes any remaining compressed data and the GZIP trailer.
     * @return the final buffer
     */
    public ByteBuffer doFinal() {
        // finish deflating
        deflater.finish();

        // write all remaining data
        int r;
        do {
            r = deflater.deflate(transfer, 0, transfer.length,
                    Deflater.FULL_FLUSH);
            buffer.write(transfer, 0, r);
        } while( r == transfer.length );

        // write GZIP trailer
        writeInt((int) crc.getValue());
        writeInt((int) deflater.getBytesRead());

        // reset deflater
        deflater.reset();

        // final output
        byte[] outBytes = buffer.toByteArray();
        buffer.reset();
        return ByteBuffer.wrap(outBytes);
    }


    /**
     * Write a 32 bit value in little-endian order
     * 
     * @param v
     *            the value to write
     */
    private void writeInt(int v) {
        System.out.println("v="+v);
        buffer.write(v & 0xff);
        buffer.write((v >> 8) & 0xff);
        buffer.write((v >> 16) & 0xff);
        buffer.write((v >> 24) & 0xff);
    }


    /**
     * For testing. Pass in the name of a file to GZIP compress
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        File inFile = new File(args[0]);
        File outFile = new File(args[0]+".test.gz");
        FileChannel inChan = (new FileInputStream(inFile)).getChannel();
        FileChannel outChan = (new FileOutputStream(outFile)).getChannel();

        BufferDeflate2 def = new BufferDeflate2(false);

        ByteBuffer buf = ByteBuffer.allocate(500);
        while( true ) {
            buf.clear();
            int r = inChan.read(buf);
            if( r==-1 ) break;
            buf.flip();
            ByteBuffer compBuf = def.deflate(buf);
            outChan.write(compBuf);
        }

        ByteBuffer compBuf = def.doFinal();
        outChan.write(compBuf);

        inChan.close();
        outChan.close();
    }
}

【讨论】:

  • 这看起来像是我们的首选解决方案。
  • 嗯。看来您这里有错误; needsInput() 是否需要添加更多的导入检查,而finished() 是否有更多的输出检查,因此您对needsInput() 的使用应更改为finished()
【解决方案3】:

处理 ByteBuffers 并不难。请参阅下面的示例代码。您需要知道缓冲区是如何创建的。选项有:

  1. 每个缓冲区都是独立压缩的。这很容易处理,我认为情况并非如此。您只需将缓冲区转换为字节数组并将其包装在 GZIPInputStream 内的 ByteArrayInputStream 中。
  2. 每个缓冲区都由写入器以 SYNC_FLUSH 结束,因此包含流中的整个数据块。写入器写入缓冲区的所有数据都可以立即被读取器读取。
  3. 每个缓冲区只是 GZIP 流的一部分。无法保证读者可以从缓冲区中读取任何内容。

GZIP 生成的数据必须按顺序处理。 ByteBuffer 必须按照它们生成的顺序进行处理。

示例代码:

package stack;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.Pipe;
import java.nio.channels.SelectableChannel;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.GZIPInputStream;

public class BufferDeflate {

    static AtomicInteger idSrc = new AtomicInteger(1);

    /** Queue for transferring buffers */
    final BlockingQueue<ByteBuffer> buffers = new LinkedBlockingQueue<ByteBuffer>();

    /** The entry point for deflated buffers */
    final Pipe.SinkChannel bufSink;

    /** The source for the inflater */
    final Pipe.SourceChannel infSource;

    /** The destination for the inflater */
    final Pipe.SinkChannel infSink;

    /** The source for the outside world */
    public final SelectableChannel source;



    class Relayer extends Thread {
        public Relayer(int id) {
            super("BufferRelayer" + id);
        }


        public void run() {
            try {
                while( true ) {
                    ByteBuffer buf = buffers.take();
                    if( buf != null ) {
                        bufSink.write(buf);
                    } else {
                        bufSink.close();
                        break;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }



    class Inflater extends Thread {
        public Inflater(int id) {
            super("BufferInflater" + id);
        }


        public void run() {
            try {
                InputStream in = Channels.newInputStream(infSource);
                GZIPInputStream gzip = new GZIPInputStream(in);
                OutputStream out = Channels.newOutputStream(infSink);

                int ch;
                while( (ch = gzip.read()) != -1 ) {
                    out.write(ch);
                }
                out.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


    /**
     * New buffer inflater
     */
    public BufferDeflate() throws IOException {
        Pipe pipe = Pipe.open();
        bufSink = pipe.sink();
        infSource = pipe.source();

        pipe = Pipe.open();
        infSink = pipe.sink();
        source = pipe.source().configureBlocking(false);

        int id = idSrc.incrementAndGet();

        Thread thread = new Relayer(id);
        thread.setDaemon(true);
        thread.start();

        thread = new Inflater(id);
        thread.setDaemon(true);
        thread.start();
    }


    /**
     * Add the buffer to the stream. A null buffer closes the stream
     * 
     * @param buf
     *            the buffer to add
     * @throws IOException
     */
    public void add(ByteBuffer buf) throws IOException {
        buffers.offer(buf);
    }
}

只需将缓冲区传递给add 方法并从公共source 通道读取。在处理给定数量的字节后可以从 GZIP 读取的数据量是无法预测的。因此,我将source 通道设为非阻塞,因此您可以在添加字节缓冲区的同一线程中安全地读取它。

【讨论】:

  • 您的解决方案看似正确,但需要的资源太多。这就是我们在尝试调整 java.util.zip 中的 gzip 流类时发现的。我们使用类似组件(例如加密过滤器)的经验告诉我们,这必须更简单、更便宜。
  • 资源太多怎么办?在处理 I/O 阻塞时,上面的内容尽可能简单。如果您想要便宜,请选择 Mark Adler 的答案。他是使用 Java 处理 zip 格式的专家。
  • 队列、管道和线程对于这个问题来说似乎过多。
  • 正如我已经暗示的那样,如果您想简单地使用 Mark Adler 的回答 :)
猜你喜欢
  • 1970-01-01
  • 2011-11-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-08-29
  • 1970-01-01
  • 2010-11-09
  • 2014-11-09
相关资源
最近更新 更多