【问题标题】:Parallelise for loop in Python [duplicate]在Python中并行化for循环[重复]
【发布时间】:2021-08-08 17:25:04
【问题描述】:

我正在研究机器学习模型,使用回归来预测各种数据类别的未来值。数据本身非常复杂,因此我在下面提供了一个示例来模仿我想要实现的目标:

df = 
category       date   data
1        2021-06-19   94.9
1        2021-06-20   93.3
1        2021-06-21   91.6
...      ...          ...
2        2021-06-19   13.1
2        2021-06-20   11.9
2        2021-06-21   10.4
...      ...          ...
3        2021-06-19   53.9
3        2021-06-20   55.3
3        2021-06-21   59.3
...      ...          ...

我目前正在使用 for 循环,在每个类别上运行我的预测模型:

categories = df.category.unique()

for category in categories:
  # run my model
  # save results

但是,这很耗时,因为我要循环访问大约 4000 个类别。每个类别预测都独立于其他类别。

有没有一种简单的方法来并行化这项工作,而不是循环遍历每个类别,按顺序执行预测?

在线搜索时,Spark 是一个受欢迎的结果,但这似乎是一个很大的学习曲线(并且可能会失去一些在 python/pandas 中可访问的功能),我希望我可以在 python 库中使用一些东西更合适。

【问题讨论】:

  • 请在循环内发布代码,即你对每个类别做什么。
  • 这听起来是可并行的,而且 CPU 很便宜,所以归根结底是你想学什么和做什么——我们无法从你的脑海中挖掘出来。 Google Cloud 按需提供的 96 CPU 机器大约是 5 到 6 美元/小时(如果您可以容忍它被召回,则为 1 美元/小时),所以诀窍是要有一个启动脚本来从云存储下载你的东西,得到它完成,保存回云存储,并终止机器。您可以从更小、更便宜的机器开始,只需更改 GUI 上的 CPU 数量。从云存储下载到 Google Cloud 以外的其他地方的其他费用约为 0.12 美元/GB。

标签: python pandas machine-learning parallel-processing


【解决方案1】:

你可以这样做

# The joblib module provides Parallel and delayed methods
from joblib import Parallel, delayed

'''
The Parallel method is to parallelize the process over n cores using the n_jobs argument (-1 means max possible value). The delayed function wraps the actual function and passes the values of a list to the function in parallel.
'''

def predict_cat(category):
    # category_dataset = # filter on basis of that category
    # preds = model.predict(category_dataset)
    # I will write some random preds as I don't have actual values
    preds = [1,2,3]
    with open('pred_file.txt', 'a') as file:
        file.write(str(category) + " " + str(preds) + "\n")

# Here instead of range you will use the list of unique categories. 
Parallel(n_jobs=-1)(delayed(predict_cat)(c) for c in range(5));

结束后你可以从pred_file读取值。

【讨论】:

    【解决方案2】:

    由于 Spark 不是你的首选方法,我能想到的有两种方法,让我与你分享,

    1. 您可以使用 Joblib

    Python 有一个很棒的包,它使并行性变得异常简单。 参考:https://joblib.readthedocs.io/en/latest/

    基本的使用模式是:

    from joblib import Parallel, delayed
    
    def myfun(arg):
         do_stuff
         return result
    
    results = Parallel(n_jobs=-1, verbose=verbosity_level, backend="threading")(
                 map(delayed(myfun), arg_instances))
    

    现在,arg_instances 是 myfun 并行计算的值的列表。主要限制是 myfun 必须是顶级函数。 backend 参数可以是“threading”或“multiprocessing”。

    您可以将其他常用参数传递给并行化函数。 myfun 的主体还可以引用初始化的全局变量,这些值将提供给子级。

    参数和结果几乎可以是线程后端的任何内容,但结果需要通过多处理后端可序列化。

    1. Numba

    Numba 可以自动并行化 for 循环。

    参考:http://numba.pydata.org/numba-doc/latest/user/parallel.html#explicit-parallel-loops

    from numba import jit, prange
    
    @jit
    def parallel_sum(A):
        sum = 0.0
        for i in prange(A.shape[0]):
            sum += A[i]
    
        return sum
    

    值得一读的博客:http://blog.dominodatalab.com/simple-parallelization/

    [Honorable Mention] Dask 也提供了类似的功能。如果您正在处理核心数据之外的数据,或者您正在尝试并行化更复杂的计算,这可能会更可取。 参考:https://dask.org/

    【讨论】:

      猜你喜欢
      • 2017-03-17
      • 2012-07-22
      • 2023-02-16
      • 2020-10-18
      • 2013-09-02
      • 1970-01-01
      相关资源
      最近更新 更多