【问题标题】:Tokio future that reads from a channel and uses poll_fn and try_ready never completes从通道读取并使用 poll_fn 和 try_ready 的 Tokio 未来永远不会完成
【发布时间】:2019-02-26 04:01:25
【问题描述】:

我有一个永远不会完成的 Tokio 未来(rxReceiversock 是 tokio UdpSocket)。它基本上从数据包队列中读取数据包并通过套接字传输它们:

poll_fn(move || {
    match try_ready!(rx
        .poll()
        .map_err(|_e| tokio::io::Error::new(tokio::io::ErrorKind::Other, "Poll error")))
    {
        Some((packet, to)) => {
            println!(
                "Rx: Received {} bytes for {}: {:?}",
                packet.len(),
                to,
                packet.as_slice(),
            );
            try_ready!(sock.poll_send_to(packet.as_slice(), &to));
            println!("Sent");
        }
        None => println!("Rx end"),
    }
    Ok(futures::Async::Ready(()))
})
.map_err(|e: tokio::io::Error| println!("Error: {:?}", e))

它一直执行到poll_send_to 行(poll_send_to 之前的println! 执行,之后的println! 不执行)然后永远等待而不发送数据包。

我用下面的future替换了上面的future,以确保它不是套接字问题(之前我认为是片状通知有一些问题):

poll_fn(move || {
    let packet = vec![0;10];
    let to = SocketAddr::from_str("127.0.0.1:8001").expect("Parse error");
    try_ready!(sock.poll_send_to(packet.as_slice(), &to));
    Ok(futures::Async::Ready(()))
})
.map_err(|e: tokio::io::Error| println!("Error: {:?}", e))

这个future运行得很好——它按预期发送了数据包并退出了程序。

鉴于rx 可以成功poll 并打印println 消息,我认为问题不在于消息通道。鉴于第二个未来有效,我认为问题不在于套接字。我直接通过 Wireshark 观察数据包,所以我认为这也不是我观察的问题。

我对 Rust 和 Tokio 还很陌生,所以我可能忽略了一些基本事实(例如,不能在同一未来两次 try_ready,未来不会从之前中断的地方继续,等等)。

你能帮我弄清楚第一个未来的问题吗?

use futures::future::lazy;
use futures::stream::Stream;
use futures::try_ready;
use std::net::SocketAddr;
use std::str::FromStr;
use tokio;
use tokio::net::UdpSocket;
use tokio::prelude::future::poll_fn;
use tokio::prelude::Future;

fn main() {
    let mut sock = UdpSocket::bind(&SocketAddr::from_str("127.0.0.1:8000").expect("Parse error"))
        .expect("Bind error");

    let (mut tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, SocketAddr)>(2000);

    tokio::run(lazy(move || {
        //----------------- This future works ----------------//
        // tokio::spawn(
        //     poll_fn(move || {
        //         let packet = vec![70; 10];
        //         let to = SocketAddr::from_str("127.0.0.1:8001").expect("Parse error");
        //         try_ready!(sock.poll_send_to(packet.as_slice(), &to));
        //         Ok(futures::Async::Ready(()))
        //     })
        //     .map_err(|e: tokio::io::Error| println!("Error: {:?}", e)),
        // );

        //----------------- This future doesn't ----------------//
        tokio::spawn(
            poll_fn(move || {
                match try_ready!(rx
                    .poll()
                    .map_err(|_e| tokio::io::Error::new(tokio::io::ErrorKind::Other, "Poll error")))
                {
                    Some((packet, to)) => {
                        // This is printed
                        println!(
                            "Rx: Received {} bytes for {}: {:?}",
                            packet.len(),
                            to,
                            packet.as_slice(),
                        );
                        try_ready!(sock.poll_send_to(packet.as_slice(), &to));
                        // This is never printed
                        println!("Sent");
                    }
                    None => println!("Rx end"),
                }
                Ok(futures::Async::Ready(()))
            })
            .map_err(|e: tokio::io::Error| println!("Error: {:?}", e)),
        );

        //----------------- This future queues a packet ----------------//
        tokio::spawn(
            poll_fn(move || {
                try_ready!(tx.poll_ready());
                tx.try_send((
                    vec![70; 10],
                    SocketAddr::from_str("127.0.0.1:8001").expect("Parse error"),
                ))
                .expect("Send error");
                // Wait permanently so message channel doesn't get disconnected
                // Achieved differently in production
                Ok(futures::Async::NotReady)
            })
            .map_err(|e: tokio::sync::mpsc::error::SendError| println!("Error: {:?}", e)),
        );

        Ok(())
    }));
}

Repo

【问题讨论】:

  • “例如不能try_ready 在同一个未来出现两次” - 该宏只是扩展为一个匹配表达式,它返回 PendingErr 和否则解包结果,所以调用两次没有问题。

标签: rust future rust-tokio


【解决方案1】:

使用这个版本的你的未来会显示问题:

tokio::spawn(
    future::poll_fn(move || {
        eprintln!("Starting poll_fn");

        let from_channel = rx
            .poll()
            .map_err(|_e| tokio::io::Error::new(tokio::io::ErrorKind::Other, "Poll error"));

        if let Some((packet, to)) = futures::try_ready!(dbg!(from_channel)) {
            futures::try_ready!(dbg!(sock.poll_send_to(packet.as_slice(), &to)));
        }
        Ok(futures::Async::Ready(()))
    })
    .map_err(|e: tokio::io::Error| println!("Error: {:?}", e)),
);

这是稍微清理过的输出:

Starting poll_fn
[src/main.rs:21] from_channel = Ok(NotReady)

Starting poll_fn
[src/main.rs:21] from_channel = Ok(Ready(Some(/* ... */)))
[src/main.rs:22] sock.poll_send_to(packet.as_slice(), &to) = Ok(NotReady)

Starting poll_fn
[src/main.rs:21] from_channel = Ok(NotReady)

言辞:

  1. 未来开始。
  2. 频道中没有准备好任何东西;频道注册通知。
  3. 未来回归。
  4. 通道获取值并通知任务。
  5. 未来重新开始。
  6. 通道中已准备好一个值。
  7. 在套接字上发送尚未准备好;套接字注册一个通知。
  8. 未来回归。
  9. 套接字被清除并通知任务。
  10. 未来重新开始。
  11. 频道没有准备好;频道注册通知。
  12. 未来回归。
  13. 不会向频道添加任何其他内容。

简而言之,您没有正确维护您未来的状态机。您需要知道未来最后一次运行时您走了多远,并在下次运行时从该点开始。

async / await 语法备受期待是有原因的:它会为您编写这些状态机。

我不知道为什么您选择使用较低级别的基于poll 的界面。我会使用更高级别的Future-based:

tokio::spawn({
    rx.fold(sock, |sock, (packet, to)| {
        sock.send_dgram(packet, &to)
            .inspect(|_| println!("Sent it!"))
            .map(|(sock, _)| sock)
            .map_err(|e| panic!("Error: {:?}", e))
    })
    .map(drop)
    .map_err(|e| panic!("Error: {:?}", e))
});

基于Future 的接口 [...] 出错时会破坏套接字(和缓冲区)

这是使用基于poll 的接口的一个很好的理由,但我仍然会深入研究它足够长的时间来实现您自己的未来。像这样的:

struct X(UdpSocket);
struct XSendGram<D> {
    sock: Option<UdpSocket>,
    data: D,
    addr: SocketAddr,
}

impl X {
    fn send_dgram<D>(self, data: D, addr: SocketAddr) -> XSendGram<D> {
        XSendGram {
            sock: Some(self.0),
            data,
            addr,
        }
    }
}

impl<D> Future for XSendGram<D>
where
    D: AsRef<[u8]>,
{
    type Item = (X, usize);
    type Error = (X, std::io::Error);

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        let mut sock = self.sock.take().expect("Future called after success or failure");

        match sock.poll_send_to(self.data.as_ref(), &self.addr) {
            Ok(Async::Ready(bytes)) => Ok(Async::Ready((X(sock), bytes))),
            Ok(Async::NotReady) => {
                self.sock = Some(sock); // Restore it for the next call
                Ok(Async::NotReady)
            }
            Err(e) => Err((X(sock), e)),
        }
    }
}
tokio::spawn({
    rx.fold(X(sock), |sock, (packet, to)| {
        sock.send_dgram(packet, to)
            .inspect(|(_, n)| println!("Sent {} bytes", n))
            .then(|r| match r {
                Ok((sock, _)) | Err((sock, _)) => future::ok(sock),
            })
    })
    .map(drop)
    .map_err(|e| panic!("Error: {:?}", e))
});

【讨论】:

  • 完美,谢谢!确实是状态机才是问题所在。我假设 async-await 之类的语义已经存在(基本上我的意思是“未来不会从之前中断的地方恢复”),而不是每次都从头开始执行轮询。至于基于poll 的接口,我只是从tokio(github.com/tokio-rs/tokio/blob/master/examples/echo-udp.rs) 中挑选了echo-udp.rs 示例开始使用,因为它看起来像一个标准的UDP 服务器。将开始切换到基于Future 的。
  • 所以,我查看了基于Future 的接口,它在出错时破坏了套接字(和缓冲区)。没有更好的选择吗?例如。可以使用基于poll 的接口而不破坏套接字来处理的错误?
猜你喜欢
  • 2013-12-14
  • 2020-12-22
  • 1970-01-01
  • 2021-12-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多