【问题标题】:Is there a good way to allow the clients of a server to access other clients directly without using a Mutex?有没有一种好方法可以让服务器的客户端直接访问其他客户端而不使用互斥锁?
【发布时间】:2021-08-04 19:28:13
【问题描述】:

我正在开发一个板条箱,其中有 ServerClient 结构,它们是用于发送和接收数据的包装器。我正在尝试为服务器找到一种方法来存储已连接客户端的列表,并让客户端处理程序闭包可以访问所述列表,但可以从单独的线程中访问。我想尽量不要使用Mutex,因为这可能会成为性能瓶颈。

我尝试过使用 crossbeam 的 SegQueue,但它只有 push()pop(),我需要能够像使用矢量一样访问和删除所有客户端。

这是服务器代码:

use std::net::TcpListener;
use std::net::TcpStream;
use std::thread;
use std::time::Duration;
use rayon::ThreadPoolBuilder;

pub enum ReadEvent {
    Disconnected,
    ReceivedError(String),
    InvalidPacket,
}

pub enum SendEvent {
    NotConnected,
}

pub enum ConnectionError {
    FailedToConnect(String),
}

pub struct Client {
    address: Option<String>,
    stream: Option<TcpStream>,
    pub(crate) disconnected: bool,
}

impl Client {

    /// Will connect to an IP and Port and return a new client object
    /// Clients only work with Notitia servers so this method can not be accessed by users
    pub(crate) fn new<S: Into<String>>(ip: S, port: S) -> Self {
        let address = format!("{}:{}", ip.into(), port.into());

        let stream_result = TcpStream::connect(address.clone());
        if stream_result.is_err() {
            panic!("Client failed to connect to address {}", address);
        }

        Self {
            address: Some(address),
            stream: Some(stream_result.unwrap()),
            disconnected: false
        }
    }

    /// Creates a client from an existing connection
    /// /// Clients only work with Notitia servers so this method can not be accessed by users
    pub(crate) fn from<S: Into<String>>(address: S, stream: TcpStream) -> Self {
        Self {
            address: Some(address.into()),
            stream: Some(stream),
            disconnected: false
        }
    }

    pub(crate) fn connect<S: Into<String>>(&mut self, ip: S, port: S) -> Result<(), ConnectionError> {
        self.disconnect();
        self.disconnected = false;
        let address = format!("{}:{}", ip.into(), port.into());
        self.address = Some(address.clone());
        let stream = TcpStream::connect(address);
        if stream.is_err() {
            return Err(ConnectionError::FailedToConnect(stream.unwrap_err().to_string()));
        }
        self.stream = Some(stream.unwrap());
        Ok(())
    }

    pub fn disconnect(&mut self) {
        self.stream = None;
        self.address = None;
        self.disconnected = true;
    }

    fn try_get_stream(&mut self) -> Option<&TcpStream> {
        self.disconnect(); // ensure the Client object is disconnected
        let stream = self.stream.as_ref().unwrap();
        Some(stream)
    }

    /// Sends data to the stream
    pub fn send<S: Into<String>>(&mut self, data: S) -> Result<(), SendEvent> {
        let stream = self.try_get_stream();
        if stream.is_none() {
            return Err(SendEvent::NotConnected);
        }
        // Writes to the stream here
        Ok(())
    }

    /// Sends an error to the client and if it should disconnect
    pub fn send_error<S: Into<String>>(&mut self, error: S, disconnect: bool) -> Result<(), SendEvent> {
        let stream = self.try_get_stream();
        if stream.is_none() {
            return Err(SendEvent::NotConnected);
        }
        // writes an error to the stream
        Ok(())
    }

    /// Waits until a message is received from the stream
    pub fn read(&mut self) -> Result<String, ReadEvent> {
        let stream = self.try_get_stream();
        if stream.is_none() {
            return Err(ReadEvent::Disconnected);
        }
        // Reads data from the stream and processes it into a string output
        Ok(format!(""))
    }
}

impl Clone for Client {
    fn clone(&self) -> Self {
        let address_clone = if self.address.is_none() {
            None
        } else {
            let addr = self.address.as_ref().unwrap();
            Some(addr.clone())
        };

        let stream = if self.stream.is_none() {
            None
        } else {
            let s = self.stream.as_ref().unwrap();
            let c = s.try_clone();
            if c.is_err() {
                None
            } else {
                Some(c.unwrap())
            }
        };

        Self {
            address: address_clone,
            stream,
            disconnected: self.disconnected
        }
    }
}

pub enum ServerError {
    FailedToBindAddr(String),
}

pub struct Server {
    address: String,
    connected: Vec<Client> // <--- The vector in question
}

impl Server {

    pub fn new<IP: Into<String>, PORT: Into<String>>(ip: IP, port: PORT) -> Self {
        Self {
            address: format!("{}:{}", ip.into(), port.into()),
            connected: Vec::new(),
        }
    }
 
    /// Starts the server
    /// max_connections: If Some, will limit the amount of connections to the server at a time
    /// client_handle: A closure that takes a reference to a client and a reference to the vector of clients that are connected
    pub fn start<'a, F: 'a + Fn(&Client, &Vec<Client>) + Send + Sync>(& mut self, max_connections: Option<usize>, client_handle: F) -> Result<(), ServerError> {
        // bind the listener
        let listener_res = TcpListener::bind(self.address.clone());
        if listener_res.is_err() {
            return Err(ServerError::FailedToBindAddr(listener_res.unwrap_err().to_string()));
        }
        let listener = listener_res.unwrap();

        for mut s in listener.incoming() {
            if s.is_err() {
                continue;
            }
            let stream = s.unwrap();

            let sockaddr = stream.peer_addr();
            if sockaddr.is_err() {
                // Write an error to the client
                continue;
            }
            let ip = sockaddr.unwrap().ip().to_string();

            let mut client = Client::from(ip, stream);

            let connections = self.connected.len();

            if max_connections.is_some() && connections >= max_connections.unwrap() {
                client.send_error("Server is full!", true);
                continue;
            }

            self.connected.push(client.clone());

            pool.install(|| { // Uses rayon for the threadpool
                client_handle(&client, &self.connected); // The issue
                client.disconnect();
                // TODO Remove client from self.connected
            });
        }

        Ok(())
    }

}

fn main() {
    thread::spawn(move || { // start the client after 5 seconds to give the server time to start up
        thread::sleep(Duration::from_millis(5000));
        let mut client = Client::new("localhost", "2277");
    });
    let mut server = Server::new("0.0.0.0", "2277");
    server.start(Some(20), |client, connected| {
        println!("Connected!")
    });
}

我正在考虑将connected 制作成哈希图来存储连接ID。有没有一种好方法可以让客户端直接访问其他连接的客户端而不使用Mutex?如果没有,有没有更直接的方法?

编辑:更新的代码能够按要求运行。 (需要人造丝箱运行)

【问题讨论】:

  • 很难回答您的问题,因为它不包含minimal reproducible example。我们无法分辨代码中存在哪些 crate(及其版本)、类型、特征、字段等。如果您尝试在 Rust Playground 上重现您的错误,如果可能的话,这将使我们更容易为您提供帮助,否则在全新的 Cargo 项目中,然后在 edit 您的问题中包含附加信息。您可以使用Rust-specific MRE tips 来减少您在此处发布的原始代码。谢谢!
  • 值得注意的是,items like network, crate::client, write_event_error are missing。请按照上述提示删除或减少这些缺失的部分。
  • 好的!我会尝试在生锈的操场上搞一个!

标签: rust tcp


【解决方案1】:

对于任何偶然发现这一点的人,我在 rust 的 RWLock 中找到了一个解决方案,它单独锁定数据的读取和写入,使其比互斥锁更高效。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-11-01
    • 2016-03-10
    • 1970-01-01
    • 2011-02-02
    • 2017-06-08
    • 1970-01-01
    • 2022-07-28
    • 1970-01-01
    相关资源
    最近更新 更多