【问题标题】:Gzip only after a threshold reached?仅在达到阈值后才使用 Gzip?
【发布时间】:2015-10-26 15:01:35
【问题描述】:

我需要归档每天用于构建报告的所有数据。我使用 gzip 压缩大部分数据,因为一些数据集可能非常大(10mb+)。我将每个单独的 protobuf 图写入文件。我还将一组固定的已知小对象类型列入白名单,并添加了一些代码来检测文件是否被 gzip 压缩,当我阅读它时。这是因为一个小文件,压缩后实际上可以比未压缩时更大。

不幸的是,仅仅由于数据的性质,我可能只有几个较大对象类型的元素,而白名单方法可能会出现问题。

有没有办法将一个对象写入一个流,并且只有当它达到一个阈值(比如 8kb),然后压缩它?我事先不知道对象的大小,有时我有一个带有IEnumerable<T> 的对象图,它的大小可能相当大。

编辑: 代码相当基本。我确实略过了我将其存储在filestream db 表中的事实。这对于实现目的来说并不重要。我删除了一些无关的代码。

public Task SerializeModel<T>(TransactionalDbContext dbConn, T Item, DateTime archiveDate, string name)
{
    var continuation = (await dbConn
        .QueryAsync<PathAndContext>(_getPathAndContext, new {archiveDate, model=name})
        .ConfigureAwait(false))
        .First();

    var useGzip = !_whitelist.Contains(typeof(T));

    using (var fs = new SqlFileStream(continuation.Path, continuation.Context, FileAccess.Write,
        FileOptions.SequentialScan | FileOptions.Asynchronous, 64*1024))
    using (var buffer = useGzip ? new GZipStream(fs, CompressionLevel.Optimal) : default(Stream))
    {
        _serializerModel.Serialize(stream ?? fs, item);
    }

    dbConn.Commit();
}

【问题讨论】:

  • 尝试读取8k,如果数据少,则不压缩输出。否则,gzip 整个流,包括最初的 8k?您的具体问题是什么?
  • 问题是我只有T model 并且我将它传递给protobuf .SerializeWithLengthPrefix,如果对象是1 字节或100mb 我不知道。
  • 你插入一个输出流。该流实例必须缓冲 8k 数据,然后决定要做什么。然后它可以将解压缩或压缩的数据传递给另一个流。
  • @MichaelB 假设你可以做到这一点。在不知道流是否已压缩的情况下,您将如何处理反序列化?
  • 这是最简单的部分。 Gzip 有一个很容易检测到的标头。

标签: c# protobuf-net gzipstream


【解决方案1】:

在序列化过程中,您可以使用中间流来完成您的要求。像这样的东西可以完成这项工作

class SerializationOutputStream : Stream
{
    Stream outputStream, writeStream;
    byte[] buffer;
    int bufferedCount;
    long position;
    public SerializationOutputStream(Stream outputStream, int compressTreshold = 8 * 1024)
    {
        writeStream = this.outputStream = outputStream;
        buffer = new byte[compressTreshold];
    }
    public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); }
    public override void SetLength(long value) { throw new NotSupportedException(); }
    public override int Read(byte[] buffer, int offset, int count) { throw new NotSupportedException(); }
    public override bool CanRead { get { return false; } }
    public override bool CanSeek { get { return false; } }
    public override bool CanWrite { get { return writeStream != null &&  writeStream.CanWrite; } }
    public override long Length { get { throw new NotSupportedException(); } }
    public override long Position { get { return position; } set { throw new NotSupportedException(); } }
    public override void Write(byte[] buffer, int offset, int count)
    {
        if (count <= 0) return;
        var newPosition = position + count;
        if (this.buffer == null)
            writeStream.Write(buffer, offset, count);
        else
        {
            int bufferCount = Math.Min(count, this.buffer.Length - bufferedCount);
            if (bufferCount > 0)
            {
                Array.Copy(buffer, offset, this.buffer, bufferedCount, bufferCount);
                bufferedCount += bufferCount;
            }
            int remainingCount = count - bufferCount;
            if (remainingCount > 0)
            {
                writeStream = new GZipStream(outputStream, CompressionLevel.Optimal);
                try
                {
                    writeStream.Write(this.buffer, 0, this.buffer.Length);                            
                    writeStream.Write(buffer, offset + bufferCount, remainingCount);
                }
                finally { this.buffer = null; }
            }
        }
        position = newPosition;
    }
    public override void Flush()
    {
        if (buffer == null)
            writeStream.Flush();
        else if (bufferedCount > 0)
        {
            try { outputStream.Write(buffer, 0, bufferedCount); }
            finally { buffer = null; }
        }
    }
    protected override void Dispose(bool disposing)
    {
        try
        {
            if (!disposing || writeStream == null) return;
            try { Flush(); }
            finally { writeStream.Close(); }
        }
        finally
        {
            writeStream = outputStream = null;
            buffer = null;
            base.Dispose(disposing);
        }
    }
}

并像这样使用它

using (var stream = new SerializationOutputStream(new SqlFileStream(continuation.Path, continuation.Context, FileAccess.Write,
        FileOptions.SequentialScan | FileOptions.Asynchronous, 64*1024)))
    _serializerModel.Serialize(stream, item);

【讨论】:

  • 这可能行得通,我希望避免编写新的流类。
【解决方案2】:

数据集可能非常大 (10mb+)

在大多数设备上,这不是很大。在决定是否压缩之前,您是否有无法读取整个对象的原因?还要注意@Niklas 的建议,即在决定是否压缩之前读取一个缓冲区的数据(例如 8K)。

这是因为一个小文件,压缩后实际上可以比未压缩的大。

使小文件可能变大的是 ZIP 标头,尤其是字典。一些 ZIP 库允许您使用在压缩和解压缩时已知的自定义字典。多年前我使用SharpZipLib

在编码和测试方面,使用这种方法需要付出更多的努力。如果您觉得这种好处是值得的,它可能会提供最好的方法。

请注意,无论您采用何种路径,您都将使用存储设备块大小的倍数来物理存储数据。

如果对象是 1 字节或 100mb 我不知道

注意协议缓冲区是not really designed for large data sets

协议缓冲区不是为处理大消息而设计的。作为一般经验法则,如果您要处理的消息大于 1 MB,则可能是时候考虑另一种策略了。

也就是说,Protocol Buffers 非常适合处理大型数据集中的单个消息。通常,大型数据集实际上只是小片段的集合,其中每个小片段都可能是结构化的数据片段。

如果您的最大对象可以轻松地序列化到内存中,请先将其序列化为 MemoryStream,然后将该 MemoryStream 写入您的最终目的地,或者通过 GZipStream 运行它,然后再将其运行到最终目的地。如果最大的对象不能舒适地序列化到内存中,我不确定要给出什么进一步的建议。

【讨论】:

  • 不幸的是,我并不总是有一个简单的方法来读取 8kb 的数据或任何东西。我只是一个T model 作为入口点。一般来说,我想避免阅读整个对象,主要是因为我不知道该怎么做。 10mb 是一个随机数,有些实际上是 1gb 左右,但任何超过 10mb 的东西都会导致 gc 相关压力的进程变慢。我不会将文件发送到任何地方,我只是将它们用作存档。为此,它们比 binarywriter 工作得更好。
  • 请显示您当前用于序列化对象的代码。
  • 尝试阅读例如8K 从fs 到 MemoryStream。如果你得到 8K,你就知道你想使用 GZipStream。如果流支持倒带,则倒带,否则将其丢弃并创建一个新的 SqlFileStream,然后使用倒带或新流进行反序列化。
  • Serialize 将写入所有内容(无论是 1 字节还是 100mb)。我想我可以指定属性来支持倒带。但我不想写整个文件然后倒带再做一次。
  • 我不是说要写整个文件。第一步,尝试将 8K 从 SqlFileStream 复制到 MemoryStream。不要在第一步中反序列化。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-11-29
  • 2018-07-04
  • 1970-01-01
  • 1970-01-01
  • 2021-04-10
  • 1970-01-01
  • 2018-07-12
相关资源
最近更新 更多