【问题标题】:How to run an asynchronous task from a non-main thread in Tokio?如何从 Tokio 的非主线程运行异步任务?
【发布时间】:2020-04-18 16:20:35
【问题描述】:
use std::thread;
use tokio::task; // 0.3.4

#[tokio::main]
async fn main() {
    thread::spawn(|| {
        task::spawn(async {
            println!("123");
        });
    })
    .join();
}

编译时出现警告:

warning: unused `std::result::Result` that must be used
  --> src/main.rs:6:5
   |
6  | /     thread::spawn(|| {
7  | |         task::spawn(async {
8  | |             println!("123");
9  | |         });
10 | |     })
11 | |     .join();
   | |____________^
   |
   = note: `#[warn(unused_must_use)]` on by default
   = note: this `Result` may be an `Err` variant, which should be handled

在执行时出现错误:

thread '<unnamed>' panicked at 'must be called from the context of Tokio runtime configured with either `basic_scheduler` or `threaded_scheduler`', src/main.rs:7:9

【问题讨论】:

  • 您在这里的实际目标是什么?你为什么要在不同的线程上生成该任务?我认为答案真的取决于你想做什么。
  • @Frxstrem 我需要从非主线程 (t) 启动另一个线程,并且线程 (t) 应该继续运行。在此之前,我使用了线程池。 repl.it/repls/AssuredWellmadeParentheses 之类的东西现在我决定用 async/await Tokio 替换池
  • 真正的问题是:为什么要混合线程和任务?
  • 如果你使用tokio,最好到处使用。任务不是线程,不能从任何地方产生,只能从由 tokio 管理的线程产生。您也许可以使用 tokio 通道在两者之间进行通信,但这是大量的额外工作和非常不同的代码结构
  • 您需要获取运行时的句柄,您可以将其传递给线程。您可能必须手动创建运行时才能执行此操作。然后句柄将处理将任务从调用它的线程编组到 tokio 工作线程。

标签: asynchronous rust rust-tokio


【解决方案1】:

关键是您需要获得 Tokio Handle。这是对 Runtime 的引用,它允许您从运行时之外生成异步任务。

使用#[tokio::main] 时,获得Handle 的最简单方法是通过Handle::current 生成另一个线程然后将句柄分配给可能想要启动异步任务的每个线程:

use std::thread;
use tokio::runtime::Handle; // 0.3.4

#[tokio::main]
async fn main() {
    let threads: Vec<_> = (0..3)
        .map(|thread_id| {
            let handle = Handle::current();

            thread::spawn(move || {
                eprintln!("Thread {} started", thread_id);

                for task_id in 0..3 {
                    handle.spawn(async move {
                        eprintln!("Thread {} / Task {}", thread_id, task_id);
                    });
                }

                eprintln!("Thread {} finished", thread_id);
            })
        })
        .collect();

    for t in threads {
        t.join().expect("Thread panicked");
    }
}

您还可以创建Mutex&lt;Option&lt;Handle&gt;&gt;global, mutable singleton,将其初始化为None,然后在tokio::main 函数的早期将其设置为Some。然后,您可以获取该全局变量,将其解包,并在需要时克隆 Handle

use once_cell::sync::Lazy; // 1.5.2

static HANDLE: Lazy<Mutex<Option<Handle>>> = Lazy::new(Default::default);
*HANDLE.lock().unwrap() = Some(Handle::current());
let handle = HANDLE.lock().unwrap().as_ref().unwrap().clone();

另见:

【讨论】:

    【解决方案2】:

    我有一个作业处理应用程序,它公开了一个 Web API 以添加作业并处理它们,但 API 请求不应等待作业完成(可能需要一段时间)。我使用服务器发送的事件来广播作业结果。这意味着主 API 服务器在 main#[tokio::main] 内执行,但我应该在哪里运行作业执行器?在作业执行器中,我将有很多等待:下载之类的东西。它们会干扰 Web API 服务器。关键问题是我如何才能并行启动两个执行?

    在这种情况下,您需要使用thread::spawn 创建一个单独的线程,您将在其中创建一个 Tokio 执行器。您得到的错误是在您的第二个线程中,没有 Tokio 执行程序(运行时)。您需要手动创建一个并告诉它运行您的任务。更简单的方法是使用Runtime API:

    use tokio::runtime::Runtime; // 0.2.23
    
    // Create the runtime
    let rt = Runtime::new().unwrap();
    
    // Spawn a future onto the runtime
    rt.spawn(async {
        println!("now running on a worker thread");
    });
    

    在您的主线程中,执行器已经可以使用#[tokio::main]。在添加此属性之前,运行时是手动创建的。

    如果你想坚持 async/await 理念,你可以使用join

    use tokio; // 0.2.23
    
    #[tokio::main]
    async fn main() {
        let (_, _) = tokio::join!(start_server_listener(), start_job_processor());
    }
    

    这就是为什么大多数答案都在质疑您的方法。虽然非常罕见,但我相信在某些情况下,您希望异步运行时位于另一个线程上,同时还可以手动配置运行时。

    【讨论】:

      猜你喜欢
      • 2019-06-20
      • 1970-01-01
      • 2016-10-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-04-06
      • 1970-01-01
      相关资源
      最近更新 更多