【问题标题】:Using dag_run variables in airflow Dag在气流 Dag 中使用 dag_run 变量
【发布时间】:2021-07-22 06:20:40
【问题描述】:

我正在尝试使用气流变量来确定是否执行任务。我已经尝试过了,但它不起作用:

if '{{ params.year }}' == '{{ params.message }}':
     run_this = DummyOperator (
                task_id = 'dummy_dag'
               )

我希望得到一些帮助以使其正常工作。还有没有更好的方法在气流中做这样的事情?

【问题讨论】:

    标签: airflow


    【解决方案1】:

    我认为解决这个问题的一个好方法是使用BranchPythonOperator 根据提供的 DAG 参数动态分支。考虑这个例子:

    使用params 向DAG 提供参数(也可以从UI 完成),在此示例中:{"enabled": True}

    from airflow.decorators import dag, task
    from airflow.utils.dates import days_ago
    from airflow.operators.python import get_current_context, BranchPythonOperator
    
    @dag(
        default_args=default_args,
        schedule_interval=None,
        start_date=days_ago(1),
        catchup=False,
        tags=["example"],
        params={"enabled": True},
    )
    def branch_from_dag_params():
        def _print_enabled():
            context = get_current_context()
            enabled = context["params"].get("enabled", False)
            print(f"Task id: {context['ti'].task_id}")
            print(f"Enabled is: {enabled}")
    
        @task
        def task_a():
            _print_enabled()
    
        @task
        def task_b():
            _print_enabled()
    
    

    BranchPythonOperator 定义一个可调用对象,您将在其中执行条件并返回下一个要执行的任务。您可以从**kwargs 访问执行上下文变量。还请记住,此运算符应返回单个 task_idtask_ids 列表 以跟随下游。这些生成的任务应该总是直接在它的下游。

        def _get_task_run(ti, **kwargs):
            custom_param = kwargs["params"].get("enabled", False)
    
            if custom_param:
                return "task_a"
            else:
                return "task_b"
    
        branch_task = BranchPythonOperator(
            task_id="branch_task",
            python_callable=_get_task_run,
        )
        task_a_exec = task_a()
        task_b_exec = task_b()
        branch_task >> [task_a_exec, task_b_exec]
    
    

    结果是 task_a 被执行,而 task_b跳过

    AIRFLOW_CTX_DAG_OWNER=airflow
    AIRFLOW_CTX_DAG_ID=branch_from_dag_params
    AIRFLOW_CTX_TASK_ID=task_a
    Task id: task_a
    Enabled is: True
    

    如果这对你有用,请告诉我。

    Docs

    【讨论】:

    • 这有助于@NicoE。我并没有真正考虑过BranchPythonOperator。适合我的需要。所以可以通过lkwargscontext 访问参数,对吗?
    • @Kay 很高兴听到它奏效了!简短的回答是肯定的,它也可以从自定义运算符或任何执行上下文中访问。你可以找到另一个例子here
    • 哦,哇,这是一个非常好的。我有一个关于从自定义运算符访问参数的问题,该链接为我提供了有关如何处理它的想法。谢谢@NicoE
    • @kay 感谢您的反馈!如果您觉得它有帮助,非常感谢您为回复点赞。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-08-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多