【问题标题】:How to run Spark code in Airflow?如何在 Airflow 中运行 Spark 代码?
【发布时间】:2017-02-11 04:10:08
【问题描述】:

地球人你好! 我正在使用 Airflow 来安排和运行 Spark 任务。 这次我发现的只是 Airflow 可以管理的 python DAG。
DAG 示例:

spark_count_lines.py
import logging

from airflow import DAG
from airflow.operators import PythonOperator

from datetime import datetime

args = {
  'owner': 'airflow'
  , 'start_date': datetime(2016, 4, 17)
  , 'provide_context': True
}

dag = DAG(
  'spark_count_lines'
  , start_date = datetime(2016, 4, 17)
  , schedule_interval = '@hourly'
  , default_args = args
)

def run_spark(**kwargs):
  import pyspark
  sc = pyspark.SparkContext()
  df = sc.textFile('file:///opt/spark/current/examples/src/main/resources/people.txt')
  logging.info('Number of lines in people.txt = {0}'.format(df.count()))
  sc.stop()

t_main = PythonOperator(
  task_id = 'call_spark'
  , dag = dag
  , python_callable = run_spark
)

问题是我不擅长 Python 代码,并且有一些用 Java 编写的任务。我的问题是如何在 python DAG 中运行 Spark Java jar?或者也许还有其他方法可以做到这一点?我发现火花提交:http://spark.apache.org/docs/latest/submitting-applications.html
但我不知道如何将所有内容连接在一起。也许有人以前使用过它并且有工作示例。感谢您的宝贵时间!

【问题讨论】:

    标签: java python apache-spark directed-acyclic-graphs airflow


    【解决方案1】:

    有一个SparkSubmitOperator 在 kubernetes(minikube 实例)上使用 Spark 2.3.1 的示例:

    """
    Code that goes along with the Airflow located at:
    http://airflow.readthedocs.org/en/latest/tutorial.html
    """
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
    from airflow.models import Variable
    from datetime import datetime, timedelta
    
    default_args = {
        'owner': 'user@mail.com',
        'depends_on_past': False,
        'start_date': datetime(2018, 7, 27),
        'email': ['user@mail.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        'end_date': datetime(2018, 7, 29),
    }
    
    dag = DAG(
        'tutorial_spark_operator', default_args=default_args, schedule_interval=timedelta(1))
    
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
        dag=dag)
    
    print_path_env_task = BashOperator(
        task_id='print_path_env',
        bash_command='echo $PATH',
        dag=dag)
    
    spark_submit_task = SparkSubmitOperator(
        task_id='spark_submit_job',
        conn_id='spark_default',
        java_class='com.ibm.cdopoc.DataLoaderDB2COS',
        application='local:///opt/spark/examples/jars/cppmpoc-dl-0.1.jar',
        total_executor_cores='1',
        executor_cores='1',
        executor_memory='2g',
        num_executors='2',
        name='airflowspark-DataLoaderDB2COS',
        verbose=True,
        driver_memory='1g',
        conf={
            'spark.DB_URL': 'jdbc:db2://dashdb-dal13.services.dal.bluemix.net:50001/BLUDB:sslConnection=true;',
            'spark.DB_USER': Variable.get("CEDP_DB2_WoC_User"),
            'spark.DB_PASSWORD': Variable.get("CEDP_DB2_WoC_Password"),
            'spark.DB_DRIVER': 'com.ibm.db2.jcc.DB2Driver',
            'spark.DB_TABLE': 'MKT_ATBTN.MERGE_STREAM_2000_REST_API',
            'spark.COS_API_KEY': Variable.get("COS_API_KEY"),
            'spark.COS_SERVICE_ID': Variable.get("COS_SERVICE_ID"),
            'spark.COS_ENDPOINT': 's3-api.us-geo.objectstorage.softlayer.net',
            'spark.COS_BUCKET': 'data-ingestion-poc',
            'spark.COS_OUTPUT_FILENAME': 'cedp-dummy-table-cos2',
            'spark.kubernetes.container.image': 'ctipka/spark:spark-docker',
            'spark.kubernetes.authenticate.driver.serviceAccountName': 'spark'
            },
        dag=dag,
    )
    
    t1.set_upstream(print_path_env_task)
    spark_submit_task.set_upstream(t1)
    

    使用存储在 Airflow 变量中的变量的代码:

    另外,您需要创建一个新的 spark 连接或编辑现有的“spark_default” 额外的字典{"queue":"root.default", "deploy-mode":"cluster", "spark-home":"", "spark-binary":"spark-submit", "namespace":"default"}:

    【讨论】:

    • 对 Airflow 中的 conf 属性选项有点困惑。从上面的代码看来,自定义 key=value 正在传递给 conf。这怎么可能?也许我不理解这个选项,但我认为它仅适用于通常在 spark-submits 中使用 --conf 标志传递的 spark 配置属性。
    • @horatio1701d conf 键它只是我们传递给 spark_submit 的 --conf 键的数组。它可以是 k8s、spark 或只是我们的自定义键
    • 奇怪的是没有 SparkSubmitHook 示例,因为现在已弃用。我的意思是任何地方。
    【解决方案2】:

    Airflow 从 1.8 版(今天发布)开始,有

    SparkSQLHook 代码 - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py

    SparkSubmitHook 代码 - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py

    请注意,这两个新的 Spark 运算符/钩子在 1.8 版本中位于“contrib”分支中,因此没有(很好地)记录。

    因此您可以使用 SparkSubmitOperator 提交您的 java 代码以供 Spark 执行。

    【讨论】:

    【解决方案3】:

    您应该可以使用BashOperator。保持其余代码不变,导入所需的类和系统包:

    from airflow.operators.bash_operator import BashOperator
    
    import os
    import sys
    

    设置所需路径:

    os.environ['SPARK_HOME'] = '/path/to/spark/root'
    sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))
    

    并添加运算符:

    spark_task = BashOperator(
        task_id='spark_java',
        bash_command='spark-submit --class {{ params.class }} {{ params.jar }}',
        params={'class': 'MainClassName', 'jar': '/path/to/your.jar'},
        dag=dag
    )
    

    您可以使用 Jinja 模板轻松扩展它以提供额外的参数。

    您当然可以通过将 bash_command 替换为适合您情况的模板来针对非 Spark 场景进行调整,例如:

    bash_command = 'java -jar {{ params.jar }}'
    

    并调整params

    【讨论】:

    • 如果我没记错的话,这意味着 Spark 正在运行 Airflow 的同一台机器上运行?在单独的 Spark 集群上运行怎么样?
    • @cryanbhu 如果您的意思是驱动程序,那么答案是肯定的(只要 Spark 在客户端模式下运行)。你可能想看看this question,虽然它不能解决问题。
    猜你喜欢
    • 2017-04-11
    • 2017-08-21
    • 2021-05-26
    • 2017-03-19
    • 2017-03-03
    • 2022-10-19
    • 1970-01-01
    • 2020-01-04
    • 1970-01-01
    相关资源
    最近更新 更多