【问题标题】:How to convert JSON to BSON using Json.net AS A STREAM如何使用 Json.net AS A STREAM 将 JSON 转换为 BSON
【发布时间】:2019-08-08 06:13:32
【问题描述】:

我想将 JSON 文件流式转换为 BSON 文件。考虑到JsonTextReaderBsonDataWriter的方法,这可能吗?

代码如下:

using ( StreamReader textReader = File.OpenText(@"k:\\BrokeredMessage_Alarmhub-Infra-Prd-Sbn_08-06-2019 11-13-34.json" ) )
using ( JsonTextReader jsonTextReader = new JsonTextReader( textReader ))
using ( FileStream oFileStream = new FileStream( @"k:\\output.bson", FileMode.CreateNew ) )
using ( BsonDataWriter datawriter = new BsonDataWriter (oFileStream) )
{
   ...
}

我不想反序列化 JSON 文件的全部内容,因为我想读取 JSON 文件并以最小的内存负载写入 BSON 文件。这可以通过使用流来实现吗?

【问题讨论】:

    标签: c# json json.net bson


    【解决方案1】:

    BsonDataWriter 继承自 JsonWriter,因此您可以使用 JsonWriter.WriteToken(JsonReader) 从 JSON 流复制到 BSON 流(反之亦然,使用 BsonDataReader):

    public static class JsonExtensions
    {
        public static void CopyToBson(string inputPath, string outputPath, FileMode fileMode = FileMode.CreateNew)
        {
            using ( var textReader = File.OpenText(inputPath) )
            using ( var jsonReader = new JsonTextReader( textReader ))
            using ( var oFileStream = new FileStream( outputPath, fileMode ) )
            using ( var dataWriter = new BsonDataWriter(oFileStream) )
            {
                dataWriter.WriteToken(jsonReader);
            }
        }
    }
    

    注意事项:

    1. 您可能希望添加错误处理以在发生错误时删除部分创建的输出文件。

    2. BSON 文档的根标记必须是对象或数组,因此仅包含原始值的 JSON 输入会导致此方法抛出错误。

    3. 根据BSON specification,数组是一个普通的 BSON 文档,键为整数值,从 0 开始并按顺序继续。 因此,如果您将包含数组的 JSON 转换为BSON,然后将 BSON 加载到JToken(或dynamic)中,您将得到一个带有数字键的对象而不是数组。

    4. BSON 支持已移至 Json.NET 10.0.1 中自己的包 Newtonsoft.Json.Bson。在早期版本中使用BsonWriter

    5. 即使您正在使用流,如 this answerOutOfMemory Exception with Streams and BsonWriter in Json.Net 中所述,您可能无法获得您希望的内存性能:

      根据BSON specification,每个对象或数组——在标准中称为documents——必须在开头包含一个包含字节总数的计数文件...

      Newtonsoft 的BsonDataWriter 和底层BsonBinaryWriter 通过缓存所有要写入树的tokens 来实现这一点,然后当根令牌的内容已经完成时,在写出树之前递归地计算大小。

    演示小提琴 #1 here.


    如果BsonDataWriter 创建的令牌缓存超出系统内存,您将需要手动实现从JsonReader 流式传输到 BSON 流的算法,在输出流中回溯以写出最终对象大小(s) 一旦完成。

    例如,假设您的根 JSON 容器是一个 JSON 对象数组。然后下面的方法会递增序列化数组,然后在流中回溯写入总大小:

    public static partial class BsonExtensions
    {
        public static void CopyJsonToBson(string inputPath, string outputPath, FileMode fileMode)
        {
            using ( var textReader = File.OpenText(inputPath) )
            using ( var jsonReader = new JsonTextReader( textReader ))
            using ( var oFileStream = new FileStream( outputPath, fileMode ) )
            {
                CopyJsonToBson(jsonReader, oFileStream);
            }
        }
    
        public static void CopyJsonToBson(JsonReader jsonReader, Stream stream)
        {
            var rootTokenType = jsonReader.ReadToContentAndAssert().TokenType;
            if (!stream.CanSeek || rootTokenType != JsonToken.StartArray)
            {
                using ( var dataWriter = new BsonDataWriter(stream) { CloseOutput = false } )
                {
                    dataWriter.WriteToken(jsonReader, stream.CanSeek);
                }
            }
            else
            {
                stream.Flush(); // Just in case.
    
                var initialPosition = stream.Position;
                var buffer = new byte[256];
    
                WriteInt(stream, 0, buffer); // CALCULATED SIZE TO BE CALCULATED LATER.
    
                ulong index = 0;
    
                while (jsonReader.ReadToContentAndAssert().TokenType != JsonToken.EndArray)
                {
                    var bsonType = GetBsonType(jsonReader.TokenType, jsonReader.ValueType);
                    stream.WriteByte(unchecked((byte)bsonType));
                    WriteString(stream, index.ToString(NumberFormatInfo.InvariantInfo), buffer);
                    using (var dataWriter = new BsonDataWriter(stream) { CloseOutput = false })
                    {
                        dataWriter.WriteToken(jsonReader);
                    }
                    index++;
                }
    
                stream.WriteByte((byte)0);
                stream.Flush();
    
                var finalPosition = stream.Position;
                stream.Position = initialPosition;
    
                var size = checked((int)(finalPosition - initialPosition));
                WriteInt(stream, size, buffer); // CALCULATED SIZE TO BE CALCULATED LATER.
    
                stream.Position = finalPosition;
            }
        }
    
        private static readonly Encoding Encoding = new UTF8Encoding(false);
    
        private static void WriteString(Stream stream, string s, byte[] buffer)
        {
            if (s != null)
            {
                if (s.Length < buffer.Length / Encoding.GetMaxByteCount(1))
                {
                    var byteCount = Encoding.GetBytes(s, 0, s.Length, buffer, 0);
                    stream.Write(buffer, 0, byteCount);
                }
                else
                {
                    byte[] bytes = Encoding.GetBytes(s);
                    stream.Write(bytes, 0, bytes.Length);
                }
            }
    
            stream.WriteByte((byte)0);
        }       
    
        private static void WriteInt(Stream stream, int value, byte[] buffer)
        {
            unchecked
            {
                buffer[0] = (byte) value;
                buffer[1] = (byte) (value >> 8);
                buffer[2] = (byte) (value >> 16);
                buffer[3] = (byte) (value >> 24);
            }
            stream.Write(buffer, 0, 4);
        }
    
        private static BsonType GetBsonType(JsonToken jsonType, Type valueType)
        {
            switch (jsonType)
            {
                case JsonToken.StartArray:
                    return BsonType.Array;
    
                case JsonToken.StartObject:
                    return BsonType.Object;
    
                case JsonToken.Null:
                    return BsonType.Null;
    
                // Add primitives as required.
    
                default:
                    throw new JsonWriterException(string.Format("BsonType for {0} not implemented.", jsonType));
            }
        }
    
        //Copied from: https://github.com/JamesNK/Newtonsoft.Json.Bson/blob/master/Src/Newtonsoft.Json.Bson/BsonType.cs
        //Original source: http://bsonspec.org/spec.html
        enum BsonType : sbyte
        {
            Number = 1,
            String = 2,
            Object = 3,
            Array = 4,
            Binary = 5,
            Undefined = 6,
            Oid = 7,
            Boolean = 8,
            Date = 9,
            Null = 10,
            Regex = 11,
            Reference = 12,
            Code = 13,
            Symbol = 14,
            CodeWScope = 15,
            Integer = 16,
            TimeStamp = 17,
            Long = 18,
            MinKey = -1,
            MaxKey = 127
        }       
    }
    
    public static partial class JsonExtensions
    {
        public static JsonReader ReadToContentAndAssert(this JsonReader reader)
        {
            return reader.ReadAndAssert().MoveToContentAndAssert();
        }
    
        public static JsonReader MoveToContentAndAssert(this JsonReader reader)
        {
            if (reader == null)
                throw new ArgumentNullException();
            if (reader.TokenType == JsonToken.None)       // Skip past beginning of stream.
                reader.ReadAndAssert();
            while (reader.TokenType == JsonToken.Comment) // Skip past comments.
                reader.ReadAndAssert();
            return reader;
        }
    
        public static JsonReader ReadAndAssert(this JsonReader reader)
        {
            if (reader == null)
                throw new ArgumentNullException();
            if (!reader.Read())
                throw new JsonReaderException("Unexpected end of JSON stream.");
            return reader;
        }
    }
    

    然后按如下方式使用:

    var inputPath = @"k:\\BrokeredMessage_Alarmhub-Infra-Prd-Sbn_08-06-2019 11-13-34.json";
    var outputPath = @"k:\\output.bson";
    
    BsonExtensions.CopyJsonToBson(inputPath, outputPath, FileMode.Create);
    

    注意事项:

    1. 我专门针对数组的情况实现了流+搜索,因为这似乎是大型 JSON 文件最常见的场景。

    2. 1234563
    3. 这样做之后,例程就可以递归地调用自身,这在根对象包含一个非常大的数组作为成员时可能很有用。 (不过,此时,您基本上已经编写了自己的BsonDataWriter 版本。)

      但是,这样做可能会导致在输出流中进行大量搜索,从而极大地影响性能。

    演示小提琴 #2 here.

    【讨论】:

    • 感谢您如此完整的回复。我得出的结论是 bson 格式不适合流式传输 json,最后我使用了 gzip 流。 Bson 是一种用于加快 MongoDb 数据库中 json 文档读取时间的格式。我认为 BsonDataWriter/BsonDataReader 类甚至不应该存在,因为它会导致计数器性能代码,或者它们应该只将 JObjects 作为输入/输出,而不是 JsonTextReader/JsonTextWriter。
    猜你喜欢
    • 2016-02-14
    • 2015-07-30
    • 1970-01-01
    • 1970-01-01
    • 2015-10-01
    • 2019-05-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多