并发编程理论
理论居多,实际应用代码简单。
1、操作系统发展史
计算机五大组成部分
- 控制器
- 运算器
- 存储器
- 输入设备
- 输出设备
"""
1)用户独占全机。
不会出现因资源已被其他用户占用而等待的现象,但资源的利用率低。
2)CPU 等待手工操作。CPU的利用不充分。
"""
1.1 批处理系统
加载在计算机上的一个系统软件,在它的控制下,计算机能够自动地、成批地处理一个或多个用户的作业(这作业包括程序、数据和命令)。
联机批处理
"""
即作业的输入/输出由CPU来处理。
在主机与输入机之间增加一个存储设备——磁带。
"""
脱机批处理
"""
即输入/输出脱离主机控制。
添加了高速磁带部分,相当于现在的CPU,输入机则相当于现在的硬盘。
"""
2、多道技术
单核实现并发的效果,但是单核不能实现并行的效果,节省了多个程序运行的总耗时。
- 并发
看起来像同时运行 - 并行
真正意义上的同时执行
"""
单道技术:串行
多道技术:在执行任务A的时候控制硬盘将任务B读取到内存,此时还是单核情况。
"""
-
空间上的复用
- 多个程序共用一套计算机硬件
-
时间上的复用
-
如:洗衣服30s、做饭50秒、烧水30秒。
-
单道耗时110秒,多道则为50秒(切换节省时间)
-
"""
切换(CPU)分为两种情况
1、当一个程序遇到IO操作的时候,操作系统会剥夺该程序的CPU执行权限。
(作用:提高CPU利用率,不影响当前程序的执行效率)
2、当一个程序长时间占用CPU的时候,操作系统也会剥夺该程序的CPU执行权限。
(作用:降低了程序的执行效率[原本时间+切换时间])
"""
3、进程理论
- 程序
- 是指令和数据的集合,静态的概念,是“死的”。
-
进程
- 表示程序正在运行的过程,是“活的”。
同一个程序执行两次,就会在操作系统中出现两个进程,所以我们可以同时运行一个软件,分别做不同的事情也不会混乱。
进程调度
-
先来先服务调度算法
"""对长作业有利,对短作业无利""" -
短作业优先调度算法
"""对短作业有利,对长作业无利""" -
时间片轮转法
""" 时间片:将固定的时间切分成N多份,每一份都表示一个时间片 越往下,任务的执行时间越长,任务的执行优先级越低, 当第一个队列中出现了新的任务, 那么CPU会立刻停止当前执行的任务, 去执行新添加进来第一层的队列中的任务 """
进程运行的三状态图
只要程序想要运行,就必须经过就绪态。
-
同步和异步(★★★★★)
"""描述的是任务的提交方式""" 同步:任务提交之后,原地等待任务的结果,等待过程中不做任何事。 异步:任务提交之后,不原地等待任务的结果,直接继续执行下一个任务。 (任务的返回结果由异步回调机制自动处理) -
阻塞和非阻塞(★★★★★)
"""描述的程序的运行状态""" 阻塞:阻塞态 非阻塞:就绪态、运行态 理想状态:应该让代码永远处于就绪态和运行态之间切换。
最高效的组合:异步非阻塞。
4、开启进程的两种方式(★★★★★)
- 导入Process模块
"""第一种"""
from multiprocessing import Process
import time
def task(name):
print(\'%s is running...\' % name)
time.sleep(3)
print(\'%s is over!\' % name)
if __name__ == \'__main__\':
"""创建一个对象并实例化"""
p = Process(target=task, args=(\'爸爸\',)) # target传入任务名,args传入参数 容器类型无论里面有几个元素,一定要用逗号隔开
p.start() # 开启进程 操作系统创建一个进程并执行 异步
print(\'主进程...\')
"""
Windows 操作系统下,创建进程一定要在main内创建
因为Windows下创建进程类似于模块导入的方式
会从上往下依次执行代码
Linux则是直接将代码完整的拷贝一份
"""
- 类的继承
"""第二种:类的继承"""
from multiprocessing import Process
import time
class MyProcess(Process):
def run(self):
print(\'Hello...\')
time.sleep(3)
print(\'over...\')
if __name__ == \'__main__\':
p = MyProcess() # 实例化
p.start()
print(\'主进程...\')
创建进程就是在内存中申请一块内存空间,将需要运行的代码丢进去。
"""
一个进程对应在内存中就是一块独立的内存空间
进程与进程之间数据默认情况下是无法交互的,
若想交互,则需要借助第三方模块。
"""
5、进程对象的join方法(★★★)
join是让主进程的代码等待子进程代码运行结束之后,再继续运行。
不影响其他子进程的运行。
from multiprocessing import Process
import time
def task(name, n):
print(\'%s is running...\' % name)
time.sleep(n)
print(\'%s is over!\' % name)
if __name__ == \'__main__\':
p1 = Process(target=task, args=(\'爸爸\', 1))
p2 = Process(target=task, args=(\'爷爷\', 2))
p3 = Process(target=task, args=(\'太爷爷\', 3))
start_time = time.time()
p1.start()
p2.start()
p3.start()
p1.join() # 等待子进程运行结束后才运行主进程
p2.join() # 等待子进程运行结束后才运行主进程
p3.join() # 等待子进程运行结束后才运行主进程
print(\'主进程...\', time.time() - start_time)
"""
爷爷 is running...
太爷爷 is running...
爸爸 is over!
爷爷 is over!
太爷爷 is over!
主进程... 3.1058456897735596
"""
-
错误示范
start_time = time.time() for i in range(1,4): p = Process(target=task, args=(\'子进程%s\' % i, i)) p.start() p.join() print(\'主进程\', time.time() - start_time) # 总耗时6s多 -
正确示范
start_time = time.time() p_list = [] for i in range(1,4): p = Process(target=task, args=(\'子进程%s\' % i, i)) p.start() p_list.append(p) for p in p_list: p.join() print(\'主进程\', time.time() - start_time) # 总耗时3s多
6、进程之间数据相互隔离
from multiprocessing import Process
money = 100
def task():
global money # 局部修改全局
money = 666 # 修改的是子进程中的变量值
print(\'子进程\', money)
if __name__ == \'__main__\':
p = Process(target=task)
p.start()
p.join() # 等待子进程执行完毕
print(\'主进程\', money)
"""
子进程 666
主进程 100
"""
7、进程对象的其他方法
"""
一台计算机上运行着很多进程,那么计算机是如何进行区分和管理?
计算机会给每个运行的进程分配一个PID号
如何查看?
Windows: cmd ---> tasklist 即可查看
MAC: 终端 ---> ps aux
"""
from multiprocessing import Process, current_process
current_process().pid # 查看当前进程的进程号
import os
os.getpid() # 获取当前进程PID号
os.getppid() # 获取父进程的PID号
from multiprocessing import Process, current_process
import time
import os
def task():
# print(\'%s is running...\' % current_process().pid) # 查看当前进程的进程号
print(\'%s is running...\' % os.getpid())
time.sleep(3)
if __name__ == \'__main__\':
p = Process(target=task)
p.start()
# p.join()
p.terminate() # 杀死当前进程
print(p.is_alive()) # 查看当前进程是否存活 前面没有加join()方法的时候输出True 加了输出False
"""
is_alive()
一般情况下会默认将存储布尔值的变量名
和返回的结果为布尔值的方法名都起成以is_开头
"""
# print(\'主...\', current_process().pid)
print(\'主...\', os.getpid())
print(\'主主...\', os.getppid())
8、僵尸进程与孤儿进程
- 僵尸进程
"""
死了但是没有死透,
当你开设了子进程之后,该进程死后不会立刻释放占用的进程号,
要让父进程能够查看到它开设的子进程的一些基本信息,占用的PID号,运行时间等。
所有的进程都会步入僵尸进程
父进程不会死,并且会无限制的创建子进程且子进程也不会结束。
回收子进程占用的PID号
父进程等待子进程结束
父进程调用join()方法
"""
- 孤儿进程
"""
子进程存活,父进程意外死亡。
操作系统会开设一个通道用于专门管理孤儿进程回收相关资源
"""
from multiprocessing import Process
import time
def run():
print(\'Hello World!\')
time.sleep(3)
print(\'get out!\')
if __name__ == \'__main__\':
p = Process(target=run)
p.start()
print(\'主\')
"""
主
Hello World!
get out!
"""
9、守护进程
会随着主进程的结束而结束
- 主进程创建守护进程
- 守护进程会在主进程代码执行结束后就终止
- 守护进程内无法再开启子进程,否则抛出异常:
- AssertionError: daemonic processes are not allowed to have children
"""进程之间是互相独立的,主进程代码运行结束,守护进程随即终止"""
from multiprocessing import Process
import time
def task(name):
print(\'%s正在活着...\' % name)
time.sleep(3)
print(\'%s正在死亡...\' % name)
if __name__ == \'__main__\':
p = Process(target=task, args=(\'Jack\', ))
# p = Process(target=task, kwargs={\'name\':\'jack\'})
p.daemon = True
p.start()
# p.daemon = True # AssertionError: process has already started 需要放在start()方法之前才不会报错
print(\'皇帝驾崩...\')
"""
皇帝驾崩...
Process finished with exit code 0
"""
- 如何开启守护进程?
"""在start()方法前开启daemon方法即可"""
p.daemon = True
p.start()
10、互斥锁(★★★★★)
-
未加锁处理
多个进程操作同一份数据的时候,会出现数据错乱的问题,
针对上述问题,解决方案就是加锁处理,将并发变成串行,牺牲效率但是保证了数据的安全性。
from multiprocessing import Process
import time
import random
import json
# 查票
def search(i):
# 文件操作读取票数
with open(\'./data\', \'r\', encoding=\'utf-8\') as f:
dic = json.load(f) # load加文件句柄
print(\'用户%s查询余票:%s\' % (i, dic.get(\'ticket_num\')))
# 字典取值推荐采用get方法 采用[]取值若不存在值的话会报错 给字典赋键值对的时候采用[]
# 买票 1、先查 2、再买
def buy(i):
# 先查票 获取余票
with open(\'./data\', \'r\', encoding=\'utf-8\') as f:
dic = json.load(f)
# 模拟网络延迟 1-3s
time.sleep(random.randint(1, 3))
# 判断当前是否有票
if dic.get(\'ticket_num\') > 0:
# 修改数据库 买票
dic[\'ticket_num\'] -= 1
# 写入数据库
with open(\'./data\', \'w\', encoding=\'utf-8\') as f:
json.dump(dic, f)
print(\'用户%s买票成功!\' % i)
else:
print(\'用户%s买票失败!\')
def run(i):
search(i)
buy(i)
if __name__ == \'__main__\':
for i in range(1, 6):
p = Process(target=run, args=(i, ))
p.start()
"""
用户1查询余票:1
用户5查询余票:1
用户4查询余票:1
用户3查询余票:1
用户2查询余票:1
用户4买票成功!
用户2买票成功!
用户5买票成功!
用户1买票成功!
用户3买票成功!
Process finished with exit code 0
"""
-
加锁处理
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全
from multiprocessing import Process, Lock
import time
import random
import json
def search(i):
with open(\'./data\', \'r\', encoding=\'utf-8\') as f:
dic = json.load(f)
print(\'用户%s查询余票:%s\' % (i, dic.get(\'ticket_num\')))
def buy(i):
with open(\'./data\', \'r\', encoding=\'utf-8\') as f:
dic = json.load(f)
time.sleep(random.randint(1, 3))
if dic.get(\'ticket_num\') > 0:
dic[\'ticket_num\'] -= 1
with open(\'./data\', \'w\', encoding=\'utf-8\') as f:
json.dump(dic, f)
print(\'用户%s买票成功!\' % i)
else:
print(\'用户%s买票失败!\' % i)
def run(i, mutex):
search(i)
"""给买票环节加锁处理"""
mutex.acquire() # 抢锁
buy(i)
mutex.release() # 释放锁
if __name__ == \'__main__\':
mutex = Lock()
for i in range(1, 6):
p = Process(target=run, args=(i, mutex))
p.start()
"""
用户5查询余票:1
用户1查询余票:1
用户3查询余票:1
用户4查询余票:1
用户2查询余票:1
用户5买票成功!
用户1买票失败!
用户3买票失败!
用户4买票失败!
用户2买票失败!
Process finished with exit code 0
"""
注意:
1、锁不要轻易的使用,容易造成死锁现象。
2、锁只在处理数据的部分添加,用于保证数据的安全(即争抢数据环节)。
11、队列介绍(★★★★★)
- 队列
- 先进先出 FIFO
- 堆栈
- 先进后出 FILO
- Queue模块
from multiprocessing import Queue
# 创建一个队列
q = Queue(5) # 括号内可以传数字,表示生成的队列最大存放的数据量
# 往队列中存数据
q.put(1) # put() 放数据
q.put(1)
print(q.full()) # 判断当前队列是否满了 False
print(q.empty()) # 判断当前队列是否为空
q.put(1)
q.put(1)
q.put(1)
# q.put(1) # 当队列的数据放满之后 如果还有数据需要继续放入 程序会发生阻塞
print(q.full()) # 判断当前队列是否满了 True
# 从队列中取数据
v1 = q.get() # 队列中如果没有数据了 get方法会原地阻塞
v2 = q.get()
v3 = q.get()
v4 = q.get()
v5 = q.get()
try:
v6 = q.get(timeout=3) # 队列中如果没有数据了 get方法会原地阻塞
print(v6)
except Exception:
print(\'一滴都不剩!\')
print(v1)
"""
q.full()
q.empty()
q.get_nowait()
在多进程下是不精确的
"""
12、进程间通信IPC机制
进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。
- 消息队列
from multiprocessing import Queue, Process
"""
研究思路:
1、主进程跟子进程借助于队列通信
2、子进程跟子进程借助于队列通信
"""
def producer(q):
q.put(\'我是康康,很高兴认识你!\')
# print(\'帅哥!你好!\')
def customer(q):
print(q.get())
if __name__ == \'__main__\':
q = Queue()
p = Process(target=producer, args=(q,))
p1 = Process(target=customer, args=(q,))
p.start()
p1.start()
# print(\'主进程:\', q.get())
"""借助队列实现进程相互间的通信!"""
"""
我是康康,很高兴认识你!
Process finished with exit code 0
"""
13、生产者消费者模型
-
生产者(消息队列)
生产/制造东西
-
消费者
消费/处理东西
该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
- Queue模块实现
from multiprocessing import Process, Queue
import time
import random
def consumer(name, q):
# 消费者能全部吃掉
while True:
food = q.get() # 一旦q中无数据 就会阻塞在这个位置
# 判断当前是否有结束的标识
if not food:
break
time.sleep(random.randint(1, 3)) # 模拟吃的时间
print(\'%s 吃了 %s\' % (name, food))
def producer(name, food, q):
for i in range(5):
data = \'%s 生产了%s %s\' % (name, food, i)
time.sleep(random.randint(1, 3)) # 模拟延迟
print(data)
q.put(data) # 将数据放入队列中
if __name__ == \'__main__\':
q = Queue()
# 生产者
p1 = Process(target=producer, args=(\'厨师1号\', \'馒头\', q))
p2 = Process(target=producer, args=(\'厨师2号\', \'饺子\', q))
# 消费者
c1 = Process(target=consumer, args=(\'消费者1号\', q))
c2 = Process(target=consumer, args=(\'消费者2号\', q))
p1.start()
p2.start()
c1.start()
c2.start()
p1.join()
p2.join()
# 等待生产者生产完毕 往队列中添加特定的结束符号
q.put(None) # 在所有生产者生产的数据的末尾
q.put(None) # None的个数取决于消费者的个数
- JoinableQueue模块实现
from multiprocessing import Process, JoinableQueue
import time
import random
def consumer(name, q):
# 消费者能全部吃掉
while True:
food = q.get() # 一旦q中无数据 就会阻塞在这个位置
time.sleep(random.randint(1, 3)) # 模拟吃的时间
print(\'%s 吃了 %s\' % (name, food))
q.task_done() # 告诉队列已经从内部取出一个数据并处理完毕
def producer(name, food, q):
for i in range(5):
data = \'%s 生产了%s %s\' % (name, food, i)
time.sleep(random.randint(1, 3)) # 模拟延迟
print(data)
q.put(data) # 将数据放入队列中
if __name__ == \'__main__\':
q = JoinableQueue()
# 生产者
p1 = Process(target=producer, args=(\'厨师1号\', \'馒头\', q))
p2 = Process(target=producer, args=(\'厨师2号\', \'饺子\', q))
# 消费者
c1 = Process(target=consumer, args=(\'消费者1号\', q))
c2 = Process(target=consumer, args=(\'消费者2号\', q))
p1.start()
p2.start()
# 将消费者设置成守护进程
c1.daemon = True
c2.daemon = True
c1.start()
c2.start()
p1.join()
p2.join()
q.join() # 等待队列中所有的数据被取完再往下执行代码
"""
JoinableQueue() 每当你往该队列中存入数据的时候, 内部会有一个计数器+1
当调用task_done()方法的时候 计数器-1
q.join() 当计数器为0的时候,才往后运行
只要q.join()执行完毕 说明消费者已经处理完数据 消费者就没有存在的必要 即可用到守护进程
"""
14、线程相关知识点
- 进程
资源单位,是资源分配的最小单位。
- 线程
执行单位,是CPU调度的最小单位。
"""
同一进程下的多个线程的数据是共享的
每一个进程下中至少有一个线程
"""
15、开启线程的两种方式
- Thread模块直接传参调用
from threading import Thread # 导入线程相关模块
import time
def tasK(name):
print(\'%s is running...\' % name)
time.sleep(1)
print(\'%s is over!\' % name)
if __name__ == \'__main__\':
t = Thread(target=tasK, args=(\'菜鸟\',))
t.start()
print(\'主进程...\')
"""
菜鸟 is running...
主进程...
菜鸟 is over!
Process finished with exit code 0
"""
- 类的继承(继承Thread类,重写run()方法)
from threading import Thread # 导入线程相关模块
import time
class MyThread(Thread):
def __init__(self, name):
"""即重写了别人的方法,又不知道方法里有什么东西,那么我们用super()直接调用父类的方法"""
super().__init__() # 调用父类的方法
self._name = name
# 重写run()方法
def run(self):
print(\'%s is running...\' % self._name)
time.sleep(1)
print(\'%s is over!\')
t = MyThread(\'菜鸟\')
t.start()
print(\'主进程..\')
"""
菜鸟 is running...
主进程..
%s is over!
Process finished with exit code 0
"""
16、TCP服务端实现并发的效果
- 服务端
1、固定的HOST和PORT,即固定的ADDR
2、不间断提供服务
3、支持并发
from socket import *
from threading import Thread
HOST = \'127.0.0.1\'
PORT = 8080
BUFSIZE = 1024
ADDR = (HOST, PORT)
server = socket() # 源码中默认TCP协议
server.bind(ADDR)
server.listen(5)
print(ADDR, \'正在运行中...\')
def recv_data(conn, addr):
"""将服务的代码单独封装为一个函数"""
while True:
try:
data = conn.recv(BUFSIZE)
if not data:
break
print(data.decode(\'utf-8\'), \'from:\', addr)
conn.send(data.upper())
except ConnectionResetError as e:
print(e)
break
conn.close()
if __name__ == \'__main__\':
while True:
conn, addr = server.accept()
t = Thread(target=recv_data, args=(conn, addr))
t.start()
- 客户端
from socket import *
HOST = \'127.0.0.1\'
PORT = 8080
BUFSIZE = 1024
ADDR = (HOST, PORT)
server = socket()
server.connect(ADDR)
while True:
msg = input("Please input msg>>>").strip()
if not msg:
continue
server.send(msg.encode(\'utf-8\'))
data = server.recv(BUFSIZE)
print(data.decode(\'utf-8\'), \'from:\', ADDR)
17、线程对象的join方法
主线程等待子线程执行完毕后再结束。
- 示例代码
from threading import Thread
import time
def task(name):
print(\'%s is running...\' % name)
time.sleep(1)
print(\'%s is over!\')
if __name__ == \'__main__\':
t = Thread(target=task, args=(\'菜鸟\', ))
t.start()
t.join()
print(\'主线程...\')
"""
菜鸟 is running...
%s is over!
主线程...
Process finished with exit code 0
"""
18、线程间数据共享
同一进程下的多个线程数据共享
- 修改全局变量的值案例
from threading import Thread
money = 100
def task():
global money
money = 200
print(\'子线程:\', money)
if __name__ == \'__main__\':
t = Thread(target=task)
t.start()
t.join()
print(\'主进程\', money)
"""
子线程: 200
主进程 200
Process finished with exit code 0
"""
19、线程对象属性及其他方法
currentThread(): 返回当前的线程变量。
enumerate(): 返回一个包含正在运行的线程的list
activeCount(): 返回正在运行的线程数量
currentThread().name:获取线程的名称
from threading import Thread
import os
def tasK():
print(\'Hello World!\', os.getpid())
if __name__ == \'__main__\':
t = Thread(target=tasK)
t.start()
print(\'主线程...\', os.getpid())
# 证明是在同一个进程下,其进程号(PID)是一样的
"""
Hello World! 2488
主进程... 2488
Process finished with exit code 0
"""
20、守护线程
运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕。
"""
主线程运行结束之后不会立刻结束,会等待所有其他非守护线程结束才会结束,因为主线程的结束意味着所在的进程的结束。
"""
# -*- coding: utf-8 -*-
from threading import Thread
import time
def task(name):
print(\'%s is running...\' % name)
time.sleep(1)
print(\'%s is over!\' % name)
if __name__ == \'__main__\':
t = Thread(target=task, args=(\'爸爸\', ))
t.daemon = True
t.start()
print(\'主线程...\')
"""
爸爸 is running...
主线程...
Process finished with exit code 0
"""
- 具有迷惑性的案例
# -*- coding: utf-8 -*-
from threading import Thread
import time
def foo():
print(123)
time.sleep(1)
print(\'over123\')
def func():
print(456)
time.sleep(3)
print(\'over456\')
if __name__ == \'__main__\':
t1 = Thread(target=foo)
t2 = Thread(target=func)
t1.daemon = True
t1.start()
t2.start()
print(\'主....\')
"""
123
456
主....
over123
over456
Process finished with exit code 0
"""
21、线程互斥锁
- 不加锁
并发执行,速度快,数据不安全。
# -*- coding: utf-8 -*-
from threading import Thread, Lock
import time
money = 100
def task():
global money
tmp = money
time.sleep(0.1)
money = tmp - 1
if __name__ == \'__main__\':
t_list = []
for i in range(100):
t = Thread(target=task)
t.start()
t_list.append(t)
[t.join() for t in t_list]
print(money)
# 加锁前:结果为99
- 加锁后
将并发变成串行数据,保证了数据的安全性。
# -*- coding: utf-8 -*-
from threading import Thread, Lock
import time
mutex = Lock()
money = 100
def task():
global money
mutex.acquire()
tmp = money
time.sleep(0.1)
money = tmp - 1
mutex.release()
if __name__ == \'__main__\':
t_list = []
for i in range(100):
t = Thread(target=task)
t.start()
t_list.append(t)
[t.join() for t in t_list]
print(money)
# 加锁前:结果为99 加锁后:结果为0 由并发变成了串行
22、GIL全局解释器锁
GIL(Global Interpret Lock)是CPython解释器的特点。
同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势。
在CPython解释器中GIL是一把互斥锁,用来阻止同一个进程下的多个线程的同时执行,即同一个进程下的多个线程无法利用多核优势。
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
-
垃圾回收机制
- 引用计数(变量值被变量名关联的次数)
- 标记清除(解决容器对象的循环引用问题)
- 分代回收(基于引用计数的回收机制,采用“空间换时间”策略)
GIL是解释型语言的通病,而编译型的语言则没有,如C、C++。
-
GIL(★★★★★)
- GIL不是Python的特点而是CPython解释器的特点。
- GIL是保证解释器级别的数据的安全。
- 同一个进程下多个线程无法同时执行(即无法利用多核优势)。
- 针对不同的数据需要加不同的锁进行处理。
- 解释型语言的通病:同一进程下的多个线程无法利用多核优势。
- GIL与Lock
- GIL与普通互斥锁的区别
# -*- coding: utf-8 -*-
from threading import Thread, Lock
import time
mutex = Lock() # 生成一把锁
money = 100
def task():
global money
# with mutex:
# """上下文管理,自动调用mutex.acquire()和mutex.release()"""
mutex.acquire() # 自己的互斥锁
tmp = money
time.sleep(0.1) # 只要进入IO,GIL会自动释放
money = tmp - 1
mutex.release()
if __name__ == \'__main__\':
t_list = []
for i in range(100):
t = Thread(target=task)
t.start()
t_list.append(t)
for t in t_list:
t.join()
print(money)
"""
100个线程开启之后,要先去抢GIL,再去抢互斥锁,
此时,我进入IO,GIL自动释放,
但是我手里还有一个自己的互斥锁,
其他线程抢到了GIL但是没有抢到互斥锁,
所以,最后GIL还是回到了我自己的手上,然后去操作数据
"""
23、多进程与多线程的实际应用场景
- 单核
"""
四个任务:
IO密集型:一个进程开启多个线程
计算密集型:一个进程开启四个线程
"""
- 多核
意味着能有多个核并行完成计算,故多核提升的计算性能。
"""
四个任务:
IO密集型:多线程
计算密集型:多进程
"""
- 计算密集型案例
from multiprocessing import Process
from threading import Thread
import time
import os
def calculate():
result = 1
for i in range(1, 100000):
result *= i
if __name__ == \'__main__\':
l = []
print(os.cpu_count()) # 获取当前机器CPU个数
start_time = time.time()
for i in range(os.cpu_count()):
p = Process(target=calculate) # 4.084869623184204
t = Thread(target=calculate)
# p.start()
# l.append(p)
t.start() # 18.552288055419922
l.append(t)
# for p in l:
# p.join()
for t in l:
t.join()
print(\'time: \', time.time() - start_time)
- I/O密集型案例
多进程主要将时间耗费在进程创建的开销上,而多线程则主要节省了进程创建的时间开销。
# -*- coding: utf-8 -*-
from multiprocessing import Process
from threading import Thread
import time
def io():
time.sleep(3)
if __name__ == \'__main__\':
l = []
start_time = time.time()
for i in range(400):
p = Process(target=io) # 25.05035924911499
# p.start()
# l.append(p)
t = Thread(target=io) # 3.0440149307250977
t.start()
l.append(t)
# for p in l:
# p.join()
for t in l:
t.join()
print(\'time: \', time.time() - start_time)
小结
"""
通常是在多进程下开设多线程,
这样的既可以利用多核优势也能节省资源的消耗。
"""
24、死锁与递归锁(了解)
- 死锁
指两个或两个以上的
进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。
抢锁必须释放锁,但在实际操作中也极其容易发生死锁现象,即整个程序卡死、发生阻塞。
线程1先抢到A锁,其他线程等待,接着线程1又抢到B锁,未释放锁,其他线程继续等待...
# -*- coding: utf-8 -*-
from threading import Thread, Lock
import time
mutexA = Lock()
mutexB = Lock()
class MyThread(Thread):
def __init__(self):
super().__init__()
def run(self):
self.func1()
self.func2()
def func1(self):
mutexA.acquire()
print(\'%s get A\' % self.name) # self.name 获取当前线程名
mutexB.acquire()
print(\'%s get B\' % self.name)
mutexB.release()
mutexA.release()
def func2(self):
mutexB.acquire()
print(\'%s get B\' % self.name) # self.name 获取当前线程名
time.sleep(2)
mutexA.acquire()
print(\'%s get A\' % self.name)
mutexA.release()
mutexB.release()
if __name__ == \'__main__\':
for i in range(10):
t = MyThread()
t.start()
# 发生死锁...
"""
Thread-1 get A
Thread-1 get B
Thread-1 get B
Thread-2 get A
Process finished with exit code -1
"""
- 递归锁
可以被连续的acquire和release,但是只能被第一个抢到锁的执行上述操作。
"""
内部有一个计数器,即counter
每acquire一次, counter += 1
每release一次, counter -= 1
只要counter != 0,其他人就无法抢到这把锁。
不会发生死锁现象
"""
RLock模块演示代码
# -*- coding: utf-8 -*-
from threading import Thread, RLock
import time
mutexA = mutexB = RLock() # 两个变量同时指向一个内存地址
class MyThread(Thread):
def __init__(self):
super().__init__()
def run(self):
self.func1()
self.func2()
def func1(self):
mutexA.acquire() # A ---> counter += 1 ---> 1
print(\'%s get A\' % self.name) # self.name 获取当前线程名
mutexB.acquire() # B ---> counter += 1 ---> 2
print(\'%s get B\' % self.name)
mutexB.release() # B ---> counter -= 1 ---> 1
mutexA.release() # A ---> counter -= 1 ---> 0
def func2(self):
mutexB.acquire() # B ---> counter += 1 ---> 1
print(\'%s get B\' % self.name) # self.name 获取当前线程名
time.sleep(2) # 其他线程继续等待
mutexA.acquire() # A ---> counter += 1 ---> 2
print(\'%s get A\' % self.name)
mutexA.release() # A ---> counter -= 1 ---> 1
mutexB.release() # B ---> counter -= 1 ---> 0
if __name__ == \'__main__\':
for i in range(10):
t = MyThread()
t.start()
# 递归锁
"""
Thread-1 get A
Thread-1 get B
Thread-1 get B
Thread-1 get A
Thread-2 get A
Thread-2 get B
Thread-2 get B
Thread-2 get A
...
Process finished with exit code 0
"""
25、信号量Semaphore(了解)
信号量在不同的阶段可能对应不同的技术点,在并发编程中指的是锁。
Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
"""
如果将互斥锁比喻成一个厕所(家庭厕所,只有一个坑位),
那么信号量就是多个厕所(公共厕所,多个坑位)。
"""
演示代码
# -*- coding: utf-8 -*-
from threading import Thread, Semaphore
import time
import random
sm = Semaphore(5) # 创建5个坑位
def task(name):
sm.acquire()
print(\'%s 正在蹲...\' % name)
time.sleep(random.randint(1, 5))
# time.sleep(3)
sm.release()
if __name__ == \'__main__\':
for i in range(20):
t = Thread(target=task, args=(\'%s号\' % (i+1), ))
t.start()
"""
刚开始有5把锁可以抢,5个线程就能抢到,
当其中一个线程释放锁,
接下来就立即有新的线程抢到这把锁
"""
26、Event事件(了解)
一些进程/线程需要等待另外一些进程/线程运行完毕之后才能运行,类似于信号的发送。
event.isSet():返回event的状态值;
event.wait():如果 event.isSet()==False将阻塞线程;
event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态,等待操作系统调度;
event.clear():恢复event的状态值为False。
案例:等红绿灯事件
# -*- coding: utf-8 -*-
from threading import Thread, Event
import time
event = Event() # 生成一个红绿灯
def light():
print(\'The red light is on.\')
time.sleep(3)
print(\'The green light is on.\')
# 提示等红灯的可以通过
event.set()
def car(name):
print(\'%s 车正在等待红灯\' % name)
event.wait() # 等待别人发信号
print(\'%s 车加油门走了...\' % name)
if __name__ == \'__main__\':
t = Thread(target=light)
t.start()
for i in range(3):
t = Thread(target=car, args=(\'%s\' % i, ))
t.start()
"""
The red light is on.
0 车正在等待红灯
1 车正在等待红灯
2 车正在等待红灯
The green light is on.
1 车加油门走了...
2 车加油门走了...0 车加油门走了...
Process finished with exit code 0
"""
27、线程q(了解)
同一个进程下的多个线程的数据是共享的,在同一个进程下使用队列的原因是队列是管道+栈,使用队列,保证的数据的安全性。
- 先进先出q(FIFO)
q = queue.Queue(3)
q.put(1) # 放数据
q.get() # 取数据
q.get_nowait()
q.get(timeout=3)
q.full() # 是否满
q.empty() # 是否空
- 后进先出q(LIFO)
q = queue.LifoQueue(3) # last in first out
q.put(1)
q.put(2)
q.put(3)
print(q.get()) # 3
- 优先级q(给数据设置优先级)
q = queue.PriorityQueue(4)
q.put((10, \'111\')) # put((优先级, 放入的内容))数字越小优先级越高
q.put((100, \'222\'))
q.put((0, \'333\'))
q.put((-2, \'666\'))
print(q.get()) # (-2, \'666\')
28、进程池与线程池(★★★★★)
开设进程和线程都需要消耗资源,所以不可能做到无限制的开设多线程的多进程。
池的概念
池是用来保证计算机硬件安全的情况下
最大限度的利用计算机,降低了程序的运行的效率,但是保证了计算机硬件的安全,从而使写的程序能够正常运行。
- 进程池
# -*- coding: utf-8 -*-
from concurrent.futures import ProcessPoolExecutor
import time
import os
def task(n):
print(n, os.getpid())
time.sleep(2)
return n * n
def call_back(n):
print(\'call_back>>>:\', n.result())
if __name__ == \'__main__\':
# 进程号一样
pool = ProcessPoolExecutor(5) # 默认CPU个数的进程:self._max_workers = os.cpu_count() or 1
start_time = time.time()
for i in range(20):
res = pool.submit(task, i).add_done_callback(call_back) # 异步回调机制
"""
任务的提交方式:
同步:任务提交之后,原地等待任务的返回结果,期间不做任何事
异步:任务提交之后,不等待任务的返回结果,继续执行下一个任务
"""
print(\'time:\', time.time() - start_time) # time: 8.396687507629395
- 线程池
# -*- coding: utf-8 -*-
from concurrent.futures import ThreadPoolExecutor
import time
pool = ThreadPoolExecutor(8) # 默认开启CPU个数*5的线程数
def task(n):
print(n)
time.sleep(2)
return n*n
start_time = time.time()
t_list = []
for i in range(20):
# pool.submit(task, i)
res = pool.submit(task, i) # 异步提交
# print(res.result()) # 返回值是一个对象 result()方法 返回异步回调的结果---> 由并发变串行
# res.result()拿到的是异步提交的任务返回的结果 同步提交 join()方法
t_list.append(res)
# 等待线程池中的所有任务执行完毕后再继续往下执行
pool.shutdown() # 关闭线程池 等待线程池中所有的任务运行完毕
for res in t_list:
print(\'>>>:\', res.result()) # 有序
"""
任务的提交方式:
同步:任务提交之后,原地等待任务的返回结果,期间不做任何事
异步:任务提交之后,不等待任务的返回结果,继续执行下一个任务
"""
print(\'time:\', time.time() - start_time) # time: 6.030665397644043
小结(★★★★★)
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
pool = ProcessPoolExecutor(5)
res = pool.submit(task, i).add_done_callback(call_back) # 异步回调机制
29、协程(了解)
"""
进程:资源单位
线程:执行单位
协程:单线程下实现并发的效果
并发=切换+保存状态
"""
协程
在代码层面上检测所有的IO操作,一旦有IO,就在代码级别完成切换,从而提升程序的运行效率。
- 必须在只有一个单线程里实现并发
- 修改共享数据不需加锁
- 用户程序里自己保存多个控制流的上下文栈
- 附加:一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))
# -*- coding: utf-8 -*-
import time
# 串行执行计算密集型任务
def func1():
for i in range(1000000):
i + 1
def func2():
for i in range(1000000):
i + 1
start_time1 = time.time()
func1()
func2()
print(time.time() - start_time1) # 0.053926944732666016
# 切换+yield
def func3():
while True:
1000000 + 1
yield
def func4():
g = func3() # 初始化生成器
for i in range(1000000):
i + 1
next(g)
start_time2 = time.time()
func4()
print(time.time() - start_time2) # 0.07108616828918457
gevent模块
gevent模块本身无法检测常见的IO操作,
需要在使用的时候额外导入from gevent import monkey;monkey.patch_all()
# -*- coding: utf-8 -*-
import time
from gevent import spawn
from gevent import monkey;monkey.patch_all()
def heng():
print(\'哼...\')
time.sleep(2)
print(\'哼...\')
def ha():
print(\'哈...\')
time.sleep(3)
print(\'哈...\')
start_time = time.time()
# heng()
# ha()
# print(time.time() - start_time) # 串行下:5.015585899353027
g1 = spawn(heng)
g2 = spawn(ha)
g1.join()
g2.join()
print(time.time() - start_time)
# 单线程下实现并发:3.017923593521118
30、协程实现TCP服务端并发
- 服务端
# -*- coding: utf-8 -*-
from gevent import monkey;monkey.patch_all()
import socket
from gevent import spawn
def communication(conn):
while True:
try:
data = conn.recv(1024)
if len(data) == 0: break
conn.send(data.upper())
except ConnectionResetError as e:
print(e)
break
conn.close()
def server(ip, port):
server = socket.socket()
server.bind((ip, port))
server.listen(5)
while True:
conn, addr = server.accept()
spawn(communication, conn)
if __name__ == \'__main__\':
g1 = spawn(server, \'127.0.0.1\', 8080)
g1.join()
- 客户端
# -*- coding: utf-8 -*-
from threading import Thread, current_thread
import socket
def x_client():
client = socket.socket()
client.connect((\'127.0.0.1\',8080))
n = 0
while True:
msg = \'%s say hello %s\'%(current_thread().name,n)
n += 1
client.send(msg.encode(\'utf-8\'))
data = client.recv(1024)
print(data.decode(\'utf-8\'))
if __name__ == \'__main__\':
for i in range(100):
t = Thread(target=x_client)
t.start()
小结
可以通过多进程下开设多线程,多线程下开设协程,从而使程序的执行效率提升。
并发编程理论(IO模型)
IO模型简介
这里描述的IO模型都是针对
网络IO。
同步IO、异步IO、阻塞IO、非阻塞IO、信号驱动IO(实际情况中不常用)
"""
- 等待数据的准备(wait data)
- 从内核拷贝数据到进程中(copy data)
"""
- 常见的网络阻塞状态
"""
- accept()
- recv()
- recvfrom()
send()虽然也有IO行为,但是不再考虑范围之内。
"""
阻塞IO模型
- 服务端
# -*- coding: utf-8 -*-
from socket import *
HOST = \'127.0.0.1\'
PORT = 8080
BUFSIZE = 1024
ADDR = (HOST, PORT)
server = socket()
server.bind(ADDR)
server.listen(5)
while True:
conn, addr = server.accept()
while True:
try:
data = conn.recv(BUFSIZE)
if not data:
break
print(data.decode(\'utf-8\'))
conn.send(data.upper())
except ConnectionResetError as e:
break
conn.close()
在服务端开启多进程/多线程实现并发,并没有解决IO阻塞,实际上,该等待的地方还是得等,没用规避掉,只不过是多个人等待彼此互不干扰而已。
- 客户端
# -*- coding: utf-8 -*-
from socket import *
HOST = \'127.0.0.1\'
PORT = 8080
BUFSIZE = 1024
ADDR = (HOST, PORT)
client = socket()
client.connect(ADDR)
while True:
client.send(b\'Hello World\')
data = client.recv(BUFSIZE)
if not data:
break
print(data.decode(\'utf-8\'))
非阻塞IO模型
实际应用中不会考虑使用非阻塞IO模型,因为CPU占有率太高,过于浪费资源。
提交后无论是否有数据都会立刻获得结果,即:在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。
- 服务端(单线程下实现并发)
# -*- coding: utf-8 -*-
from socket import *
HOST = \'127.0.0.1\'
PORT = 8080
BUFSIZE = 1024
ADDR = (HOST, PORT)
server = socket()
server.bind(ADDR)
server.listen(5)
server.setblocking(False) # 将所有得网络阻塞变为非阻塞
# server.accept() # 拿不到conn,addr会报错
# 报错信息:BlockingIOError: [WinError 10035] 无法立即完成一个非阻止性套接字操作。
conn_list = [] # 存储链接对象
non_data_conn_list = [] # 存储将删除的链接对象
while True:
try:
conn, addr = server.accept()
conn_list.append(conn)
except BlockingIOError as e:
print(\'列表得长度:\', len(conn_list))
# print(\'此处没其他请求,则可以做其他事...\')
for conn in conn_list:
try:
data = conn.recv(BUFSIZE)
if not data:
conn.close() # 客户端断开链接
non_data_conn_list.append(conn) # 将即将删除的对象添加到新的列表中
continue
conn.send(data.upper())
except BlockingIOError:
continue
except ConnectionResetError:
conn.close()
non_data_conn_list.append(conn)
for conn in non_data_conn_list:
conn_list.remove(conn)
non_data_conn_list.clear()
- 客户端
# -*- coding: utf-8 -*-
from socket import *
HOST = \'127.0.0.1\'
PORT = 8080
BUFSIZE = 1024
ADDR = (HOST, PORT)
client = socket()
client.connect(ADDR)
while True:
msg = input(\'msg>>>>\').strip()
if not msg:
continue
client.send(msg.encode(\'utf-8\'))
data = client.recv(BUFSIZE)
if not data:
break
print(data.decode(\'utf-8\'))
- 存在的问题
- 循环调用
recv()频繁的占用CPU不进行操作,大幅度推高CPU占用率,对于低配主机,容易出现卡机的情况。 -
整体吞吐量低,任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。 -
recv()起到的作用多是检测“操作是否完成”。
IO多路复用
操作系统提供的监管机制能够协助监管socket对象和conn对象,并且可以监管多个对象,只要有人触发就立即返回可执行的对象。
当监管的对象只有一个的时候,IO多路复用的效率不如阻塞IO模型,但是IO多路复用可以一次性监管很多对象。
当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
select的优势在于可以处理多个连接,不适用于单个连接
- 服务端
# -*- coding: utf-8 -*-
from socket import *
import select
HOST = \'127.0.0.1\'
PORT = 8080
BUFSIZE = 1024
ADDR = (HOST, PORT)
server = socket()
server.bind(ADDR)
server.listen(5)
server.setblocking(False)
read_list = [server]
while True:
r_list, w_list, x_list = select.select(read_list, [], []) # 监测server
"""
帮你监管对象,一旦有请求,就返回一个监管对象
"""
# res = select.select(read_list, [], [])
# print(res) # 阻塞在select()的监管机制内 拿到的就是server对象
# print(server)
for i in r_list:
"""针对不同的对象进行不同的处理操作"""
if i is server:
# 此时执行accept()不会发生阻塞
conn, addr = i.accept()
# 也应该添加到待监管的队列中
read_list.append(conn)
else:
try:
data = i.recv(BUFSIZE)
if not data:
i.close()
# 将无效的监管对象移除
read_list.remove(i)
continue
print(data.decode(\'utf-8\'))
i.send(data.upper())
except ConnectionResetError:
i.close()
read_list.remove(i)
相比其他模型,使用
select()的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,同时能够为多客户端提供服务。如果试图建立一个简单的事件驱动的服务器程序,这个模型有一定的参考价值。
- 客户端
# -*- coding: utf-8 -*-
from socket import *
HOST = \'127.0.0.1\'
PORT = 8080
BUFSIZE = 1024
ADDR = (HOST, PORT)
client = socket()
client.connect(ADDR)
while True:
# msg = input(\'msg>>>>\').strip()
msg = \'hahaha\'
if not msg:
continue
client.send(msg.encode(\'utf-8\'))
data = client.recv(BUFSIZE)
if not data:
break
print(data.decode(\'utf-8\'))
小结
监管机制有很多,select机制(Windows、Linux)、poll机制(Linux)、epoll机制(Linux),poll和select机制都可以监管多个对象,但是poll能监管的对象更多,当监管的对象特别多的时候,select和poll机制可能会出现及其大的延时相应。epoll会给每一个监管对象绑定一个回调机制,一旦有相应,回调机制会立刻发起提醒。
异步IO模型
异步IO模型是所有模型中效率最高的,也是使用最广泛的模型。
用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,回调机制会立刻返回,所以不会对用户进程产生任何block。
然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。
相关的模块和框架
asyncio模块,sanic、tronado、twisted异步框架。
- asyncio模块
# -*- coding: utf-8 -*-
import threading
import asyncio
# @asyncio.coroutine
async def hello():
print(\'Hello world %s\' % threading.current_thread()) # 获取线程名
await asyncio.sleep(1)
print(\'Hello world %s \' % threading.current_thread())
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
"""
单线程实现并发效果
Hello world <_MainThread(MainThread, started 15216)>
Hello world <_MainThread(MainThread, started 15216)>
Hello world <_MainThread(MainThread, started 15216)>
Hello world <_MainThread(MainThread, started 15216)>
"""