【问题标题】:tokio::timeout on Stream always occurs流上的 tokio::timeout 总是发生
【发布时间】:2020-02-15 20:45:59
【问题描述】:

我正在尝试接受 UDP 消息,但只有当它发生在 5 秒内时,我才能通过手动实现 Stream 和使用期货库中的组合器构建一个 Stream 抽象。无论哪种方式,在recv_from 未来解决后,持续时间将到期并且流将返回Err(Elapsed(()))。这不是预期的行为,如果返回值,则不应返回任何错误。

预期的行为是流将解决超时或 Vec,但不是一个,然后在 5 秒后解决另一个。

use futures::{pin_mut, ready, stream::unfold, FutureExt};
use tokio::{
    net::{udp, UdpSocket},
    stream::{Stream, StreamExt},
    time::{self, Duration},
};

use std::{
    io,
    net::SocketAddr,
    pin::Pin,
    task::{Context, Poll},
};

#[derive(Debug)]
pub(crate) struct UdpStream {
    stream: udp::RecvHalf,
}

impl UdpStream {
    fn new(stream: udp::RecvHalf) -> Self {
        Self { stream }
    }

    fn stream(self) -> impl Stream<Item = io::Result<(Vec<u8>, SocketAddr)>> {
        unfold(self.stream, |mut stream| async move {
            let mut buf = [0; 4096];
            match time::timeout(Duration::from_secs(5), stream.recv_from(&mut buf)).await {
                Ok(Ok((len, src))) => {
                    Some((Ok((buf.iter().take(len).cloned().collect(), src)), stream))
                }
                e => {
                    println!("{:?}", e);
                    None
                }
            }
        })
    }
}

impl Stream for UdpStream {
    type Item = io::Result<(Vec<u8>, SocketAddr)>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let socket = &mut self.stream;
        pin_mut!(socket);
        let mut buf = [0u8; 4096];
        let (len, src) = ready!(Box::pin(socket.recv_from(&mut buf)).poll_unpin(cx))?;
        Poll::Ready(Some(Ok((buf.iter().take(len).cloned().collect(), src))))
    }
}

async fn listen_udp(addr: SocketAddr) -> io::Result<()> {
    let udp = UdpSocket::bind(addr).await?;
    let (mut udp_recv, mut udp_send) = udp.split();

    let mut msg_stream = Box::pin(UdpStream::new(udp_recv).stream());
    // use the manually implemented stream with this:
    // let mut msg_stream = UdpStream::new(udp_recv).timeout(Duration::from_secs(5));

    while let Some(msg) = msg_stream.next().await {
        match msg {
            Ok((buf, src)) => {
                udp_send.send_to(&buf, &src).await?;
                println!("Message recv: {:?}", buf);
            }
            Err(e) => {
                eprintln!("timed out: {:?}", e);
            }
        }
    }
    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
    listen_udp("127.0.0.1:9953".parse()?).await?;
    Ok(())
}

您可以尝试运行此代码并使用 echo "foo" | nc 127.0.0.1 9953 -udig 发出 udp 请求

cargo.toml

[package]
name = "udp_test"
version = "0.1.0"
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "0.2", features = ["full"] }
futures = "0.3"

【问题讨论】:

  • 抱歉,我认为这很明显,因为我使用的是 async/await。东京 0.2 和期货 0.3。添加了 cargo.toml
  • 显式优于隐式,这些板条箱仍然不稳定,因此为未来的读者提供更好的显式版本
  • 它们不再不稳定,但谢谢,是的,我下次会包括它们。
  • 版本 1 下的一切都相当不稳定,这就是 semver 的意义所在。

标签: rust stream rust-tokio


【解决方案1】:

在您的流返回第一个(也是唯一的)元素后,它会返回等待下一个;在超时之前它永远不会结束。

这里基本上不需要流抽象。包裹在超时中的future 可以:

use std::{io, net::SocketAddr};
use tokio::{
    net::UdpSocket,
    time::{self, Duration},
};

async fn listen_udp(addr: SocketAddr) -> io::Result<()> {
    let mut udp = UdpSocket::bind(addr).await?;
    let mut buf = [0; 4096];

    match time::timeout(Duration::from_secs(5), udp.recv_from(&mut buf)).await? {
        Ok((count, src)) => {
            udp.send_to(&buf[..count], &src).await?;
            println!("Message recv: {:?}", &buf[..count]);
        }
        Err(e) => {
            eprintln!("timed out: {:?}", e);
        }
    }
    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    listen_udp("127.0.0.1:9953".parse()?).await?;
    Ok(())
}

【讨论】:

  • 感谢您的回复。这只会监听一个响应然后退出。我认为流在这里很有用,因为我想不断地收听,在实际应用程序中,我不只是分配给 Vec,我正在做一些额外的解码并创建一个值流。我正在寻找的所需输出是,在 recv_from 开始后,如果它无法在 5 秒内读取任何内容,则取消 recv 未来并返回超时。
  • 抱歉,不是分配给 Vec,而是“填充缓冲区”*
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2010-09-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-08-30
相关资源
最近更新 更多