# url管理器
# url管理器 import pickle import hashlib class UrlManager(): def __init__(self): self.new_urls = self.load_progress(\'new_urls.txt\') # 未爬取url集合 self.old_urls = self.load_progress(\'old_urls.txt\') # 已爬取集合 def has_new_url(self): \'\'\' 判断是否有未爬取的URL :return: \'\'\' return self.new_url_size() != 0 def get_new_url(self): \'\'\' 获取一个未爬取的URL :return: \'\'\' new_url = self.new_urls.pop() m = hashlib.md5() m.update(new_url) self.old_urls.add(m.hexdigest()[8:-8]) return new_url def add_new_url(self, url): \'\'\' 将新的URL添加到未爬取的集合中 :param url: 单个URL :return: \'\'\' if url is None: return m = hashlib.md5() m.update(url) url_md5 = m.hexdigest()[8:-8] if url not in self.new_urls and url_md5 not in self.old_urls: self.new_urls.add(url) def add_new_urls(self, urls): \'\'\' 将新的URL添加到未爬取的URL集合中 :param urls: URL集合 :return: \'\'\' if urls is None or len(urls) == 0: return for url in urls: self.add_new_url(url) def new_url_size(self): \'\'\' 获取未爬取URL集合的大小 :return: \'\'\' return len(self.new_urls) def old_url_size(self): \'\'\' 获取已经爬取的URL集合的大小 :return: \'\'\' return len(self.old_urls) def save_progress(self, path, data): \'\'\' 保存进度 :param path: 文件路径 :param data: 数据 :return: \'\'\' with open(path, \'wb\') as f: pickle.dump(data, f) def load_progress(self, path): \'\'\' 从本地文件加载进度 :param path: 文件路径 :return: 返回set集合 \'\'\' print(\'[+] 从文件加载进度:%s\' % path) try: with open(path, \'rb\') as f: tmp = pickle.load(f) return tmp except: print(\'[!] 文件无效,创建:%s\' % path) return set()
#爬虫管理器 from multiprocessing.managers import BaseManager from .HTML_downloader import HtmlDownloader from .HTML_parser import HtmlParser class Spiderwork(): def __init__(self): BaseManager.register(\'get_task_queue\') BaseManager.register(\'get_result_queue\') server_addr = \'127.0.0.1\' print(\'connect to %s ....\'% server_addr) self.m = BaseManager(address=(server_addr,8001),authkey=\'baike\'.encode(\'utf-8\')) self.m.connect() self.task = self.m.get_task_queue() self.result = self.m.get_result_queue() self.downloader = HtmlDownloader() self.parser = HtmlParser() print(\'init finshed..\') def crawl(self): while True: try: if not self.task.empty(): url = self.task.get() if url ==\'end\': print(\'控制节点通知爬虫节点停止工作。\') self.result.put({\'new_urls\':\'end\',\'data\':\'end\'}) return print(\'爬虫节点正在解析:%s\' % url.encode(\'utf-8\')) content=self.downloader.download(url) new_urls,data=self.parser.parser(url,content) self.result.put({\'new_urls\':url,\'data\':data}) except EOFError as e : print(\'链接工作节点失败\') return except Exception as e : print(e) print(\'crawl fial\') if __name__ ==\'__main__\': spider = Spiderwork() spider.crawl()
# HTML解析器 import re from urllib import parse from bs4 import BeautifulSoup class HtmlParser(): def parser(self,page_url,html_cont): \'\'\' 用于解析网页内容,抽取URL和数据 :param page_url: 下载页面的URL :param html_cont: 下载的网页内容 :return: \'\'\' if page_url is None or html_cont is None: return soup = BeautifulSoup(html_cont,\'html.parser\') new_urls = self._get_new_urls(page_url,soup) new_data = self._get_new_data(page_url,soup) return new_urls,new_data def _get_new_urls(self,page_url,soup): \'\'\' 抽取新的URL集合 :param page_url: 下载页面的URL :param soup: soup :return: \'\'\' new_urls = set() #抽取符合要求的a标签 links = soup.find_all(\'a\',href = re.compile(r\'/item/.\')) for link in links: # 提取href属性 new_url = link[\'href\'] # 拼接成完整的网址 new_full_url = parse.urljoin(page_url,new_url) new_urls.add(new_full_url) return new_urls def _get_new_data(self,page_url,soup): \'\'\' 抽取有效数据 :param page_url: 下载页面URL :param soup: :return: 返回有效数据 \'\'\' data = {} data[\'url\'] = page_url title = soup.find(\'dd\',class_ = \'lemmaWgt-lemmaTitle-title\').find(\'h1\') data[\'title\'] = title.text summary = soup.find(\'div\',class_ = \'lemma-summary\') #获取tag中包含的所有文本内容 data[\'summary\'] = summary.text return data
# HTML下载器 import requests class HtmlDownloader(): def download(self,url): if url is None: return None headers = {\'User-Agent\': \'Mozilla/5.0 (Macintosh; Intel …) Gecko/20100101 Firefox/57.0\'.encode(\'utf-8\')} r = requests.get(url,headers=headers) if r.status_code ==200: r.encoding = \'utf-8\' return r.text return None
# 数据存储器 import codecs import time class DataOutput(): def __init__(self): self.filepath = \'baike_%s.html\' % (time.strftime(\'%Y_%m_%d_%H_%M_%S\', time.localtime())) self.output_head(self.filepath) self.datas = [] def store_data(self, data): if data is None: return self.datas.append(data) if len(self.datas) > 10: self.output_html(self.filepath) def output_head(self, path): \'\'\' 将HTML头写进去 :param path: :return: \'\'\' fout = codecs.open(path, \'w\', encoding=\'utf-8\') fout.write(\'<html>\') fout.write(\'<table>\') fout.write(\'<table>\') fout.close() def output_html(self, path): \'\'\' 将数据写入HTML文件中 :return: \'\'\' fout = codecs.open(path, \'a\', encoding=\'utf-8\') for data in self.datas: fout.write(\'<tr>\') fout.write(\'<td>%s</td>\' % data[\'url\']) fout.write(\'<td>%s</td>\' % data[\'title\']) fout.write(\'<td>%s</td>\' % data[\'summary\']) fout.write(\'</tr>\') self.datas.remove(data) fout.write(\'</table>\') fout.write(\'</table>\') fout.write(\'</html>\') fout.close() def output_end(self, path): \'\'\' 将HTML尾写进去 :param path: :return: \'\'\' fout = codecs.open(path, \'a\', encoding=\'utf-8\') fout.write(\'</table>\') fout.write(\'</table>\') fout.write(\'</html>\') fout.close()
#控制调度器 import random,time,queue from multiprocessing.managers import BaseManager from multiprocessing import Process from .URLManager import UrlManager from .Data_store import DataOutput class NodeManager(): def start_Manager(self,url_q,result_q): \'\'\' 创建一个分布式管理器 :param url_q: url队列 :param result_q: 结果队列 :return: \'\'\' BaseManager.register(\'get_task_queue\', callable=lambda: url_q) BaseManager.register(\'get_result_queue\', callable=lambda: result_q) manager = BaseManager(address=(\'\', 8001), authkey=\'baike\'.encode(\'utf-8\')) return manager def url_manager_proc(self,url_q,conn_q,root_url): url_manager = UrlManager() url_manager.add_new_urls(root_url) while True: while (url_manager.has_new_url()): #从URL管理器获取新的URL new_url = url_manager.get_new_url() # 将新URL发送给工作节点 url_q.put(new_url) print(\'old_url=\',url_manager.old_url_size()) # 判断,当爬取2000个链接后关闭并保存 if (url_manager.old_url_size()>2000): url_q.put(\'end\') print(\'控制节点发起结束通知\') # 关闭管理节点。同时存储set状态 url_manager.save_progress(\'new_urls.txt\',url_manager.new_urls) url_manager.save_progress(\'old_urls.txt\',url_manager.old_urls) return try: if not conn_q.empty(): urls = conn_q.get() url_manager.add_new_urls(urls) except BaseException as e: time.sleep(0.1) def result_solve_proc(self,result_q,conn_q,store_q): while True: try: if not result_q.empty(): content= result_q.get(True) if content[\'new_urls\'] ==\'end\': #结果分析进程接受通知然后结束 print(\'结果分析进程接收通知然后结束\') store_q.put(\'end\') return conn_q.put(content[\'new_urls\']) # url为set类型 store_q.put(content[\'data\'])#解析出来的数据为dict类型 else: time.sleep(0.1) except BaseException as e: time.sleep(0.1) def store_proc(self,store_q): output = DataOutput() while True: if not store_q.empty(): data = store_q.get() if data ==\'end\': print(\'存储进程接受通知然后结束\') output.output_end(output.filepath) return output.store_data(data) else: time.sleep(0.1) if __name__ ==\'__main__\': # 初始化4个队列 url_q = queue.Queue() result_q = queue.Queue() store_q = queue.Queue() conn_q = queue.Queue() # 创建分布式管理器 node = NodeManager() manager = node.start_Manager(url_q,result_q) # 创建URL管理进程,数据提取进程和数据存储进程 url_manager_proc = Process(target=node.url_manager_proc,args=(url_q,conn_q,\'http://baike.baidu.com/view/284853.htm\')) result_solve_proc = Process(target=node.result_solve_proc,args=(result_q,conn_q,store_q)) store_proc = Process(target=node.store_proc,args=(store_q,)) # 启动3个进程和分布式管理器 url_manager_proc.start() result_solve_proc.start() store_proc.start() manager.get_server().serve_forever()