这里的基本困难不是protobuf-net,而是V2 protocol buffer format。有两种方式可以对重复元素(例如字节数组或流)进行编码:
-
作为打包的重复元素。在这里,该字段的所有元素都被打包到一个键值对中,线类型为 2(长度分隔)。每个元素的编码方式与正常情况相同,只是前面没有标签。
protobuf-net 自动以这种格式编码字节数组,但是这样做需要提前知道总字节数。对于字节流,这可能需要将整个流加载到内存中(例如 StreamProperty.CanSeek == false 时),这违反了您的要求。
-
作为重复元素。这里编码的消息有零个或多个具有相同标签号的键值对。
对于字节流,使用这种格式会导致编码消息大量膨胀,因为每个字节都需要一个额外的整数密钥。
如您所见,两种默认表示都不能满足您的需求。相反,将大字节流编码为“相当大”块的序列是有意义的,其中每个块都被打包,但整体序列不是。
StreamObject 的以下版本执行此操作:
[ProtoContract]
class StreamObject
{
public StreamObject() : this(new MemoryStream()) { }
public StreamObject(Stream stream)
{
if (stream == null)
throw new ArgumentNullException();
this.StreamProperty = stream;
}
[ProtoIgnore]
public Stream StreamProperty { get; set; }
internal static event EventHandler OnDataReadBegin;
internal static event EventHandler OnDataReadEnd;
const int ChunkSize = 4096;
[ProtoMember(1, IsPacked = false, OverwriteList = true)]
IEnumerable<ByteBuffer> Data
{
get
{
if (OnDataReadBegin != null)
OnDataReadBegin(this, new EventArgs());
while (true)
{
byte[] buffer = new byte[ChunkSize];
int read = StreamProperty.Read(buffer, 0, buffer.Length);
if (read <= 0)
{
break;
}
else if (read == buffer.Length)
{
yield return new ByteBuffer { Data = buffer };
}
else
{
Array.Resize(ref buffer, read);
yield return new ByteBuffer { Data = buffer };
break;
}
}
if (OnDataReadEnd != null)
OnDataReadEnd(this, new EventArgs());
}
set
{
if (value == null)
return;
foreach (var buffer in value)
StreamProperty.Write(buffer.Data, 0, buffer.Data.Length);
}
}
}
[ProtoContract]
struct ByteBuffer
{
[ProtoMember(1, IsPacked = true)]
public byte[] Data { get; set; }
}
注意到OnDataReadBegin 和OnDataReadEnd 事件了吗?然后我添加了用于调试目的,以检查输入流是否实际流入输出 protobuf 流。以下测试类执行此操作:
internal class TestClass
{
public void Test()
{
var writeStream = new MemoryStream();
long beginLength = 0;
long endLength = 0;
EventHandler begin = (o, e) => { beginLength = writeStream.Length; Console.WriteLine(string.Format("Begin serialization of Data, writeStream.Length = {0}", writeStream.Length)); };
EventHandler end = (o, e) => { endLength = writeStream.Length; Console.WriteLine(string.Format("End serialization of Data, writeStream.Length = {0}", writeStream.Length)); };
StreamObject.OnDataReadBegin += begin;
StreamObject.OnDataReadEnd += end;
try
{
int length = 1000000;
var inputStream = new MemoryStream();
for (int i = 0; i < length; i++)
{
inputStream.WriteByte(unchecked((byte)i));
}
inputStream.Position = 0;
var streamObject = new StreamObject(inputStream);
Serializer.Serialize(writeStream, streamObject);
var data = writeStream.ToArray();
StreamObject newStreamObject;
using (var s = new MemoryStream(data))
{
newStreamObject = Serializer.Deserialize<StreamObject>(s);
}
if (beginLength >= endLength)
{
throw new InvalidOperationException("inputStream was completely buffered before writing to writeStream");
}
inputStream.Position = 0;
newStreamObject.StreamProperty.Position = 0;
if (!inputStream.AsEnumerable().SequenceEqual(newStreamObject.StreamProperty.AsEnumerable()))
{
throw new InvalidOperationException("!inputStream.AsEnumerable().SequenceEqual(newStreamObject.StreamProperty.AsEnumerable())");
}
else
{
Console.WriteLine("Streams identical.");
}
}
finally
{
StreamObject.OnDataReadBegin -= begin;
StreamObject.OnDataReadEnd -= end;
}
}
}
public static class StreamExtensions
{
public static IEnumerable<byte> AsEnumerable(this Stream stream)
{
if (stream == null)
throw new ArgumentNullException();
int b;
while ((b = stream.ReadByte()) != -1)
yield return checked((byte)b);
}
}
上面的输出是:
Begin serialization of Data, writeStream.Length = 0
End serialization of Data, writeStream.Length = 1000888
Streams identical.
这表明输入流确实被流式传输到输出,而没有立即完全加载到内存中。
原型fiddle.
是否有一种机制可以使用流中的字节增量地写出打包的重复元素,并提前知道长度?
似乎没有。假设您有一个CanSeek == true 的流,您可以将其封装在IList<byte> 中,该IList<byte> 枚举流中的字节,提供对流中字节的随机访问,并在IList.Count 中返回流长度。有一个示例小提琴here 显示了这种尝试。然而不幸的是,ListDecorator.Write() 只是枚举列表并在将其编码内容写入输出流之前对其进行缓冲,这会导致输入流完全加载到内存中。我认为发生这种情况是因为 protobuf-net 对 List<byte> 的编码不同于 byte [],即作为Base 128 Varints 的长度分隔序列。由于 byte 的 Varint 表示有时需要超过一个字节,因此无法从列表计数中提前计算长度。有关字节数组和列表编码方式差异的更多详细信息,请参阅this answer。应该可以像 byte [] 一样实现 IList<byte> 的编码——只是目前不可用。