【问题标题】:Airflow DAGs recreation in each operator每个运营商中的 Airflow DAG 娱乐
【发布时间】:2021-11-25 08:36:15
【问题描述】:

我们使用 Airflow 2.1.4 并在 Kubernetes 上运行。

我们为网络服务器、调度程序分离了 pod,并且我们正在使用 Kubernetes 执行器

我们正在使用各种运算符,例如PythonOperatorKubernetesPodOperator 等。

我们的设置处理约 2000 个客户(企业),每个客户都有自己的 DAG。

我们的代码如下所示:

def get_customers():
    logger.info("querying database to get all customers")
    return sql_connection.query(SELECT id, name, offset FROM customers_table)


customers = get_customers()
for id, name, offset in customers:
    dag = DAG(
        dag_id=f"{id}-{name}",
        schedule_interval=offset,
    )
    with dag:
        first = PythonOperator(..)
        second = KubernetesPodOperator(..)
        third = SimpleHttpOperator(..)
        first >> second >> third

    globals()[id] = dag

上面的 sn-p 是我们已有的简化版本,但 DAG 中有几十个运算符(而不仅仅是三个)。

问题在于,对于每个 DAG 中的每个运算符,我们都会看到 querying database to get all customers 日志 - 这意味着我们查询数据库的方式超出了我们的预期。

数据库不经常更新,我们每天只能更新一次 DAG。 我知道 DAG 被保存在元数据数据库或其他东西中......

  • 有没有办法只构建一次/通过调度程序而不是每个操作员都构建这些 DAG?
  • 我们是否应该更改设计以支持我们的多租户需求?还有比这更好的选择吗?

在我们的例子中,约 60 名运营商 X 约 2,000 名客户 = 约 120,000 次数据库查询。

【问题讨论】:

    标签: airflow airflow-scheduler


    【解决方案1】:

    是的,这完全是意料之中的。 Airflow 会定期解析 DAG(默认为每 30 秒),因此随后会执行任何顶级代码(在解析文件期间执行的代码,而不是运算符的“执行”方法)。

    简单的答案(和最佳实践)是“不要在 DAG 的顶级代码中使用任何繁重的操作”。特别是不要使用数据库查询。但是,如果您想要一些更具体的答案以及如何处理它的可能方法,Airflow 文档中有关于最佳实践的专门章节:

    简而言之,提出了三种方式:

    1. 使用环境变量
    2. 通过外部脚本自动(定期)从数据库生成配置文件(例如 .json),并将其放在 DAG 旁边,并由 DAG 从那里读取 json 文件,而不是使用 sql 查询。
    3. 动态生成许多 DAG python 文件(例如使用 JINJA)还可以使用外部脚本自动并定期生成。

    您可以使用 2) 或 3) 来实现我相信的目标。

    【讨论】:

    • 感谢@JarekPotiuk。是否有“定期”脚本的最佳实践建议? pod 中的另一个容器与共享文件夹?调度程序 pod 中的 cronjob?还有其他想法吗?
    • 它不在 Airlfow 的推荐范围内,因为它在很大程度上取决于您的部署。作为系统,这确实不是“气流”问题,但它是相当外部的,您应该遵循组织中类似事物的最佳实践。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-03-30
    • 2022-01-09
    • 1970-01-01
    • 2018-08-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多