【问题标题】:Async loop on a new thread in rust: the trait `std::future::Future` is not implemented for `()`rust 中新线程上的异步循环:`()` 未实现特征`std::future::Future`
【发布时间】:2021-06-14 08:17:00
【问题描述】:

我知道这个问题已经被问过很多次了,但我仍然不知道该怎么做(更多内容见下文)。

我正在尝试使用std::thread::spawn 生成一个新线程,然后在其中运行一个异步循环。

我要运行的异步函数:

#[tokio::main] 
pub async fn pull_tweets(pg_pool2: Arc<PgPool>, config2: Arc<Settings>) {
    let mut scheduler = AsyncScheduler::new();

    scheduler.every(10.seconds()).run(move || {
        let arc_pool = pg_pool2.clone();
        let arc_config = config2.clone();
        async {
            pull_from_main(arc_pool, arc_config).await;
        }
    });

    tokio::spawn(async move {
        loop {
            scheduler.run_pending().await;
            tokio::time::sleep(Duration::from_millis(100)).await;
        }
    });
}

产生一个线程来运行:

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let handle = thread::spawn(move || async {
        pull_tweets(pg_pool2, config2).await;
    });
}

错误:

error[E0277]: `()` is not a future
  --> src/main.rs:89:9
   |
89 |         pull_tweets(pg_pool2, config2).await;
   |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `()` is not a future
   |
   = help: the trait `std::future::Future` is not implemented for `()`
   = note: required by `poll`

The last comment here 在概括问题方面做得非常出色。似乎在某些时候会期望实现IntoFuture 的返回值,而我没有。我尝试将Ok(()) 添加到闭包和函数中,但没有运气。

  • 添加到闭包中实际上没有任何作用
  • 添加到异步函数给了我一个新的但听起来相似的错误:
`Result<(), ()>` is not a future

然后我注意到答案专门讨论了extension functions,我没有使用它。 This 还谈到了扩展功能。

其他一些 SO 答案:

  • This 是由缺少 async 引起的。
  • Thisthisreqwest 库特定的。

所以似乎没有一个工作。有人可以帮助我理解 1)为什么这里存在错误以及 2)如何解决它?

注意 1:所有这些都可以通过将 std::thread::spawn 替换为 tokio::task::spawn_blocking 来轻松解决。但我有意按照this article 尝试线程生成。

注 2:关于我想要实现的更广泛的背景:我正在异步循环中从 twitter 中提取 150,000 条推文。我想比较 2 个实现:在主运行时运行与在单独线程上运行。后者是我挣扎的地方。

注意 3:在我看来,线程和异步任务是两个不同的原语,它们不能混合使用。即产生一个新线程不会影响任务的行为方式,产生一个新任务只会增加现有线程的工作。如果这种世界观有误(以及我能读到什么),请告诉我。

【问题讨论】:

  • 等待 pull_tweets()。同时,#[tokio::main] 告诉 tokio pull_tweets() 是一个“主”函数,即一个同步函数,它将创建一个执行程序并阻止它。您不能同时拥有两者:它必须是异步的或主的。干掉#[tokio::main],然后看看它是如何工作的。
  • 感谢您的回答。我在另一个我现在找不到的 SO 帖子中提出了将 #[tokio::main] 添加到非主函数的想法。您的解决方案使代码编译,但不幸的是它没有做任何事情(即调度程序不运行)。但反过来 - 删除 async/await 并保持 #[tokio::main] 确实有效!
  • 是的,thread::spawn(|| async ...) 没有意义,你需要像 block_on() 这样的东西。你是对的,可以将tokio::main 用于不是main() 的东西,它只是一个将异步函数转换为同步的工具。但是请注意,默认的 tokio 执行器是多线程的,因此虽然您可能会考虑线程和异步不同的世界,但 tokio 不同意,至少默认情况下不同意。

标签: rust rust-tokio


【解决方案1】:

#[tokio::main] 将您的函数转换为以下内容:

#[tokio::main] 
pub fn pull_tweets(pg_pool2: Arc<PgPool>, config2: Arc<Settings>) {
    let rt = tokio::Runtime::new();
    rt.block_on(async {
        let mut scheduler = AsyncScheduler::new();
        // ...
    });
}

请注意,它是一个同步函数,它产生一个新的运行时并运行内部未来直到完成。你不要await它,它是一个独立的运行时,有自己的专用线程池和调度器:

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let handle = thread::spawn(move || {
        pull_tweets(pg_pool2, config2);
    });
}

请注意,您的原始示例在另一个方面是错误的:

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let handle = thread::spawn(move || async {
        pull_tweets(pg_pool2, config2).await;
    });
}

即使pull_tweets 一个异步函数,线程也不会做任何事情,因为你所做的只是在async 块中创建另一个未来。创建的未来不会被执行,因为未来是惰性的(并且该线程中没有执行器上下文)。

我将构建代码以直接在新线程中生成运行时,并从那里调用您想要的任何 async 函数:

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let handle = thread::spawn(move || {
        let rt = tokio::runtime::Builder::new_multi_thread()
            .enable_all()
            .build()
            .unwrap();
        rt.block_on(async {
            pull_tweets(pg_pool2, config2).await;
        });
    });
}

pub async fn pull_tweets(pg_pool2: Arc<PgPool>, config2: Arc<Settings>) {
    // ...
}

【讨论】:

  • 嘿 - 感谢您的评论!关于删除 .await 的部分很重要。我必须多做一点(这很奇怪,而且我认为是 clkwerk 库特定的)——我必须删除循环周围的 tokio::spawn 块(所以只保留循环)。然后一切正常。顺便说一句,没有 .await 的 #[tokio::main] 也可以。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-08-15
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多