【问题标题】:Apache airflow with conditional statements带有条件语句的 Apache 气流
【发布时间】:2020-10-06 11:21:23
【问题描述】:

我是 Airflow 的新手。我想使用气流操作符进行如下操作。

简单地说,我想从数据库表中读取一些数据,并根据该表中列的值执行不同的任务。

这是我用来获取数据的表。

+-----------+--------+
| task_name | status |
+-----------+--------+
| a         |      1 |
| b         |      2 |
| c         |      4 |
| d         |      3 |
| e         |      4 |
+-----------+--------+

从上表中,我想选择 status=4 的行,并根据它们的任务名称运行相关的 jar 文件(对于运行 jar 文件,我计划使用 Bash Operator)。我想使用 Airflow 执行此任务。请注意,我使用的是 PostgreSQL。

这是我目前实现的代码。

from airflow.models import DAG
from airflow.operators.postgres_operator import PostgresOperator
from datetime import datetime, timedelta
from airflow import settings

#set the default attributes
default_args = {
    'owner': 'Airflow',
    'start_date': datetime(2020,10,4)
}

status_four_dag = DAG(
    dag_id = 'status_check',
    default_args = default_args,
    schedule_interval = timedelta(seconds=5)
)

test=PostgresOperator(
    task_id='check_status',
    sql='''select * from table1 where status=4;''',
    postgres_conn_id='test',
    database='status',
    dag=status_four_dag,
)

我卡在我想检查 task_name 并调用相关 BashOperators 的地方。

感谢您的支持。谢谢。

【问题讨论】:

    标签: bash postgresql jar conditional-statements airflow


    【解决方案1】:

    XComs 用于在任务之间传递消息。将用于形成命令的 JAR 文件名和其他参数发送到 xcom 并在后续任务中使用。

    例如,

    check_status >> handle_status
    

    check_status - 从数据库检查状态并将 JAR 文件名和参数写入xcom

    handle_status - 从xcom 中提取 JAR 文件名和参数,形成命令并执行它

    示例代码:

    def check_status(**kwargs):
        if randint(1, 100) % 2 == 0:
            kwargs["ti"].xcom_push("jar_filename", "even.jar")
        else:
            kwargs["ti"].xcom_push("jar_filename", "odd.jar")
    
    
    with DAG(dag_id='new_example', default_args=default_args) as dag:
        t0 = PythonOperator(
            task_id="check_status",
            provide_context=True,
            python_callable=check_status
        )
    
        t1 = BashOperator(
            task_id="handle_status",
            bash_command="""
                jar_filename={{ ti.xcom_pull(task_ids='check_status', key='jar_filename') }}
                echo "java -jar ${jar_filename}"
            """
        )
    
        t0 >> t1
    

    【讨论】:

      猜你喜欢
      • 2021-11-13
      • 1970-01-01
      • 2021-06-19
      • 2019-06-12
      • 2020-08-18
      • 2017-12-02
      • 2017-03-06
      • 2017-07-08
      • 2020-05-24
      相关资源
      最近更新 更多