【问题标题】:How to collect multiple results from concurrently working for loop?如何从同时工作的for循环中收集多个结果?
【发布时间】:2021-10-26 14:11:06
【问题描述】:

我想运行一个包含许多不同变量的函数

假设我的功能是:

async fn do_the_hard_job(my_input: u16) {
    ...
    // do the hard job that takes some time
    if my_condition {
        println!("{}", input);
    }
}

我想为许多输入运行这个函数;假设我的输入是

inputs = stream::iter(1..=u16::MAX);

由于该函数会花费一些周期,因此我希望尽可能同时运行所有输入。所以我可以这样运行

 inputs.for_each_concurrent(0, |input| do_the_hard_job(input))
       .await;

到目前为止一切顺利;我使用该函数运行所有输入并在stdout 上获得我的输出。但是如果我想将输出写入某个输出文件呢?

我无法在do_the_hard_job 函数中打开文件并将其附加到其中。那会搞砸的。我无法将文件添加为参数,因为该方法将同时运行,哪个将借用可变文件。

我尝试返回值而不是在方法中打印,然后收集返回的值;像这样:

let mut return_values:Vec<u16> = Vec::new();
inputs
    .for_each_concurrent(0, |input| async move {
        if let done = do_the_hard_job(port).await{
            if done > 0 {
                return_values.push(port);
            }
        }}).await;

很遗憾,这没有用。我可以尝试什么来实现我的目标?

编辑:我为这个问题准备了一个复制器:https://github.com/kursatkobya/bite-sized-qualms/tree/main/concurrent-write

【问题讨论】:

  • 瓶颈与异步无关。该方法正在执行一项繁重的任务,因此假设为单个输入值完成一次运行大约需要 2 秒。
  • 好吧,我猜你的do_the_hard_job 函数应该返回一些结果,否则你不能自己检索结果。我认为这里缺少部分或您的实现完全错误。
  • 我已经更改了该函数以返回内容,正如我在问题的后半部分中提到的那样。然而,当将结果推送到向量中时,它失败了。在这里我添加了一个复制代码:github.com/kursatkobya/bite-sized-qualms/tree/main/…
  • 最好使用rust playground而不是github
  • 你面对的是this problem。最简单的解决方案是用Arc&lt;Mutex&lt;_&gt;&gt; 包装消费者,请查看playground 的示例,对于可以使用tokio::fs 将异步值写入文件的文件

标签: rust rust-futures


【解决方案1】:

你可以结合thencollect得到结果:

use futures::{stream, StreamExt};
use std::{
    time::Duration,
};

#[tokio::main]
async fn main() {
    let inputs = stream::iter(0..=10);

    // Finaly the one below does not work
    let output_values: Vec<u16> = inputs
        .then(|input: u16| async move {
            let result = f(input).await;
            result
        }).collect::<Vec<u16>>().await;
    println!("{:?}", output_values);

}

async fn f(input: u16) -> u16 {
    tokio::time::sleep(Duration::from_millis(200)).await;
    input
}

Playground

【讨论】:

  • 感谢您的回答。虽然此解决方案修复了编译和构建问题,但它破坏了并发性。也许这个解决方案可以与另一个并发/多线程解决方案结合使用。 @OmerErden 在评论中给出的另一个解决方案有效。我可以对这两种解决方案进行更多研究,感谢您提供信息并为我提供建立基础以解决问题的想法。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-01-28
  • 2021-11-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多