【问题标题】:Airflow spark submit operator气流火花提交操作员
【发布时间】:2021-01-04 09:50:47
【问题描述】:

我给 spark2 提交的命令为:

    value = Varibale.get('value')
    cmd = """
                    spark2-submit --master yarn --deploy-mode cluster 
                    --driver-memory=10G 
                    --conf spark.dynamicAllocation.minExecutors=5
                    --conf spark.dynamicAllocation.maxExecutors=10 
                    --queue test 
                    --executor-memory=10G
                    --executor-cores=2
                    --conf spark.yarn.driver.memoryOverhead=5120
                    --conf spark.driver.maxResultSize=2G 
                    --conf spark.yarn.executor.memoryOverhead=5120 
                    --conf spark.kryoserializer.buffer.max=1000m 
                    --conf spark.executor.extraJavaOptions=-XX:+UseG1GC 
                    --conf spark.network.timeout=15000s
                    --conf spark.executor.heartbeatInterval=1500s 
                    --conf spark.task.maxDirectResultSize=8G 
                    --principal test-host@test
                    --keytab /home/test-host.keytab 
                    --conf spark.ui.view.acls="*" 
                    /home/test/test.py {0}
                    """.format(value)

    test = SSHOperator(task_id='TEST',
                   ssh_conn_id='test-conn',
                   command=cmd
                   )

我希望将其转换为 SparkSubmitOperator。另外,我需要 spark2 提交。

如何将上述转换为 SparkSubmitOperator? 到目前为止,我已经尝试过:

          
                                      
SparkSubmitOperator(task_id='TEST',
conn_id='test-conn',
application=f'/home/test/test.py {0}'.format(value),
executor_cores=2,
executor_memory='10g',
)

【问题讨论】:

    标签: pyspark airflow-scheduler airflow


    【解决方案1】:

    Airflow 中SparkSubmitOperator 所需的选项可以在字典中发送。请记住,字典中的键应与函数的参数名称相同。

    创建以下两个字典:

    base_config = {
        "task_id":"TEST",
        "conn_id":"test-conn",
        "application": "/home/test/test.py"
        "executor-memory":"10G",
        "driver-memory":"10G",
        "executor-cores":2,
        "principal":"test-host@test",
        "keytab":"/home/test-host.keytab",
        "env_vars":{"SPARK_MAJOR_VERSION":2}
        }
    
    spark_config = {
        "spark.master": "yarn",
        "spark.submit.deployMode": "client",
        "spark.yarn.queue":"test",
        "spark.dynamicAllocation.minExecutors":5,
        "spark.dynamicAllocation.maxExecutors":10, 
        "spark.yarn.driver.memoryOverhead":5120,
        "spark.driver.maxResultSize":"2G",
        "spark.yarn.executor.memoryOverhead":5120,
        "spark.kryoserializer.buffer.max":"1000m",
        "spark.executor.extraJavaOptions":"-XX:+UseG1GC",
        "spark.network.timeout":"15000s",
        "spark.executor.heartbeatInterval":"1500s",
        "spark.task.maxDirectResultSize":"8G",
        "spark.ui.view.acls":"*"
    }
    
    SparkSubmitOperator(**base_config,conf=spark_config)
    

    这应该使您的流程配置受到驱动。

    【讨论】:

    • 是使用 spark2-submit 还是仅仅使用 spark-submit?
    • 这取决于您的 Spark 安装 这默认运行 spark-submit。您可以传递环境变量字典。我已将此更改添加到答案中
    猜你喜欢
    • 2021-09-20
    • 1970-01-01
    • 1970-01-01
    • 2022-01-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多