【问题标题】:Multiprocess Process Python3 - Return DataFrames多进程处理 Python3 - 返回数据帧
【发布时间】:2019-03-18 15:13:28
【问题描述】:

我有一个大型函数需要并行运行以节省计算时间。该函数采用名称和列表,并返回带有计算结果的数据框。 我需要将每个数据框返回并保存到列表(或其他东西)中。 我可以使用 PROCESS 让并行运行,但它返回一个累积数据帧。我怎样才能从每个进程中获取数据框而不是全部添加。 因此:

DF1 = A B
      1 2
      2 4


DF2 = A B
      1 3
      5 6

我得到的是:

DF1 = A B
      1 2
      2 4

DF2 = A B
      1 2
      2 4
      1 3
      5 6

我在下面粘贴了我的示例代码: 提前谢谢你 - 我似乎无法弄清楚这一点..

if __name__ == "__main__":

    SectorA = ["AAPL", "ATVI", "BIDU"]
    SectorB = ['AA','IBM','UAL']
    sectors = [SectorA,SectorB, Tech_Sector, TransP_Sector]
    sectorsNames = ['SectorA', 'SectorB']

    ######################################################################
    procs = []
    #proc = Process(target=RUN_Function)  # instantiating without any argument
    #procs.append(proc)
    #proc.start()

    # instantiating process with arguments
    for name, sec in itt.zip_longest(sectorsNames, sectors):
        info("My Main")
        proc = Process(target=RUN_Function, args=(name,sec))
        procs.append(proc)
        procs.start()
        #proc.join()

    '''
    for mpos in procs:
        print("Positions for the following: ".format(mpos))
    '''

    print("Finished")

    proc.terminate()

【问题讨论】:

    标签: python-3.x pickle python-multiprocessing


    【解决方案1】:

    谢谢德米特里,

    我终于解决了这个问题。

    我忽略了 .append()。

    这应该被排除并且应该在循环之后添加一个 .close() 。希望这对其他人有帮助。

    for name, sec in zip(sectorsNames, sectors):
        #info("My Main")
        proc = Process(target=RUN_Function, args=(name,sec))
        #procs.append(proc)---#EXCLUDE THIS
        proc.start()
    #procs.append(proc)
    proc.close() #ADD THE CLOSE
    

    【讨论】:

      【解决方案2】:

      我无法帮助您处理多处理问题,但也许 AsyncIO 可能会感兴趣,例如

      代码

      #!/usr/bin/env python3
      # -*- coding: utf-8 -*-
      
      from asyncio import ensure_future, gather, run
      from json import dumps
      
      
      async def calculate(data):
          tasks = list()
          result = dict()
          temp = None
      
          for df in data:
              task = ensure_future(calculate_one(df, data[df]))
              tasks.append(task)
      
              temp = await gather(*tasks)
      
          for element in temp:
              result[element['df']] = element['ds']
      
          return result
      
      
      async def calculate_one(df, dataset):
          result = dict()
          result['df'] = df
          result['ds'] = list()
          for element in dataset:
              result['ds'].append(element + '-processed')
      
          return result
      
      
      if __name__ == '__main__':
      
          src_data = {'df1': ['a1', 'b1', 'c1', 'd1'],
                      'df2': ['a2', 'b2', 'c2', 'd2']}
      
          res = run(calculate(src_data))
      
          print(dumps(res, indent=4))
      

      这个源码可以在here找到

      结果

      {
          "df1": [
              "a1-processed",
              "b1-processed",
              "c1-processed",
              "d1-processed"
          ],
          "df2": [
              "a2-processed",
              "b2-processed",
              "c2-processed",
              "d2-processed"
          ]
      }
      

      您的示例不是很清楚,因此如果您需要来自其他数据帧的数据进行处理,您可以使用全局变量,也可以将所有 scr_data 放入 calculate_one 并使用索引。

      希望对你有帮助!

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2016-09-15
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-10-15
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多