老师的博客:http://www.cnblogs.com/Eva-J/articles/8253549.html#_label14

Pipe

pipe是管道但是不是很推荐使用,因为有着不安全的危险,queue就相当于pipe加上lock,比较安全,但是的注意他们的close的时间,详见python3中的day38的笔记

下面是老师的代码

from multiprocessing import Lock,Pipe,Process
def producer(con,pro,name,food):
    con.close()
    for i in range(100):
        f = '%s生产%s%s'%(name,food,i)
        print(f)
        pro.send(f)
    pro.send(None)
    pro.send(None)
    pro.send(None)
    pro.close()

def consumer(con,pro,name,lock):
    pro.close()
    while True:
            lock.acquire()
            food = con.recv()
            lock.release()
            if food is None:
                con.close()
                break
            print('%s吃了%s' % (name, food))
if __name__ == '__main__':
    con,pro = Pipe()
    lock= Lock()
    p = Process(target=producer,args=(con,pro,'egon','泔水'))
    c1 = Process(target=consumer, args=(con, pro, 'alex',lock))
    c2 = Process(target=consumer, args=(con, pro, 'bossjin',lock))
    c3 = Process(target=consumer, args=(con, pro, 'wusir',lock))
    c1.start()
    c2.start()
    c3.start()
    p.start()
    con.close()
    pro.close()
# pipe 数据不安全性
# IPC
# 加锁来控制操作管道的行为 来避免进程之间争抢数据造成的数据不安全现象

# 队列 进程之间数据安全的
# 管道 + 锁

 

manager

老师的代码

from multiprocessing import Manager,Process,Lock
def main(dic,lock):
    dic['count'] -= 1

if __name__ == '__main__':
    m = Manager()
    l = Lock()
    dic=m.dict({'count':100})
    p_lst = []
    for i in range(50):
        p = Process(target=main,args=(dic,l))
        p.start()
        p_lst.append(p)
    for i in p_lst: i.join()
    print('主进程',dic)

 

运行几次后,你会发现得到的结果不一样,所以也存在数据不安全的现象,所以一般推荐使用queue比较安全,queue,pipe都是可是实现数据通信的。

数据池pool

参数介绍:

Pool([numprocess [,initializer [, initargs]]]):创建进程池

1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值

2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None

3 initargs:是要传给initializer的参数组

1 p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
2 '''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,
必须从不同线程调用p.apply()函数或者使用p.apply_async()
''' 3 4 p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。 5 '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,
将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。
''' 6 7 p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成 8 9 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用

 

1 方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法
2 obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
3 obj.ready():如果调用完成,返回True
4 obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
5 obj.wait([timeout]):等待结果变为可用。
6 obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数

 

进程池的效率要高许多,详见老师的博客

import time
from multiprocessing import Pool,Process
def func(n):
    n=n
    for i in range(10):
        n+=i
    print(n)
if __name__ == '__main__':
    start = time.time()
    pool = Pool(5)               # 5个进程
    pool.map(func,range(100))    # 100个任务
    t1 = time.time() - start

    start = time.time()
    p_lst = []
    for i in range(100):#100个任务
        p = Process(target=func,args=(i,))
        p_lst.append(p)
        p.start()
    for p in p_lst :p.join()
    t2 = time.time() - start
    print(t1,t2)
测试代码

相关文章: