【问题标题】:Delaying a Tokio Stream延迟 Tokio 流
【发布时间】:2017-09-30 23:02:03
【问题描述】:

给定一个Stream,我想创建一个新的Stream,其中的元素在它们之间产生时间延迟。

我尝试使用tokio_core::reactor::TimeoutStreams 的and_then 组合器编写代码,但延迟不起作用:我立即获得所有元素,没有延迟。

这是一个自包含的示例 (playground):

extern crate tokio_core;
extern crate futures;

use std::time::Duration;

use futures::{Future, Stream, stream, Sink};
use self::futures::sync::{mpsc};
use tokio_core::reactor;

const NUM_ITEMS: u32 = 8;

fn main() {
    let mut core = reactor::Core::new().unwrap();
    let handle = core.handle();
    let chandle = handle.clone();

    let (sink, stream) = mpsc::channel::<u32>(0);

    let send_stream = stream::iter_ok(0 .. NUM_ITEMS)
        .and_then(move |i: u32| {
                  let cchandle = chandle.clone();
                  println!("Creating a timeout object...");
                  reactor::Timeout::new(Duration::new(1,0), &cchandle)
                      .map_err(|_| ())
                      .and_then(|_| Ok(i))
        });

    let sink = sink.sink_map_err(|_| ());
    handle.spawn(sink.send_all(send_stream).and_then(|_| Ok(())));

    let mut incoming_items = Vec::new();
    {
        let keep_messages = stream.for_each(|item| {
            incoming_items.push(item);
            println!("item = {}", item);
            Ok(())
        });

        core.run(keep_messages).unwrap();
    }
    assert_eq!(incoming_items, (0 .. NUM_ITEMS).collect::<Vec<u32>>());
}

为了完整起见,这是我得到的输出:

Creating a timeout object...
Creating a timeout object...
item = 0
Creating a timeout object...
item = 1
Creating a timeout object...
item = 2
Creating a timeout object...
item = 3
Creating a timeout object...
item = 4
Creating a timeout object...
item = 5
Creating a timeout object...
item = 6
item = 7

我怀疑问题出在以下几行:

reactor::Timeout::new(Duration::new(1,0), &cchandle)
    .map_err(|_| ())
    .and_then(|_| Ok(i))

可能我并没有真正等待返回的Timeout 对象,尽管我不确定如何解决它。

【问题讨论】:

    标签: stream rust rust-tokio


    【解决方案1】:

    正如我所怀疑的,问题在于新创建的Timeout 的操作(使用and_then)。我们需要首先解开调用reactor::Timeout::new 的结果,如果手动完成,这可能会变得混乱,或者使用into_future,将结果转换为Future,然后使用Future 组合器处理它.

    解决问题的代码:

    extern crate tokio_core;
    extern crate futures;
    
    use std::time::Duration;
    
    use futures::{Future, Stream, stream, Sink, IntoFuture};
    use self::futures::sync::{mpsc};
    use tokio_core::reactor;
    
    const NUM_ITEMS: u32 = 8;
    
    fn main() {
        let mut core = reactor::Core::new().unwrap();
        let handle = core.handle();
        let chandle = handle.clone();
    
        let (sink, stream) = mpsc::channel::<u32>(0);
    
        let send_stream = stream::iter_ok(0 .. NUM_ITEMS)
            .and_then(move |i: u32| {
                      let cchandle = chandle.clone();
                      println!("Creating a timeout object...");
                      reactor::Timeout::new(Duration::new(1,0), &cchandle)
                          .into_future()
                          .and_then(move |timeout| timeout.and_then(move |_| Ok(i)))
                          .map_err(|_| ())
            });
    
        let sink = sink.sink_map_err(|_| ());
        handle.spawn(sink.send_all(send_stream).and_then(|_| Ok(())));
    
        let mut incoming_items = Vec::new();
        {
            let keep_messages = stream.for_each(|item| {
                incoming_items.push(item);
                println!("item = {}", item);
                Ok(())
            });
    
            core.run(keep_messages).unwrap();
        }
        assert_eq!(incoming_items, (0 .. NUM_ITEMS).collect::<Vec<u32>>());
    }
    

    请注意,正在使用两个and_then。第一个解开调用reactor::Timeout::new 获得的Result。第二个实际上是在等待Timeout 触发。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2012-09-16
      • 2023-03-09
      • 2011-04-20
      • 2010-12-15
      • 2017-05-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多