【问题标题】:What is the best approach to encapsulate blocking I/O in future-rs?在 future-rs 中封装阻塞 I/O 的最佳方法是什么?
【发布时间】:2017-06-15 09:18:27
【问题描述】:

我阅读了tokio documentation,我想知道将来封装昂贵的同步 I/O 的最佳方法是什么。

通过反应器框架,我们获得了绿色线程模型的优势:少数 OS 线程通过执行器处理大量并发任务。

tokio 的未来模型是需求驱动的,这意味着未来本身将轮询其内部状态以提供有关其完成的信息;允许背压和取消功能。据我了解,未来的轮询阶段必须是非阻塞的才能正常工作。

我要封装的 I/O 可以看作是一个长的原子且代价高昂的操作。理想情况下,独立任务将执行 I/O,而相关联的未来将轮询 I/O 线程以了解完成状态。

我看到的仅有的两个选项是:

  • 在未来的 poll 函数中包含阻塞 I/O。
  • 产生一个 OS 线程来执行 I/O 并使用未来机制来轮询其状态,如 shown in the documentation

据我了解,这两种解决方案都不是最优的,也没有充分利用绿色线程模型(首先在文档中不建议,其次不要通过反应器框架提供的执行器)。还有其他解决方案吗?

【问题讨论】:

  • 选项 3:让长时间运行的操作在线程池上运行,该线程池在完成时发出未来信号(从 IO 线程轮询)。 Afaik tokio 已经通过某种 ThreadPool 执行器(而不是 IO 执行器)对此提供了支持。
  • 对于@Matthias247 的评论,在线程池上运行期货的箱子是futures-cpupool

标签: performance rust future


【解决方案1】:

理想情况下,独立任务将执行 I/O,相关联的未来将轮询 I/O 线程以了解完成状态。

是的,这是异步执行的推荐方法。请注意,这不限于 I/O,而是适用于任何长时间运行的同步任务!

期货箱

ThreadPool 类型是为此创建的1

在这种情况下,您生成工作以在池中运行。池本身执行工作以检查工作是否已完成并返回满足 Future 特征的类型。

use futures::{
    executor::{self, ThreadPool},
    future,
    task::{SpawnError, SpawnExt},
}; // 0.3.1, features = ["thread-pool"]
use std::{thread, time::Duration};

async fn delay_for(pool: &ThreadPool, seconds: u64) -> Result<u64, SpawnError> {
    pool.spawn_with_handle(async {
        thread::sleep(Duration::from_secs(3));
        3
    })?
    .await;
    Ok(seconds)
}

fn main() -> Result<(), SpawnError> {
    let pool = ThreadPool::new().expect("Unable to create threadpool");

    let a = delay_for(&pool, 3);
    let b = delay_for(&pool, 1);

    let c = executor::block_on(async {
        let (a, b) = future::join(a, b).await;

        Ok(a? + b?)
    });

    println!("{}", c?);
    Ok(())
}

可以看到总时间只有3秒:

% time ./target/debug/example
4

real    3.010
user    0.002
sys     0.003

1 — 有一些discussion 表示当前的实现可能不是最好的阻塞操作,但现在就足够了。

东京

这里,我们使用task::spawn_blocking

use futures::future; // 0.3.15
use std::{thread, time::Duration};
use tokio::task; // 1.7.1, features = ["full"]

async fn delay_for(seconds: u64) -> Result<u64, task::JoinError> {
    task::spawn_blocking(move || {
        thread::sleep(Duration::from_secs(seconds));
        seconds
    })
    .await?;
    Ok(seconds)
}

#[tokio::main]
async fn main() -> Result<(), task::JoinError> {
    let a = delay_for(3);
    let b = delay_for(1);

    let (a, b) = future::join(a, b).await;
    let c = a? + b?;

    println!("{}", c);

    Ok(())
}

另请参阅 Tokio 文档中的 CPU-bound tasks and blocking code

附加点

请注意,这不是一种有效的睡眠方式,它只是一些阻塞操作的占位符。如果您确实需要睡觉,请使用futures-timertokio::time::sleep 之类的东西。详情请见Why does Future::select choose the future with a longer sleep period first?

这两种解决方案都不是最优的,也没有充分利用绿色线程模型

没错——因为你没有异步的东西!您正在尝试结合两种不同的方法,并且必须在它们之间进行转换。

第二次不要通过reactor框架提供的执行器

我不确定你在这里的意思。 block_ontokio::main 隐式创建了一个执行程序。线程池有一些内部逻辑来检查线程是否完成,但这应该只在用户的执行器polls 时触发。

【讨论】:

  • 感谢您的回答。执行者是指Core event loop,如果我做对了,它基本上不会产生任何线程。虽然我想我开始思考这个问题:核心事件循环和 cpu 池执行器是互补的东西,最有效的取决于情况(低级快速轮询的核心事件循环 - 像 OS 套接字轮询 - 和 cpu-pool用于网络或磁盘等繁重的 IO)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-05-21
  • 2021-05-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多