【问题标题】:DAG utilizing ExternalTaskSensor run shows error "env_var.json not found"DAG 利用 ExternalTask​​Sensor 运行显示错误“env_var.json not found”
【发布时间】:2020-11-23 11:56:39
【问题描述】:

我是 Airflow 的新手,遇到了这个问题 -

我在两个单独的文件中有两个 DAG,其中第二个应该在第一个完成后运行。为此,我使用 ExternalTask​​Sensor 检查第一个 DAG 是否已运行。

DAG - 1:

spark_dag = DAG(
    'SPARK_DAG',
        default_args=default_dag_args,
        schedule_interval=timedelta(days=1)
)

submit_spark_PROG_ACTN = BashOperator(
        task_id='Submit_PROG_ACTN',
        bash_command="gcloud dataproc jobs submit spark --cluster <CLUSTER_DETAILS>",
        dag=spark_dag)
        
submit_spark_PROG_ACTN

DAG 2-

create_table_dag = DAG(
        'CreateTableDAG',
        default_args=default_dag_args,
        schedule_interval=timedelta(days=1)
)

CreateTable = BigQueryCreateEmptyTableOperator(
                task_id='BigQueryCreateEmptyTableOperator_task',
                dataset_id='dataset_name',
                table_id='PROG_ACTN',
                project_id='maximal-muse-11256',
                gcs_schema_object='<gcs_details>',
                google_cloud_storage_conn_id='google_cloud_datastore_default',
                dag = create_table_dag) 
                
check_spark = ExternalTaskSensor(
                            task_id = 'check_spark', 
                            external_dag_id = 'SPARK_DAG',
                            external_task_id = 'Submit_PROG_ACTN',
                            execution_delta = timedelta(minutes=10),
                            timeout = 300,
                            dag = create_table_dag)
                            
CreateTable << check_spark

我在 GCP 上的 Cloud Composer 实例上使用 Airflow。作曲家版本 - 作曲家 1.11.1 气流 v1.10.9

现在当 DAG-2 运行时,它处于运行状态很长一段时间,然后失败。 日志 -

*** Reading remote log from gs://us-central1-schedulejobs-1ecdfd20-bucket/logs/CreateTableDAG/check_spark/2020-07-29T00:00:00+00:00/1.log.
[2020-07-30 12:03:38,864] {taskinstance.py:656} INFO - Dependencies all met for <TaskInstance: CreateTableDAG.check_spark 2020-07-29T00:00:00+00:00 [queued]>
[2020-07-30 12:03:38,905] {taskinstance.py:656} INFO - Dependencies all met for <TaskInstance: CreateTableDAG.check_spark 2020-07-29T00:00:00+00:00 [queued]>
[2020-07-30 12:03:38,906] {taskinstance.py:867} INFO -
--------------------------------------------------------------------------------
[2020-07-30 12:03:38,906] {taskinstance.py:868} INFO - Starting attempt 1 of 4
[2020-07-30 12:03:38,906] {taskinstance.py:869} INFO -
--------------------------------------------------------------------------------
[2020-07-30 12:03:38,943] {taskinstance.py:888} INFO - Executing <Task(ExternalTaskSensor): check_spark> on 2020-07-29T00:00:00+00:00
[2020-07-30 12:03:38,944] {base_task_runner.py:131} INFO - Running on host: airflow-worker-5557648985-67qrv
[2020-07-30 12:03:38,944] {base_task_runner.py:132} INFO - Running: ['airflow', 'run', 'CreateTableDAG', 'check_spark', '2020-07-29T00:00:00+00:00', '--job_id', '433', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/CreateTableDAG.py', '--cfg_path', '/tmp/tmp4zjj44qs']
[2020-07-30 12:03:40,939] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:40,939] {settings.py:255} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=570, pid=5986
[2020-07-30 12:03:41,549] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:41,548] {app.py:55} WARNING - Can't load Environment Variable overrides.
[2020-07-30 12:03:41,549] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark Traceback (most recent call last):
[2020-07-30 12:03:41,550] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark   File "/usr/local/lib/airflow/airflow/www/app.py", line 50, in <module>
[2020-07-30 12:03:41,550] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark     with open('/home/airflow/gcs/env_var.json', 'r') as env_var_json:
[2020-07-30 12:03:41,550] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark FileNotFoundError: [Errno 2] No such file or directory: '/home/airflow/gcs/env_var.json'
[2020-07-30 12:03:41,550] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:41,549] {app.py:56} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-07-30 12:03:41,558] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:41,557] {configuration.py:618} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-07-30 12:03:41,575] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:41,575] {settings.py:255} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=570, pid=5986
[2020-07-30 12:03:41,966] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:41,965] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluster.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-07-30 12:03:41,967] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:41,967] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-07-30 12:03:41,968] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:41,967] {dagbag.py:401} INFO - Filling up the DagBag from /home/airflow/gcs/dags/CreateTableDAG.py
[2020-07-30 12:03:41,989] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark /usr/local/lib/airflow/airflow/utils/helpers.py:438: DeprecationWarning: Importing 'BashOperator' directly from 'airflow.operators' has been deprecated. Please import from 'airflow.operators.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
[2020-07-30 12:03:41,990] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark   DeprecationWarning)
[2020-07-30 12:03:41,993] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark /usr/local/lib/airflow/airflow/utils/helpers.py:438: DeprecationWarning: Importing 'PythonOperator' directly from 'airflow.operators' has been deprecated. Please import from 'airflow.operators.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
[2020-07-30 12:03:41,993] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark   DeprecationWarning)
[2020-07-30 12:03:42,480] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark Running <TaskInstance: CreateTableDAG.check_spark 2020-07-29T00:00:00+00:00 [running]> on host airflow-worker-5557648985-67qrv
[2020-07-30 12:03:42,542] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:42,542] {external_task_sensor.py:117} INFO - Poking for SPARK_DAG.submit_spark_PROG_ACTN on 2020-07-28T23:50:00+00:00 ... 
[2020-07-30 12:04:42,622] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:04:42,622] {external_task_sensor.py:117} INFO - Poking for SPARK_DAG.submit_spark_PROG_ACTN on 2020-07-28T23:50:00+00:00 ... 
[2020-07-30 12:05:42,686] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:05:42,686] {external_task_sensor.py:117} INFO - Poking for SPARK_DAG.submit_spark_PROG_ACTN on 2020-07-28T23:50:00+00:00 ... 
[2020-07-30 12:06:42,774] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:06:42,774] {external_task_sensor.py:117} INFO - Poking for SPARK_DAG.submit_spark_PROG_ACTN on 2020-07-28T23:50:00+00:00 ... 
[2020-07-30 12:07:42,845] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:07:42,845] {external_task_sensor.py:117} INFO - Poking for SPARK_DAG.submit_spark_PROG_ACTN on 2020-07-28T23:50:00+00:00 ... 
[2020-07-30 12:08:42,937] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:08:42,936] {external_task_sensor.py:117} INFO - Poking for SPARK_DAG.submit_spark_PROG_ACTN on 2020-07-28T23:50:00+00:00 ... 
[2020-07-30 12:08:42,989] {taskinstance.py:1135} ERROR - Snap. Time is OUT.
Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 972, in _run_raw_tas
    result = task_copy.execute(context=context
  File "/usr/local/lib/airflow/airflow/sensors/base_sensor_operator.py", line 116, in execut
    raise AirflowSensorTimeout('Snap. Time is OUT.'
airflow.exceptions.AirflowSensorTimeout: Snap. Time is OUT
[2020-07-30 12:08:42,991] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:08:42,989] {taskinstance.py:1135} ERROR - Snap. Time is OUT.
[2020-07-30 12:08:42,991] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark Traceback (most recent call last):
[2020-07-30 12:08:42,992] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark   File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 972, in _run_raw_task
[2020-07-30 12:08:42,992] {taskinstance.py:1158} INFO - Marking task as UP_FOR_RETRY
[2020-07-30 12:08:42,992] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark     result = task_copy.execute(context=context)
[2020-07-30 12:08:42,992] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark   File "/usr/local/lib/airflow/airflow/sensors/base_sensor_operator.py", line 116, in execute
[2020-07-30 12:08:42,993] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark     raise AirflowSensorTimeout('Snap. Time is OUT.')
[2020-07-30 12:08:42,993] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark airflow.exceptions.AirflowSensorTimeout: Snap. Time is OUT.
[2020-07-30 12:08:42,993] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:08:42,992] {taskinstance.py:1158} INFO - Marking task as UP_FOR_RETRY
[2020-07-30 12:08:43,030] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark Traceback (most recent call last):
[2020-07-30 12:08:43,030] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark   File "/usr/local/bin/airflow", line 7, in <module>
[2020-07-30 12:08:43,031] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark     exec(compile(f.read(), __file__, 'exec'))
[2020-07-30 12:08:43,031] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark   File "/usr/local/lib/airflow/airflow/bin/airflow", line 37, in <module>
[2020-07-30 12:08:43,031] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark     args.func(args)
[2020-07-30 12:08:43,032] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark   File "/usr/local/lib/airflow/airflow/utils/cli.py", line 75, in wrapper
[2020-07-30 12:08:43,032] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark     return f(*args, **kwargs)
[2020-07-30 12:08:43,032] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark   File "/usr/local/lib/airflow/airflow/bin/cli.py", line 545, in run
[2020-07-30 12:08:43,032] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark     _run(args, dag, ti)
[2020-07-30 12:08:43,033] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark   File "/usr/local/lib/airflow/airflow/bin/cli.py", line 465, in _run
[2020-07-30 12:08:43,033] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark     pool=args.pool,
[2020-07-30 12:08:43,033] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark   File "/usr/local/lib/airflow/airflow/utils/db.py", line 74, in wrapper
[2020-07-30 12:08:43,034] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark     return func(*args, **kwargs)
[2020-07-30 12:08:43,034] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark   File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 972, in _run_raw_task
[2020-07-30 12:08:43,034] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark     result = task_copy.execute(context=context)
[2020-07-30 12:08:43,034] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark   File "/usr/local/lib/airflow/airflow/sensors/base_sensor_operator.py", line 116, in execute
[2020-07-30 12:08:43,034] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark     raise AirflowSensorTimeout('Snap. Time is OUT.')
[2020-07-30 12:08:43,034] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark airflow.exceptions.AirflowSensorTimeout: Snap. Time is OUT.

我无法弄清楚 DAG 未运行的原因。它说明了一些关于 env_var.json 未找到的信息,但我没有在我的代码中的任何地方使用它。我在那里遗漏了什么,还是警告是这里问题的根源?

【问题讨论】:

    标签: python-3.x google-cloud-platform airflow


    【解决方案1】:

    在您的传感器中,您指定了设置timeout=300

    check_spark = ExternalTaskSensor(
                                task_id = 'check_spark', 
                                external_dag_id = 'SPARK_DAG',
                                external_task_id = 'Submit_PROG_ACTN',
                                execution_delta = timedelta(minutes=10),
                                timeout = 300,
                                dag = create_table_dag)
    

    此设置以秒为单位,这就是它在 5 分钟后失败的原因:

    [2020-07-30 12:03:42,542] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:42,542] {external_task_sensor.py:117} INFO - Poking for SPARK_DAG.submit_spark_PROG_ACTN on 2020-07-28T23:50:00+00:00 ...
    ...
    [2020-07-30 12:08:42,989] {taskinstance.py:1135} ERROR - Snap. Time is OUT.
    

    建议增加超时以允许 DAG 1 中的任务完成。

    【讨论】:

    • 谢谢!它消除了错误,但现在传感器超时,因为 POKE 没有收到来自 DAG 1 的响应。你能告诉我是否有错误吗?
    • 我会检查两件事:1)您的 DAG 1 中的任务是否完成并标记为成功? 2) 你的 DAG 1 的 schedule_interval 是每天;但是,在您的 execution_delta 中,您专门在执行日期前 10 分钟等待 DAG 运行,即 "2020-07-28T23:50:00+00:00" ,尝试删除 execution_delta 参数以定位正确的 DAG运行(每日计划)
    • 摆脱 execution_delta 成功了,谢谢! DAG 1 将成功运行,但在 DAG 2 中,传感器将始终在“超时”期后超时。正如您所指出的,execution_delta 定义不正确。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-02-04
    • 2013-06-18
    相关资源
    最近更新 更多