进程间通信
进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。
进程队列queue
不同于线程queue,进程queue的生成是用multiprocessing模块生成的。
在生成子进程的时候,会将代码拷贝到子进程中执行一遍,及子进程拥有和主进程内容一样的不同的名称空间。
示例1:
1 import multiprocessing 2 def foo(): 3 q.put([11,'hello',True]) 4 print(q.qsize()) 5 6 q=multiprocessing.Queue() #全局定义一个q进程队列,在产生子进程时候会在子进程里生成,可以指定最大数,限制队列长度 7 if __name__ == '__main__': 8 p=multiprocessing.Process(target=foo,args=()) #因为名称空间不同,子进程的主线程创建的q队列,主进程get不到,所以会阻塞住 9 p.start() 10 # foo() #主进程执行一下函数就可以访问到了 11 print(q.get())
示例2:
1 import multiprocessing 2 3 def foo(): 4 q.put([11,'hello',True]) 5 print(q.qsize()) 6 7 if __name__ == '__main__': 8 q = multiprocessing.Queue() #主进程创建一个q进程队列 9 p=multiprocessing.Process(target=foo,args=()) #因为名称空间不同,子进程的主线程找不到q队列,所以会报错提示没有q 10 p.start() 11 print(q.get())
示例3:
1 import multiprocessing
2
3 def foo(argument): #定义函数处理进程队列
4 argument.put([11,'hello',True])
5 print(argument.qsize())
6 q = multiprocessing.Queue() #全局定义一个进程队列
7 print('test')
8
9 if __name__ == '__main__':
10 x = multiprocessing.Queue() #主进程定义一个进程队列
11 p=multiprocessing.Process(target=foo,args=(x,)) #主进程把值传给子进程就可以处理了
12 p.start()
13 print(x.get())
14 # foo(q)
15 # print(q.get())
常用方法
q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。 q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常. q.get_nowait():同q.get(False) q.put_nowait():同q.put(False) q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。 q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。 q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
其他方法
q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞 q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,如果某个使用者正在被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。 q.join_thread():连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread方法可以禁止这种行为
另一个创建进程队列的类
http://www.cnblogs.com/zero527/p/7211909.html
管道pipe
管道就是管道,就像生活中的管道,两头都能进能出
默认管道是全双工的,如果创建管道的时候映射成False,左边只能用于接收,右边只能用于发送,类似于单行道
最简单的管道双向通信示例:
1 import multiprocessing
2
3 def foo(sk):
4 sk.send('hello world')
5 print(sk.recv())
6
7 if __name__ == '__main__':
8 conn1,conn2=multiprocessing.Pipe() #开辟两个口,都是能进能出,括号中如果False即单向通信
9 p=multiprocessing.Process(target=foo,args=(conn1,)) #子进程使用sock口,调用foo函数
10 p.start()
11 print(conn2.recv()) #主进程使用conn口接收
12 conn2.send('hi son') #主进程使用conn口发送
常用方法
conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。 conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
注意:send()和recv()方法使用pickle模块对对象进行序列化
其他方法
conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法,不用的时候两边都要close conn1.fileno():返回连接使用的整数文件描述符 conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。 conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收 conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。
注意:生产者和消费者都没有使用管道的某个端点,就应该将其关闭,如在生产者中关闭管道的右端,在消费者中关闭管道的左端。如果忘记执行这些步骤,程序可能再消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生产EOFError异常。因此在生产者中关闭管道不会有任何效果,付费消费者中也关闭了相同的管道端点。
1 from multiprocessing import Process,Pipe 2 3 import time,os 4 def consumer(p,name): 5 left,right=p 6 left.close() 7 while True: 8 try: 9 baozi=right.recv() 10 print('%s 收到包子:%s' %(name,baozi)) 11 except EOFError: 12 right.close() 13 break 14 def producer(seq,p): 15 left,right=p 16 right.close() 17 for i in seq: 18 left.send(i) 19 # time.sleep(1) 20 else: 21 left.close() 22 if __name__ == '__main__': 23 left,right=Pipe() 24 c1=Process(target=consumer,args=((left,right),'c1')) 25 c1.start() 26 seq=(i for i in range(10)) 27 producer(seq,(left,right)) 28 right.close() 29 left.close() 30 c1.join() 31 print('主进程') 32 33 生产者消费者关闭某端点