【问题标题】:Python dask groupby apply not working with "processes" schedulerPython dask groupby 不适用于“进程”调度程序
【发布时间】:2021-06-24 10:15:42
【问题描述】:

我正在尝试使用以下代码使用 dask 并行化 groupby 应用在 pandas 数据帧上。

import pandas as pd
import dask.dataframe as dd
def dummy_function(df):
    """
    This function doing some python calculations
    and manipulation to given dataframe
    """
    df["new_column"] = df["existing_column"]
    return df
given_df = pd.DataFrame({"Phone_no": ["123", "234", "123", "578"], "City": ["ABC", "BCD", "ABC", "EFG"]})
ddf = dd.from_pandas(given_df, npartitions=2)

工作代码

output_df = ddf.groupby("Phone_no").apply(dummy_function).compute()

即使这段代码正在运行,但所有核心都没有被使用,在做了一些研究后,我发现由于 dask 的默认计算调度程序是线程化的,所以这是由于 python 的全局解释器锁 (GIL) 所有核心都没有被使用用过的。更多细节在这里。 https://realpython.com/python-gil/

因此我尝试使用“进程”调度程序。

output_df = ddf.groupby("Phone_no").apply(dummy_function).compute(scheduler="processes")

但这会返回以下错误

NotImplementedError('object proxy must define __reduce_ex__()')

我相信 multiprocessing 在某种程度上使用了 pickle,因此这个错误是从那里引起的。 我能找到的最接近的相关问题是这个

https://github.com/GrahamDumpleton/wrapt/issues/102#issue-227792648

其中一条评论建议了以下解决方案

https://github.com/GrahamDumpleton/wrapt/issues/102#issuecomment-456528633

我的问题

上述解决方案有效吗? 如果是,那么如何实现它以及我应该在哪里进行编辑或定义 __reduce__ex 函数?

我什至尝试在 wrapt 模块中编辑 ObjectProxy 类,但不知何故无法使其工作。

或者有没有其他方法可以让进程调度器工作?

或者以 Dask 以外的任何其他方式进行多处理?

【问题讨论】:

    标签: python pickle python-multiprocessing dask


    【解决方案1】:

    您可以尝试查看dask.distributed,以便将任务提交给调度程序以在工作人员上运行。

    您需要从 shell 启动您的 dask-scheduler

    dask-scheduler &
    

    然后,使用dask-scheduler 仍然来自 shell 的地址选择您打算使用的工人数量:

    dask-worker 172.17.0.2:8786 & 
    

    然后,在您的脚本中,您通过传递调度程序的 IP 和端口来创建 client 实例。然后,您可以使用此客户端实例通过dask-scheduler 向工作人员提交任务。

    client=Client('localhost:8786')
    

    然后,您可以计算结果

    output_df=client.submit(result)
    client.shutdown()
    

    【讨论】:

      猜你喜欢
      • 2022-08-02
      • 1970-01-01
      • 2020-09-05
      • 2017-03-30
      • 2013-08-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-06-02
      相关资源
      最近更新 更多