【问题标题】:why doesn't this callable get evaluated for each run inside a pathos multiprocessing Pool?为什么不对 pathos 多处理池中的每次运行进行评估?
【发布时间】:2020-03-24 09:38:36
【问题描述】:

我正在尝试并行化一些代码,这些代码使用部分函数为我正在进行的模拟生成随机数。使用以下代码:

#!/usr/bin/env python3 

import functools
import random
import pathos
from itertools import starmap
from time import sleep
from datetime import datetime

def example(func1, func2):
    sleep(1)
    [a, b] = [func1(), func2()]
    return (f"arg #1 is {round(a,2)}, arg #2 is {round(b,2)} at {datetime.now().time()}")

rand1 = functools.partial(random.uniform, 100, 199)
rand2 = functools.partial(random.uniform, 200, 299)
rand3 = functools.partial(random.uniform, 300, 399)

argsToRun = [(rand1, rand2), (rand2, rand3), (rand1, rand3)]    # 3 ordered combinations...

print(f"running with a for loop...")
for args in argsToRun:
    result = example(*args)
    print(result)


print("\nRunning with itertools.starmap...")
results = starmap(example, argsToRun)
print("\n".join(results))


print("\nRunning with pathos.mp.starmap...")    
with pathos.helpers.mp.Pool() as pool:
    results = pool.starmap(example, argsToRun)
print("\n".join(results))

我得到以下输出...

running with a for loop...
arg #1 is 134.5, arg #2 is 232.45 at 11:58:17.025493
arg #1 is 213.38, arg #2 is 306.7 at 11:58:18.027038
arg #1 is 107.3, arg #2 is 347.19 at 11:58:19.028476

Running with itertools.starmap...
arg #1 is 167.7, arg #2 is 247.96 at 11:58:20.030238
arg #1 is 235.97, arg #2 is 318.02 at 11:58:21.031543
arg #1 is 140.41, arg #2 is 387.51 at 11:58:22.032727

Running with pathos.mp.starmap...
arg #1 is 120.24, arg #2 is 208.23 at 11:58:23.100251  
arg #1 is 220.24, arg #2 is 308.23 at 11:58:23.112206  
arg #1 is 120.24, arg #2 is 308.23 at 11:58:23.126050  

问题是,当我并行化它时,随机函数每次都不会被不同地评估。如果您查看最后一个块,它们只被评估一次(或者结果以某种方式被一遍又一遍地重用......),传入的随机函数的值不会改变。我把时间戳放在那里让自己相信最后一个块实际上是并行执行的。

我确信这与何时以及如何评估函数调用的元组参数有关,但此时我迷路了。

非常高级的目标是能够构建一个(非常大的)参数列表以传递到 simPy 环境中,并让池在它们上并行执行。但在我弄清楚如何让随机性发挥作用之前,我只能以所需速度的 1/32 来做这件事。

【问题讨论】:

  • 同时从一个生成器跨多个进程生成随机数可能不是一个好主意。也许让生产者预先填充进程从中消费的随机值队列?
  • 重点是对于不同的运行有不同的随机分布。我不明白为什么这是一个坏主意 - 我需要数字的差异,而不是每次都相同的数字。
  • 因为通常情况下,除非对象是多进程/线程安全的,否则同时从多个进程/线程操作对象可能会弄乱对象的内部状态并导致奇怪的行为;喜欢它两次给出相同的输出。
  • 请参阅here 了解如何解决此问题。尝试创建多个生成器,而不是对所有内容都使用全局生成器。
  • 抱歉 - 我不明白这种机制如何/为什么会同时操作来自多个进程的对象。该函数被传递到它被调用的最终位置,然后应该在那里独立评估,对吗?我确定我缺少一些东西,但我还没有看到它。 (对不起!)

标签: functional-programming python-multiprocessing pathos


【解决方案1】:

所以我得到了这个工作,但我很确定它仍然是一个完全的黑客。我最终只获取函数的参数并将它们传递给使用随机本地实例的方法...

#!/usr/bin/env python3 

import functools
import random
import pathos
from itertools import starmap
from time import sleep
from datetime import datetime

def example( func1, func2):
    sleep(0.5)
    [a, b] = [func1(), func2()]
    return (f"values {round(a,2)},  {round(b,2)}   at {datetime.now().time()}")

def threadSafe(func1, func2):
    sleep(0.5)
    localRandom = random.Random()
    meth1 = getattr(localRandom, func1.func.__name__, func1.args)
    meth2 = getattr(localRandom, func2.func.__name__, func2.args)
    local_f1 = functools.partial(meth1, *func1.args)
    local_f2 = functools.partial(meth2, *func2.args)
    [a, b] = [local_f1(), local_f2()]
    return (f"values {round(a,2)},  {round(b,2)}   at {datetime.now().time()}")


rand1 = functools.partial(random.uniform, 100, 199)
rand2 = functools.partial(random.uniform, 200, 299)
rand3 = functools.partial(random.uniform, 300, 399)

argsToRun = [(rand1, rand2), (rand2, rand3), (rand1, rand3)]    # 3 ordered combinations...


print(f"running with a for loop...")
for args in argsToRun:
    result = example(*args)
    print(result)


print("\nRunning with itertools.starmap...")
results = starmap(example, argsToRun)
print("\n".join(results))


print("\nRunning threadSafe with pathos.mp.starmap...")    
with pathos.helpers.mp.Pool() as pool:
    results = pool.starmap(threadSafe, argsToRun)
print("\n".join(results))

【讨论】:

    猜你喜欢
    • 2018-03-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-01-26
    • 2019-03-08
    • 2020-11-08
    • 2012-08-23
    • 1970-01-01
    相关资源
    最近更新 更多