【问题标题】:How to execute multiple sql files in airflow using PostgresOperator?如何使用 PostgresOperator 在气流中执行多个 sql 文件?
【发布时间】:2022-08-03 05:55:46
【问题描述】:

我的 sql 文件夹中有多个 sql 文件。我不确定如何执行 DAG 中的所有 sql 文件?

  - dags
    - sql
      - dummy1.sql
      - dummy2.sql

对于单个文件,以下代码有效

sql_insert= PostgresOperator(task_id=\'sql_insert\',
                             postgres_conn_id=\'postgres_conn\',
                             sql=\'sql/dummy1.sql\')

    标签: python postgresql airflow airflow-2.x


    【解决方案1】:

    有清单

    sql_insert= PostgresOperator(task_id='sql_insert',
                                 postgres_conn_id='postgres_conn',
                                 sql=['sql/dummy1.sql', 'sql/dummy2.sql'])
    

    或者你可以让它动态

    import glob
    sql_insert= PostgresOperator(task_id='sql_insert',
                                 postgres_conn_id='postgres_conn',
                                 sql=glob.glob("sql/*.sql")]
    

    【讨论】:

    • 感谢你的回答。第一个解决方案有效,但动态解决方案无效。它显示任务成功但未插入数据
    【解决方案2】:

    添加到@Javier Lopez Tomas 的答案,为了动态地执行此操作,您必须使文件的路径相对于您在启动 DAG 时指定的 template_searchpathglob 返回匹配模式的文件的绝对路径。您可以增加以下内容:

    import glob
    sql_insert= PostgresOperator(task_id='sql_insert',
                                 postgres_conn_id='postgres_conn',
                                 sql=[x.split(template_searchpath)[1] for x in glob.glob("sql/*.sql")]
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-03-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-05-14
      • 1970-01-01
      • 2021-12-13
      相关资源
      最近更新 更多