一、queue
二、线程
        基本使用
        线程锁
        自定义线程池
        
        生产者消费者模型(队列)
三、进程
        基本使用
        进程锁
        进程数据共享
            默认数据不共享
            queues
            array
            Manager.dict
        进程池
    
    PS:
        IO密集型-多线程
        计算密集型 - 多进程
    
四、协程
        原理:利用一个线程,分解一个线程成为多个“微线程”==》程序级别
        greenlet
        gevent
        pip3 install gevent

 

一、queue

1.1 queue用法

# 先进先出队列

# put放数据,是否阻塞,阻塞时的超时事件

# get取数据(默认阻塞),是否阻塞,阻塞时的超时事件

# 队列的最大长度:queue.Queue(2) 里面的数字

# qsize()真实个数

# maxsize 最大支持的个数

# join,task_done,阻塞进程,当队列中任务执行完毕之后,不再阻塞

 

import queue
q = queue.Queue(2)  # q = queue.Queue()如果没有参数的话,就是可以放无限多的数据。
print(q.empty())    # 返回队列是否为空,空则为True,此处为True
q.put(11)
q.put(22)
print(q.empty())    # 此处为False
print(q.qsize())    # 返回队列中现在有多少元素
# q.put(22)
# q.put(33,block=False)   # 如果队列最大能放2个元素,这时候放了第三个,默认是阻塞的,block=False,如果就会报错:queue.Full
# q.put(33,block=True,timeout=2)  # 设置为阻塞,如果timeout设置的时间之内,还没有人来取,则就会报错:queue.Full
print(q.get())
print(q.get())
print(q.get(timeout=2))   # 队列里的数据已经取完了,如果再取就会阻塞,这里timeout时间2秒,就是等待2秒,队列里还没有数据就报错:queue.Empty

 

1.2 queue.join

# join:实际上意味着等到队列为空,再执行别的操作,否则就一直阻塞,不是说get取完了,就不阻塞了,而是每次get之后,

# 要执行:task_done 告诉一声已经取过了,等队列为空,join才不阻塞。

 

下面的程序是阻塞的

q = queue.Queue(5)

q.put(123)
q.put(456)
q.get()
# q.task_done()
q.get()
# q.task_done()  # 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
q.join()

 

下面的程序是不阻塞的:

q = queue.Queue(5)

q.put(123)
q.put(456)
q.get()
q.task_done()
q.get()
q.task_done()  # 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
q.join()

 

1.3 其他队列

import queue

# queue.Queue,先进先出队列

# queue.LifoQueue,后进先出队列

# queue.PriorityQueue,优先级队列

# queue.deque,双向对队

 

# queue.Queue(2) 先进先出队列

# put放数据,是否阻塞,阻塞时的超时事件

# get取数据(默认阻塞),是否阻塞,阻塞时的超时事件

# qsize()真实个数

# maxsize 最大支持的个数

# join,task_done,阻塞进程,当队列中任务执行完毕之后,不再阻塞

 

import queue
# q = queue.Queue(2) 先进先出队列
# q = queue.LifoQueue() 后进先出队列
# q = queue.PriorityQueue() 优先级队列
# q = queue.deque()   双向队列

q = queue.LifoQueue()
q.put(123)
q.put(456)
# 打印;456
print(q.get())

# 优先级最小的拿出来
# 如果优先级一样,则是谁先放,就先取出谁
q = queue.PriorityQueue()
q.put((1,'alex1'))
q.put((1,'alex2'))
q.put((1,'alex3'))
q.put((3,'alex3'))
# (1, 'alex1')
print(q.get())


q = queue.deque()
q.append(123)
q.append(333)
q.appendleft(456)
# deque([456, 123, 333])
print(q)
# 打印:456
print(q[0])
q.pop()     # 从右边删除
# deque([456, 123])
print(q)
q.popleft() # 从左边删除

 

python的队列是在内存里创建的,python的进程退出了,则队列也清空了。

 

 

 

二、生产者消费者模型(队列)

1生产者消费者模型的作用:

1解决阻塞

2就是解耦,修改生产者,不会影响消费者,反之亦然。

 

2)在生产环境,用生产者消费者模型,就可以解决:

1、处理瞬时并发的请求问题。瞬时的连接数就不会占满。所以服务器就不会挂了。

2、客户端提交一个请求,不用等待处理完毕,可以在页面上做别的事情。

 

 

2.1)如果不用队列存数据,服务端通过多线程来处理数据:

用户往队列存数据,服务器从队列里取数据。

没有队列的话,就跟最大连接数有关系,每个服务器就有最大连接数。

客户端要获取服务器放回,服务器要查、修改数据库或修改文件,要2分钟,那客户端就要挂起链接2分钟,2万个连接一半都要挂起,服务器就崩溃了。

 

如果没有队列,第一个用户发来请求,连上服务器,占用连接,等待2分钟。

第二个人来也要占用2分钟。

web服务器

如果要处理并发,有10万并发,如果:一台机器接收一个连接,需要10万个机器,等待2分钟就处理完了。

 

2.2)把请求放在队列的好处

用户发来请求,把请求放到队列里,可以让连接马上断开,不会阻塞,就不占用服务器的连接数了。如果看到订单处理了没,就要打开另外一个页面,查看请求是否处理。

 

服务器查询处理任务的时候,每个才花2分钟,服务器耗时是没有减少的。

但是这样做,客户端就不会持续的占用连接了。那瞬时的连接数就不会占满。所以服务器就不会挂了。

但是后台要处理10万个请求,也需要50台服务器。并不会减少服务器数量。

这样就能处理瞬时并发的请求问题。

 

服务器只是处理请求,是修改数据库的值,不是告诉客户端。而是客户端再发来请求,查询数据库已经修改的内容。

提交订单之后,把这个订单扔给队列,程序返回“正在处理”,就不等待了,然后断开这个连接,你可以在页面里做别的事情,不用一直等待订单处理完。这样就不影响服务器的最大连接数。在页面帮你发起一个alax请求,url,不停的请求(可能是定时器),我的订单成功没有,我的订单成功没有,如果订单成功了,就自动返回页面:订单成功

如果不用队列的话,一个请求就占用一个服务器,等待的人特别多,等待连接的个数太多了。服务器就挂掉了。

队列就没有最大个数限制,把请求发给队列了,然后http链接就断开了,就不用等待了。

12306买票的时候,下次再来请求的时候,就会告诉你,前面排了几个人。

 

3python queue的特点:

 

pythonqueue是内存级别的。rabbitmq可以把队列发到别的服务器上处理。

所以python里的queue不能持久化,但是rabbitmq可以持久化。

queue.Queue()这样写,队列就没有最大个数限制。queue.Queue(5)就是说队列里最多能放5个值

 

4)生产者消费者代码示例:

 

import time,random
import queue,threading
q = queue.Queue()


def Producer(name):
    count =0
    while True:
        time.sleep(random.randrange(3))
        if q.qsize()<3:         # 只要盘子里小于3个包子,厨师就开始做包子
            q.put(count)
            print("Producer %s has produced %s baozi.." %(name,count))
            count += 1

def Consumer(name):
    count =0
    while True:
        time.sleep(random.randrange(4))
        if not q.empty():       # 只要盘子里有包子,顾客就要吃。
            data = q.get()
            print(data)
            print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' % (name,data))
        else:           # 盘子里没有包子
            print("---no baozi anymore----")
        count+=1

p1 = threading.Thread(target=Producer,args=('A',))
c1 = threading.Thread(target=Consumer,args=('B',))
c2 = threading.Thread(target=Consumer,args=('C',))
p1.start()
c1.start()
c2.start()

'''
当你设计复杂程序的时候,就可以用生产者消费者模型,来松耦合你的代码,也可以减少阻塞。
'''

 

三、线程锁

3.1 LockRLock

Lock只能锁一次,RLock可以递归多层,Lock不支持多层锁嵌套,我们一般用RLOCK

 

import threading
import time
NUM = 10

def func(lock):
    global NUM
    # 上锁
    lock.acquire()
    lock.acquire()
    NUM -= 1
    time.sleep(2)
    print(NUM)
    # 开锁
    lock.release()
    lock.release()

# Lock = threading.Lock()   # 不支持嵌套锁,一般不用
RLock = threading.RLock()   # 一般用RLock,支持嵌套锁。

for i in range(10):
    t = threading.Thread(target=func,args=(RLock,))
    t.start()

'''
死锁:
就是你也抢资源,我也抢资源,谁也抢不走就是死锁。
如果是python,就是Lock,弄成嵌套锁,不支持,则变成死锁。
解决办法:
用RLock,支持嵌套锁
'''

 

3.2 信号量 BoundedSemaphore

如果用线程锁,一次只允许一个进入,如果用信号量可以允许同时多少个一起进入。

每次5个线程同时执行,可能就会同时修改一个值。

 

 

import threading
import time
NUM = 10

def func(i,lock):
    global NUM
    # 上锁
    lock.acquire()  # 总共30个 一次执行5个 25个,依次类推:20,15。。。
    NUM -= 1
    time.sleep(2)
    print('NUM:',str(NUM),'i:',i)
    # 开锁
    lock.release()

# Lock = threading.Lock()   # 不支持嵌套锁,一般不用
# RLock = threading.RLock()   # 一般用RLock,支持嵌套锁。
lock = threading.BoundedSemaphore(5)   # 参数是每次执行几个线程

for i in range(30):
    t = threading.Thread(target=func,args=(i,lock,))
    t.start()

 

'''
打印:
NUM: 5 i: 2
NUM: 4 i: 0
NUM: 4 i: 4
NUM: 2 i: 3
NUM: 1 i: 1
NUM: 0 i: 6
NUM: 0 i: 5
NUM: -2 i: 7
NUM: -2 i: 8
NUM: -4 i: 9
NUM: -5 i: 10
NUM: -6 i: 11
NUM: -7 i: 12
NUM: -8 i: 13
NUM: -9 i: 14
NUM: -10 i: 15
NUM: -10 i: 16
NUM: -10 i: 18
NUM: -10 i: 17
NUM: -10 i: 19
'''
打印

相关文章: