【发布时间】:2021-11-25 08:36:15
【问题描述】:
我们使用 Airflow 2.1.4 并在 Kubernetes 上运行。
我们为网络服务器、调度程序分离了 pod,并且我们正在使用 Kubernetes 执行器。
我们正在使用各种运算符,例如PythonOperator、KubernetesPodOperator 等。
我们的设置处理约 2000 个客户(企业),每个客户都有自己的 DAG。
我们的代码如下所示:
def get_customers():
logger.info("querying database to get all customers")
return sql_connection.query(SELECT id, name, offset FROM customers_table)
customers = get_customers()
for id, name, offset in customers:
dag = DAG(
dag_id=f"{id}-{name}",
schedule_interval=offset,
)
with dag:
first = PythonOperator(..)
second = KubernetesPodOperator(..)
third = SimpleHttpOperator(..)
first >> second >> third
globals()[id] = dag
上面的 sn-p 是我们已有的简化版本,但 DAG 中有几十个运算符(而不仅仅是三个)。
问题在于,对于每个 DAG 中的每个运算符,我们都会看到 querying database to get all customers 日志 - 这意味着我们查询数据库的方式超出了我们的预期。
数据库不经常更新,我们每天只能更新一次 DAG。 我知道 DAG 被保存在元数据数据库或其他东西中......
- 有没有办法只构建一次/通过调度程序而不是每个操作员都构建这些 DAG?
- 我们是否应该更改设计以支持我们的多租户需求?还有比这更好的选择吗?
在我们的例子中,约 60 名运营商 X 约 2,000 名客户 = 约 120,000 次数据库查询。
【问题讨论】: