【问题标题】:Difference between using Future directly or wrapping in async move block直接使用 Future 或包装在异步移动块中的区别
【发布时间】:2021-05-23 12:43:54
【问题描述】:

我想知道为什么将代码从直接在 tokio::spawn 中使用 Future 更改为使用 async move 块可以编译代码。

直接使用:

struct ClientMsg {
    ...
    resp: oneshot::Sender<Bytes>,
}

async fn client_thread(
    mut rx: Receiver<ClientMsg>,
    client: Client,
) -> Result<(), Box<dyn Error>> {
    while let Some(msg) = rx.recv().await {
        ...
        let response = client.get(url).send().await?.bytes().await?;
        msg.resp.send(response).unwrap();
    }
    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    ...
    let (tx, rx) = mpsc::channel(5);
    tokio::spawn(client_thread(rx, client)); // <--- Difference is here

    Ok(())
}

异步块:

struct ClientMsg {
    ...
    resp: oneshot::Sender<Bytes>,
}

async fn client_thread(
    mut rx: Receiver<ClientMsg>,
    client: Client,
) -> Result<(), Box<dyn Error>> {
    while let Some(msg) = rx.recv().await {
        ...
        let response = client.get(url).send().await?.bytes().await?;
        msg.resp.send(response).unwrap();
    }
    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    ...
    let (tx, rx) = mpsc::channel(5);
    tokio::spawn(async move { client_thread(rx, client) }); // <-- Difference is here

    Ok(())
}

注意以下use

use bytes::Bytes;
use reqwest::Client;

use tokio::sync::{
    mpsc::{self, Receiver},
    oneshot,
};
use url::Url;

直接使用代码失败:

error[E0277]: `(dyn StdError + 'static)` cannot be sent between threads safely
   --> src/main.rs:44:5
    |
44  |     tokio::spawn(client_thread(rx, client, base_url));
    |     ^^^^^^^^^^^^ `(dyn StdError + 'static)` cannot be sent between threads safely
    | 
   ::: /home/jeanluc/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.2.0/src/task/spawn.rs:130:20
    |
130 |         T::Output: Send + 'static,
    |                    ---- required by this bound in `tokio::spawn`
    |
    = help: the trait `Send` is not implemented for `(dyn StdError + 'static)`
    = note: required because of the requirements on the impl of `Send` for `Unique<(dyn StdError + 'static)>`
    = note: required because it appears within the type `Box<(dyn StdError + 'static)>`
    = note: required because it appears within the type `std::result::Result<(), Box<(dyn StdError + 'static)>>`

client_thread 的返回类型与main 函数完全相同,但是在 Tokio 上运行没有任何问题。此外,error type from reqwest 实现了Send

【问题讨论】:

    标签: asynchronous rust send tokio


    【解决方案1】:

    按照您的做法将函数调用包装在 async {} 块中是行不通的。运行时,它调用创建未来的函数,然后返回它。它永远不会被轮询,所以它永远不会取得进展。直接使用它或与.await 一起使用可确保它被轮询。

    use futures::executor::block_on; // 0.3.12
    
    async fn f(s: &str) {
        println!("ran {}", s);
    }
    
    fn main() {
        block_on(f("first"));
        block_on(async { f("second") }); // this doesn't print
        block_on(async { f("third").await });
    }
    
    ran first
    ran third
    

    由于未来没有持久化,它最终不会影响async {} 块的特征,因此可以是Send.await 会再次遇到同样的问题。


    在您的情况下,您需要做的就是确保未来实现Send,以便它可以使用tokio::spawn 运行。解决方法是要求返回的 Error 特征实现 Send

    async fn client_thread(
        mut rx: Receiver<ClientMsg>,
        client: Client,
    ) -> Result<(), Box<dyn Error + Send>> {
        // ...                    ^^^^^^
    }
    

    【讨论】:

      猜你喜欢
      • 2020-03-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-03-23
      • 2021-11-18
      • 1970-01-01
      • 1970-01-01
      • 2020-05-31
      相关资源
      最近更新 更多