【问题标题】:parallelize dataframe splitting and processing并行化数据帧拆分和处理
【发布时间】:2018-11-05 14:52:51
【问题描述】:

问题陈述:我如何并行化一个将 pandas 数据帧分成两部分的 for 循环,同时将函数应用于每个部分,并将函数的组合结果存储到列表中循环结束后使用?

对于上下文,我正在尝试并行化我的决策树实现。我之前看到的与此问题相关的许多答案都需要将函数的结果应用为数据框,并且结果只是连接成一个大数据框。我相信这个问题稍微笼统一些。

例如,这是我要并行化的代码:

# suppose we have some dataframe given to us
df = pd.DataFrame(....)
computation_results = []
# I would like to parallelize this whole loop and store the results of the
# computations in computation_results. min_rows and total_rows are known
# integers.
for i in range(min_rows, total_rows - min_rows + 1):
    df_left = df.loc[range(0, i), :].copy()
    df_right = df.loc[range(i, total_rows), :].copy()
    # foo is a function that takes in a dataframe and returns some
    # result that has no pointers to the passed dataframe. The following
    # two function calls should also be parallelized.
    left_results = foo(df_left)
    right_results = foo(df_right)
    # combine the results with some function and append that combination
    # to a list. The order of the results in the list does not matter.
    computation_results.append(combine_results(left_results, right_results))
# parallelization is not needed for the following function and the loop is over
use_computation_results(computation_results)

【问题讨论】:

  • 标准 python 不太擅长基于线程的并行化。由于 IO,多进程选项成本很高,numpy 类型矢量化可能是唯一剩下的选项。不过我可能是错的......

标签: python pandas dataframe parallel-processing multiprocessing


【解决方案1】:

查看https://docs.python.org/3.3/library/multiprocessing.html#using-a-pool-of-workers中的示例。

所以在你的情况下:

with Pool(processes=2) as pool:                  # start 2 worker processes
  for i in range(min_rows, total_rows - min_rows + 1):
     df_left = df.loc[range(0, i), :].copy()
     call_left = pool.apply_async(foo, df_left)  # evaluate "foo(df_left)" asynchronously
     df_right = df.loc[range(i, total_rows), :].copy() 
     call_right = pool.apply_async(foo, df_left) # evaluate "foo(df_right)" asynchronously
     left_results = call_left.get(timeout=1)     # wait and get left result
     right_results = call_right.get(timeout=1)   # wait and get right result
     computation_results.append(combine_results(left_results, right_results))

【讨论】:

    猜你喜欢
    • 2017-07-24
    • 2010-11-26
    • 1970-01-01
    • 2021-01-29
    • 1970-01-01
    • 1970-01-01
    • 2018-07-13
    • 2014-01-27
    相关资源
    最近更新 更多