【发布时间】:2020-10-06 11:21:23
【问题描述】:
我是 Airflow 的新手。我想使用气流操作符进行如下操作。
简单地说,我想从数据库表中读取一些数据,并根据该表中列的值执行不同的任务。
这是我用来获取数据的表。
+-----------+--------+
| task_name | status |
+-----------+--------+
| a | 1 |
| b | 2 |
| c | 4 |
| d | 3 |
| e | 4 |
+-----------+--------+
从上表中,我想选择 status=4 的行,并根据它们的任务名称运行相关的 jar 文件(对于运行 jar 文件,我计划使用 Bash Operator)。我想使用 Airflow 执行此任务。请注意,我使用的是 PostgreSQL。
这是我目前实现的代码。
from airflow.models import DAG
from airflow.operators.postgres_operator import PostgresOperator
from datetime import datetime, timedelta
from airflow import settings
#set the default attributes
default_args = {
'owner': 'Airflow',
'start_date': datetime(2020,10,4)
}
status_four_dag = DAG(
dag_id = 'status_check',
default_args = default_args,
schedule_interval = timedelta(seconds=5)
)
test=PostgresOperator(
task_id='check_status',
sql='''select * from table1 where status=4;''',
postgres_conn_id='test',
database='status',
dag=status_four_dag,
)
我卡在我想检查 task_name 并调用相关 BashOperators 的地方。
感谢您的支持。谢谢。
【问题讨论】:
标签: bash postgresql jar conditional-statements airflow