【问题标题】:How to read dynamic argument airflow operator?如何阅读动态参数气流运算符?
【发布时间】:2019-12-16 01:38:24
【问题描述】:

我是 python 和气流 dag 的新手。 我正在关注答案部分中提到的以下链接和代码。
How to pass dynamic arguments Airflow operator?

我在读取 yaml 文件时遇到问题,在 yaml 文件中我有一些与配置相关的参数。

configs:
    cluster_name: "test-cluster"
    project_id: "t***********"
    zone: "europe-west1-c"
    num_workers: 2
    worker_machine_type: "n1-standard-1"
    master_machine_type: "n1-standard-1"

在 DAG 脚本中,我创建了一个将创建集群的任务,在执行此任务之前,我们需要传递给它的所有参数 default_args 参数,如集群名称、项目 ID 等。为了读取我创建的那些参数一种readYML方法。见下面的代码

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from zipfile import ZipFile
from airflow.contrib.operators import dataproc_operator

from airflow.models import Variable
import yaml

def readYML():
     print("inside readYML")
     global cfg
     file_name = "/home/airflow/gcs/data/cluster_config.yml"
     with open(file_name, 'r') as ymlfile:
          cfg = yaml.load(ymlfile)
     print(cfg['configs']['cluster_name'])

 # Default Arguments
 readYML()

 dag_name = Variable.get("dag_name")

  default_args = {
     'owner': 'airflow',
     'depends_on_past': False,
     'start_date': datetime.now(),
     'email': ['airflow@example.com'],
     'email_on_failure': False,
     'email_on_retry': False,
     'retries': 1,
     'retry_delay': timedelta(minutes=5),
     #'cluster_name': cfg['configs']['cluster_name'],    
    }

    # Instantiate a DAG

    dag = DAG(dag_id='read_yml', default_args=default_args, 
    schedule_interval=timedelta(days=1))

    # Creating Tasks
    Task1 = DataprocClusterCreateOperator(
    task_id='create_cluster',
    dag=dag
    )

在这段代码中没有错误,当我在 GCP 作曲家环境中上传时,没有显示错误通知,但是这个 DAG 不可运行,没有运行按钮。

请参阅随附的屏幕截图。 我正在使用 python 3 和气流 composer-1.7.2-airflow-1.10.2 版本。

【问题讨论】:

  • 这通常表明您的代码有问题,气流无法解析它。如果你有气流环境设置,我建议你试试python your_dag.py 看看它是否给你任何错误信息。我已经可以看到的一件事是dag=DAG 应该是dag=dag
  • @Chengzhi 不,这里我复制粘贴出错了,实际上是创建了我在不同的python文件中创建的任务,所以我从那里复制了代码,反正这种问题我们可以在红色通知警报,您可以在上面的屏幕截图中看到,没有错误。所以这不是我已经检查过的问题
  • 您能否验证气流调度程序是否也在运行?
  • 是的,运行良好
  • 当然代码有错误,你的default_args = {行缩进一次空格到深。除此之外,YML](fdik.org/yml) 与 YAML 不同,并且至少自 2006 年 9 月以来,YAML 文件的推荐扩展名一直是 .yaml。因此,您应该始终在代码中使用 YAML 和 .yaml

标签: python python-3.x yaml airflow google-cloud-composer


【解决方案1】:

根据 Cloud Composer 文档中的 Data Stored in Cloud Storage 页面:

为避免网络服务器错误,请确保网络服务器解析 DAG(未运行)所需的数据在 dags/ 文件夹中可用。否则,网络服务器无法访问数据或加载 Airflow 网络界面。

您的 DAG 正在尝试打开 /home/airflow/gcs/data 下的 YAML 文件,该文件在网络服务器上不存在。将文件放在您的 GCS 存储桶中的 dags/ 文件夹下,调度程序、工作人员和 Web 服务器将可以访问该文件,并且 DAG 将在 Web UI 中运行。

【讨论】:

  • 访问任何配置文件的方法是什么,它不在dag文件夹中,甚至不在composer存储桶中。我们需要从其他存储桶访问配置文件。怎么办?
  • @BhageshArora 访问它的最佳方法是将您的 yaml 文件配置保存在 dags 文件夹中。您可以专门为此在 dags 文件夹中创建一个文件夹。我有同样的问题。上面威尔逊已经回答了这一点。默认情况下,Composer 不会在工作人员之间同步除 dags 文件夹之外的任何其他位置。这就是推荐使用 dags 文件夹的原因
猜你喜欢
  • 1970-01-01
  • 2019-07-10
  • 1970-01-01
  • 1970-01-01
  • 2019-05-13
  • 2023-01-07
  • 2023-03-21
  • 2023-03-15
  • 2021-12-10
相关资源
最近更新 更多