【问题标题】:Deserialize from tokio socket从 tokio 套接字反序列化
【发布时间】:2018-07-11 12:48:13
【问题描述】:

我正在使用 tokio 实现一个服务器,该服务器与使用 serde(bincode)序列化的消息进行通信。如果没有异步和期货,我会这样做

extern crate tokio_io;
extern crate bincode;
extern crate serde;
extern crate bytes;
extern crate futures;
#[macro_use] extern crate serde_derive;

use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::io::{read_exact, write_all};
use bincode::{serialize, deserialize, deserialize_from, Infinite, serialized_size};
use std::io::Read;
use std::io::Cursor;
use futures::future::Future;

type Item = String; // Dummy, this is a complex struct with derived Serizalize
type Error = bincode::Error;

// This works
fn decode<R>(reader: &mut R) -> Result<Item, Error> where R: Read {
    let message: Item = deserialize_from(reader, Infinite)?;
    Ok(message)
}

fn main() {

    let ser = serialize("Test", Infinite).unwrap();
    let buf = Cursor::new(ser);

    let mut reader = std::io::BufReader::new(buf);

    println!("{:?}", decode(&mut reader))
}

但我需要的是一个decode 函数,它可以与异步套接字一起工作

// I need this since I get the reader from a (tokio) socket as
// let socket = TcpListener::bind(&addr, &handle).unwrap();
// let (reader, writer) = socket.split();
fn decode_async<R>(reader: R) -> Result<Item, Error> where R: AsyncRead {
    // Does not work:   
    let message: Item = deserialize_from(reader, Infinite)?;
    Ok(message)
}

我唯一的想法是在编码期间手动将长度写入缓冲区,然后使用 read_exact:

// Encode with size
fn encode_async(item: &Item) -> Result<Vec<u8>, Error>{
    let size = serialized_size(item);
    let mut buf = serialize(&size, Infinite).unwrap();
    let ser = serialize(item, Infinite).unwrap();
    buf.extend(ser);
    Ok(buf)
}

// Decode with size
fn decode_async<R>(reader: R) -> Box<Future<Item = Item, Error = std::io::Error>>
    where R: AsyncRead + 'static {

    let read = read_exact(reader, vec![0u8; 8]).and_then(|(reader, buf)| {
        let size = deserialize::<u64>(&mut &buf[..]).unwrap();
        Ok((reader, size as usize))
    }).and_then(|(reader, size)| {
        read_exact(reader, vec![0u8; size])
    }).and_then(|(reader, buf)| {
        let item = deserialize(&mut &buf[..]).unwrap();
        Ok(item)
    });

    Box::new(read)
}

fn main() {

    let ser = encode_async(&String::from("Test")).unwrap();
    let buf = Cursor::new(ser);

    let mut reader = std::io::BufReader::new(buf);
    let dec = decode_async(reader).wait();

    println!("{:?}", dec)
}

有没有更好的方法来实现解码?

【问题讨论】:

    标签: asynchronous rust serde rust-tokio


    【解决方案1】:

    deserialize_from 无法处理 IO 错误,尤其是异步(非阻塞)Readers 在等待更多数据时返回的那种WouldBlock。这受到接口的限制:deserialize_from 不返回Future 或部分状态,它返回完整解码的Result,并且不知道如何将Reader 与事件循环结合起来处理WouldBlock 没有忙循环。

    理论上,可以实现async_deserialize_from,但不能使用serde提供的接口,除非你提前读取完整数据进行解码,否则会达不到目的。

    如果您知道“无休止”流(或在后面跟着其他数据)。

    【讨论】:

      【解决方案2】:

      Stefan 的回答是正确的,但是您可能有兴趣查看为您执行此操作的 tokio-serde-* 板条箱系列,特别是 tokio-serde-bincode。来自自述文件:

      使用 serde 轻松实现 Tokio Bincode 传输所需的实用程序,用于帧值的序列化和反序列化。基于 tokio-serde。

      箱子里有几个examples的使用方法。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2013-01-01
        • 1970-01-01
        • 1970-01-01
        • 2013-07-10
        • 2012-11-02
        • 2020-07-15
        相关资源
        最近更新 更多