【问题标题】:How do you wrap synchronous network I/O trivially with Tokio? [duplicate]你如何用 Tokio 简单地包装同步网络 I/O? [复制]
【发布时间】:2020-12-12 08:39:55
【问题描述】:

不幸的是,我对 Rust 并发开发的理解有一个明显的失误。这个问题源于数周反复努力解决一个看似“微不足道”的问题。


问题域

开发一个名为twistrs的Rust库,它是一个域名排列和枚举库。该库的目的和目标是提供一个根域(例如google.com)并生成该域的排列(例如guugle.com)并丰富该排列(例如它解析为123.123.123.123)。

它的一个目标是比它的Python counterpart 执行得更快。最值得注意的是,网络调用,例如但不限于 DNS 查找。

目前的设计方案

库背后的想法(除了作为学习场所之外)是开发一个非常简单的安全库,可以实现以满足各种需求。您(作为客户端)可以选择在内部直接与 permutationenrichment 模块进行交互,或者使用库提供的异步/并发实现。

请注意,内部存在 no 共享状态。这可能是非常低效的,但暂时有点没有意义,因为它可以防止很多问题。

当前问题

在内部,DNS 查找是同步完成的,并且本质上是阻塞的。我无法将其转换为并发代码。我能得到的最接近的是使用 tokio mpsc 频道,并执行 spawn 单个 tokio 任务:

use twistrs::enrich::{Result, DomainMetadata};
use twistrs::permutate::Domain;

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let domain = Domain::new("google.com").unwrap();

    let _permutations = domain.all().unwrap().collect::<Vec<String>>();

    let (mut tx, mut rx) = mpsc::channel(1000);

    tokio::spawn(async move {
        for (i, v) in _permutations.into_iter().enumerate() {
            let domain_metadata = DomainMetadata::new(v.clone());

            let dns_resolution = domain_metadata.dns_resolvable();

            if let Err(_) = tx.send((i, dns_resolution)).await {
                println!("receiver dropped");
                return;
            }
        }
    });

    while let Some(i) = rx.recv().await {
        println!("got: {:?}", i);
    }
}

也就是说,精明的读者会立即注意到这会阻塞,并以任何方式有效地同步运行 DNS 查找。

由于move 是在tx 上完成的(并且tx 不是impl Copy),因此无法尝试在for 循环中生成Tokio 任务:

for (i, v) in _permutations.into_iter().enumerate() {
    tokio::spawn(async move {
        let domain_metadata = DomainMetadata::new(v.clone());

        let dns_resolution = domain_metadata.dns_resolvable();

        if let Err(_) = tx.send((i, dns_resolution)).await {
            println!("receiver dropped");
            return;
        }
    });
}

删除await 当然不会导致任何事情发生,因为需要轮询生成的任务。我如何有效地将所有这些同步调用包装到异步任务中,这些任务可以独立运行并最终汇聚到一个集合中?

我遇到的一个类似的 Rust 项目是 batch_resolve,它在这方面做得非常好(!)。但是,我发现实现对于我想要实现的目标非常复杂(也许我错了)。非常感谢任何帮助或实现这一目标的见解。

如果您想快速重现此问题,您可以简单地克隆项目并使用本文中的第一个代码 sn-p 更新 examples/twistrs-cli/main.rs

【问题讨论】:

  • 但是为什么要使用同步 DNS 查找呢?您是否将 DNS 委托给操作系统?否则你可以将网络请求异步到 DNS 服务器?
  • 如果您使用同步 dns 请求,您将受限于 Tokio 使用的工作线程数,通常为 8 左右。有什么理由不使用 Tokio 的 lookup_host 进行 dns 查询?这将允许您使用标准并发原语同时运行大量它们。
  • 这两点很好,我实际上正在研究可能解决我问题的方法。

标签: rust dns rust-tokio


【解决方案1】:

编辑:我误解了您的问题,没有意识到 DNS 解析本身不是异步的。以下方法实际上不适用于同步代码,并且只会由于阻塞代码而导致执行程序停止,但如果您切换到异步解析方法,我会保留它。如果适合您的需要,我建议使用 tokio 的异步 lookup_host()


异步执行器旨在处理大量并行任务,因此您可以尝试为每个请求生成一个新任务,使用 Semaphore 来一次创建运行任务数量的上限。其代码可能如下所示:

let (mut tx, mut rx) = mpsc::channel(1000);
let semaphore = Arc::new(Semaphore::new(1000)); // allow up to 1000 tasks to run at once

for (i, v) in _permutations.into_iter().enumerate() {
    let domain_metadata = DomainMetadata::new(v.clone());
    let mut tx = tx.clone(); // every task will have its own copy of the sender
    let permit = semaphore.acquire_owned().await; // wait until we have a permit

    let dns_resolution = domain_metadata.dns_resolvable();
    tokio::spawn(async move {
        if let Err(_) = tx.send((i, dns_resolution)).await {
            println!("receiver dropped");
            return;
        }
        drop(permit); // explicitly release the permit, to make sure it was moved into this task
    }); // note: task spawn results and handle dropped here
}

while let Some(i) = rx.recv().await {
    println!("got: {:?}", i);
}

如果证明额外任务的开销太大,您可以尝试使用 FuturesUnordered 等来自 futures 板条箱的工具将这些任务组合到一个未来中。这允许您获取任意大的期货列表,并在单个任务中重复轮询它们。

【讨论】:

  • 谢谢@apetranzilla -- 打算试试这个并回复你。将寻求移动到lookup_host() 以及在 cmets 中的建议。
  • 我设法让 tokio::net::lookup_host 编译,这很棒。唯一的问题是domain_metadata.dns_resolvable() 现在返回一个Future,然后通过tx 推送。因此,当我们到达rx.recv().await 时,我们所拥有的只是一组 Futures(也无法调试/打印)。
  • 为了更好的上下文:impl std::future::Future cannot be formatted using {:?} because it doesn't implement std::fmt::Debug
  • 如果有帮助,这就是 dns_resolvable() 的实现方式 - play.rust-lang.org/…
  • 你应该可以在新任务中添加.await关键字,这样你就有dns_resolution.await并通过通道而不是期货本身发送Results .由于您在新生成的任务中等待它们,因此不会导致外部任务等待。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-07-02
  • 2018-08-08
  • 1970-01-01
  • 2020-01-03
  • 2013-03-07
  • 2012-09-14
相关资源
最近更新 更多