如前文所述,Arbiter是gunicorn master进程的核心。Arbiter主要负责管理worker进程,包括启动、监控、杀掉Worker进程;同时,Arbiter在某些信号发生的时候还可以热更新(reload)App应用,或者在线升级gunicorn。Arbiter的核心代码在一个文件里面,代码量也不大,源码在此:https://github.com/benoitc/gunicorn。
Arbiter主要有以下方法:
setup:
处理配置项,最重要的是worker数量和worker工作模型
init_signal:
注册信号处理函数
handle_xxx:
各个信号具体的处理函数
kill_worker,kill_workers:
向worker进程发信号
spawn_worker, spawn_workers:
fork出新的worker进程
murder_workers:
杀掉一段时间内未响应的worker进程
manage_workers:
根据配置文件的worker数量,以及当前active的worker数量,决定是要fork还是kill worker进程
reexec:
接收到信号SIGUSR2调用,在线升级gunicorn
reload:
接收到信号SIGHUP调用,会根据新的配置新启动worker进程,并杀掉之前的worker进程
sleep:
在没有信号处理的时候,利用select的timeout进行sleep,可被唤醒
wakeup:
通过向管道写消息,唤醒进程
run:
主循环
Arbiter真正被其他代码(Application)调用的函数只有__init__和run方法,在一句代码里:
Arbiter(self).run()
上面代码中的self即为Application实例,其中__init__调用setup进行配置项设置。下面是run方法伪代码
def run() self.init_signal() self.LISTENERS = create_sockets(self.cfg, self.log) self.manage_workers() while True: if no signal in SIG_QUEUE self.sleep() else: handle_signal()
关于fork子进程
fork子进程的代码在 spawn_worker, 源码如下:
1 def spawn_worker(self): 2 self.worker_age += 1 3 worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS, 4 self.app, self.timeout / 2.0, 5 self.cfg, self.log) 6 self.cfg.pre_fork(self, worker) 7 pid = os.fork() 8 if pid != 0: 9 self.WORKERS[pid] = worker 10 return pid 11 12 # Process Child 13 worker_pid = os.getpid() 14 try: 15 util._setproctitle("worker [%s]" % self.proc_name) 16 self.log.info("Booting worker with pid: %s", worker_pid) 17 self.cfg.post_fork(self, worker) 18 worker.init_process() 19 sys.exit(0) 20 except SystemExit: 21 raise 22 except AppImportError as e: 23 self.log.debug("Exception while loading the application", 24 exc_info=True) 25 print("%s" % e, file=sys.stderr) 26 sys.stderr.flush() 27 sys.exit(self.APP_LOAD_ERROR) 28 except: 29 self.log.exception("Exception in worker process"), 30 if not worker.booted: 31 sys.exit(self.WORKER_BOOT_ERROR) 32 sys.exit(-1) 33 finally: 34 self.log.info("Worker exiting (pid: %s)", worker_pid) 35 try: 36 worker.tmp.close() 37 self.cfg.worker_exit(self, worker) 38 except: 39 self.log.warning("Exception during worker exit:\n%s", 40 traceback.format_exc())