不适用框架,写一个功能相对较全面的爬虫,包含监控网站更新、代理ip、限流器、提取链接、重试下载等基本功能
代码如下:
主函数:
def run(self):
while not self.crawler_queue.empty():
url_str = self.crawler_queue.get()
print("url_str is ::::::{}".format(url_str))
# 检测robots.txt文件规则
if self.rp.can_fetch(self.headers["User-Agent"], url_str):
self.throttle.wait_url(url_str)
depth = self.visited[url_str]
if depth < MAX_DEP:
# 下载链接
html_content = self.download(url_str)
# 储存链接
if html_content is not None:
self.save_result(html_content, url_str)
# self.mcache[url_str] = html_content
# save_url(html_content, url_str)
pass
else:
continue
# 筛选出页面所有的链接
url_list = extractor_url_lists(html_content.decode("utf8"))
# 筛选需要爬取的链接
filter_urls = [link for link in url_list if re.search(self.link_regex, link)]
for url in filter_urls:
# 补全链接
real_url = self.nomalize(url)
# 判断链接是否被爬取过
if real_url not in self.visited:
# print("link is ::::::",real_url)
self.visited[real_url] = depth + 1
self.crawler_queue.put(real_url)
else:
print("ronots.txt 禁止下载", url_str)
限流器:
class Throttle(object):
def __init__(self, delay):
# 创建一个字典用来保存要下载的域名, 和当前时间
self.domains = {}
# 规定休眠时间
self.delay = delay
def wait_url(self, url_str):
"""
:param url_str:
:return:
"""
domain_url = urlparse(url_str).netloc # 取出网址域名部分(netloc)
last_accessed = self.domains.get(domain_url) # 取出域名的上次下载时间
if self.delay > 0 and last_accessed is not None:
# 将当前时间和上次下载时间相减, 计算出两次下载时间间隔, 然后用规定的休眠时间(delay)减去这个时间间隔
# 如果大于0就休眠, 否则就直接下载后续的链接
sleep_interval = self.delay - (datetime.now() - last_accessed).seconds
if sleep_interval > 0:
print(sleep_interval)
time.sleep(sleep_interval)
self.domains[domain_url] = datetime.now() # 把当前时间为值, 域名为key存入到domains字典中
print(self.domains[domain_url])
随机代理:
class RandomProxy(object):
def __init__(self):
self.proxies = []
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.80 Safari/537.36"
}
def crawl_proxies(self):
"""
抓取网页上的代理
:return:
"""
self.proxies.append("59.110.48.236:3128")
self.proxies.append("115.48.45.58:80")
self.proxies.append("110.52.8.198:53281")
def verify_proxies(self):
"""
校验每个代理是否可用
:return:
"""
invalid_ip = []
for ip_str in self.proxies:
proxies = {
"http": ip_str
}
r = requests.get("http://www.baidu.com", proxies=proxies, headers=self.headers)
if r.status_code == 200:
continue
else:
invalid_ip.append(ip_str)
for remove_ip in invalid_ip:
self.proxies.remove(remove_ip)
def get_one_proxy(self):
return random.choice(self.proxies)
重试下载:
@retry(stop_max_attempt_number=3)
def retry_download(self, url_str, data, method, proxies):
"""
使用装饰器的重试下载类
:param url_str:
:param data:
:param method:
:param proxies:
:return:
"""
if method == "POST":
result = requests.post(url_str, data=data, headers=self.headers, proxies=proxies)
else:
result = requests.get(url_str, headers=self.headers, timeout=3, proxies=proxies)
# assert 出错时会报出错的位置
assert result.status_code == 200 # 此处为断言, 判断状态码是否为200
return result.content
完整代码: