【问题标题】:How to process multiple pyspark dataframes in parallel如何并行处理多个pyspark数据帧
【发布时间】:2020-01-29 12:33:55
【问题描述】:

我有一个包含数百万条记录和数百列的 pyspark 数据框(以下示例)

clm1, clm2, clm3
code1,xyz,123
code2,abc,345
code1,qwe,456

我想根据 clm1 将其划分为多个数据帧,即 clm1=code1 的单独数据帧和 clm1=code2 的单独数据帧等等,然后处理它们并将结果写入单独的文件中。我想并行执行此操作以加快该过程。我正在使用以下代码:

S1 = myclass("code1")
S2 = myclass("code2")


t1 = multiprocessing.Process(target=S1.processdata,args=(df,))
t2 = multiprocessing.Process(target=S2.processdata,args=(df,))
t1.start()
t2.start()

t1.join()
t2.join()

但我遇到了错误

Method __getstate__([]) does not exist

如果我使用 threading.Thread 而不是 multiprocessing.Process 它工作正常,但这似乎并没有减少总时间

【问题讨论】:

    标签: python dataframe pyspark


    【解决方案1】:

    关于错误

    方法getstate([])不存在

    这是一个py4j.Py4JExceptionmultiprocessing.Process 出现此错误,因为此模块使用进程。另一方面 threading.Thread 使用使用相同内存的线程,因此它们可以共享数据帧对象。

    也看看那个 SO 问题答案:Multiprocessing vs Threading Python


    一般建议

    我知道您可能是 Spark 世界的新手,我建议您使用我的解决方案来解决您的问题。您询问了如何进行多处理,但如果您有 Spark,这可能不是最佳实践。

    您拥有 Spark-并行处理框架,您无需手动并行化您的任务。

    Spark 设计用于集群中的并行计算,但它在大型单节点中运行得非常好。多处理库在 Python 计算任务中很有用,在 Spark/Pyspark 中,所有计算在 JVM 中并行运行。

    在 python_code.py 中

    import pyspark.sql.functions as f
    
    
    # JOB 1
    df1 = df.filter(f.col('clm1')=='code1')
    ... many transformations
    df1.write.format('..')..
    
    # JOB 2
    df2 = df.filter(f.col('clm1')=='code2')
    ... many transformations
    df2.write.format('..')..
    

    然后使用你的所有核心(* = 所有核心)通过 spark-submit 运行此代码

    # Run application locally on all cores
    ./bin/spark-submit --master local[*] python_code.py
    

    通过这种方法,您可以使用 Spark 的强大功能。这些作业将按顺序执行,但您将拥有: 始终使用 CPU 并行处理 计算时间较短

    【讨论】:

    • 当它开始写入文件No space left on device ERROR TaskMemoryManager: error while calling spill() 处理 1 小时后,我遇到了错误
    • 好的,这个错误,这是一个 Linux 错误。您的机器中没有空间。您可以在 bash 中运行“du -sh”并查看可用空间。同样,错误与问题无关。答案是关于设计的,这个错误以及可能发生的其他错误是实现细节,如果它们在 Stackoverflow 中不存在,它们将在不同的问题中回答
    • @user0204 关于设计,你还好吗?了解 Pyspark(一般 Spark)背后的概念吗?
    • 但是我在 s3 上以无限空间写入文件,而不是在我运行脚本的同一台机器上,那么为什么我会收到此错误?我理解这个概念,但在接受您的回答之前我必须尝试一下。
    • @user0204 你说得对,我没有解释你为什么会出现这个错误:-) 因为这不是我在 Spark 中的答案的最佳实践。现在我更新了关于您的错误的答案。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-01-29
    • 2016-09-15
    • 2015-07-01
    • 1970-01-01
    • 1970-01-01
    • 2020-01-17
    • 1970-01-01
    相关资源
    最近更新 更多