【发布时间】:2021-12-30 07:48:49
【问题描述】:
我们想在 Dag 中的 Dagtrigger 期间从 UI 读取 cli 输入传递到 dag。 我尝试了下面的代码,但它不起作用。在这里,我将输入传递为 {""kpi":"ID123"} 我想在我的函数 get_data_from_bq 中打印这个 ip 值
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from airflow import models
from airflow.models import Variable
from google.cloud import bigquery
from airflow.configuration import conf
LOCATION = Variable.get("HDM_PROJECT_LOCATION")
PROJECT_ID = Variable.get("HDM_PROJECT_ID")
client = bigquery.Client()
kpi='{{ kpi}}'
# default arguments
default_dag_args = {
'start_date':days_ago(0),
'retries': 0,
'project_id': PROJECT_ID
}
# Setting airflow environment varriable,getting hdm_batch_details data and updating it
def get_data_from_bq(**kwargs):
print("op is:")
print(kpi)
#Dag Defination
with models.DAG(
'00_test_sql1',
schedule_interval=None,
default_args=default_dag_args) as dag:
v_run_sql_01 = PythonOperator(
task_id='Run_SQL',
provide_context=True,
python_callable=get_data_from_bq,
location=LOCATION,
use_legacy_sql=False)
v_run_sql_01
注意:我不想使用任何运算符来读取从 cli 传递的数据
【问题讨论】: