【问题标题】:How can I stream elements from inside a JSON array using serde_json?如何使用 serde_json 从 JSON 数组中流式传输元素?
【发布时间】:2021-08-03 18:31:08
【问题描述】:

我有一个 5GB 的 JSON 文件,它是一组具有固定结构的对象:

[
  {
    "first": "John",
    "last": "Doe",
    "email": "john.doe@yahoo.com"
  },
  {
    "first": "Anne",
    "last": "Ortha",
    "email": "anne.ortha@hotmail.com"
  },
  ....
]

我知道我可以尝试使用How can I deserialize JSON with a top-level array using Serde?中显示的代码解析这个文件:

use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
struct User {
    first: String,
    last: String,
    email: String,
}

let users: Vec<User> = serde_json::from_str(file)?;

有多个问题:

  1. 首先将其作为一个整体读取
  2. 读取为字符串后,将其转换为 User 结构的向量(我不希望这样)

我试过How I can I lazily read multiple JSON values from a file/stream in Rust?,但它会在打印任何内容之前读取整个文件,并在循环内一次打印整个结构。我期待在循环中一次一个对象:

理想情况下,(已解析的)用户对象的解析和处理应该同时在两个单独的线程/任务/例程中或通过使用通道进行。

【问题讨论】:

  • 但它会在打印任何内容之前读取整个文件——你如何验证这一点?
  • 添加截图
  • 假设它从循环中打印出两个项目——你怎么知道它是否读入了整个文件?
  • 好的,不管文件有多大,这条线都没有问题。 println!("Before reader"); let iterator = deserializer.into_iter::&lt;serde_json::Value&gt;(); println!("after reader"); - 现在的问题是整个文件内容在第一次循环迭代时立即打印,所以我不能单独获取每个对象。 - & 除了上面提到的地方,我找不到在任何其他地方加载整个文件的任何用法

标签: rust serde-json


【解决方案1】:

从 JSON 数组流式传输元素是可能的,但需要一些繁琐的工作。您必须自己跳过前导的[ 和间歇的,,并检测最后的]。要解析单个数组元素,您需要使用 StreamDeserializer 并从中提取单个项目(这样您就可以删除它并重新获得对 IO 阅读器的控制权)。例如:

use serde::de::DeserializeOwned;
use serde_json::{self, Deserializer};
use std::io::{self, Read};

fn read_skipping_ws(mut reader: impl Read) -> io::Result<u8> {
    loop {
        let mut byte = 0u8;
        reader.read_exact(std::slice::from_mut(&mut byte))?;
        if !byte.is_ascii_whitespace() {
            return Ok(byte);
        }
    }
}

fn invalid_data(msg: &str) -> io::Error {
    io::Error::new(io::ErrorKind::InvalidData, msg)
}

fn deserialize_single<T: DeserializeOwned, R: Read>(reader: R) -> io::Result<T> {
    let next_obj = Deserializer::from_reader(reader).into_iter::<T>().next();
    match next_obj {
        Some(result) => result.map_err(Into::into),
        None => Err(invalid_data("premature EOF")),
    }
}

fn yield_next_obj<T: DeserializeOwned, R: Read>(
    mut reader: R,
    at_start: &mut bool,
) -> io::Result<Option<T>> {
    if !*at_start {
        *at_start = true;
        if read_skipping_ws(&mut reader)? == b'[' {
            // read the next char to see if the array is empty
            let peek = read_skipping_ws(&mut reader)?;
            if peek == b']' {
                Ok(None)
            } else {
                deserialize_single(io::Cursor::new([peek]).chain(reader)).map(Some)
            }
        } else {
            Err(invalid_data("`[` not found"))
        }
    } else {
        match read_skipping_ws(&mut reader)? {
            b',' => deserialize_single(reader).map(Some),
            b']' => Ok(None),
            _ => Err(invalid_data("`,` or `]` not found")),
        }
    }
}

pub fn iter_json_array<T: DeserializeOwned, R: Read>(
    mut reader: R,
) -> impl Iterator<Item = Result<T, io::Error>> {
    let mut at_start = false;
    std::iter::from_fn(move || yield_next_obj(&mut reader, &mut at_start).transpose())
}

示例用法:

fn main() {
    let data = r#"[
  {
    "first": "John",
    "last": "Doe",
    "email": "john.doe@yahoo.com"
  },
  {
    "first": "Anne",
    "last": "Ortha",
    "email": "anne.ortha@hotmail.com"
  }
]"#;
    use serde::{Deserialize, Serialize};

    #[derive(Serialize, Deserialize, Debug)]
    struct User {
        first: String,
        last: String,
        email: String,
    }

    for user in iter_json_array(io::Cursor::new(&data)) {
        let user: User = user.unwrap();
        println!("{:?}", user);
    }
}

Playground

在生产中使用它时,您可以将其打开为File,而不是将其读取为字符串。与往常一样,不要忘记将File 包装在BufReader 中。

【讨论】:

  • 谢谢!这正是我想要的。我刚刚在大约 5 分钟内解析了 1.03 GB 的文件,其中包含 10,000,000 个对象。对于阅读文件,我使用了let str = fs::read_to_string(path)?; (it doesn't have overhead) for recipe in iter_json_array(io::Cursor::new(str)) {
  • @RSun 将所有内容读入字符串在内存使用方面似乎不是最理想的;为什么不使用缓冲阅读器,就像在 golang 中一样? let reader = BufReader::new(File::open(path)?); for recipe in iter_json_array(reader) { ... }
  • 同意。我还是 Rust 的新手 :)
  • @RSun 我很好奇它是否会影响运行时间(在任何一个方向上)——有时最优雅的代码也不是最高性能的! (而且 5 分钟解析一个 5 GiB 的文件并不完全是出色的性能——在加快速度方面,也许还有一些唾手可得的成果。)
【解决方案2】:

is not directly possible 来自 serde_json 1.0.66。

一个workaround suggested 是实现您自己的使用通道的Visitor。随着数组反序列化的进行,每个元素都被推下通道。然后通道的接收端可以抓取每个元素并对其进行处理,从而为反序列化释放空间以推入另一个值。

【讨论】:

    猜你喜欢
    • 2013-12-01
    • 2016-04-13
    • 1970-01-01
    • 2019-06-23
    • 2012-09-09
    • 2022-01-13
    • 2014-09-18
    • 1970-01-01
    • 2016-01-21
    相关资源
    最近更新 更多