【问题标题】:How to use multiprocessing pool in a for loop while saving the data?保存数据时如何在for循环中使用多处理池?
【发布时间】:2018-10-04 17:55:37
【问题描述】:

我有一些数据正在尝试对其应用 multiprocessing.pool,因为我有一台配备 16 个处理器的机器。 我在这里生成一些伪数据:

y = pd.Series(np.random.randint(400, high=600, size=1250))
date_today = datetime.now()
x = pd.date_range(date_today, date_today + timedelta(1250), freq='D')
data = pd.DataFrame(columns=['Date','Price'])
data['Date'] = x
data['Price'] = y
d={name: group for name, group in data.groupby(np.arange(len(data)) // (len(data)))} 

我真正想要的是在 for 循环参数中应用 pool 。所以每个常量使用一个处理器:

parameters = range(300,550,50)
portfolio = pd.DataFrame(columns=['Parameter','Date','Price','Calculation'])
for key, value in sorted(d.items()):
    for constante in parameters:
        print('Constante:',constante)
        # HERE I WANT TO USE MP.POOL()

在代码中,我使用某种移动窗口来执行计算。这是最简单的代码版本。所以我想在写入 DF 时为参数中的每个常量分配一个进程。如何实现这一目标?

【问题讨论】:

    标签: python multiprocessing pool


    【解决方案1】:

    您可能会想像这样使用multiprocessing.pool.map,尽管您可能需要根据自己的需要进行调整...

    from functools import partial
    from multiprocessing import Pool
    
    def pool_map_fn(value=None, constante=None, i=None):
        s = {'val': value[i:i+constante]}
        window = pd.concat([s['val']['Date'],s['val']['Price']], axis=1)
        window['Price'] = pd.to_numeric(window['Price'], errors='coerce').fillna(0)
        calc = window['Price'].mean()                                        
        date_variable = window['Date'].iloc[-1]
        price_var = window['Price'].iloc[-1]
        if price_var < calc:
            print('Parameter',constante,'Lower than average',date_variable,price_var,calc)  
            portfolio = portfolio.append({'Parameter': constante,
                                          'Date': date_variable, 
                                          'Price': price_var,
                                          'Calculation': calc}, ignore_index=True)
        if price_var > calc:
            print('Parameter',constante,'Higher than average',date_variable,price_var,calc)
    
    parameters = range(300,550,50)
    portfolio = pd.DataFrame(columns=['Parameter','Date','Price','Calculation'])
    for key, value in sorted(d.items()):
        for constante in parameters:
            with Pool() as pool:
                results = pool.map(partial(pool_map_fn, value=value, constante=constante),
                                   range(len(value) - constante + 1))
    

    注意:这是未经测试的,但应该可以工作,如果您遇到错误,请尝试解决它们,因为这个概念应该是合理的。

    【讨论】:

    • TypeError: pool_map_fn() got multiple values for argument 'value';努力解决这个错误
    • 尝试将i arg 移动到该 fn 的第一个 arg,我可能搞砸了部分接收 args 的方式,这主要是我的想法。
    • 是的,但是得到了局部变量引用错误(组合);你建议使用全局参数吗?只是另一个问题;是否有一个变量参数可以用来指定要使用的特定处理器数量?
    • 编辑:您可能已经理解了这个问题。代码行' print('Parameter',constante,'Lower than average',date_variable,price_var,calc) '和 Parameter: constante 的输出不会不同,即并行参数 300、350、400 和 450如果我使用比方说 4 个处理器?
    • @JayDough 我将此答案视为低质量队列-我将您的最后一条评论标记为粗鲁。如果答案有错误:请求修复。如果答案不适合您,请投反对票。不要要求人们删除他们投入时间的东西,那只是没有完成。没有人强迫你接受这个答案。
    猜你喜欢
    • 2022-01-12
    • 1970-01-01
    • 2021-08-08
    • 2022-12-04
    • 1970-01-01
    • 2017-06-09
    • 2017-04-09
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多