【发布时间】:2018-05-08 06:57:37
【问题描述】:
如何在 Airflow 任务中使用 Django 模型?
根据 Airflow 官方文档,Airflow 提供了与数据库交互的钩子(如 MySqlHook / PostgresHook / 等),以后可以在 Operators 中使用这些钩子来执行行查询。附上核心代码片段:
从https://airflow.apache.org/_modules/mysql_hook.html复制
class MySqlHook(DbApiHook):
conn_name_attr = 'mysql_conn_id'
default_conn_name = 'mysql_default'
supports_autocommit = True
def get_conn(self):
"""
Returns a mysql connection object
"""
conn = self.get_connection(self.mysql_conn_id)
conn_config = {
"user": conn.login,
"passwd": conn.password or ''
}
conn_config["host"] = conn.host or 'localhost'
conn_config["db"] = conn.schema or ''
conn = MySQLdb.connect(**conn_config)
return conn
从https://airflow.apache.org/_modules/mysql_operator.html复制
class MySqlOperator(BaseOperator):
@apply_defaults
def __init__(
self, sql, mysql_conn_id='mysql_default', parameters=None,
autocommit=False, *args, **kwargs):
super(MySqlOperator, self).__init__(*args, **kwargs)
self.mysql_conn_id = mysql_conn_id
self.sql = sql
self.autocommit = autocommit
self.parameters = parameters
def execute(self, context):
logging.info('Executing: ' + str(self.sql))
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
hook.run(
self.sql,
autocommit=self.autocommit,
parameters=self.parameters)
我们可以看到,Hook 封装了连接配置,而 Operator 提供了执行自定义查询的能力。
问题:
使用不同的 ORM 来获取和处理数据库对象而不是原始 SQL 非常方便,原因如下:
- 在简单的情况下,ORM 可能是一个更方便的解决方案,请参阅ORM definitions。
- 假设已经建立了像 Django 这样的系统,其中定义了模型及其方法。每次这些模型的模式发生变化时,都需要重写气流原始 SQL 查询。 ORM 为使用此类模型提供了统一的界面。
由于某种原因,没有在 Airflow 任务中使用 ORM 的示例,就挂钩和操作符而言。根据Using Django database layer outside of Django? 问题,需要设置到数据库的连接配置,然后直接在ORM 中执行查询,但是在适当的钩子/运算符之外执行此操作会破坏Airflow principles。这就像使用 "python work_with_django_models.py" 命令调用 BashOperator。
最后,我们想要这个:
那么在这种情况下,最好的做法是什么?我们是否共享 Django ORM / 其他 ORM 的任何钩子 / 运算符?为了使以下代码真实(视为伪代码!):
import os
import django
os.environ.setdefault(
"DJANGO_SETTINGS_MODULE",
"myapp.settings"
)
django.setup()
from your_app import models
def get_and_modify_models(ds, **kwargs):
all_objects = models.MyModel.objects.filter(my_str_field = 'abc')
all_objects[15].my_int_field = 25
all_objects[15].save()
return list(all_objects)
django_op = DjangoOperator(task_id='get_and_modify_models', owner='airflow')
而不是在原始 SQL 中实现此功能。
我认为这是一个非常重要的话题,因为在这种情况下,整个基于 ORM 的框架和流程都无法深入研究 Airflow。
提前致谢!
【问题讨论】:
-
我可以看到在 Airflow 中访问 Django 模型的便利性。但如果可能,我会主张在 Django 中保留该逻辑,然后通过可能通过
BashOperator调用的管理命令公开该逻辑。如果需要,它将更容易独立测试该代码并在 Airflow 之外运行它。但是根据您的用例,我可以看到这变得不必要的多毛。还有一个缺点是异常处理不是那么简单。
标签: django django-models orm airflow apache-airflow