【问题标题】:Cloud composer import custom plugin to all existing dagsCloud Composer 将自定义插件导入到所有现有的 dag
【发布时间】:2020-08-24 15:53:52
【问题描述】:

我正在使用 Cloud Composer 来安排多个 DAG。这些 DAG 是使用 this method 动态构建的,并且使用自定义插件。

我想知道在添加/修改涉及所有 DAG 的插件时如何进行(假设它为每个 DAG 添加了一个新任务)?

这样做时我们是否需要暂停所有正在运行的 DAG?

到目前为止,我在添加/修改插件时所做的是:

  • 将插件上传到 Composer 集群的 plugins 存储桶中(使用 gcloud composer 命令)
  • 在 Airflow 配置中进行虚拟更新 -> 将虚拟值添加到 airflow.cfg(使用 gcloud composer 命令)

我这样做是为了强制 DAG 暂停,一旦更新完成,DAG 就会恢复,但会使用新插件和新任务(或者如果它不在这个 dagrun 中,那么它就是下一个)。没用吗?

如果你能帮忙,谢谢。

【问题讨论】:

    标签: airflow google-cloud-composer


    【解决方案1】:

    正如architecture diagram 中所述,您可以在其中查看 DAG 和插件代码的 Airflow 网络服务器在 Google 管理的租户项目中运行,而 Airflow 工作人员实际运行你的 DAG,插件代码直接在你的项目中。

    当 DAG/Plugin 放置在 Composer bucket 中时,Airflow 网络服务器(属于租户项目)验证代码并更新 Airflow 数据库中的任何新的调度更改。

    同时,Airflow 调度程序(在您的项目中)要求 Airflow 数据库运行下一个 DAG,并通知 Airflow 工作人员执行计划的工作。 Airflow 工作人员(在您的项目中)然后从 Composer 存储桶中获取 DAG/Plugin 代码并编译它们以运行该特定任务。

    因此,Airflow 网络服务器和 Airflow 工作人员在不同时间单独读取对 DAG/插件代码所做的任何更新。

    • 如果您在 Airflow 网络服务器中没有看到您的新代码,那么当工人在新任务运行时获取新代码时,仍应将其拾取。

    • 因此,您不必为工作人员重新启动 Composer 以获取更改。

    • 您不能强制工作人员在任务执行过程中抓取并重新编译新代码。

    两种方式刷新 Airflow Webserver 以查看插件代码在未更新时的变化:

    1. 通过Console 中的“AIRFLOW CONFIGURATIONS OVERRIDE”选项卡将[webserver] 部分中的reload_on_plugin_change 属性设置为True

    2. 或者,您可以通过“PYPI PACKAGES”控制台选项卡专门添加/删除/更新 PYPI 包。非 PYPI 包更改不会触发 Web 服务器重启。请注意,这还将启动整个 Composer 环境重新启动,这可能需要大约 20 分钟。

    【讨论】:

      猜你喜欢
      • 2021-12-05
      • 2021-12-27
      • 2020-11-02
      • 2020-06-23
      • 1970-01-01
      • 2021-09-28
      • 2020-11-30
      • 1970-01-01
      • 2020-11-25
      相关资源
      最近更新 更多