【问题标题】:How can I share or avoid sharing a websocket resource between two threads?如何在两个线程之间共享或避免共享 websocket 资源?
【发布时间】:2020-03-01 21:31:52
【问题描述】:

我正在使用 tungstenite 构建聊天服务器,而我想要这样做的方式依赖于让许多线程通过 mpsc 相互通信。我想为每个连接到服务器并将他们连接到 websocket 的用户启动一个新线程,并让该线程能够从 mpsc 读取,以便服务器可以通过该连接发送消息。

问题是 mpsc 读取阻塞了线程,但如果我想从中读取,我无法阻塞线程。我唯一能想到的解决方法是创建两个线程,一个用于入站消息,一个用于出站消息,但这需要我与两个工作人员共享我的 websocket 连接,我当然不能这样做。

这是我的代码的一个被严重截断的版本,我尝试在 match 语句的 Action::Connect 分支中创建两个工人,这给了 error[E0382]: use of moved value: 'websocket' 试图将其移动到第二个工人的闭包中:

extern crate tungstenite;
extern crate workerpool;

use std::net::{TcpListener, TcpStream};
use std::sync::mpsc::{self, Sender, Receiver};
use workerpool::Pool;
use workerpool::thunk::{Thunk, ThunkWorker};
use tungstenite::server::accept;

pub enum Action {
  Connect(TcpStream),
  Send(String),
}

fn main() {
  let (main_send, main_receive): (Sender<Action>, Receiver<Action>) = mpsc::channel();
  let worker_pool = Pool::<ThunkWorker<()>>::new(8);
  {
    // spawn thread to listen for users connecting to the server
    let main_send = main_send.clone();
    worker_pool.execute(Thunk::of(move || {
      let listener = TcpListener::bind(format!("127.0.0.1:{}", 8080)).unwrap();
      for (_, stream) in listener.incoming().enumerate() {
        main_send.send(Action::Connect(stream.unwrap())).unwrap();
      }
    }));
  }

  let mut users: Vec<Sender<String>> = Vec::new();

  // process actions from children
  while let Some(act) = main_receive.recv().ok() {
    match act {
      Action::Connect(stream) => {
        let mut websocket = accept(stream).unwrap();
        let (user_send, user_receive): (Sender<String>, Receiver<String>) = mpsc::channel();
        let main_send = main_send.clone();

        // thread to read user input and propagate it to the server
        worker_pool.execute(Thunk::of(move || {
          loop {
            let message = websocket.read_message().unwrap().to_string();
            main_send.send(Action::Send(message)).unwrap();
          }
        }));

        // thread to take server output and propagate it to the server
        worker_pool.execute(Thunk::of(move || {
          while let Some(message) = user_receive.recv().ok() {
            websocket.write_message(tungstenite::Message::Text(message.clone())).unwrap();
          }
        }));
        users.push(user_send);
      }
      Action::Send(message) => {
        // take user message and echo to all users
        for user in &users {
          user.send(message.clone()).unwrap();
        }
      }
    }
  }
}

如果我只为该臂中的输入和输出创建一个线程,则 user_receive.recv() 会阻塞该线程,因此我无法使用 websocket.read_message() 读取任何消息,直到我从主服务器收到一条 mpsc 消息线。我怎样才能解决这两个问题?我考虑克隆 websocket,但它没有实现 Clone,而且我不知道是否只是用同一个流建立一个新连接是否是一个合理的尝试,这似乎很 hacky。

【问题讨论】:

    标签: multithreading websocket rust


    【解决方案1】:

    问题是mpsc读取阻塞了线程

    您可以使用try_recv 来避免线程阻塞。 mpsc 的另一个实现是crossbeam_channel。即使是 mpsc 的作者,该项目也是 recommended 的替代品

    我想为每个连接到服务器的用户启动一个新线程

    我认为从大多数潜在客户来看,asyn/await 方法会比thread per client 方法好得多。你可以阅读更多关于它there

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2010-12-18
      • 1970-01-01
      • 2012-11-21
      • 2012-04-07
      • 1970-01-01
      • 2011-03-10
      • 1970-01-01
      相关资源
      最近更新 更多