【问题标题】:Apache airflow - automation - how to run spark submit job with paramApache 气流 - 自动化 - 如何使用参数运行 spark 提交作业
【发布时间】:2018-04-21 15:41:50
【问题描述】:

我是火花和气流的新手,试图了解如何使用气流以及工作所需的参数来启动工作。我使用以下 spark-submit 命令在边缘节点中针对特定日期运行特定作业,如下所示,

EXECUTORS_MEM=4G
EXECUTORS_NUM=300
STARTDAY=20180401
ENDDAY=20180401
QUEUE=m
jobname=x

/home/spark/spark-2.1.0-bin-hadoop2.6/bin/spark-submit --verbose --master yarn --deploy-mode client $EXECUTORS_NUM  --executor-memory $EXECUTORS_MEM --executor-cores 1 --driver-memory 8G  --queue $QUEUE --class test.core.Driver --jars $JARS2 abc.jar --config=/a/b/c/test.config --appName=abc --sparkMaster=yarnclient --job=$jobname --days=$STARTDAY,$ENDDAY

如果我创建 .py 类似于下面的代码以在气流中运行作业,您能否告诉我? 这是您应该如何运行作业并传递参数的方式吗?

如何像在边缘节点中启动作业一样传递参数?

如果我将作业自动化为每天运行,我希望开始日期为“t-7”,因此如果今天的日期是 2018 年 4 月 20 日,那么作业的开始日期必须是 2018 年 4 月 13 日。我如何实现它?

###############.py file example ##############
**********************************************

    import BashOperator

    import os
    import sys

    os.environ['SPARK_HOME'] = '/path/to/spark/root'
    sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))

    import os
    import sys


    os.environ['SPARK_HOME'] = '/home/spark/spark-2.1.0-bin-hadoop2.6/bin/'
    sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))
    and add operator:

    spark_task = BashOperator(
        task_id='spark_java',
        bash_command='spark-submit --class test.core.Driver abc.jar',
        params={'EXECUTORS_MEM': '4G', 'EXECUTORS_NUM': '300', 'QUEUE' :'m' , 'jobname' : 'x'},
        dag=dag

)

################### EOF ######################
**********************************************

新的 .py 文件 - 如果有任何问题请纠正我

  • 如何传递参数以运行不同路径的 spark 版本?
  • 传递一个不同路径的jar
  • 如下所示的参数传递方式是否正确?
  • 是否可以手动传递某些开始和结束日期以运行作业?

    from airflow import DAG
    
    from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
    from airflow.utils import timezone
    
    
    DEFAULT_DATE = timezone.datetime(2017, 1, 1)
    
    args = {
        'owner': 'airflow',
        'start_date': DEFAULT_DATE
    }
    dag = DAG('test_dag_id', default_args=args)
    
    _config = {
        'config' : '/a/b/c/d/prod.config' 
        'master' : 'yarn'
        'deploy-mode' : 'client'
        'sparkMaster' : 'yarnclient'
        'class' : 'core.Driver' 
        'driver_classpath': 'parquet.jar',
        'jars': '/a/b/c/d/test.jar',
        'total_executor_cores': 4,
        'executor_cores': 1,
        'EXECUTORS_MEM': '8G',
        'EXECUTORS_NUM': 500,
        'executor-cores' : '1',
        'driver-memory' : '8G',
        'JOB_NAME' : ' ',
        'QUEUE' : ' ',
        'verbose' : ' '
        'start_date' : ' '
        'end_date' : ' '
        ]
    }
    
    operator = SparkSubmitOperator(
        task_id='spark_submit_job',
        dag=dag,
        **_config
    )
    

【问题讨论】:

  • 我在下面的答案中添加了一个示例。如果有什么不清楚的地方请告诉我!

标签: apache-spark airflow


【解决方案1】:

开始日期是您设置一次的日期,它的目的是绝对设置,而不是相对于当前日期。

像这样:

from airflow import DAG

dag = DAG(
    ...
    start_date=datetime.datetime(2018, 4, 13),
)

可以将开始日期设置为像datetime.timedelta(days=7) 这样的增量,但不建议这样做,因为如果您要删除 DAG(包括所有引用,例如 DAG 运行、任务实例等),它会更改开始日期) 并在另一天从头开始再次运行它。最佳实践是 DAG 是幂等的。

为了向 Spark 提交作业,有一个 SparkSubmitOperator 包装了 spark-submit shell 命令。那将是首选方案。也就是说,您基本上可以使用BashOperator 做任何事情,所以这也是一个可行的替代方案。

SparkSubmitOperator 的链接代码对于它接受的每个参数都有很好的记录。您可以使用 application kwarg 指向您的 .jar 文件,使用 conf 传递 Spark 配置。还有一些用于传递执行器核心和内存等信息的 kwargs。您可以使用application_args 将任意参数列表传递给您的 Spark 应用程序。

这是一个使用 SparkSubmitOperator 的示例,该示例从 Airflow 中的单元测试中复制并略微简化。请注意,它使用** 来分解字典中的 kwargs 以初始化 Spark 运算符,但这就是测试的结构。您可以轻松地将每个配置值作为 kwarg 传递。

from airflow import DAG

from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.utils import timezone


DEFAULT_DATE = timezone.datetime(2017, 1, 1)

args = {
    'owner': 'airflow',
    'start_date': DEFAULT_DATE
}
dag = DAG('test_dag_id', default_args=args)

_config = {
    'conf': {
        'parquet.compression': 'SNAPPY'
    },
    'files': 'hive-site.xml',
    'py_files': 'sample_library.py',
    'driver_classpath': 'parquet.jar',
    'jars': 'parquet.jar',
    'packages': 'com.databricks:spark-avro_2.11:3.2.0',
    'exclude_packages': 'org.bad.dependency:1.0.0',
    'repositories': 'http://myrepo.org',
    'total_executor_cores': 4,
    'executor_cores': 4,
    'executor_memory': '22g',
    'keytab': 'privileged_user.keytab',
    'principal': 'user/spark@airflow.org',
    'name': '{{ task_instance.task_id }}',
    'num_executors': 10,
    'verbose': True,
    'application': 'test_application.py',
    'driver_memory': '3g',
    'java_class': 'com.foo.bar.AppMain',
    'application_args': [
        '-f', 'foo',
        '--bar', 'bar',
        '--start', '{{ macros.ds_add(ds, -1)}}',
        '--end', '{{ ds }}',
        '--with-spaces', 'args should keep embdedded spaces',
    ]
}

operator = SparkSubmitOperator(
    task_id='spark_submit_job',
    dag=dag,
    **_config
)

来源:https://github.com/apache/incubator-airflow/blob/f520990fe0b7a70f80bec68cb5c3f0d41e3e984d/tests/contrib/operators/test_spark_submit_operator.py

【讨论】:

  • 我能够构建一个类似于你的结构,但我不明白 .py 文件使用 SparkSubmitOperator 启动 spark 提交命令的部分。从您的示例中,我看到我们可以通过 _config 传递参数,但是如何运行位于特定路径中的 spark 提交,例如 /home/spark/spark-2.1.0-bin-hadoop2.6/bin/ 或者它只运行仅安装在您有气流的边缘节点上的火花 - SparkSubmitOperator 如何启动火花提交?我在我使用的 sprak submit 命令附近添加了我的 .py 文件,如果有任何错误,请帮助我更正脚本?
  • spark home path 应该在气流中指定为 spark 连接的额外参数(您可以使用默认的 as showed by @CTiPKA ):{"queue":"default", "deploy-mode":"cluster", "spark-home":"", "spark-binary":"spark-submit", "namespace":"default"}。 SparkSubmitOperator 只需使用您作为 spark master 提供的连接 ID 运行 spark-submit 命令(请参阅 SparkSubmitHook )。 py_files 仅在使用 PySpark 时使用。
猜你喜欢
  • 1970-01-01
  • 2016-11-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-01-02
  • 2018-05-08
相关资源
最近更新 更多