【问题标题】:How do I add a new dag to a running airflow service?如何向正在运行的气流服务添加新的 dag?
【发布时间】:2018-08-16 21:56:53
【问题描述】:

我有一个气流服务,它当前作为 web 服务器和调度程序的单独 docker 容器运行,两者都由 postgres 数据库支持。我在两个实例之间同步了 dags,并且在服务启动时适当地加载了 dags。但是,如果我在服务运行时将新的 dag 添加到 dag 文件夹(在两个容器上),则 dag 会加载到 dagbag 中,但会在缺少元数据的情况下显示在 web gui 中。我可以在每次更新后运行“airflow initdb”,但这感觉不对。调度程序和网络服务器是否有更好的方式与数据库同步?

【问题讨论】:

  • 你可以设置你的调度服务每隔几分钟重启一次,重启后它应该会获取新的 dag。只需使用airflow scheduler -r 300,这意味着调度程序每 300 秒退出一次,因此如果您将服务设置为始终重新启动调度程序,则每个新 dag 都应在
  • 我刚刚对此进行了测试,在调度程序服务重新启动后,新的 dags 会被正确拾取,您只需等待几秒钟并刷新 webUI/工作流有时 2-3 次,但它适用于我。
  • 如果我将调度程序设置为每 300 秒重新启动一次,我是否会错过任何可能在重新启动期间发生的计划任务?比如,如果我有一个 10:00:00 的计划任务,我在 9:59:59 重新启动并且需要两秒钟才能重新启动,它仍然知道执行这个吗?
  • 这取决于:如果你有 catchup=False,调度器将只运行最近的任务/dag,所以在不切实际的情况下,一个 dag 应该在 9:59:59 开始,一个在 10:00:00,只有 start_time 为 10:00:00 的那个会运行。要解决这个问题,您可以设置 catchup=True。否则,如果您只有 dag start 让我们说每小时一次,您应该看不到任何问题。
  • 您可以通过创建每 5 分钟运行一次的 dag 来轻松测试这一点,然后在 dag 第一次运行后停止调度程序 6 分钟,然后重新启动它。您应该会看到在调度程序未运行时计划启动的 dag 将启动。

标签: python airflow airflow-scheduler


【解决方案1】:

应自动获取 Dag 更新。如果他们没有被捡起,那通常是因为你所做的改变“破坏”了 dag。

要检查新任务是否确实被拾取,请在您的网络服务器上运行:

airflow list_tasks <dag_name> --tree

如果显示 Dag 未找到,则说明有错误。
如果它运行成功,那么它应该会显示您的所有任务,并且当您刷新它时,这些任务应该会在您的气流 ui 中被拾取。

如果那里没有显示新的/更新的任务,请检查您的网络服务器上的 dags 文件夹并确认代码确实正在更新。

【讨论】:

  • 上面应该说的是:airflow list_tasks MY_DAG_NAME --tree
【解决方案2】:

在airlfow.cfg中你可以改变参数来更新

# after how much time a new DAGs should be picked up from the filesystem
min_file_process_interval = 0
dag_dir_list_interval = 60

默认情况下,它将每 300 秒更新一次网络服务器中的代码。您可以根据需要更改。

【讨论】:

    猜你喜欢
    • 2019-01-08
    • 2017-09-06
    • 1970-01-01
    • 1970-01-01
    • 2022-11-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-22
    相关资源
    最近更新 更多