有人遇到同样的问题,所以我会展示我的认识
你需要:
APScheduler # for all work,
django-apscheduler #only for store scheduler jobs in Django ORM, in my case in Postgres
首先,您需要制作一个单例调度程序(来自 APScheduler 和 django-apscheduler),以确保您始终拥有一个调度程序和许多作业
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore
from django_apscheduler.jobstores import register_events
class MetaSingleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(MetaSingleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class SchedulerHandler(metaclass=MetaSingleton):
scheduler = None
def retrieve_scheduler(self):
if self.scheduler is None:
self.scheduler = BackgroundScheduler(timezone=utc)
self.scheduler.add_jobstore(DjangoJobStore(), 'djangojobstore')
self.scheduler.start()
register_events(self.scheduler)
self.scheduler.print_jobs(jobstore='djangojobstore')
return self.scheduler
在我的urls.py 中,我在服务器启动时调用此方法来启动调度程序:
scheduler = SchedulerHandler().retrieve_scheduler()
现在如果我需要添加作业,我只需调用add_job 获取现有调度程序:
def create_cron_job(cron_string, action, rule_id, job_id=''):
cron_values_list = cron_string.split(' ')
if len(cron_values_list) == 5:
print(cron_string, action, rule_id)
scheduler = SchedulerHandler().retrieve_scheduler()
if job_id: #I don't need to update job, so if there is a job created, I just delete it and add a new (but you can use .modify() func)
delete_cron_job(job_id)
job = scheduler.add_job(
server_action_task, #here the func that will be called by scheduler
trigger='cron',
jobstore='djangojobstore',
minute=cron_values_list[0],
hour=cron_values_list[1],
day=cron_values_list[2],
month=cron_values_list[3],
day_of_week=cron_values_list[4],
args=[cron_string, action, rule_id] #here the args to pass to server_action_task
)
return job.id
else:
pass
def delete_cron_job(job_id):
scheduler = SchedulerHandler().retrieve_scheduler()
scheduler.remove_job(job_id)
最后我在项目的某处(在 APIView 或 Serializer 中)添加了对这个方法的调用
def create(self, validated_data):
rule = Rules.objects.create(**validated_data)
start_job_id = create_cron_job(rule.start_rule, 'start', rule.id)
stop_job_id = create_cron_job(rule.stop_rule, 'stop', rule.id)
rule.start_job_id = start_job_id
rule.stop_job_id = stop_job_id
rule.save()
return rule
Mb 这不是最佳解决方案,但它有效
注意,如果您有cascade model deletion,您必须手动从调度程序中删除作业。我通过在我的Rules model 中添加pre_delete signal receiver 来做到这一点
from django.db.models.signals import pre_delete
from django.dispatch import receiver
class Rules(models.Model):
server = models.ForeignKey(Servers, on_delete=models.CASCADE)
start_rule = models.CharField(max_length=40)
stop_rule = models.CharField(max_length=40)
start_job_id = models.CharField(max_length=50, null=True)
stop_job_id = models.CharField(max_length=50, null=True)
rule_type = models.CharField(max_length=12, default='common') # ENUM common / individual
@receiver(pre_delete, sender=Rules)
def delete_rule(instance, **kwargs):
delete_cron_job(instance.start_job_id)
delete_cron_job(instance.stop_job_id)