【问题标题】:Run airflow DAG for each file为每个文件运行气流 DAG
【发布时间】:2019-03-04 22:26:31
【问题描述】:

所以我在气流中有这个非常好的 DAG,它基本上在二进制文件上运行几个分析步骤(作为气流插件实现)。 DAG 由 ftp 传感器触发,该传感器仅检查 ftp 服务器上是否有新文件,然后启动整个工作流程。

所以目前的工作流程是这样的:DAG 按照定义触发 -> 传感器等待 ftp 上的新文件 -> 执行分析步骤 -> 工作流程结束。

我想要的是这样的:DAG 是 triggerts -> 传感器等待 ftp 上的新文件 -> 对于 ftp 上的每个文件,分析步骤单独执行 -> 每个工作流程单独结束。

如何为 ftp 服务器上的每个文件执行分析工作流,如果服务器上没有文件,只有一个传感器应该等待新文件? 例如,我不想每隔一秒左右启动一个 DAG,因为那时我有很多传感器只是在等待一个新文件。

【问题讨论】:

    标签: python airflow


    【解决方案1】:

    使用 2 个 DAG 将感知步骤与分析步骤分开。

    DAG 1:

    传感器在 ftp 上等待新文件 -> 一旦新文件登陆,使用 TriggerDagRunOperator 触发 DAG 1 本身 -> 使用 TriggerDagRunOperator 触发 DAG 2

    DAG 2:

    对文件执行分析步骤

    【讨论】:

      猜你喜欢
      • 2021-05-03
      • 1970-01-01
      • 2022-11-02
      • 1970-01-01
      • 1970-01-01
      • 2019-01-08
      • 1970-01-01
      • 1970-01-01
      • 2019-08-02
      相关资源
      最近更新 更多