【发布时间】:2018-08-21 11:48:56
【问题描述】:
我有以下函数,可让我在两个数据帧(data 和 ref)的行之间进行一些比较,并在匹配时返回两行的索引。
def get_gene(row):
m = np.equal(row[0], ref.iloc[:,0].values) & np.greater_equal(row[2], ref.iloc[:,2].values) & np.less_equal(row[3], ref.iloc[:,3].values)
return ref.index[m] if m.any() else None
作为一个需要时间的过程(data 中的 160 万行需要 25 分钟,ref 中的行数为 20K),我试图通过并行计算来加快速度。由于 pandas 本身不支持多处理,因此我使用了我在 SO 上找到的这段代码,它与我的函数 get_gene 一起工作正常。
def _apply_df(args):
df, func, kwargs = args
return df.apply(func, **kwargs)
def apply_by_multiprocessing(df, func, **kwargs):
workers = kwargs.pop('workers')
pool = multiprocessing.Pool(processes=workers)
result = pool.map(_apply_df, [(d, func, kwargs) for d in np.array_split(df, workers)])
pool.close()
df = pd.concat(list(result))
return df
它让我的计算时间减少到 9 分钟。但是,如果我理解正确,这段代码只是将我的数据帧data 分解为 4 块,并将每块发送到 CPU 的每个核心。因此,每个核心最终都会在 400K 行(从data 拆分为 4)与 20K 行(ref)之间进行比较。
我真正想做的是根据其中一列中的值拆分两个数据帧,以便我只计算同一“组”的数据帧之间的比较:
data.get_group(['a'])与ref.get_group(['a'])data.get_group(['b'])与ref.get_group(['b'])data.get_group(['c'])与ref.get_group(['c'])等等……
这将减少计算量。 data 中的每一行只能与 ref 中的约 3K 行匹配,而不是全部 20K 行。
因此,我尝试修改上面的代码,但无法使其正常工作。
def apply_get_gene(df, func, **kwargs):
reference = pd.read_csv('genomic_positions.csv', index_col=0)
reference = reference.groupby(['Chr'])
df = df.groupby(['Chr'])
chromosome = df.groups.keys()
workers = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=workers)
args_list = [(df.get_group(chrom), func, kwargs, reference.get_group(chrom)) for chrom in chromosome]
results = pool.map(_apply_df, args_list)
pool.close()
pool.join()
return pd.concat(results)
def _apply_df(args):
df, func, kwarg1, kwarg2 = args
return df.apply(func, **kwargs)
def get_gene(row, ref):
m = np.equal(row[0], ref.iloc[:,0].values) & np.greater_equal(row[2], ref.iloc[:,2].values) & np.less_equal(row[3], ref.iloc[:,3].values)
return ref.index[m] if m.any() else None
我很确定这与 *args 和 **kwargs 通过不同函数传递的方式有关(因为在这种情况下,我必须考虑到我想传递拆分后的 @987654342 @ 数据框与拆分 data 数据框..)。
我认为问题在于函数_apply_df。我以为我明白它的真正作用,但 df, func, kwargs = args 行仍然困扰着我,我认为我未能正确修改它..
感谢所有建议!
【问题讨论】:
标签: python pandas dataframe apply python-multiprocessing