【问题标题】:How to use Prefect's resource manager with a spark cluster如何在 Spark 集群中使用 Prefect 的资源管理器
【发布时间】:2021-09-14 22:06:42
【问题描述】:

我一直在使用 Prefect 进行工作流管理,但被卡住了 与 Prefect 的资源管理器建立和终止 Spark 会话。

我浏览了 Prefects 文档,并提供了 Dusk 的示例:

from prefect import resource_manager
from dask.distributed import Client

@resource_manager
class DaskCluster:
    def init(self, n_workers):
        self.n_workers = n_workers

    def setup(self):
        "Create a local dask cluster"
        return Client(n_workers=self.n_workers)

    def cleanup(self, client):
        "Cleanup the local dask cluster"
        client.close()
        
        
with Flow("example") as flow:
    n_workers = Parameter("n_workers")

    with DaskCluster(n_workers=n_workers) as client:
        some_task(client)
        some_other_task(client)        

但是我不知道如何使用 spark 会话来做同样的事情。

【问题讨论】:

    标签: apache-spark pyspark prefect


    【解决方案1】:

    最简单的方法是在本地模式下使用 Spark:

    from prefect import task, Flow, resource_manager
    
    from pyspark import SparkConf
    from pyspark.sql import SparkSession
    
    @resource_manager
    class SparkCluster:
        def __init__(self, conf: SparkConf = SparkConf()):
            self.conf = conf
    
        def setup(self) -> SparkSession:
            return SparkSession.builder.config(conf=self.conf).getOrCreate()
    
        def cleanup(self, spark: SparkSession):
            spark.stop()
    
    @task
    def get_data(spark: SparkSession):
        return spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
    
    @task(log_stdout=True)
    def analyze(df):
        word_count = df.groupBy('word').count()
        word_count.show()
    
    
    with Flow("spark_flow") as flow:
        conf = SparkConf().setMaster('local[*]')
        with SparkCluster(conf) as spark:
            df = get_data(spark)
            analyze(df)
    
    if __name__ == '__main__':
        flow.run()
    

    您的setup() 方法返回被管理的资源,cleanup() 方法接受setup() 返回的相同资源。在这种情况下,我们创建并返回一个 Spark 会话,然后停止它。您不需要 spark-submit 或任何东西(尽管我发现以这种方式管理依赖项有点困难)。

    扩大规模变得越来越难,这是我仍在努力解决的问题。例如,Prefect 不知道如何序列化 Spark DataFrames 以进行输出缓存或持久化结果。此外,您必须小心将 Dask 执行器与 Spark 会话一起使用,因为它们不能被腌制,因此您必须将执行器设置为使用 scheduler='threads'(请参阅 here)。

    【讨论】:

    • 您是否获得了扩展到 Spark 集群的任何进展?
    • @MiguelRueda 什么样的火花簇?如果它只是一个通用的,那么您应该能够使用客户端/本地模式连接到它,如此处所示,根据需要更改 setMaster()。如果它在 k8s 上使用 spark,则涉及更多。不确定其他种类。
    猜你喜欢
    • 2018-08-23
    • 2016-09-06
    • 1970-01-01
    • 2017-02-02
    • 2021-05-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-01-19
    相关资源
    最近更新 更多