【问题标题】:Parallelization/multiprocessing of conditional for loop条件for循环的并行化/多处理
【发布时间】:2016-06-21 13:38:00
【问题描述】:

我想在 Python 中使用多处理来加速 while 循环。

更具体地说:
我有一个矩阵(样本*特征)。我想选择 x 个样本子集,其随机特征子集的值不等于某个值(在本例中为 -1)。

我的序列号:

np.random.seed(43)
datafile = '...'
df = pd.read_csv(datafile, sep=" ", nrows = 89)

no_feat = 500
no_samp = 5
no_trees = 5
i=0
iter=0


samples = np.zeros((no_trees, no_samp))
features = np.zeros((no_trees, no_feat))

while i < no_trees:
    rand_feat = np.random.choice(df.shape[1], no_feat, replace=False)
    iter_order = np.random.choice(df.shape[0], df.shape[0], replace=False)

    samp_idx = []
    a=0

#--------------
    #how to run in parallel?

    for j in iter_order:
        pot_samp = df.iloc[j, rand_feat]
        if len(np.where(pot_samp==-1)[0]) == 0:
            samp_idx.append(j)
        if len(samp_idx) == no_samp:
            print a
            break
        a+=1

#--------------

    if len(samp_idx) == no_samp:
        samples[i,:] = samp_idx
        features[i, :] = rand_feat
        i+=1
    iter+=1
    if iter>1000:   #break if subsets cannot be found
        break

搜索拟合样本是潜在的昂贵部分(j for 循环),理论上可以并行运行。在某些情况下,没有必要遍历所有样本来找到足够大的子集,这就是我在子集足够大时立即跳出循环的原因。
我正在努力寻找一种可以检查已经生成了多少有效结果的实现。有没有可能?

我以前用过joblib。如果我理解正确,这将使用pool 多处理方法作为仅适用于单独任务的后端?我认为queues 可能会有所帮助,但到目前为止我未能实现它们。

【问题讨论】:

  • 使用joblibmultiprocessing.pool 是有意义的。我会为每个核心运行一个进程,并创建一个共享计数器,由Lock 保护或实现为原子整数,将其递增直到达到特定计数(考虑到重复),然后所有进程都会完成,返回他们的结果。 (您可能可以使用apply_async())。
  • @advance512 感谢您提供这些方法供我研究。

标签: python parallel-processing python-multiprocessing


【解决方案1】:

我找到了一个可行的解决方案。我决定并行运行 while 循环,并让不同的进程通过共享计数器进行交互。此外,我对合适样本的搜索进行了矢量化。

矢量化产生了约 300 倍的加速,并且在 4 核上运行将计算速度提高了约两倍。

首先我尝试实现单独的进程并将结果放入queue。事实证明,这些并不是用来存储大量数据的。

如果有人在该代码中发现另一个瓶颈,我会很高兴有人指出这一点。

由于我对并行计算一无所知,我发现很难将这个问题放在一起,尤其是因为互联网上的示例都是非常基础的。不过我学到了很多 =)

我的代码:

import numpy as np
import pandas as pd
import itertools
from multiprocessing import Pool, Lock, Value
from datetime import datetime
import settings


val = Value('i', 0)
worker_ID = Value('i', 1)
lock = Lock()

def findSamp(no_trees, df, no_feat, no_samp):
    lock.acquire()
    print 'starting worker - {0}'.format(worker_ID.value)
    worker_ID.value +=1
    worker_ID_local = worker_ID.value
    lock.release()

    max_iter = 100000
    samp = []
    feat = []
    iter_outer = 0
    iter = 0
    while val.value < no_trees and iter_outer<max_iter:
        rand_feat = np.random.choice(df.shape[1], no_feat, replace=False

        #get samples with random features from dataset;
        #find and select samples that don't have missing values in the random features
        samp_rand = df.iloc[:,rand_feat]
        nan_idx = np.unique(np.where(samp_rand == -1)[0])
        all_idx = np.arange(df.shape[0])
        notnan_bool = np.invert(np.in1d(all_idx, nan_idx))
        notnan_idx = np.where(notnan_bool == True)[0]

        if notnan_idx.shape[0] >= no_samp:
            #if enough samples for random feature subset, select no_samp samples randomly
            notnan_idx_rand = np.random.choice(notnan_idx, no_samp, replace=False)
            rand_feat_rand = rand_feat

            lock.acquire()
            val.value += 1
            #x = val.value
            lock.release()
            #print 'no of trees generated: {0}'.format(x)
            samp.append(notnan_idx_rand)
            feat.append(rand_feat_rand)

        else:
            #increase iter_outer counter if no sample subset could be found for random feature subset
            iter_outer += 1

        iter+=1
    if iter >= max_iter:
        print 'exiting worker{0} because iter >= max_iter'.format(worker_ID_local)
    else:
        print 'worker{0} - finished'.format(worker_ID_local)
    return samp, feat

def initialize(*args):
    global val, worker_ID, lock
    val, worker_ID, lock  = args

def star_findSamp(i_df_no_feat_no_samp):
    return findSamp(*i_df_no_feat_no_samp)


if __name__ == '__main__':
    np.random.seed(43)
    datafile = '...'
    df = pd.read_csv(datafile, sep=" ", nrows = 89)
    df = df.fillna(-1)
    df = df.iloc[:, 6:]

    no_feat = 700
    no_samp = 10
    no_trees = 5000


    startTime = datetime.now()
    print 'starting multiprocessing'
    ncores = 4
    p = Pool(ncores, initializer=initialize, initargs=(val, worker_ID, lock))
    args = itertools.izip([no_trees]*ncores, itertools.repeat(df), itertools.repeat(no_feat), itertools.repeat(no_samp))

    result = p.map(star_findSamp, args)#, callback=log_result)
    p.close()
    p.join()

    print '{0} sample subsets for tree training have been found'.format(val.value)

    samples = [x[0] for x in result if x != None]
    samples = np.vstack(samples)
    features = [x[1] for x in result if x != None]
    features = np.vstack(features)
    print datetime.now() - startTime

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2013-10-07
    • 1970-01-01
    • 2021-05-04
    • 1970-01-01
    • 1970-01-01
    • 2017-01-28
    • 2021-12-12
    相关资源
    最近更新 更多