【发布时间】:2021-08-04 17:07:59
【问题描述】:
一直在寻找访问 dag 运行配置 JSON 的方法,并根据那里的内容动态构建我的实际 DAG 和底层任务。
由于 Jinja 模板对我的使用有些限制,我选择使用“香草”python,使用函数来构建我的任务。
这一切的核心是能够访问配置 JSON,我在这里找到了如何访问:https://stackoverflow.com/a/68455786/5687904
但是,由于我使用的是 Airflow 1.10.12(Composer 1.13.3),因此我不得不使用旧的/不推荐使用的属性来编辑以上内容,所以我要做的是:
conf = dag.get_dagrun(execution_date=dag.latest_execution_date).conf
我让它在一个新的 DAG 中工作以进行测试,这里是剥离任何私有数据的最小工作示例:
from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models import Variable
from dependencies.airflow_utils import (
DBT_IMAGE
)
from dependencies.kube_secrets import (
GIT_DATA_TESTS_PRIVATE_KEY
)
# Default arguments for the DAG
default_args = {
"depends_on_past": False,
"owner": "airflow",
"retries": 0,
"start_date": datetime(2021, 5, 7, 0, 0, 0),
'dataflow_default_options': {
'project': 'my-gcp_project',
'region': 'europe-west1'
}
}
# Create the DAG
dag = DAG("test_conf_strings2", default_args=default_args, schedule_interval=None)
# DBT task creation function
conf = dag.get_dagrun(execution_date=dag.latest_execution_date).conf
def dynamic_full_refresh_strings(conf, arguments):
if conf.get("full-refresh") and 'dbt snapshot' in arguments:
return ' --vars "full-refresh: true"'
elif conf.get("full-refresh"):
return conf.get("full-refresh")
else:
return ""
def task_dbt_run(conf, name, arguments, **kwargs):
return KubernetesPodOperator(
image=DBT_IMAGE,
task_id="dbt_run_{}".format(name),
name="dbt_run_{}".format(name),
secrets=[
GIT_DATA_TESTS_PRIVATE_KEY,
],
startup_timeout_seconds=540,
arguments=[arguments + dynamic_full_refresh_strings(conf, arguments)],
dag=dag,
get_logs=True,
image_pull_policy="Always",
resources={"request_memory": "512Mi", "request_cpu": "250m"},
retries=3,
namespace="default",
cmds=["/bin/bash", "-c"]
)
# DBT commands
dbt_bqtoscore = f"""
{clone_repo_simplified_cmd} &&
cd bigqueryprocessing/data &&
dbt run --profiles-dir .dbt --models execution_engine_filter"""
# Create all tasks for the dag
dbt_run_bqtoscore = task_dbt_run(conf, "bqtoscore", dbt_bqtoscore)
# Task dependencies setting
dbt_run_bqtoscore
但是,当我尝试将此逻辑添加到我的主 DAG 时,我开始收到 'NoneType' object has no attribute 'get'。
在像疯子一样检查一切并做了很多 diffchecker 之后,我确认没有区别。
为了确保我不会完全发疯,我什至复制了我的工作测试 DAG 并将名称更改为其他名称,以免与原始名称冲突。 我又遇到了错误,基本上是 1:1 的 dag 副本!
所以从错误来看,这里发生的事情是conf = dag.get_dagrun(execution_date=dag.latest_execution_date).conf 的相同代码在 dag 中产生不同的结果,唯一的区别是 dag 名称。
在我的工作测试中,我得到了我通过的正确 JSON,或者如果没有通过,则只是 {},因此没有错误。
但在出错的情况下,它是 None 导致问题。
有人知道这里可能会发生什么吗? 或者至少是我应该做哪些测试/调试来深入挖掘的想法?
【问题讨论】:
标签: python airflow google-cloud-composer