【问题标题】:Python multiprocessing for dataset preparation用于数据集准备的 Python 多处理
【发布时间】:2020-02-28 05:28:59
【问题描述】:

我正在寻找更短的方法来为机器学习任务准备我的数据集。我发现多处理库可能会有所帮助。但是,因为我是多处理的新手,所以我找不到合适的方法。

我先写了一些如下代码:

class DatasetReader:
    def __init__(self):
        self.data_list = Read_Data_from_file
        self.data = []

    def _ready_data(self, ex, idx):
        # Some complex functions that takes several minutes

    def _dataset_creator(self, queue):
        for idx, ex in enumerate(self.data_list):
            queue.put(self._ready_data(ex, idx))

    def _dataset_consumer(self, queue):
        total_mem = 0.0
        t = tqdm(range(self.num_data), total=self.num_data, desc='Building Dataset ', bar_format='{desc}:{percentage:3.0f}% ({n_fmt}/{total_fmt}) [{elapsed}<{remaining},{rate_fmt}{postfix}]')
        for idx in t:
            ins = queue.get()
            self.data.append(ins)
            gc.collect()

    def _build_dataset(self):
        queue = Queue()
        creator = Process(target=self._dataset_creator, args=(queue,))
        consumer = Process(target=self._dataset_consumer, args=(queue,))
        creator.start()
        consumer.start()

        queue.close()
        queue.join_thread()

        creator.join()
        consumer.join()

但是,在我看来,因为_dataset_creator串行方式 处理数据(此处为_ready_data),这对减少时间消耗没有帮助。

所以,我修改了代码以生成处理一个数据的多个进程:

class DatasetReader:
    def __init__(self):
        self.data_list = Read_Data_from_file
        self.data = []

    def _ready_data(self, ex, idx):
        # Some complex functions that takes several minutes

    def _dataset_creator(self, ex, idx, queue):
        queue.put(self._ready_data(ex, idx))

    def _dataset_consumer(self, queue):
        total_mem = 0.0
        t = tqdm(range(self.num_data), total=self.num_data, desc='Building Dataset ', bar_format='{desc}:{percentage:3.0f}% ({n_fmt}/{total_fmt}) [{elapsed}<{remaining},{rate_fmt}{postfix}]')
        for idx in t:
            ins = queue.get()
            self.data.append(ins)
            gc.collect()

    def _build_dataset(self):
        queue = Queue()
        for idx, ex in enumerate(self.data_list):
            p = Process(target=self._dataset_creator, args=(ex, idx, queue,))
            p.start()
        consumer = Process(target=self._dataset_consumer, args=(queue,))
        consumer.start()

        queue.close()
        queue.join_thread()

        consumer.join()

但是,这会返回错误:

Process Process-18:  
Traceback ~~~  
RuntimeError: can't start new thread  
Traceback ~~~  
OSError: [Errno 12] Cannot allocate memory  

您能帮我以并行方式处理复杂数据吗?

编辑 1:

感谢@tdelaney,我可以通过生成self.num_worker 进程(我的实验中有16 个)来减少时间消耗:

    def _dataset_creator(self, pid, queue):
        for idx, ex in list(enumerate(self.data_list))[pid::self.num_worker]:
            queue.put(self._ready_data(ex, idx))

    def _dataset_consumer(self, queue):
        t = tqdm(range(self.num_data), total=self.num_data, desc='Building Dataset ', bar_format='{desc}:{percentage:3.0f}% ({n_fmt}/{total_fmt}) [{elapsed}<{remaining},{rate_fmt}{postfix}]')
        for _ in t:
            ins = queue.get()
            self.data[ins['idx']] = ins

    def _build_dataset(self):
        queue = Queue()
        procs = []
        for pid in range(self.num_worker):
            p = Process(target=self._dataset_creator, args=(pid, queue,))
            procs.append(p)
            p.start()
        consumer = Process(target=self._dataset_consumer, args=(queue,))
        consumer.start()

        queue.close()
        queue.join_thread()

        for p in procs:
            p.join()
        consumer.join()

【问题讨论】:

  • 随机想法 - 你在过程 #18 失败 - 一次运行更少可能会有所帮助。 Read_Data_from_file 是您从文件中读取的一堆数据吗?您可以在此过程中进行读取吗? _ready_data 的结果大吗?它的计算是在 python 中完成的,还是在一些子系统中完成的,比如 pandasscipy 在发布 GIL 后在 C 中工作? gc.collect 可能不会做太多,除非你有很多循环引用的数据。但如果 tinx 很大,最好在完成后立即删除它们。
  • @tdelaney 我想我可以生成有限数量的进程,将数据拆分为每个进程的数量和处理部分数据。也许我应该试试。因为整个数据都在一个文件中,所以我认为很难拆分数据读取部分,除非我拆分数据文件。 _ready_data 的结果相当大(大约 200~300 KB)。我认为所有的计算都是在 python 中完成的。我想我不再需要gc.collect。谢谢你们的cmets! :)
  • 这是一个有趣的问题。这是进程分叉的mac / linux,还是产生进程的窗口?如果分叉,您不必将`ex`放在队列中,它已经在子进程空间中。
  • @tdelaney 我目前正在使用 Centos,这将是 Linux 系统,但我无法理解 fork 和 spawn 之间的区别。 :( 你能分享一些参考资料或可能有用的东西吗?谢谢你的帮助!
  • 在 fork 上,子进程在创建进程时具有父内存的写时复制视图。所以不要将self.data_list[idex] 传递给孩子,它已经在那里了。在生成时,python 必须腌制父状态并在子状态中取消腌制。所以,你必须通过self.data_list[idx]。但请注意,当您创建该子流程并想使用self 时,您可能会发现它复制了data_list全部

标签: python multithreading process multiprocessing queue


【解决方案1】:

我正在尝试勾勒出具有多处理池的解决方案会是什么样子。我完全摆脱了消费者进程,因为看起来父进程无论如何都在等待(并且最终需要数据),所以它可以成为消费者。所以,我设置了一个池并使用imap_unordered 来处理将数据传递给worker。

我猜想数据处理根本不需要 DatasetReader 并将其移到自己的函数中。在 Windows 上,要么整个 DataReader 对象被序列化到子进程(包括你不想要的数据),要么对象的子版本不完整,当你尝试使用它时可能会崩溃。

无论哪种方式,在子进程中对 DatasetReader 对象所做的更改都不会在父进程中看到。如果父对象依赖于该对象中的更新状态,这可能是出乎意料的。在我看来,最好将子流程中发生的事情严格括起来。

from multiprocessing import Pool, get_start_method, cpu_count

# moved out of class (assuming it is not class dependent) so that
# the entire DatasetReader object isn't pickled and sent to
# the child on spawning systems like Microsoft Windows

def _ready_data(idx_ex):
    idx, ex = idx_ex
    # Some complex functions that take several minutes
    result = complex_functions(ex)
    return (idx, result)


class DatasetReader:

    def __init__(self):
        self.data_list = Read_Data_from_file
        self.data = [None] * len(data_list)

    def _ready_data_fork(self, idx):
        # on forking system, call worker with object data
        return _ready_data((idx, self.data_list[idx]))

    def run(self):

        t = tqdm(range(self.num_data), total=self.num_data, desc='Building Dataset ',
            bar_format='{desc}:{percentage:3.0f}% ({n_fmt}/{total_fmt}) '
                '[{elapsed}<{remaining},{rate_fmt}{postfix}]')

        pool = Pool(min(cpu_count, len(self.data_list)))
        if get_start_method() == 'fork':
            # on forking system, self.data_list is in child process and
            # we only pass the index
            result_iter = pool.imap_unordered(self._ready_data_fork, 
                    (idx for idx in range(len(data_list))),
                    chunksize=1)
        else:
            # on spawning system, we need to pass the data
            result_iter = pool.imap_unordered(_ready_data,
                    enumerate(self.data_list,
                    chunksize=1)

        for idx, result in result_iter:
            next(t)
            self.data[idx] = result

        pool.join()

【讨论】:

  • 谢谢!我终于明白你试图解释了!另一方面,目前可能很难应用forking 功能,因为_ready_data 使用了大量self 函数和变量。 :( 也许我应该检查是否可以将其移出课堂,以避免腌制整个 DatasetReader 对象。无论如何,感谢您的帮助!
  • 在分叉系统上使用 self 不一定是坏事,只要知道您正在处理分叉时对象的快照。一个问题是长期维护——它很容易以一种在子流程中不起作用的方式更新代码。您可以通过将数据计算方法提取到不同的类来划分您的类。他们可能一开始只是为了方便而上这个班。通过创建更集中的类来分离关注点可能是一件好事。我不知道你的环境,只是笼统地说。
猜你喜欢
  • 2021-12-12
  • 1970-01-01
  • 2015-05-03
  • 1970-01-01
  • 2019-01-02
  • 1970-01-01
  • 2022-11-12
  • 2021-10-27
  • 1970-01-01
相关资源
最近更新 更多