【问题标题】:Rust Tokio mpsc::channel unexpected behavior for multi-task programRust Tokio mpsc::channel 多任务程序的意外行为
【发布时间】:2022-01-22 07:30:41
【问题描述】:

在以下程序中,我使用 Tokio 的 mpsc 频道。发送者被移动到名为input_message 的任务中,接收者被移动到另一个名为printer 的任务中。这两个任务在主函数中都是tokio::spawn()-ed。 input_message 任务是读取用户的输入并通过 Channel 发送。频道上的printer 任务recv() 获取用户的输入并将其简单地打印到标准输出:

use std::error::Error;
use tokio::sync::mpsc;
use std::io::{BufRead, Write};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let (tx, mut rx) = mpsc::unbounded_channel::<String>();

    let printer = tokio::spawn(async move {
        loop {
            let res = rx.recv().await;          // (11) Comment this ..
            // let res = rx.try_recv();         // (12) Uncomment this ,,
            if let Some(m) = res {              // .. and this
            // if let Ok(m) = res {             // ,, and this
                if m.trim() == "q".to_string() {
                    break;
                } 
                println!("Received: {}", m.trim());
            }
        }
        println!("Printer exited");
    });

    let input_message = tokio::spawn(async move {
        let stdin = std::io::stdin();
        let mut bufr = std::io::BufReader::new(stdin);
        let mut buf = String::new();
        loop {
            // Let the printer thread print the string before asking the user's input.
            std::thread::sleep(std::time::Duration::from_millis(1));
            print!("Enter input: ");
            std::io::stdout().flush().unwrap();
            bufr.read_line(&mut buf).unwrap();

            if buf.trim() == "q".to_string() {
                tx.send(buf).unwrap();
                break;
            }
            tx.send(buf).unwrap();

            buf = String::new();
        }
        println!("InputMessage exited");
    });   

    tokio::join!(input_message, printer);
    Ok(())
}

程序的预期行为是:

  1. 要求用户随机输入(q 退出)
  2. 将相同的输入打印到标准输出

使用rx.recv().await(如第 11-13 行),程序似乎缓冲了代表用户输入的字符串:printer 任务不接收各种输入,因此不会将字符串打印到标准输出。一旦退出消息(即 q)被发送,input_message 任务退出并且消息似乎被冲出通道并且接收者一次处理它们,因此printer 任务一次打印所有输入.这是错误输出的示例:

Enter input: Hello
Enter input: World
Enter input: q
InputMessage exited
Received: Hello
Received: World
Printer exited 

我的问题是,通道如何缓冲消息并仅在发送线程退出时一次性处理它们,而不是在发送时接收它们?

我尝试使用第 12-14 行中的 try_recv() 函数,它确实解决了问题。输出正确打印,这里是一个例子:

Enter input: Hello
Received: Hello
Enter input: World
Received: World
Enter input: q
InputMessage exited
Printer exited 

鉴于此,我感到困惑。我知道recv().awaittry_recv() 函数之间的区别,但我认为在这种情况下我忽略了更多的东西,这使得后者工作而前者不起作用。有没有人能够阐明并详细说明这一点?为什么try_recv() 有效而recv().await 无效,为什么recv().await 在这种情况下无效?就效率而言,循环使用try_recv() 是坏的还是“坏的做法”?

【问题讨论】:

    标签: multithreading asynchronous rust rust-tokio


    【解决方案1】:

    这里有几点需要指出,但首先,您正在等待std::io::stdin() 上的行,这会阻塞线程,直到有行到达该流。在线程等待输入时,无法在此线程上执行其他未来,this blog post 是一个很好的资源,如果您想深入了解为什么不应该这样做。

    Tokio 的 io 模块为 stdin() 提供了一个异步句柄,您可以使用它作为快速修复,尽管 the documentation explicitly mentions 您应该为交互式用户输入启动一个专用(非异步)线程而不是使用异步句柄。

    std::io::stdin() 替换为tokio::io::stdin() 还需要将标准库BufReader 替换为包装R: AsyncRead 而不是R: Read 的tokio 实现。

    为防止输入任务和输出任务之间的交错写入,您可以使用响应器通道,该通道在输出已打印时向输入任务发出信号。您可以发送带有以下字段的Message,而不是通过频道发送String

    struct Message {
        payload: String,
        done_tx: oneshot::Sender<()>,
    }
    

    读取输入行后,通过通道将Message 发送到打印机任务。打印机任务打印String 并通过done_tx 发出信号,输入任务可以打印输入提示并等待换行。

    将所有这些与其他一些更改(例如等待消息的 while 循环)放在一起,您最终会得到这样的结果:

    use std::error::Error;
    use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
    use tokio::sync::{mpsc, oneshot};
    
    #[derive(Debug)]
    struct Message {
        done_tx: oneshot::Sender<()>,
        message: String,
    }
    
    #[tokio::main]
    async fn main() -> Result<(), Box<dyn Error>> {
        let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
    
        let printer = tokio::spawn(async move {
            while let Some(Message {
                message: m,
                done_tx,
            }) = rx.recv().await
            {
                if m.trim() == "q".to_string() {
                    break;
                }
                println!("Received: {}", m.trim());
                done_tx.send(()).unwrap();
            }
            println!("Printer exited");
        });
    
        let input_message = tokio::spawn(async move {
            let stdin = tokio::io::stdin();
            let mut stdout = tokio::io::stdout();
            let mut bufr = tokio::io::BufReader::new(stdin);
            let mut buf = String::new();
            loop {
                // Let the printer thread print the string before asking the user's input.
                stdout.write(b"Enter input: ").await.unwrap();
                stdout.flush().await.unwrap();
                bufr.read_line(&mut buf).await.unwrap();
    
                let end = buf.trim() == "q";
                let (done_tx, done) = oneshot::channel();
                let message = Message {
                    message: buf,
                    done_tx,
                };
                tx.send(message).unwrap();
                if end {
                    break;
                }
                done.await.unwrap();
    
                buf = String::new();
            }
            println!("InputMessage exited");
        });
    
        tokio::join!(input_message, printer);
        Ok(())
    }
    

    【讨论】:

    • 非常感谢您的帮助。需要明确的是,我报告的是重现我无法真正理解的行为的最小程序。在我正在进行的项目中,我没有使用std::io::stdin,而是使用termion async_stdin link。尽管阅读它不是您等待的未来。为什么recv().await 没有运行而async_stdin 没有阻塞?我不知道您是否可以在没有上下文的情况下回答这个问题,我不知道我是否能够在这个空间中提供。
    • 基本上我想要实现的是拥有一个专用于从 TcpConnection 读取的线程/任务和一个线程/任务读取交互式用户的输入。打印机线程/任务将根据从服务器接收到的内容或用户输入进行打印。我选择避免使用 tokio async stdio 的原因正是他们在文档中所说的警告,因为我需要交互式用户交互。他们的建议是启动一个线程,所以这意味着在 tokio 环境之外有一个本机线程(std::thread::spawn() vs tokio::spawn())对吗?
    • 最后一件事,我觉得我在帖子中找到的解决方案(使用try_recv())是虚假的。我觉得这是做我想做的事情的一种不好的方式,您是否同意这一点,或者您认为这样的解决方案是否可以接受。如果不是,你能解释一下为什么吗?谢谢。
    猜你喜欢
    • 2021-11-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-02-15
    • 2023-01-19
    • 1970-01-01
    • 2017-09-30
    • 2023-01-10
    相关资源
    最近更新 更多