BsonDataWriter 继承自 JsonWriter,因此您可以使用 JsonWriter.WriteToken(JsonReader) 从 JSON 流复制到 BSON 流(反之亦然,使用 BsonDataReader):
public static class JsonExtensions
{
public static void CopyToBson(string inputPath, string outputPath, FileMode fileMode = FileMode.CreateNew)
{
using ( var textReader = File.OpenText(inputPath) )
using ( var jsonReader = new JsonTextReader( textReader ))
using ( var oFileStream = new FileStream( outputPath, fileMode ) )
using ( var dataWriter = new BsonDataWriter(oFileStream) )
{
dataWriter.WriteToken(jsonReader);
}
}
}
注意事项:
您可能希望添加错误处理以在发生错误时删除部分创建的输出文件。
BSON 文档的根标记必须是对象或数组,因此仅包含原始值的 JSON 输入会导致此方法抛出错误。
根据BSON specification,数组是一个普通的 BSON 文档,键为整数值,从 0 开始并按顺序继续。 因此,如果您将包含数组的 JSON 转换为BSON,然后将 BSON 加载到JToken(或dynamic)中,您将得到一个带有数字键的对象而不是数组。
BSON 支持已移至 Json.NET 10.0.1 中自己的包 Newtonsoft.Json.Bson。在早期版本中使用BsonWriter。
-
即使您正在使用流,如 this answer 到 OutOfMemory Exception with Streams and BsonWriter in Json.Net 中所述,您可能无法获得您希望的内存性能:
根据BSON specification,每个对象或数组——在标准中称为documents——必须在开头包含一个包含字节总数的计数文件...
Newtonsoft 的BsonDataWriter 和底层BsonBinaryWriter 通过缓存所有要写入树的tokens 来实现这一点,然后当根令牌的内容已经完成时,在写出树之前递归地计算大小。
演示小提琴 #1 here.
如果BsonDataWriter 创建的令牌缓存超出系统内存,您将需要手动实现从JsonReader 流式传输到 BSON 流的算法,在输出流中回溯以写出最终对象大小(s) 一旦完成。
例如,假设您的根 JSON 容器是一个 JSON 对象数组。然后下面的方法会递增序列化数组,然后在流中回溯写入总大小:
public static partial class BsonExtensions
{
public static void CopyJsonToBson(string inputPath, string outputPath, FileMode fileMode)
{
using ( var textReader = File.OpenText(inputPath) )
using ( var jsonReader = new JsonTextReader( textReader ))
using ( var oFileStream = new FileStream( outputPath, fileMode ) )
{
CopyJsonToBson(jsonReader, oFileStream);
}
}
public static void CopyJsonToBson(JsonReader jsonReader, Stream stream)
{
var rootTokenType = jsonReader.ReadToContentAndAssert().TokenType;
if (!stream.CanSeek || rootTokenType != JsonToken.StartArray)
{
using ( var dataWriter = new BsonDataWriter(stream) { CloseOutput = false } )
{
dataWriter.WriteToken(jsonReader, stream.CanSeek);
}
}
else
{
stream.Flush(); // Just in case.
var initialPosition = stream.Position;
var buffer = new byte[256];
WriteInt(stream, 0, buffer); // CALCULATED SIZE TO BE CALCULATED LATER.
ulong index = 0;
while (jsonReader.ReadToContentAndAssert().TokenType != JsonToken.EndArray)
{
var bsonType = GetBsonType(jsonReader.TokenType, jsonReader.ValueType);
stream.WriteByte(unchecked((byte)bsonType));
WriteString(stream, index.ToString(NumberFormatInfo.InvariantInfo), buffer);
using (var dataWriter = new BsonDataWriter(stream) { CloseOutput = false })
{
dataWriter.WriteToken(jsonReader);
}
index++;
}
stream.WriteByte((byte)0);
stream.Flush();
var finalPosition = stream.Position;
stream.Position = initialPosition;
var size = checked((int)(finalPosition - initialPosition));
WriteInt(stream, size, buffer); // CALCULATED SIZE TO BE CALCULATED LATER.
stream.Position = finalPosition;
}
}
private static readonly Encoding Encoding = new UTF8Encoding(false);
private static void WriteString(Stream stream, string s, byte[] buffer)
{
if (s != null)
{
if (s.Length < buffer.Length / Encoding.GetMaxByteCount(1))
{
var byteCount = Encoding.GetBytes(s, 0, s.Length, buffer, 0);
stream.Write(buffer, 0, byteCount);
}
else
{
byte[] bytes = Encoding.GetBytes(s);
stream.Write(bytes, 0, bytes.Length);
}
}
stream.WriteByte((byte)0);
}
private static void WriteInt(Stream stream, int value, byte[] buffer)
{
unchecked
{
buffer[0] = (byte) value;
buffer[1] = (byte) (value >> 8);
buffer[2] = (byte) (value >> 16);
buffer[3] = (byte) (value >> 24);
}
stream.Write(buffer, 0, 4);
}
private static BsonType GetBsonType(JsonToken jsonType, Type valueType)
{
switch (jsonType)
{
case JsonToken.StartArray:
return BsonType.Array;
case JsonToken.StartObject:
return BsonType.Object;
case JsonToken.Null:
return BsonType.Null;
// Add primitives as required.
default:
throw new JsonWriterException(string.Format("BsonType for {0} not implemented.", jsonType));
}
}
//Copied from: https://github.com/JamesNK/Newtonsoft.Json.Bson/blob/master/Src/Newtonsoft.Json.Bson/BsonType.cs
//Original source: http://bsonspec.org/spec.html
enum BsonType : sbyte
{
Number = 1,
String = 2,
Object = 3,
Array = 4,
Binary = 5,
Undefined = 6,
Oid = 7,
Boolean = 8,
Date = 9,
Null = 10,
Regex = 11,
Reference = 12,
Code = 13,
Symbol = 14,
CodeWScope = 15,
Integer = 16,
TimeStamp = 17,
Long = 18,
MinKey = -1,
MaxKey = 127
}
}
public static partial class JsonExtensions
{
public static JsonReader ReadToContentAndAssert(this JsonReader reader)
{
return reader.ReadAndAssert().MoveToContentAndAssert();
}
public static JsonReader MoveToContentAndAssert(this JsonReader reader)
{
if (reader == null)
throw new ArgumentNullException();
if (reader.TokenType == JsonToken.None) // Skip past beginning of stream.
reader.ReadAndAssert();
while (reader.TokenType == JsonToken.Comment) // Skip past comments.
reader.ReadAndAssert();
return reader;
}
public static JsonReader ReadAndAssert(this JsonReader reader)
{
if (reader == null)
throw new ArgumentNullException();
if (!reader.Read())
throw new JsonReaderException("Unexpected end of JSON stream.");
return reader;
}
}
然后按如下方式使用:
var inputPath = @"k:\\BrokeredMessage_Alarmhub-Infra-Prd-Sbn_08-06-2019 11-13-34.json";
var outputPath = @"k:\\output.bson";
BsonExtensions.CopyJsonToBson(inputPath, outputPath, FileMode.Create);
注意事项:
我专门针对数组的情况实现了流+搜索,因为这似乎是大型 JSON 文件最常见的场景。
1234563
-
这样做之后,例程就可以递归地调用自身,这在根对象包含一个非常大的数组作为成员时可能很有用。 (不过,此时,您基本上已经编写了自己的BsonDataWriter 版本。)
但是,这样做可能会导致在输出流中进行大量搜索,从而极大地影响性能。
演示小提琴 #2 here.