【发布时间】:2020-02-07 17:41:51
【问题描述】:
我目前有一个类似这样的烧瓶代码 app.py
from services.celery_maker import make_celery
from flask import Flask
from datetime import timedelta
template_dir = os.path.abspath('./build/')
app = Flask(__name__, template_folder=template_dir, static_folder=os.path.abspath("./build/static"))
app.config['ERROR_404_HELP'] = False
app.config['SECRET_KEY'] = config.get("DEFAULT", "SECRET_KEY")
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(minutes=5)
app.config.update(
CELERY_BROKER_URL='redis://127.0.0.1:6379/0',
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/0',
)
app.config['CELERYBEAT_SCHEDULE'] = {
# Executes every minute
'periodic_task-every-minute': {
'task': 'periodic_task',
'schedule': timedelta(seconds=30)
}
}
@app.route('/')
def view():
return "Hello, Flask is up and running!"
@celery.task(name ="periodic_task")
def periodic_task():
print('Hi! from periodic_task')
logger.info("Hello! from periodic task")
if __name__ == "__main__":
app.run(debug = True)
我将芹菜制造商放在另一个文件中,以阻止相关导入错误
services.celery_maker.py
from celery import Celery
def make_celery(app_name=__name__):
backend = "redis://localhost:6379/0"
broker = backend.replace("0", "1")
return Celery(app_name, backend=backend, broker=broker)
celery = make_celery()
celery worker 看到了我的任务,但它根本没有运行我不知道发生了什么
【问题讨论】:
标签: python flask cron celery celerybeat