【问题标题】:What is proper way to use shared list in multiprocessing在多处理中使用共享列表的正确方法是什么
【发布时间】:2020-03-14 14:27:05
【问题描述】:

借助多处理的Manager, Lock,我在 Python(3.7 版)中实现了一个 SharedList。我已将它用作使用多处理Process 函数调用创建的进程之间的共享对象。 Shared List 用于存储每个进程共享它生成的值/对象。

SharedList的实现与ManagerLockmultiprocessing的Python

class SharedList(object):
    def __init__(self, limit):
        self.manager = Manager()
        self.results = self.manager.list([])
        self.lock = Lock()
        self.limit = limit

    def append(self, new_value):
        with self.lock:
            if len(self.results) == self.limit:
                return False
            self.results.append(new_value)
            return True

    def list(self):
        with self.lock:
            return list(self.results).copy()

使用创建的SharedList来存储多个使用multiprocessing创建的进程的值

results = SharedList(limit)
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
   new_process = Process(target=child_function, args=(results))
   processes.append(new_process)
   new_process.start()

for _process in processes:
   _process.join()

for _process in processes:
   _process.close()

child_function的实现

while True:
  result = func()
  if not (results.append(result)):
     break

当我增加了限制时,某些场景的实现工作,但是挂断。 我使用的处理器数量少于CPU数量并且做相同的实验仍然挂在相同的位置。

有没有更好的方法来解决上述问题,我已经研究了不同的方法,例如使用队列,但是没有按预期工作,挂断吗?

使用队列添加了以前的实现

使用队列实现

results_out = []
manager = multiprocessing.Manager()
results = manager.Queue()
tasks = manager.Queue()
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
    new_process = multiprocessing.Process(target=child_function,
                                            args=(tasks, results)
    processes.append(new_process)
    new_process.start()

sleep(5)
for i in range(limit):
    tasks.put(0)
sleep(1)

for i in range(num_processes):
    tasks.put(-1)

num_finished_processes = 0
while True:
    new_result = results.get()
    if new_result == -1:
        num_finished_processes += 1
        if num_finished_processes == num_processes:
            break
    else:
        results_out.append(new_result)

for process in processes:
    process.join()

for process in processes:
    process.close()

child_function

while True:
    task_val = tasks.get()
    if task_val < 0:
        results.put(-1)
        break
    else:
        result = func()
        results.put(result)

更新

在发布此问题之前,我已经阅读了以下参考资料,但我无法获得所需的输出。我同意,这段代码导致了死锁状态,但是我无法在 python 中使用多处理找到没有死锁的实现

参考文献

  1. Multiprocessing of shared list

  2. https://pymotw.com/2/multiprocessing/basics.html

  3. Shared variable in python's multiprocessing

  4. https://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing

  5. https://medium.com/@urban_institute/using-multiprocessing-to-make-python-code-faster-23ea5ef996ba

  6. http://kmdouglass.github.io/posts/learning-pythons-multiprocessing-module/

  7. python multiprocessing/threading cleanup

根据建议,我可以使用 Queue

修改 SharedList
class SharedList(object):
    def __init__(self, limit):
        self.manager = Manager()
        self.tasks = self.manager.Queue()
        self.results = self.manager.Queue()
        self.limit = limit
        self.no_of_process = min(process_count, limit)

    def setup(self):
        sleep(1)
        for i in range(self.limit):
            self.tasks.put(0)
        sleep(1)
        for i in range(self.no_of_process):
            self.tasks.put(-1)

    def append(self, new_value):
        task_val = self.tasks.get()
        if task_val < 0:
            self.results.put(-1)
            return False
        else:
            self.results.put(new_value)
            return True

    def list(self):
        results_out = []
        num_finished_processes = 0
        while True:
            new_result = self.results.get()
            if new_result == -1:
                num_finished_processes += 1
                if num_finished_processes == self.no_of_process:
                    break
            else:
                results_out.append(new_result)
        return results_out

这个实现工作正常,有以下实现变化

results = SharedList(limit)
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
   new_process = Process(target=child_function, args=(results))
   processes.append(new_process)
   new_process.start()

results.setup()

for _process in processes:
   _process.join()

for _process in processes:
   _process.close()

child_function的实现

while True:
  result = func()
  if not (results.append(result)):
     break

但是,经过一些迭代后,这再次陷入僵局,挂断了

【问题讨论】:

  • 您说您尝试使用multiprocessing.Queue,但正在“挂断”。也许您应该与问题分享代码并询问相关问题,因为这正是您想要做的事情的数据结构,它挂起可能是您的代码中的一些错误的结果。
  • 您的队列代码对我来说不会死锁。你能解释一下它在哪里死锁吗?对于SharedList,我想有趣的是你在child_function 中所做的事情。您也可以显示此代码吗?与队列实现相比,您现在似乎只有一个列表?
  • 我无法在此处重现死锁,我在您的代码中看不到明显的问题。我建议您使用调试器来确定进程究竟挂在哪里,以防发生死锁。也许它也有助于从child_function 打印不同进程正在做什么。或者问题出在函数func()(你没有显示)?
  • 这个代码块其实是对我的实现的粗略描述。我尝试打印不同的过程以及它们实际在做什么,所以我在运行几次迭代后观察到。 (意味着通过指定不同的限制值来运行多处理,例如 500、1000。限制意味着列表被填充时的大小)。它让一个特定的进程在一段时间后运行,并在几次迭代后挂断。我还使用了一些虚拟函数来隔离func() 可能导致的错误

标签: python multiprocessing python-3.7


【解决方案1】:

根据建议,我能够使用 Queue

修改 SharedList
class SharedList(object):
    def __init__(self, limit):
        self.manager = Manager()
        self.tasks = self.manager.Queue()
        self.results = self.manager.Queue()
        self.limit = limit
        self.no_of_process = min(process_count, limit)

    def setup(self):
        sleep(1)
        for i in range(self.limit):
            self.tasks.put(0)
        sleep(1)
        for i in range(self.no_of_process):
            self.tasks.put(-1)

    def append(self, new_value):
        task_val = self.tasks.get()
        if task_val < 0:
            self.results.put(-1)
            return False
        else:
            self.results.put(new_value)
            return True

    def list(self):
        results_out = []
        num_finished_processes = 0
        while True:
            new_result = self.results.get()
            if new_result == -1:
                num_finished_processes += 1
                if num_finished_processes == self.no_of_process:
                    break
            else:
                results_out.append(new_result)
        return results_out

这个实现工作正常,有以下实现变化

results = SharedList(limit)
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
   new_process = Process(target=child_function, args=(results))
   processes.append(new_process)
   new_process.start()

results.setup()

for _process in processes:
   _process.join()

for _process in processes:
   _process.close()

child_function的实现

while True:
  result = func()
  if not (results.append(result)):
     break

【讨论】:

    【解决方案2】:

    我发现下面这篇基于 Ray 的文章,听起来很有趣,而且很容易实现并行计算,高效且省时

    https://towardsdatascience.com/modern-parallel-and-distributed-python-a-quick-tutorial-on-ray-99f8d70369b8

    【讨论】:

      猜你喜欢
      • 2021-09-17
      • 1970-01-01
      • 1970-01-01
      • 2012-12-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-06-15
      • 1970-01-01
      相关资源
      最近更新 更多