【问题标题】:read cli input without calling python operator在不调用 python 运算符的情况下读取 cli 输入
【发布时间】:2021-12-30 07:48:49
【问题描述】:

我们想在 Dag 中的 Dagtrigger 期间从 UI 读取 cli 输入传递到 dag。 我尝试了下面的代码,但它不起作用。在这里,我将输入传递为 {""kpi":"ID123"} 我想在我的函数 get_data_from_bq 中打印这个 ip 值

   from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from airflow import models
from airflow.models import Variable
from google.cloud import bigquery
from airflow.configuration import conf

LOCATION          = Variable.get("HDM_PROJECT_LOCATION")
PROJECT_ID        = Variable.get("HDM_PROJECT_ID")
client = bigquery.Client()
kpi='{{ kpi}}'
# default arguments
default_dag_args = {
    'start_date':days_ago(0),
    'retries': 0,
    'project_id': PROJECT_ID
}

# Setting airflow environment varriable,getting hdm_batch_details data and updating it
def get_data_from_bq(**kwargs):
    print("op is:")
    print(kpi)

#Dag Defination
with models.DAG(
        '00_test_sql1',
        schedule_interval=None,
        default_args=default_dag_args) as dag:
        
        v_run_sql_01 = PythonOperator(
        task_id='Run_SQL',
        provide_context=True,
        python_callable=get_data_from_bq,
        location=LOCATION,
        use_legacy_sql=False)

v_run_sql_01

注意:我不想使用任何运算符来读取从 cli 传递的数据

【问题讨论】:

    标签: airflow airflow-scheduler


    【解决方案1】:

    注意:我不想使用任何运算符来读取从 cli 传递的数据

    这是不可能的。只有当有任务要运行时才会创建 Dag Run。

    你应该明白:

    • DAG + 其顶层代码 - 构建由 Tasks 组成的 DAG 结构

    • DAG Run -> 是 DAG 运行的单个实例,其中包含要执行的任务实例。 Dag Run 仅包含属于具有给定“dag run”的 DAG 运行的任务实例。

    您传递的配置是“dag_run.conf”而不是“dag.conf” - 这意味着它仅针对 DagRun 指定,仅对属于它的所有任务实例有效。

    只有任务实例可以访问dag_run.conf

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-02-02
      • 2021-10-31
      • 2016-08-14
      • 1970-01-01
      • 2011-06-26
      • 2013-06-20
      • 1970-01-01
      相关资源
      最近更新 更多