【问题标题】:Why am I getting transient errors when trying to use DAG.get_dagrun() in Airflow/Google Composer?为什么在 Airflow/Google Composer 中尝试使用 DAG.get_dagrun() 时会出现暂时性错误?
【发布时间】:2021-08-04 17:07:59
【问题描述】:

一直在寻找访问 dag 运行配置 JSON 的方法,并根据那里的内容动态构建我的实际 DAG 和底层任务。

由于 Jinja 模板对我的使用有些限制,我选择使用“香草”python,使用函数来构建我的任务。

这一切的核心是能够访问配置 JSON,我在这里找到了如何访问:https://stackoverflow.com/a/68455786/5687904

但是,由于我使用的是 Airflow 1.10.12(Composer 1.13.3),因此我不得不使用旧的/不推荐使用的属性来编辑以上内容,所以我要做的是:

conf = dag.get_dagrun(execution_date=dag.latest_execution_date).conf

我让它在一个新的 DAG 中工作以进行测试,这里是剥离任何私有数据的最小工作示例:

from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models import Variable
from dependencies.airflow_utils import (
    DBT_IMAGE
)
from dependencies.kube_secrets import (
    GIT_DATA_TESTS_PRIVATE_KEY
)
# Default arguments for the DAG
default_args = {
    "depends_on_past": False,
    "owner": "airflow",
    "retries": 0,
    "start_date": datetime(2021, 5, 7, 0, 0, 0),
    'dataflow_default_options': {
        'project': 'my-gcp_project',
        'region': 'europe-west1'
        }
}

# Create the DAG
dag = DAG("test_conf_strings2", default_args=default_args, schedule_interval=None)
# DBT task creation function
conf = dag.get_dagrun(execution_date=dag.latest_execution_date).conf
def dynamic_full_refresh_strings(conf, arguments):  
    if conf.get("full-refresh") and 'dbt snapshot' in arguments:
        return ' --vars "full-refresh: true"'
    elif conf.get("full-refresh"):
        return conf.get("full-refresh")
    else:
        return ""

def task_dbt_run(conf, name, arguments, **kwargs):
    return KubernetesPodOperator(
    image=DBT_IMAGE,
    task_id="dbt_run_{}".format(name),
    name="dbt_run_{}".format(name),
    secrets=[
        GIT_DATA_TESTS_PRIVATE_KEY,
    ],
    startup_timeout_seconds=540,
    arguments=[arguments + dynamic_full_refresh_strings(conf, arguments)],
    dag=dag,
    get_logs=True,
    image_pull_policy="Always",
    resources={"request_memory": "512Mi", "request_cpu": "250m"},
    retries=3,
    namespace="default",
    cmds=["/bin/bash", "-c"]
)

# DBT commands
dbt_bqtoscore = f"""
    {clone_repo_simplified_cmd} &&
    cd bigqueryprocessing/data &&
    dbt run --profiles-dir .dbt --models execution_engine_filter"""

# Create all tasks for the dag
dbt_run_bqtoscore = task_dbt_run(conf, "bqtoscore", dbt_bqtoscore)

# Task dependencies setting
dbt_run_bqtoscore

但是,当我尝试将此逻辑添加到我的主 DAG 时,我开始收到 'NoneType' object has no attribute 'get'

在像疯子一样检查一切并做了很多 diffchecker 之后,我确认没有区别。

为了确保我不会完全发疯,我什至复制了我的工作测试 DAG 并将名称更改为其他名称,以免与原始名称冲突。 我又遇到了错误,基本上是 1:1 的 dag 副本!

所以从错误来看,这里发生的事情是conf = dag.get_dagrun(execution_date=dag.latest_execution_date).conf 的相同代码在 dag 中产生不同的结果,唯一的区别是 dag 名称。 在我的工作测试中,我得到了我通过的正确 JSON,或者如果没有通过,则只是 {},因此没有错误。 但在出错的情况下,它是 None 导致问题。

有人知道这里可能会发生什么吗? 或者至少是我应该做哪些测试/调试来深入挖掘的想法?

【问题讨论】:

    标签: python airflow google-cloud-composer


    【解决方案1】:

    在主任务之前添加一个任务PythonOperator;它基本上计算 dynamic_full_refresh_strings 返回的内容,并将该信息从第一个任务传递到第二个任务(使用 x_com 推/拉或在 dag_run.conf 中设置或任何其他方式)

    【讨论】:

      猜你喜欢
      • 2020-11-21
      • 2013-03-07
      • 1970-01-01
      • 2023-02-05
      • 1970-01-01
      • 2015-01-15
      • 1970-01-01
      • 2021-12-24
      • 1970-01-01
      相关资源
      最近更新 更多