【问题标题】:tokio Sender is not Sinktokio Sender 不是 Sink
【发布时间】:2023-02-18 17:55:27
【问题描述】:

我想知道如何 forward 一个频道到另一个频道,除了编写一个手动循环。想象tokio::mpsc::Sendertokio::broadcast::Sender都可能是impl Sink似乎很简单。令人惊讶的是,情况不仅如此。

例如,那里有tokio-stream,但没有tokio-sink。原因超出了我。仍然...也许我遗漏了某些东西或忽略了它。

让我们仔细检查以下代码:

use tokio::sync::broadcast;
use tokio_stream::wrappers::BroadcastStream;

#[tokio::main]
async fn main() {
        let (mut tx0, mut rx0) = broadcast::channel::<u32>(10);
        let (mut tx1, mut rx1) = broadcast::channel::<u32>(10);
        tokio::task::spawn(async move {
            BroadcastStream::new(rx0).forward(tx1).await;
        });
        tx0.send(1);
        println!("{:?}", rx1.recv().await);
}

它会导致错误:

the trait `futures::Sink<u32>` is not implemented for `tokio::sync::broadcast::Sender<u32>

应该如何正确完成?

【问题讨论】:

    标签: rust rust-tokio


    【解决方案1】:

    mpsc 频道有一个 Sink 实现,可作为 tokio_util::sync::PollSender 使用。目前广播频道没有Sink实现。

    目前,转发到广播频道的惯用方法是使用循环。

    【讨论】:

      【解决方案2】:

      自己实现转发很容易:

      use tokio::sync::broadcast::{self, error::RecvError};
      
      #[tokio::main]
      async fn main() {
          let (tx0, mut rx0) = broadcast::channel(10);
          let (tx1, mut rx1) = broadcast::channel(10);
          tokio::task::spawn(async move {
              loop {
                  match rx0.recv().await {
                      Ok(v) => _ = tx1.send(v),
                      Err(RecvError::Closed) => break,
                      Err(RecvError::Lagged(_)) => continue,
                  }
              }
          });
          tx0.send(1_u32).unwrap();
          println!("{:?}", rx1.recv().await);
      }
      

      【讨论】:

      • 问题是除了写一个手动循环.
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-12-31
      • 1970-01-01
      • 2023-01-10
      相关资源
      最近更新 更多