【发布时间】:2019-02-26 04:01:25
【问题描述】:
我有一个永远不会完成的 Tokio 未来(rx 是 Receiver 和 sock 是 tokio UdpSocket)。它基本上从数据包队列中读取数据包并通过套接字传输它们:
poll_fn(move || {
match try_ready!(rx
.poll()
.map_err(|_e| tokio::io::Error::new(tokio::io::ErrorKind::Other, "Poll error")))
{
Some((packet, to)) => {
println!(
"Rx: Received {} bytes for {}: {:?}",
packet.len(),
to,
packet.as_slice(),
);
try_ready!(sock.poll_send_to(packet.as_slice(), &to));
println!("Sent");
}
None => println!("Rx end"),
}
Ok(futures::Async::Ready(()))
})
.map_err(|e: tokio::io::Error| println!("Error: {:?}", e))
它一直执行到poll_send_to 行(poll_send_to 之前的println! 执行,之后的println! 不执行)然后永远等待而不发送数据包。
我用下面的future替换了上面的future,以确保它不是套接字问题(之前我认为是片状通知有一些问题):
poll_fn(move || {
let packet = vec![0;10];
let to = SocketAddr::from_str("127.0.0.1:8001").expect("Parse error");
try_ready!(sock.poll_send_to(packet.as_slice(), &to));
Ok(futures::Async::Ready(()))
})
.map_err(|e: tokio::io::Error| println!("Error: {:?}", e))
这个future运行得很好——它按预期发送了数据包并退出了程序。
鉴于rx 可以成功poll 并打印println 消息,我认为问题不在于消息通道。鉴于第二个未来有效,我认为问题不在于套接字。我直接通过 Wireshark 观察数据包,所以我认为这也不是我观察的问题。
我对 Rust 和 Tokio 还很陌生,所以我可能忽略了一些基本事实(例如,不能在同一未来两次 try_ready,未来不会从之前中断的地方继续,等等)。
你能帮我弄清楚第一个未来的问题吗?
use futures::future::lazy;
use futures::stream::Stream;
use futures::try_ready;
use std::net::SocketAddr;
use std::str::FromStr;
use tokio;
use tokio::net::UdpSocket;
use tokio::prelude::future::poll_fn;
use tokio::prelude::Future;
fn main() {
let mut sock = UdpSocket::bind(&SocketAddr::from_str("127.0.0.1:8000").expect("Parse error"))
.expect("Bind error");
let (mut tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, SocketAddr)>(2000);
tokio::run(lazy(move || {
//----------------- This future works ----------------//
// tokio::spawn(
// poll_fn(move || {
// let packet = vec![70; 10];
// let to = SocketAddr::from_str("127.0.0.1:8001").expect("Parse error");
// try_ready!(sock.poll_send_to(packet.as_slice(), &to));
// Ok(futures::Async::Ready(()))
// })
// .map_err(|e: tokio::io::Error| println!("Error: {:?}", e)),
// );
//----------------- This future doesn't ----------------//
tokio::spawn(
poll_fn(move || {
match try_ready!(rx
.poll()
.map_err(|_e| tokio::io::Error::new(tokio::io::ErrorKind::Other, "Poll error")))
{
Some((packet, to)) => {
// This is printed
println!(
"Rx: Received {} bytes for {}: {:?}",
packet.len(),
to,
packet.as_slice(),
);
try_ready!(sock.poll_send_to(packet.as_slice(), &to));
// This is never printed
println!("Sent");
}
None => println!("Rx end"),
}
Ok(futures::Async::Ready(()))
})
.map_err(|e: tokio::io::Error| println!("Error: {:?}", e)),
);
//----------------- This future queues a packet ----------------//
tokio::spawn(
poll_fn(move || {
try_ready!(tx.poll_ready());
tx.try_send((
vec![70; 10],
SocketAddr::from_str("127.0.0.1:8001").expect("Parse error"),
))
.expect("Send error");
// Wait permanently so message channel doesn't get disconnected
// Achieved differently in production
Ok(futures::Async::NotReady)
})
.map_err(|e: tokio::sync::mpsc::error::SendError| println!("Error: {:?}", e)),
);
Ok(())
}));
}
【问题讨论】:
-
“例如不能
try_ready在同一个未来出现两次” - 该宏只是扩展为一个匹配表达式,它返回Pending或Err和否则解包结果,所以调用两次没有问题。
标签: rust future rust-tokio