【问题标题】:Why is this function not paralleled?为什么这个函数不并行?
【发布时间】:2022-01-12 08:09:40
【问题描述】:

我有一个数据框 df2,它是 df 的副本。对于 col_2 列中的每个唯一值 c。我想随机提取 col_2 中对应值为 c 的 2 行。如果可用行数小于 2,则提取所有行。然后我在batch 列中将选定的行从 1 标记到 2。

您能否解释一下为什么我的函数不能对列表['a', 'b', 'c'] 中的所有值执行这项工作。例如,我观察

这意味着函数没有实现 bc 的值。

import pandas as pd
import os
from multiprocessing import dummy
from random import sample
core = os.cpu_count()
P = dummy.Pool(processes = core)

data = np.array([(3, 'a'), (2, 'a'), (1, 'b'), (0, 'c'), (2, 'c'), (3, 'c')],
                dtype=[('col_1', 'i4'), ('col_2', 'U1')])
df = pd.DataFrame.from_records(data)
df['batch'] = 0
df2 = df.copy()

def func(c):
    idx = df.col_2 == c
    pop = list(df[idx].index)
    m = min(2, len(pop))
    r = list(sample(pop, m))    
    df2.loc[r, 'batch'] = list(range(1, m + 1, 1))
    
    
P.map(func, ['a', 'b', 'c'])
df2

【问题讨论】:

  • 因为你不能在主进程和其他进程之间共享任何变量。
  • @Corralien 你的意思是变量df2 在线程之间共享?有没有办法通过并行化获得类似的结果?我的数据集很大,所以按顺序做很慢。
  • 你的数据框有多大,有多少组?
  • @Corralien 它有 32717928 行和 2193 个组。

标签: python pandas multiprocessing


【解决方案1】:

我不确定multiprocessing 是不是正确答案。保存下面的代码并执行它。我创建了一个包含 40,000,000 条记录和 2500 个组的 DataFrame。在此代码中,您有 2 个实现,用于多处理和单处理。

输出:

Dataframe: 40000000 records for 2500 groups
[MP] Elapsed time: 5.66 seconds
[SP] Elapsed time: 4.48 seconds
import pandas as pd
import numpy as np
import multiprocessing
import time

def func_mp(col, df):
    print(f"Group: {col} ({len(df)} records)")
    out = df.sample(n=2) if len(df) >= 2 else df
    out['batch'] = np.arange(0, len(out))
    return out

def func_sp(df):
    print(f"Group: {df.name} ({len(df)} records)")
    out = df.sample(n=2) if len(df) >= 2 else df
    out['batch'] = np.arange(0, len(out))
    return out

if __name__ == '__main__':
    N = 40000000
    col_1 = np.random.randint(1, 1000, N)
    col_2 = np.random.randint(0, 2500, N)
    df = pd.DataFrame({'col_1': col_1, 'col_2': col_2})

    start = time.time()
    with multiprocessing.Pool(multiprocessing.cpu_count()) as pool:
        data = pool.starmap(func_mp, df.groupby('col_2'))
        out1 = pd.concat(data)
    end = time.time()
    timemp = end - start

    start = time.time()
    out2 = df.groupby('col_2', as_index=False).apply(func_sp)
    end = time.time()
    timesp = end - start

    print()
    print(f"Dataframe: {len(df)} records for {len(df['col_2'].unique())} groups")
    print(f"[MP] Elapsed time: {timemp:.2f} seconds")
    print(f"[SP] Elapsed time: {timesp:.2f} seconds")

【讨论】:

  • 我在笔记本电脑上运行您的代码。已经5分钟了,跑步还没有结束。我的笔记本电脑的 CPU 包含 6 个内核和 12 个线程。我想知道为什么代码在你的机器上运行只需要 6 秒。
  • 你用示例玩具运行我的代码?
  • 是的,我只是复制并运行它。
  • 我刚刚记录了过程here。请看一下。
  • 我不信任 Jupyter Notebook。请问可以从控制台运行脚本吗?
猜你喜欢
  • 2016-07-03
  • 1970-01-01
  • 2023-03-13
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-01-06
  • 2018-10-08
  • 1970-01-01
相关资源
最近更新 更多