【发布时间】:2017-03-15 14:06:55
【问题描述】:
我需要重新启动 celery 守护程序,但我需要它告诉当前工作人员在他们的任务完成时关闭,然后在旧工作人员仍在关闭时启动一组新工作人员。
守护程序上当前的优雅选项会等待所有任务完成后再重新启动,这在您有长时间运行的作业时没有用。
请不要建议自动重新加载,因为它目前在 4.0.2 中没有记录。
【问题讨论】:
我需要重新启动 celery 守护程序,但我需要它告诉当前工作人员在他们的任务完成时关闭,然后在旧工作人员仍在关闭时启动一组新工作人员。
守护程序上当前的优雅选项会等待所有任务完成后再重新启动,这在您有长时间运行的作业时没有用。
请不要建议自动重新加载,因为它目前在 4.0.2 中没有记录。
【问题讨论】:
好吧,我最终做的是使用 supervisord 和 ansible 来管理这个。
[program:celery_worker]
# Max concurrent task you wish to run.
numprocs=5
process_name=%(program_name)s-%(process_num)s
directory=/opt/worker/main
# Ignore this unless you want to use virtualenvs.
environment=PATH="/opt/worker/main/bin:%(ENV_PATH)s"
command=/opt/worker/bin/celery worker -n worker%(process_num)s.%%h --app=python --time-limit=3600 -c 5 -Ofair -l debug --config=celery_config -E
stdout_logfile=/var/log/celery/%(program_name)s-%(process_num)s.log
user=worker_user
autostart=true
autorestart=true
startretries=99999
startsecs=10
stopsignal=TERM
stopwaitsecs=7200
killasgroup=false
您可以使用 supervisor 来停止/启动工作程序以加载新代码,但它会等待所有工作程序停止后再重新启动它们,这对于长时间运行的作业来说效果不佳。最好只使用 TERM MainProcesses,它会告诉工作人员停止接受工作并在完成后关闭。
ps aux | grep *celery.*MainProcess | awk '{print $2}' | xargs kill -TERM
Supervisor 会在它们死亡时重新启动它们。
当然,在不完全停止所有工作人员的情况下更新依赖项是非常不可能的,这为使用 docker 之类的东西提供了一个非常好的案例。 ;)
【讨论】:
在 Celery4 上,我必须修补基础 Task 类以使其工作。 Source
import signal
from celery import Celery, Task
from celery.utils.log import get_task_logger
logger = get_task_logger('my_logger')
class MyBaseTask(Task):
def __call__(self, *args, **kwargs):
signal.signal(signal.SIGTERM,
lambda signum, frame: logger.info('SIGTERM received,
wait till the task finished'))
return super().__call__(*args, **kwargs)
app = Celery('my_app')
app.Task = MyBaseTask
还有一个patch 可以防止在警告关闭时重新安排
【讨论】:
我们的任务可能会运行长达 48 小时。当我们发布新版本并将新版本部署到生产环境时,您所说的优雅重启非常常见。我们所做的只是向正在运行的工作人员发送 SIGTERM(关闭)信号,然后并行启动一组全新的工作人员。
【讨论】: