【问题标题】:Multiprocessing on dataset from pyspark returns JVM error来自 pyspark 的数据集的多处理返回 JVM 错误
【发布时间】:2019-04-02 04:32:13
【问题描述】:

我需要在 Jupyter notebook 中并行运行一些聚类算法。我想要并行的集群功能在进行多线程或单独运行时工作。但是,它返回

raise Py4JError("{0} 在 JVM 中不存在".format(name))

当我尝试多处理时。我对多处理没有太多经验,我做错了什么?

聚类代码:

def clustering(ID, df):
    pandas_df = df.select("row", "features", "type") \
    .where(df.type == ID).toPandas()

    print("process " + str(ID) + ": preparing data for clustering")
    feature_series = pandas_df["features"].apply(lambda x: x.toArray())
    objs = [pandas_df, pd.DataFrame(feature_series.tolist())]
    t_df = pd.concat(objs, axis=1)

    print("process " + str(ID) + ": initiating clustering")
    c= #clustering algo here
    print("process " + str(ID) + " DONE!")

    return

多处理代码:

import multiprocessing as mp

k = 4

if __name__ == '__main__':
    pl = []
    for i in range(0,k):
        print("sending process:", i)
        process = mp.Process(target=clustering, args=(i, df))
        jobs.append(process)
        process.start()

    for process in pl:
        print("waiting for join from process")
        process.join()

【问题讨论】:

    标签: pandas pyspark python-multiprocessing


    【解决方案1】:

    错误是由于子进程无法访问相同的内存(pyspark 数据帧所在的内存)。

    通过将 pyspark 数据帧的访问权限放在另一个函数中,首先对数据集进行分区解决,如下所示:

        pandas_df = df.select("row", "features", "type") \
        .where(df.type == ID).toPandas()
    

    然后在分离的 Pandas 数据帧上运行集群。

    【讨论】:

    • 我正在分区如下,然后为 df1 和 df2 启动两个单独的进程,但我仍然收到 JVM 错误df1= fulldf.where(col('_c0').like('%name1%')) df2= fulldf.where(col('_c0').like('%name2%'))
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-04-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多