Django multiprocessing多进程操作数据库OperationalError报错详解
背景
使用django进行后台开发过程中,想利用multiprocessing开启多进程并发操作数据库,结果在执行子进程时,出现报错:django.db.utils.OperationalError: server closed the connection unexpectedly,抱着“春节前也不是那么忙,闲着也有点儿*疼”的想法,对该报错的产生原因进行分析。
测试
测试环境为linux red-hat 6.8系统 + python 3.6 + Django 1.11,后台数据库采用postgressql 9.5.5。
django数据库配置如下:
DATABASES = {
'datamigration': {
'ENGINE': 'django.db.backends.postgresql_psycopg2',
'NAME': 'dbname',
'USER': 'dbuser',
'PASSWORD': 'passwd',
'HOST': 'IP', #隐去真实IP
'PORT': 'PORT', #隐去真实端口
},
}
测试model如下:
class DataMigrationTask(models.Model):
id_task = models.AutoField(primary_key=True)
service_number = models.CharField(max_length=-1, blank=True, null=True)
um = models.CharField(max_length=-1)
create_date = models.DateTimeField(auto_now_add=True,blank=True, null=True)
class Meta:
managed = False
db_table = 'data_migration_task'
app_label = 'datamigration'
测试view函数如下:
from multiprocessing import Process
import time
def test_select(query_times, process_num=0):
print('child process {} begin'.format(process_num))
while query_times <= 6:
test_value = list(DataMigrationTask.objects.filter(um='test'))
print('query_times={}, test_value={}, process_num={}'.format(query_times, test_value, process_num))
query_times += 1
time.sleep(1)
def test(request):
if request.method != 'POST':
status = list(DataMigrationTask.objects.filter(um='test'))
process_num = 1
main_wait_times = 1
p = Process(target=test_select, args=(1, process_num))
p.start()
while True:
if main_wait_times == 2:
print('the Loop times-out, break')
break
else:
print(status)
print('main process has wait {}s'.format(main_wait_times))
time.sleep(1)
main_wait_times += 1
print('test end')
为避免django的惰性查询(lazy QuerySets)特性影响,在这里使用调用model的filter方法后,使用list()函数,确保django对数据库进行实际访问(evaluate QuerySet)12
测试结果如下:
从报错截图来看,multiprocessing.Process启动的子进程与postgressql断开连接,无法通过ORM模型在数据库上执行操作,报出异常。在网上查找资料,有文章提到在django+mysql的架构中也曾出现类似错误3 ,进一步查找相关资料,有文章中这样描述:如果在某个django的进程里面用multiprocessing创建新的进程,则子进程会继承父进程的数据库连接socket,那么父子进程同时做数据库操作时会出错(数据库socket连接会抛出异常“数据库已不在”/“查询过程中出错”)4。
但问题果真如此吗?
要分析这个报错的根因,核心问题有二:
一、Django中的ORM模型是如何和数据库交互的
二、multiprocessing在创建子进程过程中,发生了什么
Django和数据库连接
在测试案例的settings文件中,采用HOST:IP方式配置数据库地址(这也是最常用的一种配置方式),因此,Django和数据库的连接,是通过 TCP sockets的方式建立的5 。
sockets是对TCP协议的一层API封装,发起数据库连接时,Django创建sockets并将它写入内存中,然后提出连接请求,当数据库监听到或者接收到连接请求时,就响应django的sockets请求,建立一个新的线程,记录该sockets的描述符;断开连接时,数据库关闭线程,并清除该sockets描述符,之后也就再不能用它进行连接6 。
子进程创建
由于操作系统不同,multiprocessing.Process子进程的创建方式也不同,对于Linux而言,子进程默认是以fork方式创建的。在解释fork创建子进程过程前,先解释几个概念:物理地址、逻辑地址和地址映射。
逻辑地址:CPU所生成的地址。CPU产生的逻辑地址被分为
(1)p(页号),它包含每个页在物理内存中的基址,用来作为页表的索引;
(2)d(页偏移),同基址相结合,用来确定送入内存设备的物理内存地址。
物理地址:内存单元所看到的地址。程序看不见真正的物理地址,它只关心逻辑地址,且认为进程的地址空间为0到max。物理地址范围从R+0到R+max,R为基地址。
地址映射:将程序地址空间中使用的逻辑地址变换成内存中的物理地址的过程。由内存管理单元(MMU)来完成。映射类型又分为
(1)静态映射,在程序装入主存时已经完成了逻辑地址到物理地址和变换,在程序执行期间不会再发生改变。
(2)动态映射,程序执行期间完成,其实现依赖于硬件地址变换机构,如基址寄存器。
Linux系统在Fork创建子进程时引入了“写时复制”技术,也就是只有进程空间的各段的内容要发生变化时,才会将父进程的内容复制一份给子进程。在创建子进程之后,两个进程用的是相同的物理空间(内存区),子进程的代码段、数据段、堆栈都是指向父进程的物理空间,也就是说,两者的虚拟空间不同,但其对应的物理空间是同一个。当父、子进程中有更改相应段的行为发生时,再为子进程相应的段分配物理空间。
具体过程是这样的:
fork子进程完全复制父进程的栈空间,也复制了页表,但没有复制物理页面,所以这时虚拟地址相同,物理地址也相同,但是会把父子共享的页面标记为“只读”(类似mmap的private的方式),如果父、子进程一直对这个页面是同一个页面,直到其中任何一个进程要对共享的页面执行“写操作”,这时内核会复制一个物理页面给这个进程使用,同时修改页表。而把原来的只读页面标记为“可写”,留给另外一个进程使用。这就是所谓的“写时复制”。7
根据上述分析,我们可以得到一个结论:
通过multiprocessing.Process在linux中创建的子进程,会复制到父进程内存中留存的sockets,也就是说,子进程向数据库发起操作,是基于父进程socket已经和数据库完成连接的情况下进行的。如果父进程连接断开,那么子进程将无法进行数据库操作。
测试代码分析
(1)主进程函数test接收到一个request请求,执行一个filter查询操作,在这个过程中和datamigration数据库建立TCP socket连接。
(2)创建一个子进程,调用函数test_select,由于主进程已经和数据库建立连接,因此该子进程在创建时,会复制主进程的socket,并基于此操作数据库。
(3)主进程执行结束,请求处理完成,由于CONN_MAX_AGE默认为0,django和数据库连接断开,数据库清除django的socket描述符
(4)子进程执行filter语句,读取socket(即主进程socket),向数据库发起操作请求,由于数据库已经清除该socket描述符,无法连接数据库,该操作请求执行失败,抛出异常:psycopg2.OperationalError: server closed the connection unexpectedly
验证测试
为保持变量可控,以下测试均在原测试代码基础上,修改所声明部分,未声明处与原测试代码相同
调整CONN_MAX_AGE参数
修改数据库配置如下:
DATABASES = {
'datamigration': {
'ENGINE': 'django.db.backends.postgresql_psycopg2',
'NAME': 'dbname',
'USER': 'dbuser',
'PASSWORD': 'passwd',
'HOST': 'IP', #隐去真实IP
'PORT': 'PORT', #隐去真实端口
'CONN_MAX_AGE': 100, # 请求处理完成100s后断开与数据库连接
},
测试结果如下:
Django提供了持久连接这个特性,持久连接避免了在每个请求中重新建立与数据库的连接的开销。它们由CONN_MAX_AGE定义连接最大生命周期的参数控制 ,可以为每个数据库单独设置。
因此从测试结果中我们可以看到,虽然该次请求已处理完成,父进程结束,但由于CONN_MAX_AGE的存在,数据库连接并未断开,子进程依然能执行数据库查询操作8 。
调整父进程建立数据库连接时间
def test(request):
if request.method != 'POST':
process_num = 1
main_wait_times = 1
p = Process(target=test_select, args=(1, process_num))
p.start()
status = list(DataMigrationTask.objects.filter(um='test'))
# 先创建子进程,再执行数据库操作
while True:
if main_wait_times == 2:
print('the Loop times-out, break')
break
else:
print(status)
print('main process has wait {}s'.format(main_wait_times))
time.sleep(1)
main_wait_times += 1
print('test end')
测试结果如下:
由于子进程创建时,父进程未与数据库连接,因此子进程在进行数据库操作时,自己重新创建TCP socket连接,父子进程连接socket相互独立,因此子进程数据库操作不受父进程退出影响。
阻塞父进程,等待子进程执行
def test(request):
if request.method != 'POST':
status = list(DataMigrationTask.objects.filter(um='test'))
process_num = 1
main_wait_times = 1
p = Process(target=test_select, args=(1, process_num))
p.start()
while True:
if main_wait_times == 4:
# 父进程等待子进程执行3s
print('the Loop times-out, break')
break
else:
print(status)
print('main process has wait {}s'.format(main_wait_times))
time.sleep(1)
main_wait_times += 1
print('test end'
测试结果如下:
父进程等待子进程执行3s后退出,同时断开连接,子进程前3次数据库查询正常执行,第4次由于连接断开,出现报错
改进思路
通过验证测试,其实改进思路已经很明显了:
1、针对不同数据库,修改settings文件中的CONN_MAX_AGE参数,但如果设置不当,可能会造成数据库中残留连接数过多,不太建议。
2、采用join方法阻塞父进程,待子进程结束后再继续执行。
3、在不影响业务逻辑的前提下,将子进程创建放在父进程数据库操作之前
4、将子进程函数单独设置为一个请求函数,创建子进程方式由直接调用子进程函数,改为发起多个请求。
参考文章
-
https://docs.djangoproject.com/en/1.11/topics/db/queries/#querysets-are-lazy ↩︎
-
https://docs.djangoproject.com/en/1.11/ref/models/querysets/#django.db.models.query.QuerySet.extra ↩︎
-
https://blog.csdn.net/orangleliu/article/details/46919453 ↩︎
-
https://docs.djangoproject.com/en/1.11/ref/settings/#host ↩︎
-
https://blog.csdn.net/xy010902100449/article/details/44851453 ↩︎
-
https://docs.djangoproject.com/en/1.11/ref/databases/#persistent-connections ↩︎