【问题标题】:Dynamic tasks in airflow based on an external file基于外部文件的气流中的动态任务
【发布时间】:2021-03-06 18:43:33
【问题描述】:

我正在从外部文件中读取元素列表并循环遍历元素以创建一系列任务。

例如,如果文件中有 2 个元素 - [A, B]。将有 2 个系列的任务:

A1 -> A2 ..
B1 -> B2 ...

这个读取元素逻辑不是任何任务的一部分,而是在 DAG 本身中。因此,Scheduler 每天在读取 DAG 文件时调用它多次。我只想在 DAG 运行时调用它。

想知道此类用例是否已有模式?

【问题讨论】:

    标签: airflow airflow-scheduler


    【解决方案1】:

    根据您的要求,如果您要避免多次读取文件,但不介意多次从元数据数据库中读取,那么您可以更改您的方法以使用Variables作为迭代源动态创建任务。

    一个基本示例可能是在PythonOperator 中执行文件读取并设置您稍后将用于迭代的Variables(相同的可调用对象):

    sample_file.json:

    {
        "cities": [ "London", "Paris", "BA", "NY" ]
    }
    

    任务定义:

    from airflow.utils.dates import days_ago
    from airflow.models import Variable
    from airflow.utils.task_group import TaskGroup
    import json
    
    def _read_file():
        with open('dags/sample_file.json') as f:
            data = json.load(f)
            Variable.set(key='list_of_cities',
                         value=data['cities'], serialize_json=True)
            print('Loading Variable from file...')
    
    def _say_hello(city_name):
        print('hello from ' + city_name)
    
    with DAG('dynamic_tasks_from_var', schedule_interval='@once',
             start_date=days_ago(2),
             catchup=False) as dag:
    
        read_file = PythonOperator(
            task_id='read_file',
            python_callable=_read_file
        )
    
    

    然后您可以从该变量中读取并创建动态任务。 (设置default_var 很重要)。 TaskGroup 是可选的。

        # Top-level code
        updated_list = Variable.get('list_of_cities',
                                    default_var=['default_city'],
                                    deserialize_json=True)
        print(f'Updated LIST: {updated_list}')
    
        with TaskGroup('dynamic_tasks_group',
                       prefix_group_id=False,
                       ) as dynamic_tasks_group:
    
            for index, city in enumerate(updated_list):
                say_hello = PythonOperator(
                    task_id=f'say_hello_from_{city}',
                    python_callable=_say_hello,
                    op_kwargs={'city_name': city}
                )
    
    # DAG level dependencies
    read_file >> dynamic_tasks_group
    

    Scheduler 日志中,您只会发现:

    INFO - Updated LIST: ['London', 'Paris', 'BA', 'NY']
    

    Dag 图表视图:

    使用这种方法,top-level code,因此被 Scheduler 连续读取,是对 Variable.get() 方法的调用。如果您需要读取多个变量,请务必记住,建议将它们存储在一个 JSON 值中,以避免不断创建与元数据数据库的连接(例如 article)。

    更新:

    • 对于 11-2021,这种方法被认为是一种“快速而肮脏”的解决方案。
    • 有效吗?是的,完全。是生产质量代码吗?没有。
    • 它有什么问题?每次调度程序解析文件时都会访问数据库,默认情况下每 30 秒访问一次,与您的 DAG 执行无关。有关 Airflow 最佳实践的完整详细信息,top-level code
    • 如何改进?考虑一下关于dynamic DAG generation 的任何推荐方式是否适用于您的需求。

    【讨论】:

    • 好答案!恕我直言,最好从文件中多次读取而不是从变量中读取。但这取决于 Dev 和他的要求
    • 哇,答案很详细!我正在考虑这种方法。通常,在任务之外调用变量也被认为是不好的。但是如果文件调用成本很高,它可能比读取文件更好。
    • 我发现了很多关于动态任务生成的文章和讨论,(大部分来自过时的版本),但总的来说,它们最终都提出了相同的两种方法:从文件中读取或从文件中读取一个变量,然后迭代并创建任务。我不是在谈论动态创建 DAG,我的意思是根据同一个 DAG 中先前任务的结果创建任务。我什至从 Airflow 的一位核心提交者那里找到了一个answer,建议采用这种方法。无论如何,如果有人知道实现这一目标的更好方法,请给我打电话!
    • @NicoE 这些方法有什么让你不满意的地方?你能想出更好的办法吗?
    • 嘿@MehdiLAMRANI,我刚刚更新了答案,包括提到 Airflow 文档的更新的最佳实践部分。那里进一步解释了这个主题,并提供了一些代码替代方案。
    猜你喜欢
    • 2020-04-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-08-16
    相关资源
    最近更新 更多