我认为您对重新考虑任务如何写入数据库的想法是正确的。
谨慎的做法是让 Flask 路由将任务附加到队列中。
一旦任务在队列中,Flask 路由就可以返回,因此 Web 请求上下文将被关闭。
然后,您可以在后台运行另一个 python(非烧瓶)程序,该程序不断从该队列中读取数据,并从中弹出最新的任务,然后执行这些任务并将它们保存到数据库中。
这可以使用也许您已经使用 Celery 调用异步函数来完成?
这是一篇关于如何创建这样的 Celery 任务并将其与 Flask 路由一起使用的博文
https://blog.miguelgrinberg.com/post/using-celery-with-flask
你是对的,当 web 请求上下文关闭时,Flask-SQLAlchemy 会断开数据库会话,这会在烧瓶路由返回时发生。
为了防止 Flask-SQLAlchemy 关闭你的会话,你不应该将你的 session 从 Flask 路由传递到异步 Celery 函数。相反,您应该在 celery 异步函数中“创建一个新会话”(调用会话注册表)。
事实上,这是在 Web 应用程序中使用 SQLAlchemy 会话时的一般原则。您希望避免传递会话对象。 SQLAlchemy 通过使用注册表模式创建返回“线程本地”会话对象的会话注册表来提升此主体。这种“线程本地”会话的想法意味着,当您从会话注册表中获取会话时,您确保该会话仅在当前线程中使用。由于您最终要创建一个新线程来执行长时间运行的任务,因此您可以依靠 Session Registry 为您提供一个线程,该线程不会受到 Web 请求关闭的影响,因为 Web 请求正在另一个线程中处理.
Flask-SQLAlchemy 默认实现了这个 Session Registry。例如,您可能有一个路线和 celery 任务,如下所示:
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from celery import Celery
app = Flask(__name__)
db = SQLAlchemy(app)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def do_stuff_that_takes_10_minutes(payload):
session = db.session # Gets a thread local session from the Session Registry that is different from the session obj you used in the route because this function will be executed in its own thread.
# some long running task here
return result
@app.route('/long_running_task', methods=['GET'])
def long_running_task():
session = db.session # Get a thread local session from the Session Registry that will close once this web request is complete
# use this session here and do stuff
# Call this function, which Celery will spawn a new thread to do
# Don't pass the above session object into this function, we want this
# session obj to stay only within this thread.
do_stuff_that_takes_10_minutes.delay()
return "Executing task, could take 10 minutes or more."