【问题标题】:Spawn processes and communicate between processes in a trio based Python application在基于 trio 的 Python 应用程序中生成进程并在进程之间进行通信
【发布时间】:2018-07-04 10:11:11
【问题描述】:

对于 Python 库 fluidimage 的实习,我们正在研究使用库 trio 编写具有客户端/服务器模型的 HPC 并行应用程序是否是一个好主意。

对于异步编程和 i/o,trio 确实很棒!

然后,我想知道如何

  1. 派生进程(执行 CPU-GPU 受限工作的服务器)
  2. 在进程之间通信复杂的 Python 对象(可能包含大型 numpy 数组)。

我没有在其文档中找到使用 trio 执行此操作的推荐方法(即使 the echo client/server tutorial 是一个好的开始)。

在 Python 中生成进程并进行通信的一种明显方法是使用 multiprocessing

在 HPC 环境中,我认为一个好的解决方案是使用 MPI (http://mpi4py.readthedocs.io/en/stable/overview.html#dynamic-process-management)。作为参考,我还不得不提到rpychttps://rpyc.readthedocs.io/en/latest/docs/zerodeploy.html#zerodeploy)。

我不知道是否可以将这些工具与 trio 一起使用,以及这样做的正确方法是什么。

一个有趣的相关问题

备注PEP 574

在我看来,PEP 574(参见https://pypi.org/project/pickle5/)也可能是解决这个问题的好方法的一部分。

【问题讨论】:

    标签: python python-3.x numpy multiprocessing python-trio


    【解决方案1】:

    不幸的是,截至今天(2018 年 7 月),Trio 尚不支持生成子进程并与子进程通信,或任何类型的 MPI 高级包装器或其他高级进程间协调协议。

    这绝对是我们最终想要达到的目标,如果您想更详细地讨论需要实现的内容,那么您可以hop in our chatthis issue 概述了核心所需的内容子流程支持。但是,如果您的目标是在几个月内为您的实习提供一些工作,老实说,您可能需要考虑更成熟的 HPC 工具,例如 dask

    【讨论】:

    • 首先,这不是“我的实习” :-) 其次,如果我们使用同步函数生成进程并与其他工具(通常是 mpi4py)进行通信,那么使用 trio.run_sync_in_worker_thread 不是很好吗?
    • run_sync_in_worker_thread 的主要缺点是它不支持取消。 (这意味着你不能使用 trio 的超时支持来设置线程调用超时,而且 control-C 也不能开箱即用,因为正常的“关闭一切”代码使用取消 - 有一些技巧可以解决这个问题)。此外,线程比本机三重任务占用更多内存,但这并不重要,除非您尝试一次运行数十万个线程。如果你能解决这些问题,那么,run_sync_in_worker_thread 很好。
    【解决方案2】:

    截至 2018 年年中,Trio 还没有这样做。迄今为止,您最好的选择是使用 trio_asyncio 来利用 asyncio 对 Trio 仍需要学习的功能的支持。

    【讨论】:

    • 免责声明:我是trio-asyncio的主要作者。
    【解决方案3】:

    我发布了一个非常天真的代码示例,该示例使用多处理和三重奏(在主程序和服务器中)。它似乎有效。

    from multiprocessing import Process, Queue
    import trio
    import numpy as np
    
    async def sleep():
        print("enter sleep")
        await trio.sleep(0.2)
        print("end sleep")
    
    def cpu_bounded_task(input_data):
        result = input_data.copy()
        for i in range(1000000-1):
            result += input_data
        return result
    
    def server(q_c2s, q_s2c):
        async def main_server():
            # get the data to be processed
            input_data = await trio.run_sync_in_worker_thread(q_c2s.get)
            print("in server: input_data received", input_data)
            # a CPU-bounded task
            result = cpu_bounded_task(input_data)
            print("in server: sending back the answer", result)
            await trio.run_sync_in_worker_thread(q_s2c.put, result)
    
        trio.run(main_server)
    
    async def client(q_c2s, q_s2c):
        input_data = np.arange(10)
        print("in client: sending the input_data", input_data)
        await trio.run_sync_in_worker_thread(q_c2s.put, input_data)
        result = await trio.run_sync_in_worker_thread(q_s2c.get)
        print("in client: result received", result)
    
    async def parent(q_c2s, q_s2c):
        async with trio.open_nursery() as nursery:
            nursery.start_soon(sleep)
            nursery.start_soon(client, q_c2s, q_s2c)
            nursery.start_soon(sleep)
    
    def main():
        q_c2s = Queue()
        q_s2c = Queue()
        p = Process(target=server, args=(q_c2s, q_s2c))
        p.start()
        trio.run(parent, q_c2s, q_s2c)
        p.join()
    
    if __name__ == '__main__':
        main()
    

    【讨论】:

      【解决方案4】:

      一个使用 mpi4py 的简单示例...从三人组的角度来看,这可能是一个糟糕的解决方法,但它似乎有效。

      通信是使用trio.run_sync_in_worker_thread 完成的,所以 (as written by Nathaniel J. Smith) (1) 没有取消(并且没有 control-C 支持)和 (2) 使用比三重任务更多的内存(但是一个 Python 线程不使用这么多内存)。

      但是对于涉及大型 numpy 数组的通信,我会从 communication of buffer-like objects is going to be very efficient with mpi4py 开始这样做。

      import sys
      from functools import partial
      
      import trio
      
      import numpy as np
      from mpi4py import MPI
      
      async def sleep():
          print("enter sleep")
          await trio.sleep(0.2)
          print("end sleep")
      
      def cpu_bounded_task(input_data):
          print("cpu_bounded_task starting")
          result = input_data.copy()
          for i in range(1000000-1):
              result += input_data
          print("cpu_bounded_task finished ")
          return result
      
      if "server" not in sys.argv:
          comm = MPI.COMM_WORLD.Spawn(sys.executable,
                                      args=['trio_spawn_comm_mpi.py', 'server'])
      
          async def client():
              input_data = np.arange(4)
              print("in client: sending the input_data", input_data)
              send = partial(comm.send, dest=0, tag=0)
              await trio.run_sync_in_worker_thread(send, input_data)
      
              print("in client: recv")
              recv = partial(comm.recv, tag=1)
              result = await trio.run_sync_in_worker_thread(recv)
              print("in client: result received", result)
      
          async def parent():
              async with trio.open_nursery() as nursery:
                  nursery.start_soon(sleep)
                  nursery.start_soon(client)
                  nursery.start_soon(sleep)
      
          trio.run(parent)
      
          print("in client, end")
          comm.barrier()
      
      else:
          comm = MPI.Comm.Get_parent()
      
          async def main_server():
              # get the data to be processed
              recv = partial(comm.recv, tag=0)
              input_data = await trio.run_sync_in_worker_thread(recv)
              print("in server: input_data received", input_data)
              # a CPU-bounded task
              result = cpu_bounded_task(input_data)
              print("in server: sending back the answer", result)
              send = partial(comm.send, dest=0, tag=1)
              await trio.run_sync_in_worker_thread(send, result)
      
          trio.run(main_server)
          comm.barrier()
      

      【讨论】:

        【解决方案5】:

        您还可以查看tractor,它似乎终于发布了第一个 alpha 版本。

        它具有使用 TCP 和 msgpack 的内置功能集中式 RPC 系统(很像 trio)(但我认为他们计划了更多传输)。您只需直接调用其他进程中的函数并以各种不同的方式流式传输/获取结果。

        这是他们的第一个例子:

        """
        Run with a process monitor from a terminal using::
        
            $TERM -e watch -n 0.1  "pstree -a $$" \
                & python examples/parallelism/single_func.py \
                && kill $!
        
        """
        import os
        
        import tractor
        import trio
        
        
        async def burn_cpu():
        
            pid = os.getpid()
        
            # burn a core @ ~ 50kHz
            for _ in range(50000):
                await trio.sleep(1/50000/50)
        
            return os.getpid()
        
        
        async def main():
        
            async with tractor.open_nursery() as n:
        
                portal = await n.run_in_actor(burn_cpu)
        
                #  burn rubber in the parent too
                await burn_cpu()
        
                # wait on result from target function
                pid = await portal.result()
        
            # end of nursery block
            print(f"Collected subproc {pid}")
        
        
        if __name__ == '__main__':
            trio.run(main)
        

        【讨论】:

          猜你喜欢
          • 2017-03-19
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2013-08-14
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多