【问题标题】:Force non blocking read with TcpStream使用 TcpStream 强制非阻塞读取
【发布时间】:2015-02-13 15:54:37
【问题描述】:

我有一个线程,它维护着一个套接字列表,我想遍历这个列表,看看是否有什么要读的,如果有的话 - 采取行动,如果没有 - 继续下一个。问题是,一旦我遇到第一个节点,所有执行都会停止,直到读取完成为止。

我正在使用std::io::Read::read(&mut self, buf: &mut [u8]) -> Result<usize>

来自doc

这个函数不提供任何关于它是否阻塞等待数据的保证,但是如果一个对象需要阻塞读取但不能,它通常会通过一个 Err 返回值来发出信号。

深挖源码,TcpStream Read实现是

impl Read for TcpStream {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
}

调用

pub fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
    let fd = self.fd();
    let dolock = || self.lock_nonblocking();
    let doread = |nb| unsafe {
        let flags = if nb {c::MSG_DONTWAIT} else {0};
        libc::recv(fd,
                   buf.as_mut_ptr() as *mut libc::c_void,
                   buf.len() as wrlen,
                   flags) as libc::c_int
    };
    read(fd, self.read_deadline, dolock, doread)
}

最后,致电read&lt;T, L, R&gt;(fd: sock_t, deadline: u64, mut lock: L, mut read: R)

在检索到数据或发生错误之前,我可以看到非阻塞读取的循环。

有没有办法通过TcpStream 强制进行非阻塞读取?

【问题讨论】:

  • 为什么不为每个套接字启动一个线程?
  • @ker 这个实现的估计连接时间为 5-30 分钟,应该能够处理大约 200k 的并发连接。我假设线程数量是一件坏事,但目前我正在尝试找出一种计算方法,因为这将是 B 计划
  • 您可能想看看 AsyncIO 库,例如 github.com/carllerche/mio
  • 看起来像一个可靠的库,但我不需要流上的所有 io 都是异步的。我想我会在这个项目中为这个特定目的实现一个单独的特征。

标签: sockets tcp rust


【解决方案1】:

更新答案

需要注意的是,从 Rust 1.9.0 开始,std::net::TcpStream 增加了功能:

fn set_nonblocking(&amp;self, nonblocking: bool) -&gt; Result&lt;()&gt;

原答案

不能完全用 TcpStream 得到它,并且不想为 IO 操作拉入一个单独的库,所以我决定在使用它之前将文件描述符设置为非阻塞,并执行一个系统调用来读取/写。虽然MIO 看起来不错,但绝对不是最安全的解决方案,但比实现一个新的 IO 库工作量少。

extern "system" {
    fn read(fd: c_int, buffer: *mut c_void, count: size_t) -> ssize_t;
}

pub fn new(user: User, stream: TcpStream) -> Socket {

    // First we need to setup the socket as Non-blocking on POSIX
    let fd = stream.as_raw_fd();
    unsafe {
        let ret_value = libc::fcntl(fd,
            libc::consts::os::posix01::F_SETFL,
            libc::consts::os::extra::O_NONBLOCK);

        // Ensure we didnt get an error code
        if ret_value < 0 {
            panic!("Unable to set fd as non-blocking")
        }
    }

    Socket {
        user: user,
        stream: stream
    }
}

pub fn read(&mut self) {
    let count = 512 as size_t;
    let mut buffer = [0u8; 512];
    let fd = self.stream.as_raw_fd();

    let mut num_read = 0 as ssize_t;
    unsafe {
        let buf_ptr = buffer.as_mut_ptr();
        let void_buf_ptr: *mut c_void = mem::transmute(buf_ptr);
        num_read = read(fd, void_buf_ptr, count);
        if num_read > 0 {
            println!("Read: {}", num_read);
        }

        println!("test");
    }
}

【讨论】:

    猜你喜欢
    • 2023-03-09
    • 1970-01-01
    • 1970-01-01
    • 2014-11-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-05-05
    • 2014-04-24
    相关资源
    最近更新 更多