【发布时间】:2019-11-05 13:14:36
【问题描述】:
通常我们在定义 DAG 的同一 python 文件中定义运算符(请参阅this 基本示例)。我也是这样做的。但是我的任务本身就很大,使用自定义运算符,所以我想要一个多态结构的 dag 项目,其中所有使用相同运算符的任务都在一个单独的文件中。为简单起见,让我举一个非常基本的例子。我有一个操作员x 有几个任务。这是我的项目结构;
main_directory
├──tasks
| ├──operator_x
| | └──op_x.py
| ├──operator_y
| : └──op_y.py
|
└──dag.py
op_x.py 有以下方法;
def prepare_task():
from main_directory.dag import dag
t2 = BashOperator(
task_id='print_inner_date',
bash_command='date',
dag=dag)
return t2
dag.py 包含以下代码;
from main_directory.tasks.operator_x import prepare_task
default_args = {
'retries': 5,
'retry_delay': dt.timedelta(minutes=5),
'on_failure_callback': gen_email(EMAIL_DISTRO, retry=False),
'on_retry_callback': gen_email(EMAIL_DISTRO, retry=True),
'start_date': dt.datetime(2019, 5, 10)
}
dag = DAG('test_dag', default_args=default_args, schedule_interval=dt.timedelta(days=1))
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = prepare_task()
现在,当我在气流环境中执行此操作并运行 airflow list_dags 时,我得到了名为 test_dag 的所需 dag,但是当我执行 airflow list_tasks -t test_dag 时,我只得到一个 ID 为 print_date 的任务,而不是定义的任务在 ID 为 print_inner_date 的子目录中。谁能帮我理解我错过了什么?
【问题讨论】:
标签: airflow