【问题标题】:Airflow set task instance status as skipped programmatically气流将任务实例状态设置为以编程方式跳过
【发布时间】:2022-02-22 22:56:39
【问题描述】:

我有一个循环来创建任务的列表。该列表的大小是静态的。

        for counter, account_id in enumerate(ACCOUNT_LIST):
            task_id = f"bash_task_{counter}"
            if account_id:
                trigger_task = BashOperator(
                    task_id=task_id,
                    bash_command="echo hello there",
                    dag=dag)
            else:
                trigger_task = BashOperator(
                    task_id=task_id,
                    bash_command="echo hello there",
                    dag=dag)
                trigger_task.status = SKIPPED # is there way to somehow set status of this to skipped instead of having a branch operator?
            trigger_task

我手动尝试过,但无法跳过任务:

        start = DummyOperator(task_id='start')
        task1 = DummyOperator(task_id='task_1')
        task2 = DummyOperator(task_id='task_2')
        task3 = DummyOperator(task_id='task_3')
        task4 = DummyOperator(task_id='task_4')

        start >> task1
        start >> task2

        try:
            start >> task3
            raise AirflowSkipException
        except AirflowSkipException as ase:
            log.error('Task Skipped for task3')
            
        try:
            start >> task4
            raise AirflowSkipException
        except AirflowSkipException as ase:
            log.error('Task Skipped for task4')

【问题讨论】:

  • @alltej 你把它弄糊涂了; AirflowSkipException 必须从您的操作员代码中提出(而不是像您在此处所做的那样在您的 DAG 定义代码中提出)。您在这里尝试做的事情尚不清楚;但也是不可能的(你不能在 DAG 定义期间标记任务的状态,因为它还没有运行)。要以预定义的方式跳过任务,您可以 [1] (更容易) 创建并有条件地将它们连接在一起 [2] 或使用BranchPythonOperator / ShortCircuitOperator
  • 你有代码示例吗?
  • 我试图避免使用 BranchOperator 因为它看起来太过分了

标签: airflow


【解决方案1】:

是的,你需要raise AirflowSkipException

from airflow.exceptions import AirflowSkipException

raise AirflowSkipException

欲了解更多信息,请参阅the source code

【讨论】:

  • 这不起作用。查看手动添加任务的更新问题
  • 您好 alltej... 您正在使用的代码不会被 Airflow 执行,因为您将其置于 DAG 级别。在执行task_instance 期间,raise AirflowSkipException 需要内联或作为执行代码的一部分。因此,它应该在您打算跳过的 Task 的 Operator 的 execute 方法中。
【解决方案2】:

每个 DAG 执行固定数量的任务。这真的很好,这也计划了您的系统应该处理多少最大并行任务,而不会降低下游系统的性能。此外,具有固定数量的任务使其在 Web UI 中可见,并让您指示它们是执行还是跳过。

在下面的代码中,我使用 None 项目初始化列表,然后根据从数据库返回的数据使用值更新列表。在python_callable函数中,检查account_id是否为None然后引发AirflowSkipException,否则执行函数。在 UI 中,任务是可见的,并指示是执行还是跳过(意味着没有account_id

    def execute(account_id):
        if account_id:
            print(f'************Executing task for account_id:{account_id}')
        else:
            raise AirflowSkipException

    def create_task(task_id, account_id):
        return PythonOperator(task_id=task_id,
                              python_callable=execute,
                              op_args=[account_id])


    list_from_dbhook = [1, 2, 3] # dummy list. Get records using DB Hook

    # Need to have some fix size. Need to allocate fix resources or # of tasks.
    # Having this fixed number of tasks will make this tasks to be visible in UI instead of being purely dynamic
    record_size_limit = 5 
    
    ACCOUNT_LIST = [None] * record_size_limit
    for index, account_id_val in enumerate(list_from_dbhook):
        ACCOUNT_LIST[index] = account_id_val

    for idx, acct_id in enumerate(ACCOUNT_LIST):
        task = create_task(f"task_{idx}", acct_id)
        task

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-02-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-07-22
    • 1970-01-01
    相关资源
    最近更新 更多