【问题标题】:How to convert a Future into a Stream?如何将 Future 转换为 Stream?
【发布时间】:2020-01-31 04:50:19
【问题描述】:

我正在尝试使用async_std 从网络接收 UDP 数据报。

有一个实现async recv_fromUdpSocket,此方法返回一个未来,但我需要一个提供UDP 数据报流的async_std::stream::Stream,因为它是一个更好的抽象。

我发现 tokio::net::UdpFramed 完全符合我的需要,但在当前版本的 tokio 中不可用。

一般来说,问题是如何将Futures 从给定的异步函数转换为Stream

【问题讨论】:

  • 它已移至tokio-util crate。
  • @edwardw 这似乎是一个答案。

标签: asynchronous rust rust-tokio


【解决方案1】:

对于单个项目,使用FutureExt::into_stream

use futures::prelude::*; // 0.3.1

fn outer() -> impl Stream<Item = i32> {
    inner().into_stream()
}

async fn inner() -> i32 {
    42
}

对于由闭包生成的多个期货的流,请使用stream::unfold

use futures::prelude::*; // 0.3.1

fn outer() -> impl Stream<Item = i32> {
    stream::unfold((), |()| async { Some((inner().await, ())) })
}

async fn inner() -> i32 {
    42
}

在你的情况下,你可以使用stream::unfold:

use async_std::{io, net::UdpSocket}; // 1.4.0, features = ["attributes"]
use futures::prelude::*; // 0.3.1

fn read_many(s: UdpSocket) -> impl Stream<Item = io::Result<Vec<u8>>> {
    stream::unfold(s, |s| {
        async {
            let data = read_one(&s).await;
            Some((data, s))
        }
    })
}

async fn read_one(s: &UdpSocket) -> io::Result<Vec<u8>> {
    let mut data = vec![0; 1024];
    let (len, _) = s.recv_from(&mut data).await?;
    data.truncate(len);
    Ok(data)
}

#[async_std::main]
async fn main() -> io::Result<()> {
    let s = UdpSocket::bind("0.0.0.0:9876").await?;

    read_many(s)
        .for_each(|d| {
            async {
                match d {
                    Ok(d) => match std::str::from_utf8(&d) {
                        Ok(s) => println!("{}", s),
                        Err(_) => println!("{:x?}", d),
                    },
                    Err(e) => eprintln!("Error: {}", e),
                }
            }
        })
        .await;

    Ok(())
}

【讨论】:

    【解决方案2】:

    一般来说,问题是如何将Futures 从给定的异步函数转换为Stream

    FutureExt::into_stream,但不要被这个名字骗了;它不适合您的情况。

    有一个实现异步recv_fromUdpSocket,此方法返回一个未来,但我需要一个提供UDP 数据报流的async_std::stream::Stream,因为它是一个更好的抽象。

    这里不一定是更好的抽象。

    具体来说,async-stdUdpSocket::recv_from 返回一个输出类型为(usize, SocketAddr) 的未来——接收到的数据的大小和对等地址。如果您要使用into_stream 将其转换为流,它只会为您提供,而不是接收到的数据。

    我发现 tokio::net::UdpFramed 完全符合我的需要,但在当前版本的 tokio 中不可用。

    它已移至tokio-util crate。不幸的是,您也不能(轻松)使用它。它需要一个tokio::net::UdpSocket,它与async_std::net::UdpSocket 不同。

    当然,您可以使用诸如futures::stream::poll_fnfutures::stream::unfold 之类的future 实用程序函数为UdpSocket::recv_from 赋予futures::stream::Stream 外观,但是您将如何处理它呢?如果你最终调用StreamExt::next 来轮询其中的一个值,你可以直接使用recv_from

    仅当您使用的某些 API 需要 Stream 输入时才需要访问 Stream,例如 rusoto

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-10-02
      • 2021-04-20
      • 2017-01-27
      • 2021-07-22
      • 2023-03-31
      • 1970-01-01
      相关资源
      最近更新 更多