【发布时间】:2019-12-16 01:38:24
【问题描述】:
我是 python 和气流 dag 的新手。
我正在关注答案部分中提到的以下链接和代码。
How to pass dynamic arguments Airflow operator?
我在读取 yaml 文件时遇到问题,在 yaml 文件中我有一些与配置相关的参数。
configs:
cluster_name: "test-cluster"
project_id: "t***********"
zone: "europe-west1-c"
num_workers: 2
worker_machine_type: "n1-standard-1"
master_machine_type: "n1-standard-1"
在 DAG 脚本中,我创建了一个将创建集群的任务,在执行此任务之前,我们需要传递给它的所有参数 default_args 参数,如集群名称、项目 ID 等。为了读取我创建的那些参数一种readYML方法。见下面的代码
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from zipfile import ZipFile
from airflow.contrib.operators import dataproc_operator
from airflow.models import Variable
import yaml
def readYML():
print("inside readYML")
global cfg
file_name = "/home/airflow/gcs/data/cluster_config.yml"
with open(file_name, 'r') as ymlfile:
cfg = yaml.load(ymlfile)
print(cfg['configs']['cluster_name'])
# Default Arguments
readYML()
dag_name = Variable.get("dag_name")
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.now(),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
#'cluster_name': cfg['configs']['cluster_name'],
}
# Instantiate a DAG
dag = DAG(dag_id='read_yml', default_args=default_args,
schedule_interval=timedelta(days=1))
# Creating Tasks
Task1 = DataprocClusterCreateOperator(
task_id='create_cluster',
dag=dag
)
在这段代码中没有错误,当我在 GCP 作曲家环境中上传时,没有显示错误通知,但是这个 DAG 不可运行,没有运行按钮。
请参阅随附的屏幕截图。 我正在使用 python 3 和气流 composer-1.7.2-airflow-1.10.2 版本。
【问题讨论】:
-
这通常表明您的代码有问题,气流无法解析它。如果你有气流环境设置,我建议你试试
python your_dag.py看看它是否给你任何错误信息。我已经可以看到的一件事是dag=DAG应该是dag=dag -
@Chengzhi 不,这里我复制粘贴出错了,实际上是创建了我在不同的python文件中创建的任务,所以我从那里复制了代码,反正这种问题我们可以在红色通知警报,您可以在上面的屏幕截图中看到,没有错误。所以这不是我已经检查过的问题
-
您能否验证气流调度程序是否也在运行?
-
是的,运行良好
-
当然代码有错误,你的
default_args = {行缩进一次空格到深。除此之外,YML](fdik.org/yml) 与 YAML 不同,并且至少自 2006 年 9 月以来,YAML 文件的推荐扩展名一直是.yaml。因此,您应该始终在代码中使用 YAML 和.yaml。
标签: python python-3.x yaml airflow google-cloud-composer