【发布时间】:2018-04-21 15:41:50
【问题描述】:
我是火花和气流的新手,试图了解如何使用气流以及工作所需的参数来启动工作。我使用以下 spark-submit 命令在边缘节点中针对特定日期运行特定作业,如下所示,
EXECUTORS_MEM=4G
EXECUTORS_NUM=300
STARTDAY=20180401
ENDDAY=20180401
QUEUE=m
jobname=x
/home/spark/spark-2.1.0-bin-hadoop2.6/bin/spark-submit --verbose --master yarn --deploy-mode client $EXECUTORS_NUM --executor-memory $EXECUTORS_MEM --executor-cores 1 --driver-memory 8G --queue $QUEUE --class test.core.Driver --jars $JARS2 abc.jar --config=/a/b/c/test.config --appName=abc --sparkMaster=yarnclient --job=$jobname --days=$STARTDAY,$ENDDAY
如果我创建 .py 类似于下面的代码以在气流中运行作业,您能否告诉我? 这是您应该如何运行作业并传递参数的方式吗?
如何像在边缘节点中启动作业一样传递参数?
如果我将作业自动化为每天运行,我希望开始日期为“t-7”,因此如果今天的日期是 2018 年 4 月 20 日,那么作业的开始日期必须是 2018 年 4 月 13 日。我如何实现它?
###############.py file example ##############
**********************************************
import BashOperator
import os
import sys
os.environ['SPARK_HOME'] = '/path/to/spark/root'
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))
import os
import sys
os.environ['SPARK_HOME'] = '/home/spark/spark-2.1.0-bin-hadoop2.6/bin/'
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))
and add operator:
spark_task = BashOperator(
task_id='spark_java',
bash_command='spark-submit --class test.core.Driver abc.jar',
params={'EXECUTORS_MEM': '4G', 'EXECUTORS_NUM': '300', 'QUEUE' :'m' , 'jobname' : 'x'},
dag=dag
)
################### EOF ######################
**********************************************
新的 .py 文件 - 如果有任何问题请纠正我
- 如何传递参数以运行不同路径的 spark 版本?
- 传递一个不同路径的jar
- 如下所示的参数传递方式是否正确?
-
是否可以手动传递某些开始和结束日期以运行作业?
from airflow import DAG from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator from airflow.utils import timezone DEFAULT_DATE = timezone.datetime(2017, 1, 1) args = { 'owner': 'airflow', 'start_date': DEFAULT_DATE } dag = DAG('test_dag_id', default_args=args) _config = { 'config' : '/a/b/c/d/prod.config' 'master' : 'yarn' 'deploy-mode' : 'client' 'sparkMaster' : 'yarnclient' 'class' : 'core.Driver' 'driver_classpath': 'parquet.jar', 'jars': '/a/b/c/d/test.jar', 'total_executor_cores': 4, 'executor_cores': 1, 'EXECUTORS_MEM': '8G', 'EXECUTORS_NUM': 500, 'executor-cores' : '1', 'driver-memory' : '8G', 'JOB_NAME' : ' ', 'QUEUE' : ' ', 'verbose' : ' ' 'start_date' : ' ' 'end_date' : ' ' ] } operator = SparkSubmitOperator( task_id='spark_submit_job', dag=dag, **_config )
【问题讨论】:
-
我在下面的答案中添加了一个示例。如果有什么不清楚的地方请告诉我!
标签: apache-spark airflow