bai2018

目的意义

爬虫应该能够快速高效的完成数据爬取和分析任务。使用多个进程协同完成一个任务,提高了数据爬取的效率。

以百度百科的一条为起点,抓取百度百科2000左右词条数据。

说明

参阅模仿了:https://book.douban.com/subject/27061630/。

作者说是简单的分布式爬虫(hh),在书中有详细的说明和注解。

这里只是补漏和梳理。

因为进程传递参数的问题,搞了几天还是放弃了在WIndows上跑,换用了Linux。

又因为各种各样的问题,弃用CentOS(它确实是安全可靠的,但是...我不会装QQ,输入法等),换用了软件容易安装的Ubuntu。然后才装了Eclipse等各种软件后,才开始多进程的调试。

构造

主节点和从节点的方案实现信息爬取。结构应该让各个节点高效工作。

从节点:

爬虫爬取速度受到网络延时的影响和网页信息解析的影响比较严重,所以使用多个从节点用来专门负责下载网页信息,解析网页信息。

则分为三个文件,爬取文件,下载网页文件,解析网页文件。

爬取文件接收来自主节点发送来的网页地址。然后调用下载网页文件并完成解析,将处理好的数据发送给主节点。

主节点:

主节点负责发送给从节点网页地址,并接收来自从节点的解析后的网页信息,将网页信息存储下来。

主节点任务分为分发网址,接收从节点的信息,存储网页三部分。在代码里,他建立了三个进程,来分别实现。

主节点任务中,存储信息,定义一套存储信息的方法。分发网址,定义一套分发网址过程中可能用到的方法。主文件中,设立三个函数,建立三个进程。

主节点设计

主节点的三个任务,分成三个进程,三个进程(分发网址,数据接收,数据存储),做一个类。

数据接收与分发网址,需要分布式进程。分布式进程需要使用队列Queue。这里一定是multiprocessing中的导入的队列。网址分发、数据接收分别使用一个队列。

注册,设定地址,秘钥,完成初始化过程,将url_q,result_q分别注册到网络中。

然后设立分发任务,传递队列给分发任务函数。分发任务使用url_q队列完成数据的发送。使用conn_q接收了新的网址,并进行存储,再次分发到url_q上。

数据接收任务,完成了数据的接收过程,接收以后需要及时将数据存储,在这里使用了两个队列conn_q,放置接收数据中的地址信息,store_q,放置接收数据中的网页信息。

数据存储任务,接收数据接收任务中的store_q队列信息,及时写入到磁盘中。

所有涉及到的文件如下:

NodeManager.py

import time
#import sys
#sys.path.append(\'/home\')#if needed ,add path as package
from UrlManager import UrlManager
from multiprocessing import Process,Queue
from multiprocessing.managers import BaseManager
from DataOutput import DataOutput

class NodeManager():
    def start_manager(self,url_q,result_q):
        BaseManager.register(\'get_task_queue\', callable=lambda:url_q)
        BaseManager.register(\'get_result_queue\',callable=lambda:result_q)
        manager=BaseManager(address=(\'127.0.0.1\',8001),authkey=\'baike\'.encode(\'utf-8\'))
        return manager
    
    def url_manager_proc(self,url_q,conn_q,root_url):
        #send url to queue and receive new urls for storing to object
        url_manager=UrlManager()
        url_manager.add_new_url(root_url)
        while True:
            while(url_manager.has_new_url()):
                new_url=url_manager.get_new_url()
                url_q.put(new_url)
                print(\'old url size:\'+str(url_manager.old_url_size()))
                if(url_manager.old_url_size()>2000):
                    url_q.put(\'end\')
                    url_manager.save_process(\'new_urls.txt\',url_manager.new_urls)
                    url_manager.save_process(\'old_urls.txt\',url_manager.old_urls)
                    print(\'finish url_manager_proc\')
                    return
            try:
                urls=conn_q.get()
                url_manager.add_new_urls(urls)
                print(\'get:\'+urls)
            except Exception:
                time.sleep(0.1)
        
    
    def result_solve_proc(self,result_q,conn_q,store_q):
        while True:
            if not result_q.empty():
                content=result_q.get(True)
                if content[\'new_urls\']==\'end\':
                    print(\'finish result_solve_proc\')
                    store_q.put(\'end\')
                    return
                conn_q.put(content["new_urls"])
                store_q.put(content["data"])
            else:
                time.sleep(0.1)
    
    def store_proc(self,store_q):
        output=DataOutput()
        while True:
            if not store_q.empty():
                data=store_q.get()
                if data ==\'end\':
                    print(\'finish store_proc\')
                    output.output_end(output.path)
                    return
                output.store_data(data)



if __name__==\'__main__\':
    url_q=Queue()#send url to workers
    result_q=Queue()#receive url\'s analytical data from works
    store_q=Queue()#analytical data which is fresh is used for storing to disk for further extract
    conn_q=Queue()#urls which is fresh are used for storing to object for further extract
    nodeObject=NodeManager()
    manager=nodeObject.start_manager(url_q,result_q)
    
    root_url=\'https://baike.baidu.com/item/%E7%BD%91%E7%BB%9C%E7%88%AC%E8%99%AB/5162711?fr=aladdin\'
    url_manager=Process(target=nodeObject.url_manager_proc,args=(url_q,conn_q,root_url,))
    result_solve=Process(target=nodeObject.result_solve_proc,args=(result_q,conn_q,store_q,))
    store=Process(target=nodeObject.store_proc,args=(store_q,))
    url_manager.start()
    result_solve.start()
    store.start()
    manager.get_server().serve_forever()

 UrlManager.py

import hashlib
import pickle
class UrlManager():
    def __init__(self):
        self.old_urls=self.load_process(\'new_urls.txt\')
        self.new_urls=self.load_process(\'old_urls.txt\')
        pass
    
    def has_new_url(self):
        return self.new_url_size()!=0
    
    def new_url_size(self):
        return len(self.new_urls)
    
    def old_url_size(self):
        return len(self.old_urls)
    
    def get_new_url(self):
        new_url=self.new_urls.pop()
        m=hashlib.md5()
        m.update(new_url.encode("utf8"))
        self.old_urls.add(m.hexdigest()[8:-8])
        return new_url
    
    def add_new_url(self,url):
        if url is None:
            return
        m=hashlib.md5()
        m.update(url.encode(\'utf-8\'))       
        url_md5=m.hexdigest()[8:-8]
        if url not in self.new_urls and url_md5 not in self.old_urls:
            self.new_urls.add(url)
    
    def add_new_urls(self,urls):
        if urls is None or len(urls) == 0:
            return
        for url in urls:
            self.add_new_url(url)
        pass
    
    def save_process(self,path,data):
        with open(path,\'wb\') as f:
            pickle.dump(data,f)
    
    def load_process(self,path):
        print(\'loading..\')
        try:
            with open(path,\'rb\') as f:
                tmp=pickle.load(f)
                return tmp
        except:
            print(\'loading error maybe loading file not exist and will create it:\'+path)
        newSet=set()
        self.save_process(path, newSet)
        return newSet

 DataOutput.py

import codecs
from os.path import os
class DataOutput(object):
    def __init__(self):
        self.path=\'baike.html\'
        self.output_head(self.path)
        self.datas=[]
    
    def store_data(self,data):
        if data is None:
            return
        self.datas.append(data)
        self.output_html(self.path,data)
    
    def output_head(self,path):
        if os.path.exists(path):
            return
        fout=codecs.open(\'baike.html\', \'w\', encoding=\'utf-8\')
        fout.write("<html>")
        fout.write("<head><meta charset=\'urf-8\'></head>")
        fout.write("<body>")
        fout.write("<table border=\'1\' width=1800  style=\'word-break:break-all;word-wrap:break-word;\'>")
        fout.write("<tr>")
        fout.write("<td width=\'20\'>序号</td>")
        fout.write("<td width=\'300\'>URL</td>")
        fout.write("<td width=\'100\'>标题</td>")
        fout.write("<td width=\'1200\'>释义</td>")
        fout.write("</tr>")   
        fout.close()
        
    def output_end(self,path):
        fout=codecs.open(path, \'a\', encoding=\'utf-8\')
        fout.write("</table>")  
        fout.write("</body>")      
        fout.write("</html>")
        fout.close()       
        
    def output_html(self,path,data):
        fout=codecs.open(path, \'a\', encoding=\'utf-8\')    
        fout.write("<tr>")
        fout.write("<td>%s</td>"%str(len(self.datas)))
        fout.write("<td><a href=%s>%s</a></td>"%(data[\'url\'],data[\'url\']))
        fout.write("<td>%s</td>"%data[\'title\'])
        fout.write("<td>%s</td>"%data[\'summary\'])
        fout.write("</tr>")
        fout.close()

从节点设计

从节点首先是连接到指定地址并验证秘钥。连接后获取url_q、result_q。

从url_q中获取发来的地址,调用HTML下载器下载数据,调动HTML解析器解析数据,然后把结果放到result_q队列上。

代码如下

SpiderWork.py

from multiprocessing.managers import BaseManager
from HtmlDownloader import HtmlDownloader
from HtmlParser import HtmlParser
class SpiderWork():
    def __init__(self):
        BaseManager.register(\'get_task_queue\')
        BaseManager.register(\'get_result_queue\')
        server_addr=\'127.0.0.1\'
        print(\'connect\'+server_addr)
        self.m=BaseManager(address=(server_addr,8001),authkey=\'baike\'.encode(\'utf-8\'))
        self.m.connect()
        self.task=self.m.get_task_queue()
        self.result=self.m.get_result_queue()
        print(self.task)
        self.downloader=HtmlDownloader()
        self.parser=HtmlParser()
        print(\'initial finish\')
    
    def crawl(self):
        while (True):
            try:
                if not self.task.empty():
                    url=self.task.get()
                    if url == \'end\':
                        print(\'stop spider1\')
                        self.result.put({\'new_urls\':\'end\',\'data\':\'end\'})
                        return
                    print(\'working:\'+url)#url
                    content=self.downloader.download(url)
                    new_urls,data=self.parser.parser(url,content)
                    self.result.put({"new_urls":new_urls,"data":data})
            except Exception as e:
                print(e,url)
                
if __name__=="__main__":
    spider=SpiderWork()
    spider.crawl()

 HtmlDownloader.py

import requests
import chardet
class HtmlDownloader(object):
    def download(self,url):
        if url is None:
            return None
        user_agent=\'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 SE 2.X MetaSr 1.0\'
        headers={\'User-Agent\':user_agent}
        r=requests.get(url,headers=headers)
        if r.status_code is 200:
            r.encoding=chardet.detect(r.content)[\'encoding\']
            return r.text
        return None

 HtmlParser.py

import re
from urllib import parse
from bs4 import BeautifulSoup
class HtmlParser(object):
    def parser(self,page_url,html_cont):
        if page_url is None or html_cont is None:
            return
        
        soup=BeautifulSoup(html_cont,\'lxml\')
        
        new_urls=self.getNewUrls(page_url,soup)
        
        new_data=self.getNewData(page_url,soup)
        return new_urls,new_data
    
    def getNewUrls(self,page_url,soup):
        new_urls=set()
        links=soup.find_all(\'a\',href=re.compile(r\'/item/.*\'))
        for link in links:
            new_url=link[\'href\']
            new_full_url=parse.urljoin(page_url,new_url)
            new_urls.add(new_full_url)
        return new_urls

    def getNewData(self,page_url,soup):
        data={}
        data[\'url\']=page_url
        title = soup.find(\'dd\',class_=\'lemmaWgt-lemmaTitle-title\').find(\'h1\')
        data[\'title\']=title.get_text()
        summary = soup.find(\'div\',class_=\'lemma-summary\')
        #获取到tag中包含的所有文版内容包括子孙tag中的内容,并将结果作为Unicode字符串返回
        data[\'summary\']=summary.get_text()
        return data

结果

建立.sh文件如下:

#!/bin/bash
rm -rf log/*
rm -rf baike.html
rm -rf new_urls.txt
rm -rf old_urls.txt
python3 control/NodeManager.py &> log/control.log & for ((i=1;i<=10;i++)) do python3 spider/SpiderWork.py &>log/spider$i.log & done

启动主节点,然后启动10个从节点。将它们所产生的日志信息记录到log/下,并都是在后台运行的进程。

两分钟左右,完成约1900条的数据获取

 

可能用到的命令:

kill -9 $(ps aux | grep python | awk \'{print $2}\')

!kill

可能用到的软件:

Eclipse的pydev进程调试。

最后

这代码里面真的有好多的细节文件,序列化操作与存储,md5的压缩方案等,都是值得思考的。

 

分类:

技术点:

相关文章: