【问题标题】:Concurrent access to vector from multiple threads using a mutex lock使用互斥锁从多个线程并发访问向量
【发布时间】:2017-04-15 16:34:58
【问题描述】:

我正在使用 Tokio 库提供的示例,并尝试获取所有当前活动 TCP 连接的向量。最终,我希望能够通过循环访问每个活动连接并将消息写入套接字来向每个活动连接广播消息。

首先,我试图打印出一个线程中的当前连接数,同时接受另一个线程中的连接。

为此,我尝试使用共享向量。我还没有实现在断开连接时从向量中删除连接。

// A tiny async echo server with tokio-core
extern crate futures;
extern crate tokio_core;
extern crate tokio_io;

use futures::{Future, Stream};
use tokio_io::{io, AsyncRead};
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
use std::thread;
use std::sync::{Arc, Mutex};
use std::io::stdout;
use std::io::Write;

fn main() {
    // Create the event loop that will drive this server
    let mut core = Core::new().unwrap();
    let handle = core.handle();

    // Bind the server's socket
    let addr = "127.0.0.1:12345".parse().unwrap();
    let tcp = TcpListener::bind(&addr, &handle).unwrap();

    let mut connections = Arc::new((Mutex::new(Vec::new())));

    thread::spawn(move || {
        //Every 10 seconds print out the current number of connections
        let mut i;
        loop {              
          i = connections.lock().unwrap().len();
          println!("There are {} connections", i);
          stdout().flush();
          thread::sleep_ms(10000);
        }
    });



    // Iterate incoming connections
    let server = tcp.incoming().for_each(|(tcp, _)| {

        connections.lock().unwrap().push(tcp);
        // Split up the read and write halves
        let (reader, writer) = tcp.split();

        // Future of the copy
        let bytes_copied = io::copy(reader, writer);

        // ... after which we'll print what happened
        let handle_conn = bytes_copied.map(|(n, _, _)| {
            println!("wrote {} bytes", n)
        }).map_err(|err| {
            println!("IO error {:?}", err)
        });

        // Spawn the future as a concurrent task
        handle.spawn(handle_conn);

        Ok(())
    });

    // Spin up the server on the event loop
    core.run(server).unwrap();

}

目前无法构建并出现以下错误:

error[E0382]: capture of moved value: `connections`
  --> src/main.rs:36:42
   |
26 |     thread::spawn(move || {
   |                   ------- value moved (into closure) here
...
36 |     let server = tcp.incoming().for_each(|(tcp, _)| {
   |                                          ^^^^^^^^^^ value captured here after move
   |
   = note: move occurs because `connections` has type `std::sync::Arc<std::sync::Mutex<std::vec::Vec<tokio_core::net::TcpStream>>>`, which does not implement the `Copy` trait

error[E0382]: use of moved value: `tcp`
  --> src/main.rs:40:32
   |
38 |         connections.lock().unwrap().push(tcp);
   |                                          --- value moved here
39 |         // Split up the read and write halves
40 |         let (reader, writer) = tcp.split();
   |                                ^^^ value used here after move
   |
   = note: move occurs because `tcp` has type `tokio_core::net::TcpStream`, which does not implement the `Copy` trait

是否有可能在不编写任何不安全代码的情况下实现这一点?

【问题讨论】:

    标签: rust


    【解决方案1】:

    由于移动关闭,您会收到第一个错误:

    let mut connections = Arc::new((Mutex::new(Vec::new())));
    thread::spawn(move || {
        let mut i = connections.lock().unwrap().len();
        ....
    }
    

    这实际上移动了整个Arc,而您只想移动它的“一部分”(也就是说,以引用计数递增的方式移动它,并且两个线程都可以使用它)。

    为此,我们可以使用Arc::clone:

    let mut connections = Arc::new((Mutex::new(Vec::new())));
    let conn = connections.clone();
    thread::spawn(move || {
        let mut i = conn.lock().unwrap().len();
        ....
    }
    

    这样,克隆的Arcconn 被移动到闭包中,而原始的Arcconnections 则不存在,因此仍然可用。

    我不确定你对第二个错误做了什么,但为了简单地计算你不需要push整个事情的连接。

    【讨论】:

    • 非常感谢您的回答!如问题中所述,最终我希望能够拥有所有当前活动连接的向量,并能够遍历该向量并向它们中的每一个广播消息。是否可以将每个 tcp 连接推送到向量上?
    • 您可以尝试将连接包装在Arc&lt;Mutex&lt;_&gt;&gt; 中,就像对矢量所做的那样,但这限制了您可以使用它执行的操作(例如,您不能在.split 调用中移动它) .
    • 我意识到我错过了第一个线程的循环。也许现在它会更有意义。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-02-19
    • 1970-01-01
    相关资源
    最近更新 更多