【发布时间】:2021-06-24 10:15:42
【问题描述】:
我正在尝试使用以下代码使用 dask 并行化 groupby 应用在 pandas 数据帧上。
import pandas as pd
import dask.dataframe as dd
def dummy_function(df):
"""
This function doing some python calculations
and manipulation to given dataframe
"""
df["new_column"] = df["existing_column"]
return df
given_df = pd.DataFrame({"Phone_no": ["123", "234", "123", "578"], "City": ["ABC", "BCD", "ABC", "EFG"]})
ddf = dd.from_pandas(given_df, npartitions=2)
工作代码:
output_df = ddf.groupby("Phone_no").apply(dummy_function).compute()
即使这段代码正在运行,但所有核心都没有被使用,在做了一些研究后,我发现由于 dask 的默认计算调度程序是线程化的,所以这是由于 python 的全局解释器锁 (GIL) 所有核心都没有被使用用过的。更多细节在这里。 https://realpython.com/python-gil/
因此我尝试使用“进程”调度程序。
output_df = ddf.groupby("Phone_no").apply(dummy_function).compute(scheduler="processes")
但这会返回以下错误
NotImplementedError('object proxy must define __reduce_ex__()')
我相信 multiprocessing 在某种程度上使用了 pickle,因此这个错误是从那里引起的。 我能找到的最接近的相关问题是这个
https://github.com/GrahamDumpleton/wrapt/issues/102#issue-227792648
其中一条评论建议了以下解决方案
https://github.com/GrahamDumpleton/wrapt/issues/102#issuecomment-456528633
我的问题:
上述解决方案有效吗? 如果是,那么如何实现它以及我应该在哪里进行编辑或定义 __reduce__ex 函数?
我什至尝试在 wrapt 模块中编辑 ObjectProxy 类,但不知何故无法使其工作。
或者有没有其他方法可以让进程调度器工作?
或者以 Dask 以外的任何其他方式进行多处理?
【问题讨论】:
标签: python pickle python-multiprocessing dask