【发布时间】:2020-02-28 05:28:59
【问题描述】:
我正在寻找更短的方法来为机器学习任务准备我的数据集。我发现多处理库可能会有所帮助。但是,因为我是多处理的新手,所以我找不到合适的方法。
我先写了一些如下代码:
class DatasetReader:
def __init__(self):
self.data_list = Read_Data_from_file
self.data = []
def _ready_data(self, ex, idx):
# Some complex functions that takes several minutes
def _dataset_creator(self, queue):
for idx, ex in enumerate(self.data_list):
queue.put(self._ready_data(ex, idx))
def _dataset_consumer(self, queue):
total_mem = 0.0
t = tqdm(range(self.num_data), total=self.num_data, desc='Building Dataset ', bar_format='{desc}:{percentage:3.0f}% ({n_fmt}/{total_fmt}) [{elapsed}<{remaining},{rate_fmt}{postfix}]')
for idx in t:
ins = queue.get()
self.data.append(ins)
gc.collect()
def _build_dataset(self):
queue = Queue()
creator = Process(target=self._dataset_creator, args=(queue,))
consumer = Process(target=self._dataset_consumer, args=(queue,))
creator.start()
consumer.start()
queue.close()
queue.join_thread()
creator.join()
consumer.join()
但是,在我看来,因为_dataset_creator 以串行方式 处理数据(此处为_ready_data),这对减少时间消耗没有帮助。
所以,我修改了代码以生成处理一个数据的多个进程:
class DatasetReader:
def __init__(self):
self.data_list = Read_Data_from_file
self.data = []
def _ready_data(self, ex, idx):
# Some complex functions that takes several minutes
def _dataset_creator(self, ex, idx, queue):
queue.put(self._ready_data(ex, idx))
def _dataset_consumer(self, queue):
total_mem = 0.0
t = tqdm(range(self.num_data), total=self.num_data, desc='Building Dataset ', bar_format='{desc}:{percentage:3.0f}% ({n_fmt}/{total_fmt}) [{elapsed}<{remaining},{rate_fmt}{postfix}]')
for idx in t:
ins = queue.get()
self.data.append(ins)
gc.collect()
def _build_dataset(self):
queue = Queue()
for idx, ex in enumerate(self.data_list):
p = Process(target=self._dataset_creator, args=(ex, idx, queue,))
p.start()
consumer = Process(target=self._dataset_consumer, args=(queue,))
consumer.start()
queue.close()
queue.join_thread()
consumer.join()
但是,这会返回错误:
Process Process-18:
Traceback ~~~
RuntimeError: can't start new thread
Traceback ~~~
OSError: [Errno 12] Cannot allocate memory
您能帮我以并行方式处理复杂数据吗?
编辑 1:
感谢@tdelaney,我可以通过生成self.num_worker 进程(我的实验中有16 个)来减少时间消耗:
def _dataset_creator(self, pid, queue):
for idx, ex in list(enumerate(self.data_list))[pid::self.num_worker]:
queue.put(self._ready_data(ex, idx))
def _dataset_consumer(self, queue):
t = tqdm(range(self.num_data), total=self.num_data, desc='Building Dataset ', bar_format='{desc}:{percentage:3.0f}% ({n_fmt}/{total_fmt}) [{elapsed}<{remaining},{rate_fmt}{postfix}]')
for _ in t:
ins = queue.get()
self.data[ins['idx']] = ins
def _build_dataset(self):
queue = Queue()
procs = []
for pid in range(self.num_worker):
p = Process(target=self._dataset_creator, args=(pid, queue,))
procs.append(p)
p.start()
consumer = Process(target=self._dataset_consumer, args=(queue,))
consumer.start()
queue.close()
queue.join_thread()
for p in procs:
p.join()
consumer.join()
【问题讨论】:
-
随机想法 - 你在过程 #18 失败 - 一次运行更少可能会有所帮助。
Read_Data_from_file是您从文件中读取的一堆数据吗?您可以在此过程中进行读取吗?_ready_data的结果大吗?它的计算是在 python 中完成的,还是在一些子系统中完成的,比如pandas或scipy在发布 GIL 后在 C 中工作?gc.collect可能不会做太多,除非你有很多循环引用的数据。但如果t和inx很大,最好在完成后立即删除它们。 -
@tdelaney 我想我可以生成有限数量的进程,将数据拆分为每个进程的数量和处理部分数据。也许我应该试试。因为整个数据都在一个文件中,所以我认为很难拆分数据读取部分,除非我拆分数据文件。
_ready_data的结果相当大(大约 200~300 KB)。我认为所有的计算都是在 python 中完成的。我想我不再需要gc.collect。谢谢你们的cmets! :) -
这是一个有趣的问题。这是进程分叉的mac / linux,还是产生进程的窗口?如果分叉,您不必将`ex`放在队列中,它已经在子进程空间中。
-
@tdelaney 我目前正在使用 Centos,这将是 Linux 系统,但我无法理解 fork 和 spawn 之间的区别。 :( 你能分享一些参考资料或可能有用的东西吗?谢谢你的帮助!
-
在 fork 上,子进程在创建进程时具有父内存的写时复制视图。所以不要将
self.data_list[idex]传递给孩子,它已经在那里了。在生成时,python 必须腌制父状态并在子状态中取消腌制。所以,你必须通过self.data_list[idx]。但请注意,当您创建该子流程并想使用self时,您可能会发现它复制了data_list的全部。
标签: python multithreading process multiprocessing queue