【问题标题】:Parallelizing comparisons between two dataframes with multiprocessing使用多处理并行化两个数据帧之间的比较
【发布时间】:2018-08-21 11:48:56
【问题描述】:

我有以下函数,可让我在两个数据帧(dataref)的行之间进行一些比较,并在匹配时返回两行的索引。

def get_gene(row):

    m = np.equal(row[0], ref.iloc[:,0].values) & np.greater_equal(row[2], ref.iloc[:,2].values) & np.less_equal(row[3], ref.iloc[:,3].values)

    return ref.index[m] if m.any() else None

作为一个需要时间的过程(data 中的 160 万行需要 25 分钟,ref 中的行数为 20K),我试图通过并行计算来加快速度。由于 pandas 本身不支持多处理,因此我使用了我在 SO 上找到的这段代码,它与我的函数 get_gene 一起工作正常。

def _apply_df(args):
    df, func, kwargs = args
    return df.apply(func, **kwargs)


def apply_by_multiprocessing(df, func, **kwargs):

    workers = kwargs.pop('workers')
    pool = multiprocessing.Pool(processes=workers)

    result = pool.map(_apply_df, [(d, func, kwargs) for d in np.array_split(df, workers)])

    pool.close()

    df = pd.concat(list(result))

    return df

它让我的计算时间减少到 9 分钟。但是,如果我理解正确,这段代码只是将我的数据帧data 分解为 4 块,并将每块发送到 CPU 的每个核心。因此,每个核心最终都会在 400K 行(从data 拆分为 4)与 20K 行(ref)之间进行比较。

我真正想做的是根据其中一列中的值拆分两个数据帧,以便我只计算同一“组”的数据帧之间的比较:

  • data.get_group(['a'])ref.get_group(['a'])

  • data.get_group(['b'])ref.get_group(['b'])

  • data.get_group(['c'])ref.get_group(['c'])

  • 等等……

这将减少计算量。 data 中的每一行只能与 ref 中的约 3K 行匹配,而不是全部 20K 行。

因此,我尝试修改上面的代码,但无法使其正常工作。

def apply_get_gene(df, func, **kwargs):

    reference = pd.read_csv('genomic_positions.csv', index_col=0)
    reference = reference.groupby(['Chr'])

    df = df.groupby(['Chr'])
    chromosome = df.groups.keys()



    workers = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=workers)


    args_list = [(df.get_group(chrom), func, kwargs, reference.get_group(chrom)) for chrom in chromosome]

    results = pool.map(_apply_df, args_list)

    pool.close()                                                          
    pool.join()                                                           

    return pd.concat(results)


def _apply_df(args):

    df, func, kwarg1, kwarg2 = args

    return df.apply(func, **kwargs)


def get_gene(row, ref):

    m = np.equal(row[0], ref.iloc[:,0].values) & np.greater_equal(row[2], ref.iloc[:,2].values) & np.less_equal(row[3], ref.iloc[:,3].values)

    return ref.index[m] if m.any() else None

我很确定这与 *args**kwargs 通过不同函数传递的方式有关(因为在这种情况下,我必须考虑到我想传递拆分后的 @987654342 @ 数据框与拆分 data 数据框..)。 我认为问题在于函数_apply_df。我以为我明白它的真正作用,但 df, func, kwargs = args 行仍然困扰着我,我认为我未能正确修改它..

感谢所有建议!

【问题讨论】:

    标签: python pandas dataframe apply python-multiprocessing


    【解决方案1】:

    看看starmap()

    starmap(func, iterable[, chunksize]) 与 map() 类似,只是 iterable 的元素应该是解包为参数的可迭代对象。

    因此 [(1,2), (3, 4)] 的可迭代结果为 [func(1,2), func(3,4)]。

    这似乎正是您所需要的。

    【讨论】:

    • 这意味着我可以摆脱刚刚出现的_apply_df 函数,因为map() 只接受一个参数,而不像 apply ?我去看看,谢谢!
    • 确实,如果由于某种原因它不起作用,请告诉我!
    • 也许我错过了一些东西,但我不确定starmap() 的行为是否像apply()_apply_df 函数的好处是它允许返回类似 df.apply(func, axis=1) 的东西,我不确定我是否可以使用 starmap() ..
    • 我想出了一些可行的方法。有需要请看我的帖子。感谢您的帮助!
    【解决方案2】:

    我为可能偶然发现这篇文章的读者发布了我想出的答案:

    正如@Michele Tonutti 所说,我只需要使用starmap() 并在这里和那里进行一些调整。权衡是它仅应用我的自定义函数get_gene 和设置axis=1,但如果需要,可能有一种方法可以使其更灵活。

    def Detect_gene(data):
    
        reference = pd.read_csv('genomic_positions.csv', index_col=0)
        ref = reference.groupby(['Chr'])
    
        df = data.groupby(['Chr'])
        chromosome = df.groups.keys()
    
        workers = multiprocessing.cpu_count()
        pool = multiprocessing.Pool(processes=workers)
    
    
        args = [(df.get_group(chrom), ref.get_group(chrom)) 
                for chrom in chromosome]
    
        results = pool.starmap(apply_get_gene, args)
    
        pool.close()                                                          
        pool.join()                                                           
    
        return pd.concat(results)
    
    
    def apply_get_gene(df, a):
    
        return df.apply(get_gene, axis=1, ref=a)
    
    
    def get_gene(row, ref):
    
        m = np.equal(row[0], ref.iloc[:,0].values) & np.greater_equal(row[2], ref.iloc[:,2].values) & np.less_equal(row[3], ref.iloc[:,3].values)
    
        return ref.index[m] if m.any() else None
    

    现在使用旧版本的代码需要约 5 分钟而不是约 9 分钟,而无需多处理则需要约 25 分钟。

    【讨论】:

      猜你喜欢
      • 2019-01-02
      • 1970-01-01
      • 2020-12-12
      • 2018-01-26
      • 2020-10-02
      • 2021-10-03
      • 2021-06-19
      • 2021-04-24
      • 2019-07-21
      相关资源
      最近更新 更多