老师的博客:http://www.cnblogs.com/Eva-J/articles/8253549.html#_label14
Pipe
pipe是管道但是不是很推荐使用,因为有着不安全的危险,queue就相当于pipe加上lock,比较安全,但是的注意他们的close的时间,详见python3中的day38的笔记
下面是老师的代码
from multiprocessing import Lock,Pipe,Process def producer(con,pro,name,food): con.close() for i in range(100): f = '%s生产%s%s'%(name,food,i) print(f) pro.send(f) pro.send(None) pro.send(None) pro.send(None) pro.close() def consumer(con,pro,name,lock): pro.close() while True: lock.acquire() food = con.recv() lock.release() if food is None: con.close() break print('%s吃了%s' % (name, food)) if __name__ == '__main__': con,pro = Pipe() lock= Lock() p = Process(target=producer,args=(con,pro,'egon','泔水')) c1 = Process(target=consumer, args=(con, pro, 'alex',lock)) c2 = Process(target=consumer, args=(con, pro, 'bossjin',lock)) c3 = Process(target=consumer, args=(con, pro, 'wusir',lock)) c1.start() c2.start() c3.start() p.start() con.close() pro.close() # pipe 数据不安全性 # IPC # 加锁来控制操作管道的行为 来避免进程之间争抢数据造成的数据不安全现象 # 队列 进程之间数据安全的 # 管道 + 锁
manager
老师的代码
from multiprocessing import Manager,Process,Lock def main(dic,lock): dic['count'] -= 1 if __name__ == '__main__': m = Manager() l = Lock() dic=m.dict({'count':100}) p_lst = [] for i in range(50): p = Process(target=main,args=(dic,l)) p.start() p_lst.append(p) for i in p_lst: i.join() print('主进程',dic)
运行几次后,你会发现得到的结果不一样,所以也存在数据不安全的现象,所以一般推荐使用queue比较安全,queue,pipe都是可是实现数据通信的。
数据池pool
参数介绍:
Pool([numprocess [,initializer [, initargs]]]):创建进程池
1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
3 initargs:是要传给initializer的参数组
1 p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。 2 '''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,
必须从不同线程调用p.apply()函数或者使用p.apply_async()''' 3 4 p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。 5 '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,
将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。''' 6 7 p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成 8 9 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
1 方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法 2 obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。 3 obj.ready():如果调用完成,返回True 4 obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常 5 obj.wait([timeout]):等待结果变为可用。 6 obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数
进程池的效率要高许多,详见老师的博客
import time from multiprocessing import Pool,Process def func(n): n=n for i in range(10): n+=i print(n) if __name__ == '__main__': start = time.time() pool = Pool(5) # 5个进程 pool.map(func,range(100)) # 100个任务 t1 = time.time() - start start = time.time() p_lst = [] for i in range(100):#100个任务 p = Process(target=func,args=(i,)) p_lst.append(p) p.start() for p in p_lst :p.join() t2 = time.time() - start print(t1,t2)