【发布时间】:2017-02-22 22:50:11
【问题描述】:
我正在尝试编写一个生成一堆线程然后在最后加入线程的程序。我希望它是可中断的,因为我的计划是让它成为一个在 UNIX 服务中不断运行的程序。
这个想法是worker_pool 将包含所有已生成的线程,因此可以随时调用terminate 来收集它们。
我似乎找不到使用 chan_select 板条箱的方法来执行此操作,因为这需要我首先生成一个线程来生成我的子线程,一旦我这样做了,我就不能再使用 worker_pool 变量在中断时加入线程时,因为它必须被移出主循环。如果您注释掉中断中终止工作人员的行,它会编译。
我有点沮丧,因为这在 C 中真的很容易做到。我可以设置一个静态指针,但是当我尝试在 Rust 中这样做时,我得到了一个错误,因为我使用了一个向量作为我的线程,并且我无法在静态中初始化为空向量。我知道在中断代码中加入工作人员是安全的,因为执行会在此处停止等待信号。
也许有更好的方法来处理信号,或者我错过了一些我可以做的事情。
错误和代码如下:
MacBook8088:video_ingest pjohnson$ cargo run
Compiling video_ingest v0.1.0 (file:///Users/pjohnson/projects/video_ingest)
error[E0382]: use of moved value: `worker_pool`
--> src/main.rs:30:13
|
24 | thread::spawn(move || run(sdone, &mut worker_pool));
| ------- value moved (into closure) here
...
30 | worker_pool.terminate();
| ^^^^^^^^^^^ value used here after move
<chan macros>:42:47: 43:23 note: in this expansion of chan_select! (defined in <chan macros>)
src/main.rs:27:5: 35:6 note: in this expansion of chan_select! (defined in <chan macros>)
|
= note: move occurs because `worker_pool` has type `video_ingest::WorkerPool`, which does not implement the `Copy` trait
main.rs
#[macro_use]
extern crate chan;
extern crate chan_signal;
extern crate video_ingest;
use chan_signal::Signal;
use video_ingest::WorkerPool;
use std::thread;
use std::ptr;
///
/// Starts processing
///
fn main() {
let mut worker_pool = WorkerPool { join_handles: vec![] };
// Signal gets a value when the OS sent a INT or TERM signal.
let signal = chan_signal::notify(&[Signal::INT, Signal::TERM]);
// When our work is complete, send a sentinel value on `sdone`.
let (sdone, rdone) = chan::sync(0);
// Run work.
thread::spawn(move || run(sdone, &mut worker_pool));
// Wait for a signal or for work to be done.
chan_select! {
signal.recv() -> signal => {
println!("received signal: {:?}", signal);
worker_pool.terminate(); // <-- Comment out to compile
},
rdone.recv() => {
println!("Program completed normally.");
}
}
}
fn run(sdone: chan::Sender<()>, worker_pool: &mut WorkerPool) {
loop {
worker_pool.ingest();
worker_pool.terminate();
}
}
lib.rs
extern crate libc;
use std::thread;
use std::thread::JoinHandle;
use std::os::unix::thread::JoinHandleExt;
use libc::pthread_join;
use libc::c_void;
use std::ptr;
use std::time::Duration;
pub struct WorkerPool {
pub join_handles: Vec<JoinHandle<()>>
}
impl WorkerPool {
///
/// Does the actual ingestion
///
pub fn ingest(&mut self) {
// Use 9 threads for an example.
for i in 0..10 {
self.join_handles.push(
thread::spawn(move || {
// Get the videos
println!("Getting videos for thread {}", i);
thread::sleep(Duration::new(5, 0));
})
);
}
}
///
/// Joins all threads
///
pub fn terminate(&mut self) {
println!("Total handles: {}", self.join_handles.len());
for handle in &self.join_handles {
println!("Joining thread...");
unsafe {
let mut state_ptr: *mut *mut c_void = 0 as *mut *mut c_void;
pthread_join(handle.as_pthread_t(), state_ptr);
}
}
self.join_handles = vec![];
}
}
【问题讨论】:
-
欢迎来到 Stack Overflow!您是否已经理解为什么停止任意线程是Very Bad Idea(不是特定于语言的问题)?除此之外,您需要提供minimal reproducible example。现在,呈现的代码似乎更像是一个愿望清单和一个隐含的要求社区为您编写实现的请求。表面可见的问题似乎是
WorkerPool没有实现Copy,因此将其移动transfers ownership。 -
您还应该包含您收到的错误消息并显示research and attempts at fixing it you've already performed的内容。
-
感谢您的快速回复。我已经包含了 WorkerPool 结构的完整代码以及我在编译时收到的错误。我不想停止线程;我想通过加入来收集它们。我同意阻止他们不是一个好主意。
-
我能够使用here 的指导删除第一个错误。谢谢你的提示。不过,我希望我不必让它变得不安全。
-
此外,即使编译,连接也不起作用。看起来我正在获取 pthread_t 的内存地址,但连接从未完成。
标签: multithreading rust signals