【问题标题】:Scheduling thousands of tasks with Airflow使用 Airflow 调度数千个任务
【发布时间】:2020-06-19 09:44:32
【问题描述】:

我们正在考虑将 Airflow 用于需要每天对外部 API 进行数千次调用以下载外部数据的项目,其中每次调用可能需要几分钟时间。

我们正在考虑的一个选项是为每个不同的 API 调用创建一个任务,但这会导致数千个任务。在 UI 中渲染所有这些任务将具有挑战性。我们还担心调度程序,它可能会处理这么多任务。

另一种选择是只有几个并行的长时间运行的任务,然后在这些任务中实现我们自己的调度程序。我们可以在PythonOperator中添加一个自定义代码,它会查询数据库并决定接下来调用哪个API。

也许 Airflow 不太适合这样的用例,在 Airflow 之外实现这样的系统会更容易更好吗?有没有人有在 Airflow 中运行数千个任务的经验,并且可以阐明上述用例的优缺点?

【问题讨论】:

  • 这不是您问题的完整答案,但我认为如果您在 python 进程中实现调度程序,那么在气流中运行它并没有太多附加值,因为调度不会真正适用(假设我理解正确)
  • @qfwfq 这也是我的想法。除非,我们可以在 Airflow 中运行数千个任务而不会出现任何问题,在这种情况下,不需要单独的调度程序。
  • 因此,只要将数千个任务适当地分组到 DAG 中,我认为没有问题。您可能遇到的唯一问题是正在执行的这些任务的及时性。如果您有特定的 SLA 要求,您可能会让一些任务在其他任务之后排队一段时间。您是否有理由担心在 Kube 等可扩展的平台上运行它?

标签: airflow airflow-scheduler


【解决方案1】:

每次调用一个任务会杀死 Airflow,因为它仍然需要在每次心跳时检查每个任务的状态 - 即使任务(工作人员)的处理是分开的,例如在 K8s 上。

不确定您计划在哪里运行 Airflow,但如果在 GCP 上并且下载时间不超过 9 分钟,您可以使用以下方法:

task (PythonOperator) -> pubsub -> cloud function (to retrieve) -> pubsub -> function (to save result to backend).

可能不需要后一个函数,但我们(重新)使用通用且简单的“bigquery streamer”。

最后,您在下游 AF 任务 (PythonSensor) 中查询后端的结果数量,并与发布的请求数量进行比较。

当我们最大限度地提高并行性时,我们非常有效地处理了对我们托管在 GCP 上的第三方系统的 100K API 调用。 GCF 的好处是您可以调整架构以使用和并发,而不是配置 VM 或容器来运行任务。

【讨论】:

    猜你喜欢
    • 2020-03-14
    • 1970-01-01
    • 1970-01-01
    • 2012-10-06
    • 2015-05-02
    • 1970-01-01
    • 1970-01-01
    • 2023-02-19
    • 1970-01-01
    相关资源
    最近更新 更多