【问题标题】:How to run functions in parallel?如何并行运行函数?
【发布时间】:2019-12-30 07:09:41
【问题描述】:

我首先进行了研究,但找不到我的问题的答案。我正在尝试在 Python 中并行运行多个函数。

我有这样的事情:

files.py

import common #common is a util class that handles all the IO stuff

dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

def func1():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir1)
       c.getFiles(dir1)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir1)
       c.getFiles(dir1)

def func2():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir2)
       c.getFiles(dir2)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir2)
       c.getFiles(dir2)

我想调用 func1 和 func2 并让它们同时运行。这些函数不会相互交互或在同一对象上交互。现在我必须等待 func1 在 func2 开始之前完成。如何执行以下操作:

process.py

from files import func1, func2

runBothFunc(func1(), func2())

我希望能够几乎同时创建两个目录,因为我每分钟都在计算正在创建的文件数量。如果目录不存在,它会影响我的时间。

【问题讨论】:

  • 你可能想重新设计它;如果您每分钟计算文件/文件夹的数量,则您正在创建竞争条件。让每个函数更新一个计数器,或者使用一个锁文件来确保周期性进程在两个函数完成执行之前不会更新计数?

标签: python multithreading multiprocessing


【解决方案1】:

您可以使用threadingmultiprocessing

由于peculiarities of CPythonthreading 不太可能实现真正的并行性。因此,multiprocessing 通常是更好的选择。

这是一个完整的例子:

from multiprocessing import Process

def func1():
  print 'func1: starting'
  for i in xrange(10000000): pass
  print 'func1: finishing'

def func2():
  print 'func2: starting'
  for i in xrange(10000000): pass
  print 'func2: finishing'

if __name__ == '__main__':
  p1 = Process(target=func1)
  p1.start()
  p2 = Process(target=func2)
  p2.start()
  p1.join()
  p2.join()

启动/加入子进程的机制可以很容易地按照runBothFunc 的方式封装到一个函数中:

def runInParallel(*fns):
  proc = []
  for fn in fns:
    p = Process(target=fn)
    p.start()
    proc.append(p)
  for p in proc:
    p.join()

runInParallel(func1, func2)

【讨论】:

  • 我使用了你的代码,但功能仍然没有同时启动。
  • @Lamar McAdory:请解释一下您所说的“同时”到底是什么意思,或许可以举一个具体的例子说明您做了什么,您期望发生什么,以及实际发生了什么。跨度>
  • @Lamar:您永远无法保证“完全相同的时间”,并且认为您可以完全错误。根据您拥有的 CPU 数量、机器的负载、计算机上发生的许多事情的时间,都会对线程/进程的启动时间产生影响。此外,由于进程是在创建后立即启动的,因此创建进程的开销也必须以您看到的时间差来计算。
  • @Lamar McAdory:没有办法确保两个函数执行的完美同步。也许值得重新评估整体方法,看看是否有更好的方法来实现您想要做的事情。
  • 如果我的函数接受参数并且当我在从不同进程调用它们时传递参数时,它们不会同时运行。你能帮忙吗
【解决方案2】:

如果您的函数主要执行 I/O 工作(以及较少的 CPU 工作)并且您拥有 Python 3.2+,则可以使用 ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor

def run_io_tasks_in_parallel(tasks):
    with ThreadPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

run_io_tasks_in_parallel([
    lambda: print('IO task 1 running!'),
    lambda: print('IO task 2 running!'),
])

如果您的函数主要执行 CPU 工作(以及较少的 I/O 工作)并且您拥有 Python 2.6+,则可以使用 multiprocessing 模块:

from multiprocessing import Process

def run_cpu_tasks_in_parallel(tasks):
    running_tasks = [Process(target=task) for task in tasks]
    for running_task in running_tasks:
        running_task.start()
    for running_task in running_tasks:
        running_task.join()

run_cpu_tasks_in_parallel([
    lambda: print('CPU task 1 running!'),
    lambda: print('CPU task 2 running!'),
])

【讨论】:

  • 这是一个很好的答案。如何从使用 concurrent.futures 的 I/O 绑定任务的结果中识别哪个完成了?如果我们有普通函数,基本上不是lamba函数,如何识别映射到被调用函数的结果?
  • 没关系,我找到了一种方法 - 而不是这个 run_cpu_tasks_in_parallel([ lambda: print('CPU task 1 running!'), lambda: print('CPU task 2 running!'), ]) 使用这个- 结果 = run_io_tasks_in_parallel([ lambda: {'is_something1': func1()}, lambda: {'is_something2': func2()}, ])
  • 如果函数给出不同参数的输出,如何保存。实际上,应该放置什么来代替 lambda: print('CPU task 1 running!'), lambda: print('CPU task 2 running!'), 以将结果附加到变量 task1_outputtask2_output
【解决方案3】:

这可以通过Ray 优雅地完成,该系统可让您轻松并行化和分发 Python 代码。

要并行化您的示例,您需要使用 @ray.remote 装饰器定义您的函数,然后使用 .remote 调用它们。

import ray

ray.init()

dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

# Define the functions. 
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in 
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
    # func1() code here...

@ray.remote
def func2(filename, addFiles, dir):
    # func2() code here...

# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)]) 

如果您将相同的参数传递给两个函数并且参数很大,则更有效的方法是使用ray.put()。这避免了大参数被序列化两次并创建它的两个内存副本:

largeData_id = ray.put(largeData)

ray.get([func1(largeData_id), func2(largeData_id)])

重要 - 如果func1()func2()返回结果,则需要重写代码如下:

ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func2.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])

multiprocessing 模块相比,使用Ray 有许多优点。特别是,相同的代码可以在单台机器上运行,也可以在一组机器上运行。有关 Ray 的更多优势,请参阅this related post

【讨论】:

  • 我发现这是更好的选择。我要补充的一件事,特别是如果你在 docker 中使用它,它是依赖于架构的。到目前为止,在 alpine linux 上不起作用(Centos 7 为我工作),你应该在运行后运行ray.shutdown(),因为如果你正在做的事情以任何方式复杂,你将很快耗尽内存。
  • 感谢雷的建议。我能够通过并行运行任务来减少我的 api 上的一些等待时间
【解决方案4】:

似乎您有一个函数需要调用两个不同的参数。这可以使用concurrent.futuresmap 与 Python 3.2+ 的组合来优雅地完成

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def sleep_secs(seconds):
  time.sleep(seconds)
  print(f'{seconds} has been processed')

secs_list = [2,4, 6, 8, 10, 12]

现在,如果您的操作是 IO 绑定的,那么您可以像这样使用ThreadPoolExecutor

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

注意这里如何使用mapmap 你的函数添加到参数列表中。

现在,如果您的函数受 CPU 限制,那么您可以使用 ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

如果您不确定,您可以简单地尝试两种方法,看看哪一种效果更好。

最后,如果您想打印结果,您可以这样做:

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)
  for result in results:
    print(result)

【讨论】:

    【解决方案5】:

    2021年最简单的方法就是使用asyncio:

    import asyncio, time
    
    async def say_after(delay, what):
        await asyncio.sleep(delay)
        print(what)
    
    async def main():
    
        task1 = asyncio.create_task(
            say_after(4, 'hello'))
    
        task2 = asyncio.create_task(
            say_after(3, 'world'))
    
        print(f"started at {time.strftime('%X')}")
    
        # Wait until both tasks are completed (should take
        # around 2 seconds.)
        await task1
        await task2
    
        print(f"finished at {time.strftime('%X')}")
    
    
    asyncio.run(main())
    

    参考资料:

    [1]https://docs.python.org/3/library/asyncio-task.html

    【讨论】:

    • 错误答案。 Asyncio 不适合并行性
    • 错误评论。它用于跨任务的并行性。是映射到真正的线程很难说,因为规范(没有语言和规范的标准。我的意思是链接中的文本)没有这么说。如果您的版本任务与线程不同,请使用 threading.Thread 创建自己的线程。
    • 如果我没记错的话,这不是真正的并行。 Asyncio 将使用阻塞时间来运行另一个任务.. 所以,一次。只有一个任务正在执行
    • @user3786340 是对的,你可以在这里看到这篇文章中的论点:towardsdatascience.com/… 它说:“但是 asyncio 的任务背后的想法与线程不同。事实上,任务运行在一个单线程。但是,如果第一个任务正在等待其响应而不是阻塞它,则每个任务都允许操作系统运行另一个任务。这就是异步 IO 的本质。(稍后对异步程序进行更彻底的演练文章)。”
    • 我也这么认为,我最近一直在研究一个算法,我正在计算 100 个数据示例的分数高斯噪声,每个数据的维度为 1024。如果我让我的代码异步,那么就没有明显的性能提升。原因是计算 FGN 的函数在所有 100 种情况下运行所需的时间相同,并且它不等待 I/O。如果您的代码正在等待 I/O,那么使用 async-await 是个好主意,否则我很有信心它不会达到目的。
    【解决方案6】:

    如果您是 windows 用户并使用 python 3,那么这篇文章将帮助您在 python 中进行并行编程。当您运行通常的多处理库的池编程时,您会收到有关程序中 main 函数的错误。这是因为 windows 没有 fork() 功能。下面的帖子给出了上述问题的解决方案。

    http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html

    由于我用的是python 3,所以我把程序改了一点:

    from types import FunctionType
    import marshal
    
    def _applicable(*args, **kwargs):
      name = kwargs['__pw_name']
      code = marshal.loads(kwargs['__pw_code'])
      gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls'])
      defs = marshal.loads(kwargs['__pw_defs'])
      clsr = marshal.loads(kwargs['__pw_clsr'])
      fdct = marshal.loads(kwargs['__pw_fdct'])
      func = FunctionType(code, gbls, name, defs, clsr)
      func.fdct = fdct
      del kwargs['__pw_name']
      del kwargs['__pw_code']
      del kwargs['__pw_defs']
      del kwargs['__pw_clsr']
      del kwargs['__pw_fdct']
      return func(*args, **kwargs)
    
    def make_applicable(f, *args, **kwargs):
      if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
      kwargs['__pw_name'] = f.__name__  # edited
      kwargs['__pw_code'] = marshal.dumps(f.__code__)   # edited
      kwargs['__pw_defs'] = marshal.dumps(f.__defaults__)  # edited
      kwargs['__pw_clsr'] = marshal.dumps(f.__closure__)  # edited
      kwargs['__pw_fdct'] = marshal.dumps(f.__dict__)   # edited
      return _applicable, args, kwargs
    
    def _mappable(x):
      x,name,code,defs,clsr,fdct = x
      code = marshal.loads(code)
      gbls = globals() #gbls = marshal.loads(gbls)
      defs = marshal.loads(defs)
      clsr = marshal.loads(clsr)
      fdct = marshal.loads(fdct)
      func = FunctionType(code, gbls, name, defs, clsr)
      func.fdct = fdct
      return func(x)
    
    def make_mappable(f, iterable):
      if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
      name = f.__name__    # edited
      code = marshal.dumps(f.__code__)   # edited
      defs = marshal.dumps(f.__defaults__)  # edited
      clsr = marshal.dumps(f.__closure__)  # edited
      fdct = marshal.dumps(f.__dict__)  # edited
      return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable)
    

    这个函数之后,上面的问题代码也稍微改成这样:

    from multiprocessing import Pool
    from poolable import make_applicable, make_mappable
    
    def cube(x):
      return x**3
    
    if __name__ == "__main__":
      pool    = Pool(processes=2)
      results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)]
      print([result.get(timeout=10) for result in results])
    

    我得到的输出是:

    [1, 8, 27, 64, 125, 216]
    

    我认为这篇文章可能对某些 Windows 用户有用。

    【讨论】:

      【解决方案7】:

      无法保证两个函数会彼此同步执行,这似乎是您想要做的。

      您能做的最好的事情是将函数分成几个步骤,然后使用Process.join 等待两者在关键同步点完成,就像@aix 的回答中提到的那样。

      这比time.sleep(10) 好,因为你不能保证准确的时间。通过显式等待,您是说函数必须在执行该步骤之前完成,然后再进行下一步,而不是假设它将在 10 毫秒内完成,这不能保证基于机器上发生的其他事情。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2017-08-03
        • 2021-10-31
        • 2015-01-23
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-11-26
        相关资源
        最近更新 更多