【发布时间】:2021-02-21 14:03:29
【问题描述】:
我正在尝试使用 Airflow 中的 Hdfs 传感器操作符来触发基于给定路径中的文件到达的下一个任务。但是,当我部署 dag 时,得到的错误为
DAG 损坏:[/usr/local/airflow/dags/test_sensor_dag.py] 语法无效(client.py,第 1473 行) 代码:
from airflow import DAG
from airflow.sensors.hdfs_sensor import HdfsSensor
from datetime import datetime
default_args = {'owner': 'airflow',
'depends_on_past': False,
'provide_context': True,
'start_date': datetime(2020, 3, 19, 0, 0),
'email': ['hr@***.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
'concurrency': 1
}
# run it daily at 6AM
schedule_interval = '00 6 * * *'
dag_name = 'test_sensor_dag'
dag = DAG(
dag_id=dag_name,
default_args=default_args,
schedule_interval=schedule_interval)
source_data_sensor = HdfsSensor(
task_id='source_data_sensor',
filepath='/data/test/file.csv',
poke_interval=10,
timeout=5,
dag=dag
).poke()
success_notification = EmailOperator(to=['hr@***.com'], task_id='success_notification',
subject='[Success:] test for {{ ds }}',
html_content='Successfully ran the DAG',
dag=dag)
source_data_sensor
success_notification.set_upstream(source_data_sensor)
【问题讨论】:
标签: hdfs airflow-scheduler airflow