【问题标题】:Multiprocessing Synchronization多处理同步
【发布时间】:2020-03-03 08:57:21
【问题描述】:

假设我需要并行运行 5 个进程,但进程 2 到 5 依赖于进程 1。如何确保进程 1 将在其他进程之前运行?我应该使用 Python 的 Multiprocessing Event() 还是 Lock() 还是两者都用?

示例 1:

process 1
process 2 or 3 or 4 or 5
process 2 or 3 or 4 or 5
process 2 or 3 or 4 or 5
process 2 or 3 or 4 or 5

示例 2:

process 3
process 1 or 2 or 4 or 5
process 1 or 2 or 4 or 5
process 1 or 2 or 4 or 5
process 1 or 2 or 4 or 5

具有 2 个依赖项的示例 3:

process 1
process 2 or 3 (run in parallel after 1)
process 4
process 5 or 6 (run in parallel after 1 and after 4)

所有进程调用相同的函数(msg),但都返回不同的值。

我需要一些指导,不一定是代码,如果你能提供,谢谢。

伪代码:

import Multiprocessing as mp

function(msg):
    return 1 if msg == "one"
    return 2 if msg == "two"
    return 3 if msg == "three"
    return 4 if msg == "four"
    return 5 if msg == "five"

msgs = ['one', 'two', 'three', 'four', 'five']

jobs = []
for msg in msgs:
    p = Process(target=function, args=(msg,))
    p.start()
    jobs.append(p)

for job in jobs:
    job.join()

在这种情况下,所有进程都将无序运行。

如果我想先完成流程 1:

可能的解决方案:

import Multiprocessing as mp

function(msg):
    return 1 if msg == "one"
    return 2 if msg == "two"
    return 3 if msg == "three"
    return 4 if msg == "four"
    return 5 if msg == "five"

msg = ['one']
p1 = Process(target=function, args=(msg,))
p1.start()
p1.join()


msgs = ['two', 'three', 'four', 'five']

jobs = []
for msg in msgs:
    p = Process(target=function, args=(msg,))
    p.start()
    jobs.append(p)

for job in jobs:
    job.join()

有没有更好的解决方案或者这样可以吗?它有效,但这并不意味着它不能以更好的方式完成(例如更少的代码重复)。

【问题讨论】:

  • 只在 1 之后产生进程,当 1 完成时?
  • 我这样做了,但似乎不是最好的解决方案,因为它创建了样板代码。如果进程号 X 应该排在第一位还是排在第二位怎么办?我一直在寻找使用多处理的更通用的解决方案。我会看看信号量。
  • 所以简单地在两个进程准备好之前不启动它们是“样板”,但是启动它们并且必须立即等待一些半外部、半内部管理的锁/信号量是紧凑的设计?提示:不,不是。
  • 我明白你的意思,但我终于设法使用 Lock() 并且我认为代码比重复调用更干净。我认为这取决于每种情况。无论如何,很高兴知道按照您 (@tevemadar) 和 @Josua Nixon 的建议去做并不是坏的设计。谢谢。

标签: python python-3.x python-multiprocessing


【解决方案1】:

不确定最后做了什么,但毕竟你可以使用Events 来达到这个目的:

import multiprocessing as mp

def function(msg,events):
  if msg == "one":
    print(1)
    events[0].set()
  if msg == "two":
    print("2 waiting")
    events[0].wait()
    events[1].wait()
    print("2 done")
  if msg == "three":
    print(3)
    events[1].set()
  if msg == "four":
    print(4)
  if msg == "five":
    print("5 waiting")
    events[0].wait()
    print("5 done")

if __name__ == '__main__':
  events = [mp.Event(),mp.Event()]
  jobs = []
  for item in ['one','two','three','four','five']:
    job = mp.Process(target=function, args=(item,events))
    job.start()
    jobs.append(job)
  for job in jobs:
    job.join()

这里我特意引入了第二个依赖:p2 依赖于 p1 和 p3(而 p5 仍然依赖于 p1)。这样,如果你运行它几次,它会显示出更多的变化(比单个依赖项):

python procy.py
2 waiting
4
1
5 waiting
5 done
3
2 done

python procy.py
1
5 waiting
2 waiting
4
5 done
3
2 done

python procy.py
1
4
3
5 waiting
5 done
2 waiting
2 done

【讨论】:

  • 谢谢 tevemadar。您的代码清晰简洁,正是我想要的。我用锁做了,但你的代码更清晰。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-01-19
  • 1970-01-01
  • 2011-08-14
  • 1970-01-01
  • 1970-01-01
  • 2023-03-09
相关资源
最近更新 更多