【问题标题】:airflow not loading operator tasks from file other then DAG file气流未从 DAG 文件以外的文件加载操作员任务
【发布时间】:2019-11-05 13:14:36
【问题描述】:

通常我们在定义 DAG 的同一 python 文件中定义运算符(请参阅this 基本示例)。我也是这样做的。但是我的任务本身就很大,使用自定义运算符,所以我想要一个多态结构的 dag 项目,其中所有使用相同运算符的任务都在一个单独的文件中。为简单起见,让我举一个非常基本的例子。我有一个操作员x 有几个任务。这是我的项目结构;

main_directory
  ├──tasks
  |  ├──operator_x
  |  |   └──op_x.py
  |  ├──operator_y
  |  :   └──op_y.py
  |   
  └──dag.py 

op_x.py 有以下方法;

def prepare_task():
    from main_directory.dag import dag
    t2 = BashOperator(
        task_id='print_inner_date',
        bash_command='date',
        dag=dag)
    return t2

dag.py 包含以下代码;

from main_directory.tasks.operator_x import prepare_task

default_args = {
    'retries': 5,
    'retry_delay': dt.timedelta(minutes=5),
    'on_failure_callback': gen_email(EMAIL_DISTRO, retry=False),
    'on_retry_callback': gen_email(EMAIL_DISTRO, retry=True),
    'start_date': dt.datetime(2019, 5, 10)
}
dag = DAG('test_dag', default_args=default_args, schedule_interval=dt.timedelta(days=1))

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = prepare_task()

现在,当我在气流环境中执行此操作并运行 airflow list_dags 时,我得到了名为 test_dag 的所需 dag,但是当我执行 airflow list_tasks -t test_dag 时,我只得到一个 ID 为 print_date 的任务,而不是定义的任务在 ID 为 print_inner_date 的子目录中。谁能帮我理解我错过了什么?

【问题讨论】:

    标签: airflow


    【解决方案1】:

    您的代码会创建循环导入。相反,请尝试以下操作:

    op_x.py 应该有:

    def prepare_task(dag):
        t2 = BashOperator(
            task_id='print_inner_date',
            bash_command='date',
            dag=dag)
        return t2
    

    dag.py:

    from main_directory.tasks.operator_x import prepare_task
    
    default_args = {
        'retries': 5,
        'retry_delay': dt.timedelta(minutes=5),
        'on_failure_callback': gen_email(EMAIL_DISTRO, retry=False),
        'on_retry_callback': gen_email(EMAIL_DISTRO, retry=True),
        'start_date': dt.datetime(2019, 5, 10)
    }
    dag = DAG('test_dag', default_args=default_args, schedule_interval=dt.timedelta(days=1))
    
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
        dag=dag)
    
    t2 = prepare_task(dag=dag)
    

    还要确保main_directory 在您的PYTHONPATH 中。

    【讨论】:

      猜你喜欢
      • 2021-05-03
      • 2022-12-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多