【问题标题】:Airflow DAG prevents other DAGS from starting气流 DAG 阻止其他 DAGS 启动
【发布时间】:2018-05-10 21:12:49
【问题描述】:

我们正在运行 Airflow v1.9.0,但一个 DAG 出现问题。这个 SSHOperator 运算符 DAG(称为匹配)从 0 6 * * * 开始,通常在一小时内完成。每月一次,我们有一个大数据摄取导致此任务占用 7 小时。不幸的是,当这种情况发生时,DAG 会阻止我们的其他 DAGS 启动,直到它完成。它是在这 7 小时内运行的唯一 DAG。这不是正常行为或我们的其他 DAGS(它们继续运行,其他 DAGS 将启动)。我们找不到任何可能导致此问题的表锁 (PostgreSQL)。

from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
from datetime import datetime, timedelta
from os import path

PATH = '/path/to/code/'

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'start_date': datetime(2018, 1, 24),
    'email': ['myemail@emails.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    }

with open(path.join(PATH, 'matching', 'run_nightly_matching.sh')) as f:
    nightly_matching_command = f.read()

dag = DAG('matching',
          template_searchpath = PATH,
          schedule_interval = '0 6 * * *', #10:00PM
          default_args = default_args)

nightly_matching = SSHOperator(task_id = 'nightly_matching',
                               ssh_conn_id = 'external_server_name',
                               command = nightly_matching_command,
                               do_xcom_push = True,
                               dag = dag)

这是我查看气流数据库时的查询结果。

airflow=# select task_id, dag_id, start_date, execution_date,end_date, duration, state, pool, queue from task_instance where date_trunc('day', start_date)::DATE = '2018-05-09'::DATE order by start_date;

task_id: nightly_matching dag_id: matching start_date: 2018-05-09 06:00:04.486709 execution_date: 2018-05-08 06:00:00 end_date: 2018-05-09 12:50:52.509942 duration: 24648.023233 state: success pool: queue: default

紧随问题 DAG 之后的这个 DAG 计划在 10:40 开始。

task_id: Task1 dag_id: dag2 start_date: 2018-05-09 12:51:04.963004 execution_date: 2018-05-08 10:40:00 end_date: 2018-05-09 12:51:07.060369 duration: 2.097365 state: success pool: queue: default

run_nightly_matching.sh 运行一个 Python 脚本,该脚本使用 psycopg2 连接到我们的数据库并匹配表对。

*** Reading local log.
[2018-05-09 06:00:04,352] {cli.py:374} INFO - Running on host airflow
[2018-05-09 06:00:04,486] {models.py:1197} INFO - Dependencies all met for <TaskInstance: matching.nightly_matching 2018-05-08 06:00:00 [queued]>
[2018-05-09 06:00:04,587] {models.py:1197} INFO - Dependencies all met for <TaskInstance: matching.nightly_matching 2018-05-08 06:00:00 [queued]>
[2018-05-09 06:00:04,588] {models.py:1407} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 2
--------------------------------------------------------------------------------

[2018-05-09 06:00:04,617] {models.py:1428} INFO - Executing <Task(SSHOperator): nightly_matching> on 2018-05-08 06:00:00
[2018-05-09 06:00:04,617] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run matching nightly_matching 2018-05-08T06:00:00 --job_id 274416 --raw -sd DAGS_FOLDER/matching.py']
[2018-05-09 06:00:04,981] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:04,980] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2018-05-09 06:00:05,023] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,022] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2018-05-09 06:00:05,140] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,139] {__init__.py:45} INFO - Using executor LocalExecutor
[2018-05-09 06:00:05,190] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,190] {models.py:189} INFO - Filling up the DagBag from /home/airflow/airflow/dags/matching.py
[2018-05-09 06:00:05,445] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,444] {base_hook.py:80} INFO - Using connection to: external_server_name
[2018-05-09 06:00:05,511] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,509] {transport.py:1687} INFO - Connected (version 2.0, client OpenSSH_6.6.1p1)
[2018-05-09 06:00:05,621] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,620] {transport.py:1687} INFO - Authentication (publickey) successful!
[2018-05-09 12:50:52,431] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 12:50:52,430] {ssh_operator.py:113} INFO - Matching: ('table_Z', 'table_Y')
[2018-05-09 12:50:52,432] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-08 23:00:58.172398
[2018-05-09 12:50:52,433] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-08 23:17:19.273990
[2018-05-09 12:50:52,433] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_Y', 'table_R')
[2018-05-09 12:50:52,433] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-08 23:17:19.274092
[2018-05-09 12:50:52,433] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 03:16:46.339119
[2018-05-09 12:50:52,434] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_X', 'table_Y')
[2018-05-09 12:50:52,434] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 03:16:46.339228
[2018-05-09 12:50:52,434] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 04:49:16.901285
[2018-05-09 12:50:52,435] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_X', 'clients')
[2018-05-09 12:50:52,435] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 04:49:16.901410
[2018-05-09 12:50:52,435] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 05:02:30.502418
[2018-05-09 12:50:52,435] {base_task_runner.py:98} INFO - Subtask: Matching: ('clients', 'table_R')
[2018-05-09 12:50:52,436] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 05:02:30.502494
[2018-05-09 12:50:52,436] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 05:02:47.035880
[2018-05-09 12:50:52,436] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_Y', 'table_B')
[2018-05-09 12:50:52,436] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 05:02:47.035974
[2018-05-09 12:50:52,437] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 05:03:17.464931
[2018-05-09 12:50:52,437] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_E', 'table_Y')
[2018-05-09 12:50:52,437] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 05:03:17.465061
[2018-05-09 12:50:52,437] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 05:43:05.543177
[2018-05-09 12:50:52,438] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_P', 'table_Y')
[2018-05-09 12:50:52,438] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 05:43:05.543298
[2018-05-09 12:50:52,438] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 05:51:42.683216

airflow.cfg (via ansible)

【问题讨论】:

  • 你能发布你的airflow.cfg吗?
  • 是的,我刚刚添加了cfg文件的链接。谢谢

标签: python airflow airflow-scheduler


【解决方案1】:

我认为这可能与执行任务的数量和这一点有关:

with open(path.join(PATH, 'matching', 'run_nightly_matching.sh')) as f:
    nightly_matching_command = f.read()

如果我没记错的话,这个代码位的执行频率可能比计划的要高得多,因为它不是任务的一部分并且无法触发。每次执行 Python 文件本身时都会执行此操作。这可能永远不会发生或经常发生,具体取决于您的设置。由于打开了一个文件句柄,这可能与问题有关。这可能没什么,但值得检查。

此外,我无法查看 cfg 文件,因为它存储在 Dropbox 上,我必须登录。

这里允许并发线程的数量可能会变得有趣。听起来像一个阻塞问题。也可能不是 DAG 本身停止了其他任务,而是负载上升到足以消耗过多负载。没有细节很难说。

【讨论】:

    猜你喜欢
    • 2018-01-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-25
    • 2017-01-01
    相关资源
    最近更新 更多