本文介绍异步MySQL异步驱动aiomysql的使用
1,安装异步模块
如果没有模块则先使用pip安装模块
|
1
2
|
pip3 install asynciopip3 install aiomysql |
2,创建MySQL数据库连接池
和同步方式不一样的是使用异步不能直接创建数据库连接conn,需要先创建一个数据库连接池对象__pool通过这个数据库连接池对象来创建数据库连接
数据库配置信息和介绍pymysql同步使用的数据库是一样的
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
import asyncio,aiomysql,time
# 数据库配置dictdb_config = {
'host': 'localhost',
'user': 'www-data',
'password': 'www-data',
'db': 'awesome'
}# 创建数据库连接池协程函数async def create_pool(**kw):
global __pool
__pool = await aiomysql.create_pool(
host=kw.get('host', 'localhost'),
port=kw.get('port', 3306),
user=kw['user'],
password=kw['password'],
db=kw['db']
)
loop=asyncio.get_event_loop()
loop.run_until_complete(create_pool(**db_config))
# 在事件循环中运行了协程函数则生成了全局变量__pool是一个连接池对象 <aiomysql.pool.Pool object at 0x00000244AD1724C8>print(__pool)
# <aiomysql.pool.Pool object at 0x00000244AD1724C8> |
3,创建执行sql语句的协程函数
因为是异步模块,只能在事件循环中通过await关键字调用,使用需要创建执行sql语句的协程函数
在协程函数内使用全局上一步创建的连接池对象来创建连接conn和浮标对象cur,通过浮标对象来执行sql语句,执行方法和pymysql模块的执行方法是一样的
|
1
2
3
4
5
6
|
cursor.execute(sql,args)sql # 需要执行的sql语句例如'select * from table_name'
args # 替换sql语句的格式化字符串,即sql语句可以使用%s代表一个字符串,然后在args中使用对应的变量或参数替换,args为一个list或元组,即是一个有序的序列需要和sql中的%s一一对应
# 例如sql='select * from table_name where id=%s' args=['12345']# 相当于使用args中的参数替换sql中的%s# select * from table_name where id='12345' |
下面分别创建两个协程函数select execute一个用来执行搜索操作,一个用来执行insert,update,delete等修改操作
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
# 执行select函数async def select(sql,args,size=None):
with await __pool as conn:
cur = await conn.cursor(aiomysql.DictCursor)
await cur.execute(sql.replace('?','?s'),args or ())
if size:
rs = await cur.fetchmany(size)
else:
rs = await cur.fetchall()
await cur.close()
return rs
# 执行insert update delete函数async def execute(sql,args):
with await __pool as conn:
try:
cur = await conn.cursor()
await cur.execute(sql.replace('?','%s'),args)
affetced = cur.rowcount
await conn.commit()
await cur.close()
except BaseException as e:
raise
return affetced
|
4,实践执行sql语句
实践执行sql语句前我们首先在本机创建一个数据库和对应的表用于测试
数据库对应的主机,用户名,密码,库名,表名如下
|
1
2
3
4
5
|
host: localhostuser: www-data
password: www-data
db:awesometable_name: users |
创建表名的sql语句如下,需要在数据库中创建好对应的表
|
1
2
3
4
5
6
7
8
9
10
11
12
|
CREATE TABLE `users` ( `id` varchar(50) NOT NULL,
`email` varchar(50) NOT NULL,
`passwd` varchar(50) NOT NULL,
`admin` tinyint(1) NOT NULL,
`name` varchar(50) NOT NULL,
`image` varchar(500) NOT NULL,
`created_at` double NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `idx_email` (`email`),
KEY `idx_created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
|
创建好的表对应的结构如下
|
1
2
3
4
5
6
7
8
9
10
11
12
13
|
mysql> desc users;+------------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------+--------------+------+-----+---------+-------+
| id | varchar(50) | NO | PRI | NULL | |
| email | varchar(50) | NO | UNI | NULL | |
| passwd | varchar(50) | NO | | NULL | |
| admin | tinyint(1) | NO | | NULL | |
| name | varchar(50) | NO | | NULL | |
| image | varchar(500) | NO | | NULL | |
| created_at | double | NO | MUL | NULL | |+------------+--------------+------+-----+---------+-------+
7 rows in set (2.68 sec)
|
①执行insert操作
|
1
2
3
4
5
6
7
8
|
# insert startimport time
sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)'
args = ['test@qq.com','password',1,'test','about:blank',time.time(),'111111']
async def insert():
await execute(sql,args)
loop.run_until_complete(insert())# insert end |
执行方式和pymysql没有区别,不同的是需要在事件循环中使用关键字await调用
执行完毕在MySQL中查看插入的数据
|
1
2
3
4
5
6
7
|
mysql> select * from users;
+--------+-------------+----------+-------+------+-------------+------------------+
| id | email | passwd | admin | name | image | created_at |
+--------+-------------+----------+-------+------+-------------+------------------+
| 111111 | test@qq.com | password | 1 | test | about:blank | 1637738541.48629 |
+--------+-------------+----------+-------+------+-------------+------------------+
1 row in set (0.00 sec)
|
②执行update操作
直接在loop事件循环中执行execute协程函数也可以
|
1
2
3
4
5
6
|
# update startimport time
sql = 'update `users` set `email`=?, `passwd`=?, `admin`=?, `name`=?, `image`=?, `created_at`=? where `id`=?'
args = ['test2@qq.com','password',1,'test2','about:blank',time.time(),'111111']
loop.run_until_complete(execute(sql,args))# update end |
执行以后把email和name都修改了
③执行delete操作
|
1
2
3
4
5
|
# delete startsql = 'delete from `users` where `id`=?'
args = ['111111']
loop.run_until_complete(execute(sql,args))# delete end |
同样根据关键字id指定的值删除了这条数据
④执行selete操作
在执行select操作前我们保证数据库里面至少有一条数据
|
1
2
3
4
5
6
|
# select startsql = 'select * from users'
args = []
rs = loop.run_until_complete(select(sql,args))
print(rs)
# select end |
这里直接执行搜索的协程函数select根据函数的定义返回的是所有结果的list,元素是查询结果的字典
输出为
|
1
|
[{'id': '111111', 'email': 'test@qq.com', 'passwd': 'password', 'admin': 1, 'name': 'test', 'image': 'about:blank', 'created_at': 1637739212.74493}]
|
如果结果有多个则使用list的下标取出
补充
同步模块pymysql和异步模块aiomysql执行速度对比
假如我们需要往数据库插入20000条数据,我们分别使用同步模式和异步模式
首先删除数据库所有测试数据
|
1
|
delete from users;
|
同步的代码
d:/learn-python3/学习脚本/pymysql/use_pymysql.py
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
import pymysql
db_config = {
'host': 'localhost',
'user': 'www-data',
'password': 'www-data',
'db': 'awesome'
}# 创建连接,相当于把字典内的键值对传递# 相当于执行pymysql.connect(host='localhost',user='www-data',password='www-data',db='awesome')conn = pymysql.connect(**db_config)
# 创建游标cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = 'select * from users'
args = []
# 执行查询返回结果数量# 执行查询rs=cursor.execute(sql,args)
# 获取查询结果# 获取查询的第一条结果,返回一个dict,dict元素是查询对应的键值对# 如果查询结果有多条则执行一次,游标移动到下一条数据,在执行一次又返回一条数据# print(cursor.fetchone())# print(cursor.fetchone())# print(cursor.fetchall())# print(cursor.fetchmany())# {'id': '111111', 'email': 'test@qq.com', 'passwd': 'password', 'admin': 1, 'name': 'test', 'image': 'about:blank', 'created_at': 1637723578.5734}# 获取查询的所有结果,返回一个list,list元素是dict,dict元素是查询对应的键值对# print(cursor.fetchall())# [{'id': '111111', 'email': 'test@qq.com', 'passwd': 'password', 'admin': 1, 'name': 'test', 'image': 'about:blank', 'created_at': 1637723578.5734}]# 获取查询的前几条结果,返回一个dict,dict元素是查询对应的键值对# print(cursor.fetchmany(1))# [{'id': '111111', 'email': 'test@qq.com', 'passwd': 'password', 'admin': 1, 'name': 'test', 'image': 'about:blank', 'created_at': 1637723578.5734}]# 执行修改操作import time
# # insert startsql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)'
args = ['test1@qq.com','password',1,'test','about:blank',time.time(),'1111121']
# 使用replace 把'?'替换成'%s'# rs = cursor.execute(sql.replace('?','%s'),args)# print(cursor.rowcount)# conn.commit()# print(rs)# insert end# update start# sql = 'update `users` set `email`=?, `passwd`=?, `admin`=?, `name`=?, `image`=?, `created_at`=? where `id`=?'# args = ['test2@qq.com','password',1,'test2','about:blank',time.time(),'111111'] # print(cursor.execute(sql.replace('?','%s'),args))# conn.commit()# update end# delete start# sql = 'delete from `users` where `id`=?'# args = ['111111'] # print(cursor.execute(sql.replace('?','%s'),args))# conn.commit()# delete end# 写成函数调用,函数内部使用了数据库连接对象conn# 可以先定义成全局变量globaldef select(sql,args,size=None):
cursor = conn.cursor(pymysql.cursors.DictCursor)
cursor.execute(sql.replace('?','%s'),args or ())
if size:
rs = cursor.fetchmany(size)
else:
rs = cursor.fetchall()
cursor.close
# logging.info('rows returned: %s' % len(rs))
return rs
def execute(sql,args):
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
cursor.execute(sql.replace('?','%s'),args)
# rowcount方法把影响函数返回
rs = cursor.rowcount
cursor.close()
conn.commit()
except:
raise
return rs
start_time = time.time()
for n in range(20000):
sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)'
email = 'test%s@qq.com' %n
args = [email,'password',1,'test','about:blank',time.time(),n]
execute(sql,args)
end_time = time.time()
# 打印开始和结束时间的差print(end_time - start_time)
|
我们使用一个循环20000次往数据库插入数据
执行,插入数据比较多需要等待一段时间输出
|
1
2
|
D:\learn-python3\函数式编程>C:/Python37/python.exe d:/learn-python3/学习脚本/pymysql/use_pymysql.py
77.46903562545776 |
可以在数据库查询到这20000条数据,而且这个表的字段created_at存储了创建这条数据的时间戳,我们可以看到,id越往后的时间戳越往后,说明数据是同步按顺序一一插入的
我们按照字段created_at排序查询
下面我们删除所有数据使用异步方式插入
异步的代码如下
d:/learn-python3/学习脚本/aiomysql/use_aiomysql.py
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
import asyncio,aiomysql,time
# 数据库配置dictdb_config = {
'host': 'localhost',
'user': 'www-data',
'password': 'www-data',
'db': 'awesome'
}# 创建数据库连接池协程函数async def create_pool(**kw):
global __pool
__pool = await aiomysql.create_pool(
host=kw.get('host', 'localhost'),
port=kw.get('port', 3306),
user=kw['user'],
password=kw['password'],
db=kw['db']
)
loop=asyncio.get_event_loop()
loop.run_until_complete(create_pool(**db_config))
# 在事件循环中运行了协程函数则生成了全局变量__pool是一个连接池对象 <aiomysql.pool.Pool object at 0x00000244AD1724C8>print(__pool)
# <aiomysql.pool.Pool object at 0x00000244AD1724C8># 执行select函数async def select(sql,args,size=None):
with await __pool as conn:
cur = await conn.cursor(aiomysql.DictCursor)
await cur.execute(sql.replace('?','?s'),args or ())
if size:
rs = await cur.fetchmany(size)
else:
rs = await cur.fetchall()
await cur.close()
return rs
# 执行insert update delete函数async def execute(sql,args):
with await __pool as conn:
try:
cur = await conn.cursor()
await cur.execute(sql.replace('?','%s'),args)
affetced = cur.rowcount
await conn.commit()
await cur.close()
except BaseException as e:
raise
return affetced
# insert start# import time# sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)'# args = ['test@qq.com','password',1,'test','about:blank',time.time(),'111111']# async def insert():# await execute(sql,args)# loop.run_until_complete(insert())# insert end# update start# import time# sql = 'update `users` set `email`=?, `passwd`=?, `admin`=?, `name`=?, `image`=?, `created_at`=? where `id`=?'# args = ['test2@qq.com','password',1,'test2','about:blank',time.time(),'111111']# loop.run_until_complete(execute(sql,args))# update end# delete start# sql = 'delete from `users` where `id`=?'# args = ['111111'] # loop.run_until_complete(execute(sql,args))# delete end# select start# sql = 'select * from users'# args = []# rs = loop.run_until_complete(select(sql,args))# print(rs)# select endasync def insert1():
for n in range(10000):
sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)'
email = 'test%s@qq.com' %n
args = [email,'password',1,'test','about:blank',time.time(),n]
await execute(sql,args)
async def insert2():
for n in range(10001,20001):
sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)'
email = 'test%s@qq.com' %n
args = [email,'password',1,'test','about:blank',time.time(),n]
await execute(sql,args)
async def main():
# 需要组合成一个事件才会异步执行即在执行insert1的时候同步执行insert2
await asyncio.gather(insert1(),insert2())
start_time = time.time()
loop.run_until_complete(main())end_time = time.time()
print(end_time - start_time)
|
这里我们定义了两个协程函数,分别用来执行前10000个数据和后10000个数据的插入,在main()把这两个协程函数组合成一个事件循环
等待一段时间后执行输出如下,忽略这个warning,可以看到执行时间明显比同步时间短
|
1
2
3
|
d:/learn-python3/学习脚本/aiomysql/use_aiomysql.py:42: DeprecationWarning: with await pool as conn deprecated, useasync with pool.acquire() as conn instead
with await __pool as conn:
39.794615507125854 |
我们去数据库查询一下数据也可以看到id从0开始和id从10001开始几乎是同时插入的
本文介绍异步MySQL异步驱动aiomysql的使用
1,安装异步模块
如果没有模块则先使用pip安装模块
|
1
2
|
pip3 install asynciopip3 install aiomysql |
2,创建MySQL数据库连接池
和同步方式不一样的是使用异步不能直接创建数据库连接conn,需要先创建一个数据库连接池对象__pool通过这个数据库连接池对象来创建数据库连接
数据库配置信息和介绍pymysql同步使用的数据库是一样的
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
import asyncio,aiomysql,time
# 数据库配置dictdb_config = {
'host': 'localhost',
'user': 'www-data',
'password': 'www-data',
'db': 'awesome'
}# 创建数据库连接池协程函数async def create_pool(**kw):
global __pool
__pool = await aiomysql.create_pool(
host=kw.get('host', 'localhost'),
port=kw.get('port', 3306),
user=kw['user'],
password=kw['password'],
db=kw['db']
)
loop=asyncio.get_event_loop()
loop.run_until_complete(create_pool(**db_config))
# 在事件循环中运行了协程函数则生成了全局变量__pool是一个连接池对象 <aiomysql.pool.Pool object at 0x00000244AD1724C8>print(__pool)
# <aiomysql.pool.Pool object at 0x00000244AD1724C8> |
3,创建执行sql语句的协程函数
因为是异步模块,只能在事件循环中通过await关键字调用,使用需要创建执行sql语句的协程函数
在协程函数内使用全局上一步创建的连接池对象来创建连接conn和浮标对象cur,通过浮标对象来执行sql语句,执行方法和pymysql模块的执行方法是一样的
|
1
2
3
4
5
6
|
cursor.execute(sql,args)sql # 需要执行的sql语句例如'select * from table_name'
args # 替换sql语句的格式化字符串,即sql语句可以使用%s代表一个字符串,然后在args中使用对应的变量或参数替换,args为一个list或元组,即是一个有序的序列需要和sql中的%s一一对应
# 例如sql='select * from table_name where id=%s' args=['12345']# 相当于使用args中的参数替换sql中的%s# select * from table_name where id='12345' |
下面分别创建两个协程函数select execute一个用来执行搜索操作,一个用来执行insert,update,delete等修改操作
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
# 执行select函数async def select(sql,args,size=None):
with await __pool as conn:
cur = await conn.cursor(aiomysql.DictCursor)
await cur.execute(sql.replace('?','?s'),args or ())
if size:
rs = await cur.fetchmany(size)
else:
rs = await cur.fetchall()
await cur.close()
return rs
# 执行insert update delete函数async def execute(sql,args):
with await __pool as conn:
try:
cur = await conn.cursor()
await cur.execute(sql.replace('?','%s'),args)
affetced = cur.rowcount
await conn.commit()
await cur.close()
except BaseException as e:
raise
return affetced
|
4,实践执行sql语句
实践执行sql语句前我们首先在本机创建一个数据库和对应的表用于测试
数据库对应的主机,用户名,密码,库名,表名如下
|
1
2
3
4
5
|
host: localhostuser: www-data
password: www-data
db:awesometable_name: users |
创建表名的sql语句如下,需要在数据库中创建好对应的表
|
1
2
3
4
5
6
7
8
9
10
11
12
|
CREATE TABLE `users` ( `id` varchar(50) NOT NULL,
`email` varchar(50) NOT NULL,
`passwd` varchar(50) NOT NULL,
`admin` tinyint(1) NOT NULL,
`name` varchar(50) NOT NULL,
`image` varchar(500) NOT NULL,
`created_at` double NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `idx_email` (`email`),
KEY `idx_created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
|
创建好的表对应的结构如下
|
1
2
3
4
5
6
7
8
9
10
11
12
13
|
mysql> desc users;+------------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------+--------------+------+-----+---------+-------+
| id | varchar(50) | NO | PRI | NULL | |
| email | varchar(50) | NO | UNI | NULL | |
| passwd | varchar(50) | NO | | NULL | |
| admin | tinyint(1) | NO | | NULL | |
| name | varchar(50) | NO | | NULL | |
| image | varchar(500) | NO | | NULL | |
| created_at | double | NO | MUL | NULL | |+------------+--------------+------+-----+---------+-------+
7 rows in set (2.68 sec)
|
①执行insert操作
|
1
2
3
4
5
6
7
8
|
# insert startimport time
sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)'
args = ['test@qq.com','password',1,'test','about:blank',time.time(),'111111']
async def insert():
await execute(sql,args)
loop.run_until_complete(insert())# insert end |
执行方式和pymysql没有区别,不同的是需要在事件循环中使用关键字await调用
执行完毕在MySQL中查看插入的数据
|
1
2
3
4
5
6
7
|
mysql> select * from users;
+--------+-------------+----------+-------+------+-------------+------------------+
| id | email | passwd | admin | name | image | created_at |
+--------+-------------+----------+-------+------+-------------+------------------+
| 111111 | test@qq.com | password | 1 | test | about:blank | 1637738541.48629 |
+--------+-------------+----------+-------+------+-------------+------------------+
1 row in set (0.00 sec)
|
②执行update操作
直接在loop事件循环中执行execute协程函数也可以
|
1
2
3
4
5
6
|
# update startimport time
sql = 'update `users` set `email`=?, `passwd`=?, `admin`=?, `name`=?, `image`=?, `created_at`=? where `id`=?'
args = ['test2@qq.com','password',1,'test2','about:blank',time.time(),'111111']
loop.run_until_complete(execute(sql,args))# update end |
执行以后把email和name都修改了
③执行delete操作
|
1
2
3
4
5
|
# delete startsql = 'delete from `users` where `id`=?'
args = ['111111']
loop.run_until_complete(execute(sql,args))# delete end |
同样根据关键字id指定的值删除了这条数据
④执行selete操作
在执行select操作前我们保证数据库里面至少有一条数据
|
1
2
3
4
5
6
|
# select startsql = 'select * from users'
args = []
rs = loop.run_until_complete(select(sql,args))
print(rs)
# select end |
这里直接执行搜索的协程函数select根据函数的定义返回的是所有结果的list,元素是查询结果的字典
输出为
|
1
|
[{'id': '111111', 'email': 'test@qq.com', 'passwd': 'password', 'admin': 1, 'name': 'test', 'image': 'about:blank', 'created_at': 1637739212.74493}]
|
如果结果有多个则使用list的下标取出
补充
同步模块pymysql和异步模块aiomysql执行速度对比
假如我们需要往数据库插入20000条数据,我们分别使用同步模式和异步模式
首先删除数据库所有测试数据
|
1
|
delete from users;
|
同步的代码
d:/learn-python3/学习脚本/pymysql/use_pymysql.py
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
import pymysql
db_config = {
'host': 'localhost',
'user': 'www-data',
'password': 'www-data',
'db': 'awesome'
}# 创建连接,相当于把字典内的键值对传递# 相当于执行pymysql.connect(host='localhost',user='www-data',password='www-data',db='awesome')conn = pymysql.connect(**db_config)
# 创建游标cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = 'select * from users'
args = []
# 执行查询返回结果数量# 执行查询rs=cursor.execute(sql,args)
# 获取查询结果# 获取查询的第一条结果,返回一个dict,dict元素是查询对应的键值对# 如果查询结果有多条则执行一次,游标移动到下一条数据,在执行一次又返回一条数据# print(cursor.fetchone())# print(cursor.fetchone())# print(cursor.fetchall())# print(cursor.fetchmany())# {'id': '111111', 'email': 'test@qq.com', 'passwd': 'password', 'admin': 1, 'name': 'test', 'image': 'about:blank', 'created_at': 1637723578.5734}# 获取查询的所有结果,返回一个list,list元素是dict,dict元素是查询对应的键值对# print(cursor.fetchall())# [{'id': '111111', 'email': 'test@qq.com', 'passwd': 'password', 'admin': 1, 'name': 'test', 'image': 'about:blank', 'created_at': 1637723578.5734}]# 获取查询的前几条结果,返回一个dict,dict元素是查询对应的键值对# print(cursor.fetchmany(1))# [{'id': '111111', 'email': 'test@qq.com', 'passwd': 'password', 'admin': 1, 'name': 'test', 'image': 'about:blank', 'created_at': 1637723578.5734}]# 执行修改操作import time
# # insert startsql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)'
args = ['test1@qq.com','password',1,'test','about:blank',time.time(),'1111121']
# 使用replace 把'?'替换成'%s'# rs = cursor.execute(sql.replace('?','%s'),args)# print(cursor.rowcount)# conn.commit()# print(rs)# insert end# update start# sql = 'update `users` set `email`=?, `passwd`=?, `admin`=?, `name`=?, `image`=?, `created_at`=? where `id`=?'# args = ['test2@qq.com','password',1,'test2','about:blank',time.time(),'111111'] # print(cursor.execute(sql.replace('?','%s'),args))# conn.commit()# update end# delete start# sql = 'delete from `users` where `id`=?'# args = ['111111'] # print(cursor.execute(sql.replace('?','%s'),args))# conn.commit()# delete end# 写成函数调用,函数内部使用了数据库连接对象conn# 可以先定义成全局变量globaldef select(sql,args,size=None):
cursor = conn.cursor(pymysql.cursors.DictCursor)
cursor.execute(sql.replace('?','%s'),args or ())
if size:
rs = cursor.fetchmany(size)
else:
rs = cursor.fetchall()
cursor.close
# logging.info('rows returned: %s' % len(rs))
return rs
def execute(sql,args):
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
cursor.execute(sql.replace('?','%s'),args)
# rowcount方法把影响函数返回
rs = cursor.rowcount
cursor.close()
conn.commit()
except:
raise
return rs
start_time = time.time()
for n in range(20000):
sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)'
email = 'test%s@qq.com' %n
args = [email,'password',1,'test','about:blank',time.time(),n]
execute(sql,args)
end_time = time.time()
# 打印开始和结束时间的差print(end_time - start_time)
|
我们使用一个循环20000次往数据库插入数据
执行,插入数据比较多需要等待一段时间输出
|
1
2
|
D:\learn-python3\函数式编程>C:/Python37/python.exe d:/learn-python3/学习脚本/pymysql/use_pymysql.py
77.46903562545776 |
可以在数据库查询到这20000条数据,而且这个表的字段created_at存储了创建这条数据的时间戳,我们可以看到,id越往后的时间戳越往后,说明数据是同步按顺序一一插入的
我们按照字段created_at排序查询
下面我们删除所有数据使用异步方式插入
异步的代码如下
d:/learn-python3/学习脚本/aiomysql/use_aiomysql.py
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
import asyncio,aiomysql,time
# 数据库配置dictdb_config = {
'host': 'localhost',
'user': 'www-data',
'password': 'www-data',
'db': 'awesome'
}# 创建数据库连接池协程函数async def create_pool(**kw):
global __pool
__pool = await aiomysql.create_pool(
host=kw.get('host', 'localhost'),
port=kw.get('port', 3306),
user=kw['user'],
password=kw['password'],
db=kw['db']
)
loop=asyncio.get_event_loop()
loop.run_until_complete(create_pool(**db_config))
# 在事件循环中运行了协程函数则生成了全局变量__pool是一个连接池对象 <aiomysql.pool.Pool object at 0x00000244AD1724C8>print(__pool)
# <aiomysql.pool.Pool object at 0x00000244AD1724C8># 执行select函数async def select(sql,args,size=None):
with await __pool as conn:
cur = await conn.cursor(aiomysql.DictCursor)
await cur.execute(sql.replace('?','?s'),args or ())
if size:
rs = await cur.fetchmany(size)
else:
rs = await cur.fetchall()
await cur.close()
return rs
# 执行insert update delete函数async def execute(sql,args):
with await __pool as conn:
try:
cur = await conn.cursor()
await cur.execute(sql.replace('?','%s'),args)
affetced = cur.rowcount
await conn.commit()
await cur.close()
except BaseException as e:
raise
return affetced
# insert start# import time# sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)'# args = ['test@qq.com','password',1,'test','about:blank',time.time(),'111111']# async def insert():# await execute(sql,args)# loop.run_until_complete(insert())# insert end# update start# import time# sql = 'update `users` set `email`=?, `passwd`=?, `admin`=?, `name`=?, `image`=?, `created_at`=? where `id`=?'# args = ['test2@qq.com','password',1,'test2','about:blank',time.time(),'111111']# loop.run_until_complete(execute(sql,args))# update end# delete start# sql = 'delete from `users` where `id`=?'# args = ['111111'] # loop.run_until_complete(execute(sql,args))# delete end# select start# sql = 'select * from users'# args = []# rs = loop.run_until_complete(select(sql,args))# print(rs)# select endasync def insert1():
for n in range(10000):
sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)'
email = 'test%s@qq.com' %n
args = [email,'password',1,'test','about:blank',time.time(),n]
await execute(sql,args)
async def insert2():
for n in range(10001,20001):
sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)'
email = 'test%s@qq.com' %n
args = [email,'password',1,'test','about:blank',time.time(),n]
await execute(sql,args)
async def main():
# 需要组合成一个事件才会异步执行即在执行insert1的时候同步执行insert2
await asyncio.gather(insert1(),insert2())
start_time = time.time()
loop.run_until_complete(main())end_time = time.time()
print(end_time - start_time)
|
这里我们定义了两个协程函数,分别用来执行前10000个数据和后10000个数据的插入,在main()把这两个协程函数组合成一个事件循环
等待一段时间后执行输出如下,忽略这个warning,可以看到执行时间明显比同步时间短
|
1
2
3
|
d:/learn-python3/学习脚本/aiomysql/use_aiomysql.py:42: DeprecationWarning: with await pool as conn deprecated, useasync with pool.acquire() as conn instead
with await __pool as conn:
39.794615507125854 |
我们去数据库查询一下数据也可以看到id从0开始和id从10001开始几乎是同时插入的