【问题标题】:Share Arc between closures在闭包之间共享弧
【发布时间】:2019-04-02 09:16:10
【问题描述】:

我正在尝试编写一个简单的 tcp 服务器来读取和广播消息。
我正在使用 Tokio,但我认为这更像是一个通用的 Rust 问题。

我有一个共享状态的 Arc:
let state = Arc::new(Mutex::new(Shared::new(server_tx)));

稍后我想生成 2 个线程,它们将使用对该状态的引用:

let server = listener.incoming().for_each(move |socket| {
    // error[E0382]: capture of moved value: `state`
    process(socket, state.clone());
    Ok(())
}).map_err(|err| {
    println!("accept error = {:?}", err);
});

let receive_sensor_messages = sensors_rx.for_each(move |line| {
    println!("Received sensor message, broadcasting: {:?}", line);

    // error[E0597]: borrowed value does not live long enough
    // error[E0507]: cannot move out of borrowed content 
    for (_, tx) in state.clone().lock().unwrap().clients {
        tx.unbounded_send(line.clone()).unwrap();
    }
    Ok(())
}).map_err(|err| {
    println!("line reading error = {:?}", err);
});

(playground)

据我所知,它试图告诉我的是state 在第一个闭包listener.incoming().for_each(move |socket| { 中被借用,所以当我尝试在sensors_rx.for_each(move |line| { 中再次这样做时,它说这是不可能的。

我的问题是如何解决? Arc 不是应该解决线程间共享变量的问题吗? 我尝试了clone 的不同组合(在闭包之外进行克隆,然后在内部再次进行clone),但没有一个有效。

干杯!

【问题讨论】:

  • 你能尝试提供一个 MCVE 而不是一个成熟的程序吗?
  • “无法移出借用内容”的发生是因为 for 循环消耗了它的参数(例如,参见 Looping through a RefCell wrapped Vec with Rust)。将该行更改为 for (_, tx) in &state.lock().unwrap().clients 会将您的问题减少到 Arc 问题。

标签: rust


【解决方案1】:

基本上,您的问题可以归结为to the following MCVE:

use std::sync::{Arc, Mutex};

struct Bar;

fn foo(_ : &Bar){
    println!("foo called");
}

fn main(){
    let example = Arc::new(Mutex::new(Bar));
    std::thread::spawn(move ||{
        let _ = example.clone();
    });
    // --- (1) ---

    std::thread::spawn(move ||{
        foo(&example.clone().lock().unwrap());
    });
}

现在,这里的第一个问题是example 被移动了。也就是说,只要我们越过(1),就会认为原来的example 被移出。相反,我们需要首先 clone然后 move

    let example = Arc::new(Mutex::new(Bar));
    let local_state = example.clone();
    std::thread::spawn(move ||{
        let _ = local_state; // now fine!
    });

另一个错误源于短暂的Arc。从本质上讲,它在底层Mutex 上对我们lock 的寿命只有足够长的时间。虽然 我们 知道至少有另一个 Arc 指向内存,但编译器无法证明这一点。但是,如果我们去掉 clone() 就可以了:

    let local_state = example.clone();        
    std::thread::spawn(move ||{
        foo(&local_state.lock().unwrap());
    });

但是,您还可以通过使用其内容(clients)来循环遍历您的容器。相反,请在此处使用 &,例如&local_state().unwrap().clients)。

您可以在or on the playground下面找到完整的固定代码:

use std::sync::{Arc, Mutex};

struct Bar;

fn foo(_ : &Bar){
    println!("foo called");
}

fn main(){
    let example = Arc::new(Mutex::new(Bar));
    let local_state = example.clone();
    std::thread::spawn(move ||{
        let _ = local_state;
    });
    let local_state = example.clone();
    std::thread::spawn(move ||{
        foo(&local_state.lock().unwrap());
    }).join();
}

【讨论】:

    【解决方案2】:

    对于每个闭包,您必须提供自己的Arc,因此您必须事先将clone 提供给您的Arc

    let state = Arc::new(Mutex::new(Shared::new(server_tx)));
    let state1 = Arc::clone(&state);
    let state2 = Arc::clone(&state);
    
    let server = listener.incoming().for_each(move |socket| {
        process(socket, state1.clone());
        Ok(())
    });
    
    let receive_sensor_messages = sensors_rx.for_each(move |line| {
        println!("Received sensor message, broadcasting: {:?}", line);
        let shared = state2.lock().unwrap();
        for (_, tx) in &shared.clients { // better: `for tx in shared.clients.values()`
            tx.unbounded_send(line.clone()).unwrap();
        }
        Ok(())
    });
    

    您可以在此处省略 state1,但我觉得这样做更简洁。

    这样做的原因是,您将值 state 移动到第一个闭包中,因此您不能在第二个闭包中使用它,因为它已经被移动(有意义,不是吗?)。

    【讨论】:

      猜你喜欢
      • 2012-08-18
      • 2016-07-31
      • 1970-01-01
      • 2016-02-11
      • 2015-10-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-03-18
      相关资源
      最近更新 更多