【问题标题】:Channel communication between tasks任务之间的通道通信
【发布时间】:2019-07-30 10:30:48
【问题描述】:

我正在尝试在一个hyper 服务和一个tokio 流之间建立一个基于通道的通信。问题是编译器出现以下错误:

闭包是FnOnce,因为它将变量tx_queue移出 它的环境。

阅读rustc --explain E0525 提供的解释后,tokio::sync::mpsc::Sender 似乎实现了Clone,但没有实现Copy(除非我忽略了什么)。

所以我有点卡住了。如何让我的服务通过tokio::sync::mpsc 频道向 tokio 流发送消息?我确定我错过了一些明显但看不到的东西:/

有问题的代码摘录(根据@E_net4 的要求进行了修改以使其更短):

    extern crate hyper;
    extern crate tokio;
    extern crate tokio_signal;

    use futures::Stream;
    use hyper::rt::Future;
    use hyper::service::service_fn_ok;
    use hyper::{Body, Request, Response, Server};

    use futures::sink::Sink;
    use futures::sync::{mpsc, oneshot};
    use futures::{future, stream};

    fn main() {
        let mut runtime = tokio::runtime::Runtime::new().unwrap();

        let (tx1, rx1) = oneshot::channel::<()>();

        let (tx_queue, rx_queue) = mpsc::channel(10);

        // ----

        runtime.spawn(start_queue(rx_queue));

        // ----

        let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(|| {
            service_fn_ok(move |_: Request<Body>| {
                tx_queue.send(1);
                Response::new(Body::from("Hello World!"))
            })
        });

        let graceful = http_server
            .with_graceful_shutdown(rx1)
            .map_err(|err| eprintln!("server error: {}", err))
            .and_then(|_| {
                dbg!("stopped");
                // TODO: stop order queue listener
                Ok(())
            });

        dbg!("HTTP server listening ...");

        runtime.spawn(graceful);

        // ----

        tx1.send(()).unwrap();

        dbg!("exited");
    }

    pub fn start_queue(rx: mpsc::Receiver<usize>) -> impl Future<Item = (), Error = ()> {
        #[derive(Eq, PartialEq)]
        enum Item {
            Value(usize),
            Tick,
            Done,
        }

        let items = rx
            .map(Item::Value)
            .chain(stream::once(Ok(Item::Done)))
            .take_while(|item| future::ok(*item != Item::Done));

        items
            .fold(0, |num, _item| {
                dbg!("x");
                future::ok(num)
            })
            .map(|_| ())
    }

完整的代码在这里:https://gist.github.com/jeromer/52aa2da43c5c93584c6ee55be68dd04e

谢谢:)

【问题讨论】:

  • 请尝试制作一个更好的minimal reproducible example,以便我们重现问题所需的一切都在问题本身中。单独的代码 sn-p 无法编译。

标签: rust rust-tokio hyper


【解决方案1】:

futures::sync::mpsc::Sender::send 使用 Sender 并生成一个 Send 对象,这是一个必须运行到完成才能实际发送数据的未来。如果通道已满,它将阻塞,直到其他人从该通道接收。完成后,它会返回Sender,您可以使用它来发送更多数据。

在这种情况下,我认为您不能仅使用 Sender 的单个实例来构造代码。您需要克隆它,以便每次调用服务函数都有新的克隆。注意现在两个闭包都是move

    let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move || {
        // This closure has one instance of tx_queue that was moved-in here.
        // Now we make a copy to be moved into the closure below.
        let tx_queue = tx_queue.clone();
        service_fn_ok(move |_: Request<Body>| {
            // This closure has one instance of tx_queue, but it will be called
            // multiple times, so it can not consume it. It must make a copy
            // before consuming it.
            tx_queue.clone().send(111);
            Response::new(Body::from("Hello World!"))
        })
    });

但是,这会给你以下警告:

warning: unused `futures::sink::send::Send` that must be used

正如我所说,send 只是为您提供了一个必须运行才能实际执行发送的未来。如果忽略返回值,则不会发生任何事情。在这种情况下,最好将spawn 它作为一个单独的任务(这样它就不会阻止对客户端的响应)。要生成它,您需要运行时中的 executor,还必须为内部闭包克隆它:

    let executor = runtime.executor();
    let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move || {
        let tx_queue = tx_queue.clone();
        let executor = executor.clone();
        service_fn_ok(move |_: Request<Body>| {
            executor.spawn(tx_queue.clone().send(111).map(|_| ()).map_err(|err| {
                // TODO: Handle the error differenty!
                panic!("Error in mpsc {:?}", err);
            }));
            Response::new(Body::from("Hello World!"))
        })
    });

【讨论】:

  • 好的,我明白你的意思了。感谢您的帮助:)
  • 该死的错误信息是没有意义的。另外双克隆sux有点没找到为什么我不能修复那个错误原因
  • 克隆看起来很糟糕,但这是不可避免的。这两个闭包都可以被多次调用(它们是Fn,而不是FnOnce),因此它们不能给出自己的tx_queue 实例,因为下次调用它们时就没有它了。所以他们保留一个实例作为模板,并在调用时克隆它。然后克隆被移动/消耗。
猜你喜欢
  • 2021-08-30
  • 2019-08-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-06-10
  • 2013-04-10
  • 2018-08-23
相关资源
最近更新 更多