【发布时间】:2018-05-10 21:12:49
【问题描述】:
我们正在运行 Airflow v1.9.0,但一个 DAG 出现问题。这个 SSHOperator 运算符 DAG(称为匹配)从 0 6 * * * 开始,通常在一小时内完成。每月一次,我们有一个大数据摄取导致此任务占用 7 小时。不幸的是,当这种情况发生时,DAG 会阻止我们的其他 DAGS 启动,直到它完成。它是在这 7 小时内运行的唯一 DAG。这不是正常行为或我们的其他 DAGS(它们继续运行,其他 DAGS 将启动)。我们找不到任何可能导致此问题的表锁 (PostgreSQL)。
from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
from datetime import datetime, timedelta
from os import path
PATH = '/path/to/code/'
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'start_date': datetime(2018, 1, 24),
'email': ['myemail@emails.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
with open(path.join(PATH, 'matching', 'run_nightly_matching.sh')) as f:
nightly_matching_command = f.read()
dag = DAG('matching',
template_searchpath = PATH,
schedule_interval = '0 6 * * *', #10:00PM
default_args = default_args)
nightly_matching = SSHOperator(task_id = 'nightly_matching',
ssh_conn_id = 'external_server_name',
command = nightly_matching_command,
do_xcom_push = True,
dag = dag)
这是我查看气流数据库时的查询结果。
airflow=# select task_id, dag_id, start_date, execution_date,end_date, duration, state, pool, queue from task_instance where date_trunc('day', start_date)::DATE = '2018-05-09'::DATE order by start_date;
task_id: nightly_matching
dag_id: matching
start_date: 2018-05-09 06:00:04.486709
execution_date: 2018-05-08 06:00:00
end_date: 2018-05-09 12:50:52.509942
duration: 24648.023233
state: success
pool:
queue: default
紧随问题 DAG 之后的这个 DAG 计划在 10:40 开始。
task_id: Task1
dag_id: dag2
start_date: 2018-05-09 12:51:04.963004
execution_date: 2018-05-08 10:40:00
end_date: 2018-05-09 12:51:07.060369
duration: 2.097365
state: success
pool:
queue: default
run_nightly_matching.sh 运行一个 Python 脚本,该脚本使用 psycopg2 连接到我们的数据库并匹配表对。
*** Reading local log.
[2018-05-09 06:00:04,352] {cli.py:374} INFO - Running on host airflow
[2018-05-09 06:00:04,486] {models.py:1197} INFO - Dependencies all met for <TaskInstance: matching.nightly_matching 2018-05-08 06:00:00 [queued]>
[2018-05-09 06:00:04,587] {models.py:1197} INFO - Dependencies all met for <TaskInstance: matching.nightly_matching 2018-05-08 06:00:00 [queued]>
[2018-05-09 06:00:04,588] {models.py:1407} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 2
--------------------------------------------------------------------------------
[2018-05-09 06:00:04,617] {models.py:1428} INFO - Executing <Task(SSHOperator): nightly_matching> on 2018-05-08 06:00:00
[2018-05-09 06:00:04,617] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run matching nightly_matching 2018-05-08T06:00:00 --job_id 274416 --raw -sd DAGS_FOLDER/matching.py']
[2018-05-09 06:00:04,981] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:04,980] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2018-05-09 06:00:05,023] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,022] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2018-05-09 06:00:05,140] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,139] {__init__.py:45} INFO - Using executor LocalExecutor
[2018-05-09 06:00:05,190] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,190] {models.py:189} INFO - Filling up the DagBag from /home/airflow/airflow/dags/matching.py
[2018-05-09 06:00:05,445] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,444] {base_hook.py:80} INFO - Using connection to: external_server_name
[2018-05-09 06:00:05,511] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,509] {transport.py:1687} INFO - Connected (version 2.0, client OpenSSH_6.6.1p1)
[2018-05-09 06:00:05,621] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,620] {transport.py:1687} INFO - Authentication (publickey) successful!
[2018-05-09 12:50:52,431] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 12:50:52,430] {ssh_operator.py:113} INFO - Matching: ('table_Z', 'table_Y')
[2018-05-09 12:50:52,432] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-08 23:00:58.172398
[2018-05-09 12:50:52,433] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-08 23:17:19.273990
[2018-05-09 12:50:52,433] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_Y', 'table_R')
[2018-05-09 12:50:52,433] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-08 23:17:19.274092
[2018-05-09 12:50:52,433] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 03:16:46.339119
[2018-05-09 12:50:52,434] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_X', 'table_Y')
[2018-05-09 12:50:52,434] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 03:16:46.339228
[2018-05-09 12:50:52,434] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 04:49:16.901285
[2018-05-09 12:50:52,435] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_X', 'clients')
[2018-05-09 12:50:52,435] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 04:49:16.901410
[2018-05-09 12:50:52,435] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 05:02:30.502418
[2018-05-09 12:50:52,435] {base_task_runner.py:98} INFO - Subtask: Matching: ('clients', 'table_R')
[2018-05-09 12:50:52,436] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 05:02:30.502494
[2018-05-09 12:50:52,436] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 05:02:47.035880
[2018-05-09 12:50:52,436] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_Y', 'table_B')
[2018-05-09 12:50:52,436] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 05:02:47.035974
[2018-05-09 12:50:52,437] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 05:03:17.464931
[2018-05-09 12:50:52,437] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_E', 'table_Y')
[2018-05-09 12:50:52,437] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 05:03:17.465061
[2018-05-09 12:50:52,437] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 05:43:05.543177
[2018-05-09 12:50:52,438] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_P', 'table_Y')
[2018-05-09 12:50:52,438] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 05:43:05.543298
[2018-05-09 12:50:52,438] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 05:51:42.683216
【问题讨论】:
-
你能发布你的airflow.cfg吗?
-
是的,我刚刚添加了cfg文件的链接。谢谢
标签: python airflow airflow-scheduler