【问题标题】:Working with deque object across multiple processes跨多个进程使用双端队列对象
【发布时间】:2019-06-27 22:23:56
【问题描述】:

我正在尝试减少读取大约 100,000 个条目的数据库的处理时间,但我需要以特定方式对它们进行格式化,为了做到这一点,我尝试使用 python 的 multiprocessing.map 函数完美,除了我似乎无法获得任何形式的队列引用来处理它们。

我一直在使用来自Filling a queue and managing multiprocessing in python 的信息来指导我跨多个进程使用队列,并使用Using a global variable with a thread 来指导我跨线程使用全局变量。我已经让软件工作了,但是当我在运行进程后检查列表/队列/字典/地图长度时,它总是返回零

我写了一个简单的例子来说明我的意思: 您必须将脚本作为文件运行,mapinitialize 函数在解释器中不起作用。

from multiprocessing import Pool
from collections import deque

global_q = deque()

def my_init(q):
    global global_q
    global_q = q
    q.append("Hello world")


def map_fn(i):
    global global_q
    global_q.append(i)


if __name__ == "__main__":
    with Pool(3, my_init, (global_q,)) as pool:
        pool.map(map_fn, range(3))
    for p in range(len(global_q)):
        print(global_q.pop())

理论上,当我使用 pool 函数将队列对象引用从主线程传递给工作线程,然后使用给定函数初始化该线程的全局变量时,当我从map 函数之后,该对象引用仍应指向原始队列对象引用(长话短说,所有内容都应在同一个队列中结束,因为它们都指向内存中的同一位置)。

所以,我希望:

Hello World
Hello World
Hello World
1
2
3

当然,1, 2, 3 的顺序是任意的,但您会在输出中看到''

当我将对象引用传递给pool 函数时,为什么没有任何反应?

【问题讨论】:

  • dequeQueue 不同,多进程与多线程不同 — 你知道这一点,对吗?每个进程都在自己的“内存空间”中运行,这使得它们无法直接共享全局变量。可以使用multiprocessing.managers.SyncManager 间接为 some 类型创建代理,以允许共享记录的类型。不幸的是,deque 不在其中,但您可以自己实现一个。
  • 这是我正在谈论的概念,或者在谈论在不同线程上初始化全局变量时希望展示的概念,我只关心这样做,因为我已经看到 python 中的每个进程都有它自己的环境/解释器,所以理论上我想如果我将原始对象的引用传递给新的解释器,那么我应该能够将其全局变量重新初始化为旧的,至少这是我的猜测
  • iggy12345:我可以理解您是如何获得这种印象的,但事实是您不能将引用传递给其他进程,因为该对象位于另一个他们无法访问的内存空间中。请参阅我刚刚发布的答案,该答案显示了如何使用多处理 Manager 完成您想要的事情。

标签: python multiprocessing deque


【解决方案1】:

这是一个如何通过扩展multiprocessing.managers.BaseManager 类以支持deques 在进程之间共享内容的示例。

文档中有一个Customized managers 部分是关于创建它们的。

import collections
from multiprocessing import Pool
from multiprocessing.managers import BaseManager


class DequeManager(BaseManager):
    pass

class DequeProxy(object):
    def __init__(self, *args):
        self.deque = collections.deque(*args)
    def __len__(self):
        return self.deque.__len__()
    def appendleft(self, x):
        self.deque.appendleft(x)
    def append(self, x):
        self.deque.append(x)
    def pop(self):
        return self.deque.pop()
    def popleft(self):
        return self.deque.popleft()

# Currently only exposes a subset of deque's methods.
DequeManager.register('DequeProxy', DequeProxy,
                      exposed=['__len__', 'append', 'appendleft',
                               'pop', 'popleft'])


process_shared_deque = None  # Global only within each process.

def my_init(q):
    """ Initialize module-level global. """
    global process_shared_deque
    process_shared_deque = q
    q.append("Hello world")


def map_fn(i):
    process_shared_deque.append(i)  # deque's don't have a "put()" method.


if __name__ == "__main__":
    manager = DequeManager()
    manager.start()
    shared_deque = manager.DequeProxy()

    with Pool(3, my_init, (shared_deque,)) as pool:
        pool.map(map_fn, range(3))

    for p in range(len(shared_deque)):  # Show left-to-right contents.
        print(shared_deque.popleft())

输出:

Hello world
0
1
2
Hello world
Hello world

【讨论】:

  • 有什么理由不让DequeProxy 继承自dequeclass DequeProxy(deque): 然后使用DequeManager.register('DequeProxy', DequeProxy, exposed=dir(DequeProxy)),我只是好奇?
  • 您不能只使用dir 注册方法,由于某种原因register 不支持所有方法,但是这个答案解决了我的问题,并且比另一个快10 倍,谢谢!
  • iggy12345 关于您的第一条评论,不知道。我使用了文档中显示的内容以及在 managers.SyncManager 类中实现代理类型的方式。至于使用dir,如果您在第103 行查看multiprocessing.managers 的源文件,您可能会发现一些有趣的实用函数:all_methods()public_methods()。我考虑在我的回答中使用它们,但两者都没有达到我想要的效果——因为好吧,你知道,“简单胜于复杂”......;¬)
  • 我希望看到这两种解决方案的时间安排使这个解决方案的速度提高了 10 倍。也许是时候写信给多处理开发人员了,他们的标准解决方案即使对于简单的问题也很糟糕,并且可怜的社区需要破解 lib 以便在进程之间传递对象并使用诸如 global 之类的保存语句
  • 今天晚些时候我会试着给你安排时间
【解决方案2】:

您不能使用全局变量进行多处理。

传递给函数多处理队列。

from multiprocessing import Queue
queue= Queue() 

def worker(q):
    q.put(something)

此外,您可能会体验到代码没问题,但由于池创建单独的进程,甚至错误都是分开的,因此您看不到代码不仅无法正常工作,而且会引发错误。

您的输出是 '' 的原因是因为您的 q/global_q 没有附加任何内容。如果它被附加,那么只有一些变量,可能称为 global_q,但它与你的主线程中的 global_q 完全不同

尝试在要进行多进程处理的函数中打印('Hello world'),您会自己看到,实际上根本没有打印任何内容。该进程只是在您的主线程之外,访问该进程的唯一方法是通过多处理队列。您可以通过 queue.put('something') 和 something = queue.get() 访问队列

试着理解这段代码,你会做得很好:

import multiprocessing as mp

shared_queue = mp.Queue() # This will be shared among all procesess, but you need to pass the queue as an argument in the process. You CANNOT use it as global variable. Understand that the functions kind of run in total different processes and nothing can really access them... Except multiprocessing.Queue - that can be shared across all processes.


def channel(que,channel_num):
    que.put(channel_num)

if __name__ == '__main__':
    processes = [mp.Process(target=channel, args=(shared_queue, channel_num)) for channel_num in range(8)]

    for p in processes:
        p.start()


    for p in processes: # wait for all results to close the pool
        p.join()

    for i in range(8): # Get data from Queue. (you can get data out of it at any time actually)
        print(shared_queue.get())

【讨论】:

  • 多处理队列与双端队列并不完全相同,这正是 OP 实际上试图共享的。此外,您的代码中有一些非常长的单行 cmets,在我看来这不是很可读。
  • @martineau 你是在告诉我我应该建议他在需要柴油的汽车上倒汽油吗?绝对没有理由使用双端队列。而且我不会为了适合“deque”而破解多处理库,如果你注意的话,他使用它只是因为不真正了解多处理和线程
  • 我告诉你 OP 的问题是关于如何分享 deque,而不是 Queue。诚然,问题的标题在这方面有点误导,但它被标记为“deque”,这就是它包含的示例代码中使用的内容。
  • @martineau 问题不是关于双端队列,而是关于跨进程传递对象。你需要在两行之间阅读。顺便说一句,无论如何你都不能在他的情况下使用 deque。
  • @martineau 在示例代码中也使用了可笑的使用全局语句。我是否必须适应他的示例代码才能提供答案?我希望在你迂腐的时候给他最好的答案。你为什么不也发布答案?
猜你喜欢
  • 2014-06-01
  • 2018-01-16
  • 2015-01-19
  • 2018-06-25
  • 2011-08-12
  • 2018-07-02
  • 2020-05-18
  • 2013-05-21
  • 2012-07-27
相关资源
最近更新 更多