【发布时间】:2022-01-22 07:30:41
【问题描述】:
在以下程序中,我使用 Tokio 的 mpsc 频道。发送者被移动到名为input_message 的任务中,接收者被移动到另一个名为printer 的任务中。这两个任务在主函数中都是tokio::spawn()-ed。 input_message 任务是读取用户的输入并通过 Channel 发送。频道上的printer 任务recv() 获取用户的输入并将其简单地打印到标准输出:
use std::error::Error;
use tokio::sync::mpsc;
use std::io::{BufRead, Write};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let (tx, mut rx) = mpsc::unbounded_channel::<String>();
let printer = tokio::spawn(async move {
loop {
let res = rx.recv().await; // (11) Comment this ..
// let res = rx.try_recv(); // (12) Uncomment this ,,
if let Some(m) = res { // .. and this
// if let Ok(m) = res { // ,, and this
if m.trim() == "q".to_string() {
break;
}
println!("Received: {}", m.trim());
}
}
println!("Printer exited");
});
let input_message = tokio::spawn(async move {
let stdin = std::io::stdin();
let mut bufr = std::io::BufReader::new(stdin);
let mut buf = String::new();
loop {
// Let the printer thread print the string before asking the user's input.
std::thread::sleep(std::time::Duration::from_millis(1));
print!("Enter input: ");
std::io::stdout().flush().unwrap();
bufr.read_line(&mut buf).unwrap();
if buf.trim() == "q".to_string() {
tx.send(buf).unwrap();
break;
}
tx.send(buf).unwrap();
buf = String::new();
}
println!("InputMessage exited");
});
tokio::join!(input_message, printer);
Ok(())
}
程序的预期行为是:
- 要求用户随机输入(q 退出)
- 将相同的输入打印到标准输出
使用rx.recv().await(如第 11-13 行),程序似乎缓冲了代表用户输入的字符串:printer 任务不接收各种输入,因此不会将字符串打印到标准输出。一旦退出消息(即 q)被发送,input_message 任务退出并且消息似乎被冲出通道并且接收者一次处理它们,因此printer 任务一次打印所有输入.这是错误输出的示例:
Enter input: Hello
Enter input: World
Enter input: q
InputMessage exited
Received: Hello
Received: World
Printer exited
我的问题是,通道如何缓冲消息并仅在发送线程退出时一次性处理它们,而不是在发送时接收它们?
我尝试使用第 12-14 行中的 try_recv() 函数,它确实解决了问题。输出正确打印,这里是一个例子:
Enter input: Hello
Received: Hello
Enter input: World
Received: World
Enter input: q
InputMessage exited
Printer exited
鉴于此,我感到困惑。我知道recv().await 和try_recv() 函数之间的区别,但我认为在这种情况下我忽略了更多的东西,这使得后者工作而前者不起作用。有没有人能够阐明并详细说明这一点?为什么try_recv() 有效而recv().await 无效,为什么recv().await 在这种情况下无效?就效率而言,循环使用try_recv() 是坏的还是“坏的做法”?
【问题讨论】:
标签: multithreading asynchronous rust rust-tokio