【问题标题】:Make sure only one worker launches the apscheduler event in a pyramid web app running multiple workers确保只有一个工作人员在运行多个工作人员的金字塔网络应用程序中启动 apscheduler 事件
【发布时间】:2013-04-17 06:48:52
【问题描述】:

我们有一个使用金字塔制作的网络应用程序,并通过 gunicorn+nginx 提供服务。它适用于 8 个工作线程/进程

我们需要工作,我们选择了调度程序。这是我们启动它的方式

from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from apscheduler.scheduler import Scheduler

rerun_monitor = Scheduler()
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)

问题是 gunicorn 的所有工作进程都选择了调度程序。我们尝试实现文件锁定,但它似乎不是一个足够好的解决方案。确保在任何给定时间只有一个工作进程选择预定事件并且没有其他线程在下一个JOB_INTERVAL 之前选择它的最佳方法是什么?

即使我们决定稍后切换到 apache2+modwsgi,该解决方案也需要使用 mod_wsgi。它需要与作为服务员的单进程开发服务器一起使用。

来自赏金赞助者的更新

我正面临着 OP 描述的相同问题,只是使用 Django 应用程序。如果原始问题,我很确定添加此细节不会有太大变化。出于这个原因,为了获得更多的知名度,我还用django标记了这个问题。

【问题讨论】:

  • 工作从哪里来?网络请求有时会添加新作业吗?
  • 没有。它只是一个监视资源并根据资源状态采取行动的作业。资源状态由请求修改。
  • 此外,调度程序作业被添加到应用程序的__init__.py
  • 哇,我在我的应用程序中做了完全相同的事情,只是我没有预见到这个问题,因为我只是和 Waitress 一起开发。密切关注这篇文章!

标签: django pyramid wsgi gunicorn apscheduler


【解决方案1】:

因为 Gunicorn 开始时有 8 个工作人员(在您的示例中),所以这 forks 应用程序 8 次分为 8 个进程。这 8 个进程是从 Master 进程派生出来的,该进程监控每个进程的状态并能够添加/删除工作人员。

每个进程都会获得您的 APScheduler 对象的副本,该对象最初是您的主进程的 APScheduler 的精确副本。这导致每个“nth”工作人员(进程)总共执行每个作业“n”次。

解决此问题的方法是使用以下选项运行 gunicorn:

env/bin/gunicorn module_containing_app:app -b 0.0.0.0:8080 --workers 3 --preload

--preload 标志告诉 Gunicorn “在分叉工作进程之前加载应用程序”。通过这样做,每个工作人员“获得了应用程序的副本,该副本已由 Master 实例化,而不是实例化应用程序本身”。这意味着以下代码仅在 Master 进程中执行一次:

rerun_monitor = Scheduler()
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)

此外,我们需要将 jobstore 设置为 :memory: 以外的任何内容。这样,虽然每个 worker 都是自己独立的进程,但无法与其他 7,通过使用本地数据库(而不是内存),我们保证作业存储上 CRUD 操作的单点真实性。

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

rerun_monitor = Scheduler(
    jobstores={'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')})
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)

最后,我们要使用 BackgroundScheduler,因为它实现了start()。当我们在 BackgroundScheduler 中调用start() 时,会在后台启动一个新线程,该线程负责调度/执行作业。这很重要,因为请记住在步骤 (1) 中,由于我们的 --preload 标志,我们只在 Master Gunicorn 进程中执行一次 start() 函数。根据定义,分叉的进程不会继承其父进程的线程,因此每个工作进程都不会运行 BackgroundScheduler 线程。

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

rerun_monitor = BackgroundScheduler(
    jobstores={'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')})
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)

由于所有这些,每个 Gunicorn worker 都有一个 APScheduler 被欺骗进入“STARTED”状态,但实际上并没有运行,因为它丢弃了它的父线程!每个实例还能够更新作业存储数据库,只是不执行任何作业!

查看flask-APScheduler 了解在网络服务器(如 Gunicorn)中运行 APScheduler 并为每个作业启用 CRUD 操作的快速方法。

【讨论】:

  • 答案确实有道理,我可以确认我的代码的.start() 部分在使用--preload 完成时只被调用一次,但有些任务确实被触发了两次(甚至认为我的worker 配置是 -w 4)
  • 如果你有一个 gunicorn python 配置文件,你可以创建一个名为 on_starting 的函数,它只会由 gunicorn 主进程执行,而不是由工作进程执行。您还可以在那里创建并启动调度程序。以github.com/mlsecproject/gglsbl-rest/blob/master/config.py 为例。
  • env/bin/gunicorn module_containing_app:app -b 0.0.0.0:8080 --workers 3 --preload 如果我有域名,应该是env/bin/gunicorn module_containing_app:app -b domain.example.com--workers 3 --preload 吗?另外,我没有文件夹env/bin/gunicorn
【解决方案2】:

我找到了一个适用于 Django 项目的修复程序,该项目有一个非常相似的问题。我只是在调度程序第一次启动时绑定一个 TCP 套接字并随后检查它。我认为以下代码只需稍作调整也可以为您工作。

import sys, socket

try:
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(("127.0.0.1", 47200))
except socket.error:
    print "!!!scheduler already started, DO NOTHING"
else:
    from apscheduler.schedulers.background import BackgroundScheduler
    scheduler = BackgroundScheduler()
    scheduler.start()
    print "scheduler started"

【讨论】:

  • 我不知道你更新了这个问题。我根本无法解决问题,我改用芹菜。你可能想考虑一下,因为我觉得一旦你开始了解芹菜,它会给你更多的力量。您可以选择使用 celery beat 运行计划作业。
  • 但是我接受你的回答是因为你似乎已经找到了解决方案,并且将来它可能会对人们有所帮助
  • @shortfellow 我更新了这个问题,因为在提供了赏金之后,我意识到只要稍加改动也可以很好地描述我的用例,而不会过多地改变你的问题。如果我的回答没有帮助,请不要接受。芹菜建议值得一看,谢谢。
  • 在我看来,Celery 和 APScheduler 之间的区别在于 APScheduler 在动态添加和删除 cron 类型的作业方面提供了更多的灵活性,而从 Celerybeat 添加和删除作业需要重新配置和重新启动:@ 987654321@ 与 pythonadventures.wordpress.com/2013/08/06/apscheduler-examples(参见 cron 样式)
  • @GeekSambhu 不知道抱歉
【解决方案3】:

简短的回答:你不能正确地做到这一点而没有后果。

我以 Gunicorn 为例,但它对于 uWSGI 基本相同。运行多个进程时有各种 hack,仅举几例:

  1. 使用--preload 选项
  2. 使用on_starting钩子启动APScheduler后台调度程序
  3. 使用when_ready钩子启动APScheduler后台调度程序

它们在一定程度上起作用,但可能会出现以下错误:

  1. worker 经常超时
  2. 没有作业时调度程序挂起https://github.com/agronholm/apscheduler/issues/305

APScheduler 设计为在单个进程中运行,它可以完全控制将作业添加到作业存储的过程。它使用threading.Eventwait()set() 方法进行协调。如果它们由不同的进程运行,则协调将不起作用。

可以在 Gunicorn 中以单个进程运行它。

  1. 只使用一个工作进程
  2. 使用post_worker_init钩子启动调度器,这将确保调度器只在工作进程而不是主进程中运行

作者还指出,多个进程共享作业存储量是不可能的。 https://apscheduler.readthedocs.io/en/stable/faq.html#how-do-i-share-a-single-job-store-among-one-or-more-worker-processes他还提供了一个使用RPyC的解决方案。

虽然使用 REST 接口包装 APScheduler 是完全可行的。您可能需要考虑将其作为一个独立的应用程序提供给一名工作人员。换句话说,如果您有其他端点,请将它们放在可以使用多个工作器的另一个应用程序中。

【讨论】:

    猜你喜欢
    • 2013-11-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-08-12
    • 2017-11-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多