【问题标题】:How to find if tokio::sync::mpsc::Receiver has been closed?如何查找 tokio::sync::mpsc::Receiver 是否已关闭?
【发布时间】:2020-09-28 10:13:31
【问题描述】:

我有一个循环,我在其中做一些工作并使用Sender 发送结果。这项工作需要时间,如果失败,我需要重试。可能在我重试时,接收器已关闭,我的重试将浪费时间。因此,我需要一种方法来检查 Receiver 是否可用而不发送消息。

在理想情况下,我希望我的代码在伪代码中看起来像这样:

let (tx, rx) = tokio::sync::mpsc::channel(1);

tokio::spawn(async move {
   // do som stuff with rx and drop it after some time
    rx.recv(...).await;
});

let mut attempts = 0;
loop {
    if tx.is_closed() {
       break;
    }
    if let Ok(result) = do_work().await {
        attempts = 0;
        let _ = tx.send(result).await;
    } else {
        if attempts >= 10 {
            break;
        } else {
            attempts += 1;
            continue;
        }
    }
};

问题是Sender 没有is_closed 方法。它确实有pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>>,但我不知道Context 是什么或在哪里可以找到它。

当我没有要发送的值时,如何检查发件人是否能够发送?

【问题讨论】:

    标签: rust future rust-tokio


    【解决方案1】:

    Sender 有一个try_send 方法:

    尝试立即在此发件人上发送消息

    此方法与发送不同,如果通道的缓冲区已满或没有接收器正在等待获取某些数据,则立即返回。与send相比,这个函数有两种失败情况,而不是一种(一种断开连接,一种缓存满)。

    使用它代替send 并检查错误:

    if let Err(TrySendError::Closed(_)) = tx.send(result).await {
        break;
    }
    

    使用poll_fn from futures crate 可以做你想做的事。它使返回Poll 的函数适应返回Future

    use futures::future::poll_fn; // 0.3.5
    use std::future::Future;
    use tokio::sync::mpsc::{channel, error::ClosedError, Sender}; // 0.2.22
    use tokio::time::delay_for; // 0.2.22
    
    fn wait_until_ready<'a, T>(
        sender: &'a mut Sender<T>,
    ) -> impl Future<Output = Result<(), ClosedError>> + 'a {
        poll_fn(move |cx| sender.poll_ready(cx))
    }
    
    #[tokio::main]
    async fn main() {
        let (mut tx, mut rx) = channel::<i32>(1);
    
        tokio::spawn(async move {
            // Receive one value and close the channel;
            let val = rx.recv().await;
            println!("{:?}", val);
        });
    
        wait_until_ready(&mut tx).await.unwrap();
        tx.send(123).await.unwrap();
    
        wait_until_ready(&mut tx).await.unwrap();
        delay_for(std::time::Duration::from_secs(1)).await;
        tx.send(456).await.unwrap(); // 456 likely never printed out,
                                     // despite having a positive readiness response
                                     // and the send "succeeding"
    }
    

    但是请注意,在一般情况下,这很容易受到TOCTOU 的影响。尽管Senderpoll_ready 在通道中保留了一个槽供以后使用,但接收端有可能在就绪检查和实际发送之间关闭。我试图在代码中指出这一点。

    【讨论】:

    • 但我没有要发送的值。那就是问题所在。我需要知道值是否仍然需要获取和发送之前。如果我有一个值,我不会对旧的常规发送方法有任何问题。
    • 我已经用poll_ready 的可能解决方案更新了我的答案。
    【解决方案2】:

    发送一个接收者忽略的空消息。它可以是任何东西。例如,如果您现在发送T,您可以将其更改为Option&lt;T&gt;,并让接收者忽略Nones。

    是的,这行得通,虽然我不太喜欢这种方法,因为我需要更改通信格式。

    我不会纠结于通讯格式。这不是一个定义良好的网络协议,应该与实现细节隔离开来;它是您自己的两段代码之间的内部通信机制。

    【讨论】:

    • 是的,这行得通,虽然我不太喜欢这种方法,因为我需要更改通信格式。
    猜你喜欢
    • 2012-01-29
    • 1970-01-01
    • 2021-07-09
    • 2023-01-10
    • 1970-01-01
    • 2014-10-23
    • 1970-01-01
    • 1970-01-01
    • 2015-10-23
    相关资源
    最近更新 更多