一、数据共享
1.进程间的通信应该尽量避免共享数据的方式
2.进程间的数据是独立的,可以借助队列或管道实现通信,二者都是基于消息传递的。
虽然进程间数据独立,但可以用过Manager实现数据共享,事实上Manager的功能远不止于此。
1 命令就是一个程序,按回车就会执行(这个只是在windows情况下) 2 tasklist 查看进程 3 tasklist | findstr pycharm #(findstr是进行过滤的),|就是管道(tasklist执行的内容就放到管道里面了, 4 管道后面的findstr pycharm就接收了)
3.(IPC)进程之间的通信有两种实现方式:管道和队列
1 from multiprocessing import Manager,Process,Lock 2 def work(dic,mutex): 3 # mutex.acquire() 4 # dic[\'count\']-=1 5 # mutex.release() 6 # 也可以这样加锁 7 with mutex: 8 dic[\'count\'] -= 1 9 if __name__ == \'__main__\': 10 mutex = Lock() 11 m = Manager() #实现共享,由于字典是共享的字典,所以得加个锁 12 share_dic = m.dict({\'count\':100}) 13 p_l = [] 14 for i in range(100): 15 p = Process(target=work,args=(share_dic,mutex)) 16 p_l.append(p) #先添加进去 17 p.start() 18 for i in p_l: 19 i.join() 20 print(share_dic) 21 # 共享就意味着会有竞争, 22 23
二、进程池
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。多进程是实现并发的手段之一,需要注意的问题是:
- 很明显需要并发执行的任务通常要远大于核数
- 一个操作系统不可能无限开启进程,通常有几个核就开几个进程
- 进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行)
例如当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个。。。手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
那么什么是进程池呢?进程池就是控制进程数目
1 ps:对于远程过程调用的高级应用程序而言,应该使用进程池,Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。
进程池的结构:
创建进程池的类:如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,不会开启其他进程
1.创建进程池
1 Pool([numprocess [,initializer [, initargs]]]):创建进程池2.参数介绍
1 numprocess:要创建的进程数,如果省略,将默认为cpu_count()的值,可os.cpu_count()查看 2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None 3 initargs:是要传给initializer的参数组
3.方法介绍
1 p.apply(func [, args [, kwargs]]):在一个池工作进程中执行 2 func(*args,**kwargs),然后返回结果。 3 需要强调的是:此操作并不会在所有池工作进程中并执行func函数。 4 如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply() 5 函数或者使用p.apply_async() 6 7 8 p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例, 9 callback是可调用对象,接收输入参数。当func的结果变为可用时, 10 将理解传递给callback。callback禁止执行任何阻塞操作, 11 否则将接收其他异步操作中的结果。 12 13 p.close():关闭进程池,防止进一步操作。禁止往进程池内在添加任务(需要注意的是一定要写在close()的上方) 14
1 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用应用1:
1 from multiprocessing import Pool 2 import os,time 3 def task(n): 4 print(\'[%s] is running\'%os.getpid()) 5 time.sleep(2) 6 print(\'[%s] is done\'%os.getpid()) 7 return n**2 8 if __name__ == \'__main__\': 9 # print(os.cpu_count()) #查看cpu个数 10 p = Pool(4) #最大四个进程 11 for i in range(1,7):#开7个任务 12 res = p.apply(task,args=(i,)) #同步的,等着一个运行完才执行另一个 13 print(\'本次任务的结束:%s\'%res) 14 p.close()#禁止往进程池内在添加任务 15 p.join() #在等进程池 16 print(\'主\')
1 # ---------------- 2 # 那么我们为什么要用进程池呢?这是因为进程池使用来控制进程数目的, 3 # 我们需要几个就开几个进程。如果不用进程池实现并发的话,会开很多的进程 4 # 如果你开的进程特别多,那么你的机器就会很卡,所以我们把进程控制好,用几个就 5 # 开几个,也不会太占用内存 6 from multiprocessing import Pool 7 import os,time 8 def walk(n): 9 print(\'task[%s] running...\'%os.getpid()) 10 time.sleep(3) 11 return n**2 12 if __name__ == \'__main__\': 13 p = Pool(4) 14 res_obj_l = [] 15 for i in range(10): 16 res = p.apply_async(walk,args=(i,)) 17 # print(res) #打印出来的是对象 18 res_obj_l.append(res) #那么现在拿到的是一个列表,怎么得到值呢?我们用个.get方法 19 p.close() #禁止往进程池里添加任务 20 p.join() 21 # print(res_obj_l) 22 print([obj.get() for obj in res_obj_l]) #这样就得到了 23
那么什么是同步,什么是异步呢?
同步就是指一个进程在执行某个请求的时候,若该请求需要一段时间才能返回信息,那么这个进程将会一直等待下去,直到收到返回信息才继续执行下去
异步是指进程不需要一直等下去,而是继续执行下面的操作,不管其他进程的状态。当有消息返回时系统会通知进程进行处理,这样可以提高执行的效率。
什么是串行,什么是并行呢?
举例:能并排开几辆车的就可以说是“并行”,只能一辆一辆开的就属于“串行”了。很明显,并行的速度要比串行的快得多。(并行互不影响,串行的等着一个完了才能接着另一个)
应用2:
使用进程池维护固定数目的进程(以前客户端和服务端的改进
1 from socket import * 2 from multiprocessing import Pool 3 s = socket(AF_INET,SOCK_STREAM) 4 s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #端口重用 5 s.bind((\'127.0.0.1\',8081)) 6 s.listen(5) 7 print(\'start running...\') 8 def talk(coon,addr): 9 while True: # 通信循环 10 try: 11 cmd = coon.recv(1024) 12 print(cmd.decode(\'utf-8\')) 13 if not cmd: break 14 coon.send(cmd.upper()) 15 print(\'发送的是%s\'%cmd.upper().decode(\'utf-8\')) 16 except Exception: 17 break 18 coon.close() 19 if __name__ == \'__main__\': 20 p = Pool(4) 21 while True:#链接循环 22 coon,addr = s.accept() 23 print(coon,addr) 24 p.apply_async(talk,args=(coon,addr)) 25 s.close() 26 #因为是循环,所以就不用p.join了 27 28
1 from socket import * 2 c = socket(AF_INET,SOCK_STREAM) 3 c.connect((\'127.0.0.1\',8081)) 4 while True: 5 cmd = input(\'>>:\').strip() 6 if not cmd:continue 7 c.send(cmd.encode(\'utf-8\')) 8 data = c.recv(1024) 9 print(\'接受的是%s\'%data.decode(\'utf-8\')) 10 c.close() 11
三、回调函数
1 回调函数什么时候用?(回调函数在爬虫中最常用) 2 造数据的非常耗时 3 处理数据的时候不耗时 4 5 你下载的地址如果完成了,就自动提醒让主进程解析 6 谁要是好了就通知解析函数去解析(回调函数的强大之处)
需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数
我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。
1 from multiprocessing import Pool 2 import requests 3 import os 4 import time 5 def get_page(url): 6 print(\'<%s> is getting [%s]\' %(os.getpid(),url)) 7 response = requests.get(url) #得到地址 8 time.sleep(2) 9 print(\'<%s> is done [%s]\'%(os.getpid(),url)) 10 return {\'url\':url,\'text\':response.text} 11 def parse_page(res): 12 \'\'\'解析函数\'\'\' 13 print(\'<%s> parse [%s]\'%(os.getpid(),res[\'url\'])) 14 with open(\'db.txt\',\'a\') as f: 15 parse_res = \'url:%s size:%s\n\' %(res[\'url\'],len(res[\'text\'])) 16 f.write(parse_res) 17 if __name__ == \'__main__\': 18 p = Pool(4) 19 urls = [ 20 \'https://www.baidu.com\', 21 \'http://www.openstack.org\', 22 \'https://www.python.org\', 23 \'https://help.github.com/\', 24 \'http://www.sina.com.cn/\' 25 ] 26 for url in urls: 27 obj = p.apply_async(get_page,args=(url,),callback=parse_page) 28 p.close() 29 p.join() 30 print(\'主\',os.getpid()) #都不用.get()方法了 31 32
如果在主进程中等待进程池中所有任务都执行完毕后,再统一处理结果,则无需回调函数
1 from multiprocessing import Pool 2 import requests 3 import os 4 def get_page(url): 5 print(\'<%os> get [%s]\' %(os.getpid(),url)) 6 response = requests.get(url) #得到地址 response响应 7 return {\'url\':url,\'text\':response.text} 8 if __name__ == \'__main__\': 9 p = Pool(4) 10 urls = [ 11 \'https://www.baidu.com\', 12 \'http://www.openstack.org\', 13 \'https://www.python.org\', 14 \'https://help.github.com/\', 15 \'http://www.sina.com.cn/\' 16 ] 17 obj_l= [] 18 for url in urls: 19 obj = p.apply_async(get_page,args=(url,)) 20 obj_l.append(obj) 21 p.close() 22 p.join() 23 print([obj.get() for obj in obj_l]) 24 25
归类: 网络编程socket