问题:我给你10个的url,你帮我去把10url的网址下载。
传统方式
# 传统串行方式 import requests import time urls = ['https://github.com/' for _ in range(10)] start = time.time() for url in urls: response = requests.get(url) # print(response) spend_time = time.time() - start print(spend_time) # 12.493084907531738
一、多进程和多线程实现并发
import time
from concurrent.futures import ProcessPoolExecutor
import requests
start = time.time()
def task(url):
response = requests.get(url)
return response
def done(future, *args, **kwargs):
response = future.result()
print(response.url)
if __name__ == '__main__':
url_list = ['https://www.douban.com/' for _ in range(100)]
with ProcessPoolExecutor(max_workers=10) as pool:
for url in url_list:
v = pool.submit(task, url)
v.add_done_callback(done)
print(time.time() - start)
# 11.862671136856079
花费时间 11.862671136856079秒
#########编写方式二#########
import requests
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor()
def task(url):
response = requests.get(url)
return response
def done(future,*args,**kwargs):
response = future.result()
print(response.url)
if __name__ == '__main__':
start = time.time()
with ThreadPoolExecutor(max_workers=os.cpu_count()) as pool:
url_list = ['https://www.douban.com/' for _ in range(100)]
for url in url_list:
v = pool.submit(task,url)
v.add_done_callback(done)
print(time.time() - start)
# 8.904985904693604
花费时间 8.904985904693604秒
由于GIL限制,建议:IO密集的任务,用ThreadPoolExecutor;CPU密集任务,用ProcessPoolExcutor。
二、基于事件循环的异步IO
1. asyncio + aiohttp
import time
import asyncio
import aiohttp
async def start_request(session, url):
sem = asyncio.Semaphore(10, loop=loop)
async with sem:
print(f'make request to {url}')
with async_timeout.timeout(60):
async with session.get(url, verify_ssl=False) as response:
if response.status == 200:
print(response.status)
async def run(urls):
conn = aiohttp.TCPConnector(ssl=False,
limit=60, # 连接池在windows下不能太大, <500
use_dns_cache=True)
async with aiohttp.ClientSession(connector=conn, loop=loop) as session:
datas = await asyncio.gather(*[start_request(session, url) for url in urls], return_exceptions=True)
for ind, url in enumerate(urls):
if isinstance(datas[ind], Exception):
print(f"{ind}, {url}: 下载失败 请重新下载:")
if __name__ == '__main__':
start = time.time()
urls = (('http://www.baidu.com/') for _ in range(100))
loop = asyncio.get_event_loop()
loop.run_until_complete(run(urls))
print(time.time() - start)
# 1.4860568046569824
2. Twisted
import time
from twisted.internet import defer
from twisted.internet import reactor
from twisted.web.client import getPage
start = time.time()
def one_done(content, arg):
response = content.decode('utf-8')
# print(response)
print(arg)
def all_done(arg):
reactor.stop()
print(time.time() - start)
@defer.inlineCallbacks
def task(url):
res = getPage(bytes(url, encoding='utf8')) # 发送Http请求
res.addCallback(one_done, url)
yield res
url_list = ('http://www.cnblogs.com' for _ in range(100))
defer_list = [] # [特殊,特殊,特殊(已经向url发送请求)]
for url in url_list:
v = task(url)
defer_list.append(v)
d = defer.DeferredList(defer_list)
d.addBoth(all_done)
reactor.run() # 死循环
# 5.039534091949463
3. tornado
import time
from tornado.httpclient import AsyncHTTPClient
from tornado.httpclient import HTTPRequest
from tornado import ioloop
COUNT = 0
start = time.time()
def handle_response(response):
global COUNT
COUNT -= 1
if response.error:
print("Error:", response.error)
else:
# print(response.body)
print(response.request)
# 方法同twisted
# ioloop.IOLoop.current().stop()
if COUNT == 0:
ioloop.IOLoop.current().stop()
def func():
url_list = ['http://www.baidu.com' for _ in range(100)]
global COUNT
COUNT = len(url_list)
for url in url_list:
print(url)
http_client = AsyncHTTPClient()
http_client.fetch(HTTPRequest(url), handle_response)
if __name__ == '__main__':
ioloop.IOLoop.current().add_callback(func)
ioloop.IOLoop.current().start() # 死循环
print(time.time() - start)
# 3.0621743202209473
以上均是Python内置以及第三方模块提供异步IO请求模块,使用简便大大提高效率,而对于异步IO请求的本质则是【非阻塞Socket】+【IO多路复用】
""" ########http请求本质,IO阻塞######## sk = socket.socket() #1.连接 sk.connect(('www.baidu.com',80,)) #阻塞 print('连接成功了') #2.连接成功后发送消息 sk.send(b"GET / HTTP/1.0\r\nHost: baidu.com\r\n\r\n") #3.等待服务端响应 data = sk.recv(8096)#阻塞 print(data) #\r\n\r\n区分响应头和影响体 #关闭连接 sk.close() """ """ ########http请求本质,IO非阻塞######## sk = socket.socket() sk.setblocking(False) #1.连接 try: sk.connect(('www.baidu.com',80,)) #非阻塞,但会报错 print('连接成功了') except BlockingIOError as e: print(e) #2.连接成功后发送消息 sk.send(b"GET / HTTP/1.0\r\nHost: baidu.com\r\n\r\n") #3.等待服务端响应 data = sk.recv(8096)#阻塞 print(data) #\r\n\r\n区分响应头和影响体 #关闭连接 sk.close() """