【问题标题】:Parallelize/vectorize computation of combinations from Pandas Dataframe对 Pandas Dataframe 中的组合进行并行化/矢量化计算
【发布时间】:2019-04-04 02:51:26
【问题描述】:

我有许多腌制的 pandas 数据帧,每个数据帧都有相当多的行数(~10k)。数据框的一列是一个 numpy ndarray 浮点数(是的,我特别选择将数组数据存储在单个单元格中 - 我读过这通常可能不是正确的方法,例如。here,但是在这种情况下,单个值是没有意义的,只有完整的值列表才有意义,所以我认为在这种情况下是有意义的)。我需要计算框架中每对行之间的欧几里得距离。我有这方面的工作代码,但我希望我能做一些事情来提高它的性能,因为现在它告诉我我的较小数据集将需要 > 一个月,但我很确定它需要在那之前我的所有记忆。

代码如下:

import pandas as pd
import sys
import getopt
import math
from scipy.spatial import distance
from timeit import default_timer as timer
from datetime import timedelta

id_column_1 = 'id1'
id_column_2 = 'id2'
distance_column = 'distance'
val_column = 'val'

# where n is the size of the set
# and k is the number of elements per combination
def combination_count(n, k):
    if k > n:
        return 0
    else:
        # n! / (k! * (n - k)!)
        return math.factorial(n)/(math.factorial(k) * math.factorial(n - k))

def progress(start, current, total, id1, id2):
    if current == 0:
        print('Processing combination #%d of #%d, (%d, %d)' % (current, total, id1, id2))
    else:
        percent_complete = 100 * float(current)/float(total)
        elapsed_time = timer() - start
        avg_time = elapsed_time / current
        remaining = total - current
        remaining_time = timedelta(seconds=remaining * avg_time)
        print('Processing combination #%d of #%d, (%d, %d). %.2f%% complete, ~%.2f s/combination, ~%s remaining' % (current, total, id1, id2, percent_complete, avg_time, remaining_time))

def check_distances(df):
    indexes = df.index
    total_combinations = combination_count(len(indexes), 2)
    current_combination = 0
    print('There are %d possible inter-message relationships to compute' % total_combinations)
    distances = pd.DataFrame(columns=[id_column_1, id_column_2, distance_column])
    distances.set_index([id_column_1, id_column_2], inplace=True)
    start = timer()
    for id1 in indexes:
        for id2 in indexes:
            # id1 is always < id2
            if id1 >= id2:
                continue
            progress(start, current_combination, total_combinations, id1, id2)
            distances.loc[(id1, id2), distance_column] = distance.euclidean(df.loc[id1, embeddings_column], df.loc[id2, embeddings_column])
            current_combination+=1

(我排除了 main() 函数,它只是拉出 args 并根据它们加载到腌制文件中)

我最近才真正开始使用 Python 来完成这项任务,所以我很可能会遗漏一些简单的东西,有没有好的方法来处理这个问题?

【问题讨论】:

    标签: python pandas numpy


    【解决方案1】:

    有一些选项可以在纯 python 中并行计算数据帧。
    最完整的可能是dask
    一个更简单但更容易的选择是pandaral-lel

    【讨论】:

    • 您好,感谢您的回答 - 当我查看 pandaral-lel 的文档时,这里似乎没有什么帮助,因为正在进行的计算不是直接在单元格上完成的值本身,但基于单元格的索引,这些索引不适用于传递给 apply()/applymap() 的函数 - 还是我在这里遗漏了什么?
    • 实际上阅读 dask 似乎也适用于那里
    • 您需要为每一行计算到所有其他行的距离。您可以使用应用程序,因为对于每一行将有 N-1(N = 数据集的大小)输出,您只需要考虑如何存储它(并且,是的,您可能需要更改代码并删除指数)。如果仍然太难,你可以使用多处理库,创建一个池并手动完成(但你仍然需要更改一些代码)
    • 是的,实际上我在此期间最终使用了 joblib,它产生了巨大的影响(我认为它在我配置它的方式下使用了多处理库)
    【解决方案2】:

    所以解决方案最终是并行化,但我无法使用 Panda 特定的并行化库解决这个问题,因为预期结果不是现有单元格内容的转换,而是从另一个数据帧派生的新值。

    我抓住了joblib library并采取了以下步骤:

    首先,我创建了一个函数,给定两个 id,可以返回该索引的行(因为单独的工作人员无法在主进程中改变数据帧,我们不得不转向首先生成所有数据的范例,然后构建数据框):

    def get_distance(df, id1, id2):
        return [id1, id2, distance.euclidean(df.loc[id1, embeddings_column], df.loc[id2, embeddings_column])]
    

    并对其应用了 joblib 并行化:

    def get_distances(df):
        indexes = df.index
        total_combinations = combination_count(len(indexes), 2)
        current_combination = 0
        print('There are %d possible inter-message relationships to compute' % total_combinations)
        data = Parallel(n_jobs=-1)(delayed(get_distance)(df, min(ids), max(ids)) for ids in combinations(indexes, 2))
        distances = pd.DataFrame(data, columns=[id_column_1, id_column_2, distance_column])
        distances.set_index([id_column_1, id_column_2], inplace=True)
        return distances
    

    这使预期时间从几个月缩短到几天,但我怀疑传递完整的数据帧会产生巨大的开销。

    在修改函数以仅传入所需的值后,立即获得了不到一天(约 20 小时)的另一项改进:

    def get_distance(id1, id2, embed1, embed2):
        return [id1, id2, distance.euclidean(embed1, embed2)]
    
    # ...later, in get_distances()...
    
    data = Parallel(n_jobs=-1)(delayed(get_distance)(min(ids), max(ids), df.loc[ids[0], embeddings_column], df.loc[ids[1], embeddings_column]) for ids in combinations(indexes, 2))
    

    最后,基于joblib's docs 以及大量数据仍在传输给工作人员的事实,我切换到多处理后端,并看到预期时间进一步下降到约 1.5 小时。 (我还添加了 tqdm 库,因此我可以比 joblib 提供的更好地了解进度)

    data = Parallel(n_jobs=-1, backend='multiprocessing')(delayed(get_distance)(min(ids), max(ids), df.loc[ids[0], embeddings_column], df.loc[ids[1], embeddings_column]) for ids in tqdm(combinations(indexes, 2), total=total_combinations))
    

    希望这有助于其他人首次涉足 Python 并行化!

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-09-26
      • 2021-02-07
      • 2022-09-27
      • 2023-03-20
      • 2017-02-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多