【问题标题】:How to use Dynamic Task Mapping with TaskGroups如何将动态任务映射与任务组一起使用
【发布时间】:2022-11-14 07:58:17
【问题描述】:

在我的实际 DAG 中,我需要首先获取一个 ID 列表,然后为每个 ID 运行一组任务。

我已经使用动态任务映射将列表传递给单个任务或操作员以使其处理列表,但我们也可以使用任务组来执行此操作吗?

如果我能弄清楚如何在 TaskGroup 级别传递一个变量值,以便它在所有子任务中使用该值,那么我应该能够满足我的要求。

下面应该让你知道我在找什么,只需要帮助让它工作。

from airflow import DAG, XComArg
from datetime import datetime
from airflow.decorators import task
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator


with DAG(
    'dtm_tg_test',
    schedule_interval = None,
    start_date = datetime(2022, 1, 1)
) as dag:

    def getList():
        return [ "Hello", "World" ]

    def printText(text):
        print(text)

    get_list = PythonOperator(
                        task_id = "get_list",
                        python_callable = getList,
                        dag = dag
                    )

    with TaskGroup.partial(
                            group_id = "task_group"
    ).expand(
        list = XComArg(get_list)
    ) as task_group:
        print_text = PythonOperator(
                            task_id = "print_output",
                            python_callable = printText,
                            op_kwargs = { "text": list }
                            dag = dag
                        )
        
        print_again = PythonOperator(
                            task_id = "print_output",
                            python_callable = printText,
                            op_kwargs = { "text": list }
                            dag = dag
                        )

        print_text >> print_again

    get_list >> task_group

【问题讨论】:

    标签: airflow dynamic-programming


    【解决方案1】:

    您可以通过以下示例实现它:

    list_ids = ['45', '48']
    
    @task_group()
    def parent_group(list_ids: List[str]) -> List[TaskGroup]:
        return list(map(build_group_for_id, list_ids))
    
    def build_group_for_id(current_id: str) -> TaskGroup:
        with TaskGroup(group_id=f'group_for_id_{current_id}') as group:
            print_text = PythonOperator(
                                task_id = f"print_output_{current_id}",
                                python_callable = printText,
                                op_kwargs = { "text": current_id }
                                dag = dag
                            )
            
            print_again = PythonOperator(
                                task_id = f"print_output_other_{current_id}",
                                python_callable = printText,
                                op_kwargs = { "text":  current_id}
                                dag = dag
    
            print_text >> print_again
    
        return group
    
    with airflow.DAG(
            "my_dag", default_args=args, schedule_interval=None,
    ) as dag:
    
       DummyOperator(task_id='start_dag') >> parent_group(list_ids())
    
    

    一些解释:

    • 我创建了一个父 taskGroup,名为 parent_group
    • 此父组采用 ID 列表
    • 我添加一个循环并为每个 ID 创建一个 TaskGroup 包含你的 2 Aiflow 任务(打印操作员)
    • 对于与父 ID 相关的 TaskGroupTaskGroup ID 是从它构建的,以便在 DAG 中是唯一的
    • 对于TaskGroup 中的打印操作符,我再次通过当前父 ID 生成任务 ID

    【讨论】:

      猜你喜欢
      • 2016-02-04
      • 2020-08-08
      • 1970-01-01
      • 2021-04-12
      • 2015-10-19
      • 2012-02-16
      • 2011-01-08
      • 1970-01-01
      • 2020-05-15
      相关资源
      最近更新 更多