lovesqcc

背景

有时,我期望能够自动批量下载收藏的一些图片或视频。在 MacOS 上, you-get 可用于下载, xargs 命令可以提供批量的功能。那么,需要能够自动抓取图片、视频等资源链接地址的小工具。

“批量下载网站图片的Python实用小工具(下)” 中,编写了一个可以用于抓取和下载图片资源的小工具。本文基于这个小工具,做一点改造,来实现资源链接地址的方便抓取。


设计

资源链接规则

要实现资源链接的抓取,首先要定义资源链接规则。常见的资源链接的标签有 a, img , video 。进一步,可以通过 id, class 来精确定位所需的资源。

资源链接参数应当尽可能使用友好。可以采用 img=jpg,png;class=*;id=xyz 来定义资源的规则。在内部,会转换成 [{"img":["jpg","png"], "class":["resLink"], "id":["xyz"]}] 的更灵活的 JSON 形式。这里是或关系,也就是可以同时抓取符合多个规则中任一个的资源地址。

很多用户很可能根本不知道资源链接规则如何定义。因此,这里可以提供一个默认选项。也就是不指定该规则的话,就默认抓取 img = png or jpg 以及 a 的 href 链接。

资源链接规则参数的形式转换可以查看 res.py 的 parseRulesParam 方法。


基础组件

需要能够抓取网页内容、定位网页元素的基础组件。为了提升速度,还需要一个并发组件。基础组件都放在包 common 下面。

  • 可以使用 requests 库来抓取网页内容,见 net.py 的 getHTMLContentFromUrl 方法。不过,有些网页是动态加载的,需要等待动态加载完成才能够抓取生成的内容。这种情况下,可以使用 selenium + chromedriver 来获取网页内容。见 net.py 的 getHTMLContentAsync 方法。可以基于这两个方法做一层策略包装,见 net.py 的 getHTMLContent 方法。

  • 可以使用 BeautifulSoup 来定位资源链接元素。见 res.py 的 findWantedLinks 方法。

  • 定义一个 IoTaskThreadPool 来并发抓取网页内容,亦可用于并发下载资源。见 multitasks.py 的 IoTaskThreadPool 类。

  • 使用装饰器来捕获异常。见 common.py 的 catchExc 包装器。


小技巧

在编写基础库时,如果需要一些配置项,用参数传递的方式会比较困难,或者导致代码不太简洁。此时,可以把函数包装成类,在类的实例化参数中传入。见 net.py 的 HTMLGrasper 类。


用法

运行前置条件

需要安装 Python3 环境及 bs4 , requests, selenium 包及 pip3, chromedriver 工具。自行网搜下哈。

brew install python3
sudo easy_install pip
pip3 install requests bs4 selenium   -i  https://pypi.doubanio.com/simple

安装问题:

  • chromedriver download 下载 chromedriver.zip 并解压后,将可执行的驱动程序复制到 /usr/local/bin/ 目录下,这样就不会报权限相关问题了。

命令使用

先使用如下命令获取资源,并写入到指定结果资源文件 reslinks.txt 中。


python3 tools/res.py -u https://space.bilibili.com/183260251/favlist -r \'class=*\'

然后使用如下命令来去重并下载资源。


grep \'pattern\' reslinks.txt | sort | uniq | xargs -I {} you-get {}

以上两个命令可以联合起来使用。


B 站视频

python3 tools/res.py -u \'https://space.bilibili.com/183260251/favlist?fid=968614951&ftype=create\'
python3 tools/res.py -u \'https://space.bilibili.com/183260251/favlist?fid=968614951&ftype=create\' -r \'class=*\' | grep \'video\' | sort | uniq | xargs -I {} you-get {} 

黑光图集

python3 tools/res.py -u \'http://tu.heiguang.com/works/show_167521.html\'
python3 tools/res.py -u \'http://tu.heiguang.com/works/show_167521.html\' -r \'img=jpg!c\' | sort | uniq | xargs -I {} you-get {}

源代码

包结构

具体可以下载工程: Pystudy Github。如果要修改 common 包下的方法,可以切换到 pystudy 目录下执行 sh install.sh 安装新修改后的包,然后再执行 res.py 脚本。

pystudy
   |-- common
            |-- __init.py__
            |-- common.py
            |-- multitasks.py
            |-- net.py
   |-- tools
           |-- res.py
   |-- install.sh
   |-- setup.py
   |-- __init.py__

res.py

#!/usr/bin/python3
#_*_encoding:utf-8_*_

import re
import sys
import json

import argparse
from bs4 import BeautifulSoup
from common.net import *
from common.multitasks import *

SaveResLinksFile = \'/Users/qinshu/joy/reslinks.txt\'
serverDomain = \'\'

def parseArgs():
    description = \'\'\'This program is used to batch download resources from specified urls.
                     eg. python3 res.py -u http://xxx.html -r \'img=jpg,png;class=resLink;id=xyz\'
                     will search resource links from network urls http://xxx.html  by specified rules
                     img = jpg or png OR class = resLink OR id = xyz [ multiple rules ]

                     python3 tools/res.py -u \'http://tu.heiguang.com/works/show_167480.html\' -r \'img=jpg!c\'
                     for <img src="xxx.jpg!c"/> 
                  \'\'\'
    parser = argparse.ArgumentParser(description=description)
    parser.add_argument(\'-u\',\'--url\', nargs=\'+\', help=\'At least one html urls are required\', required=True)
    parser.add_argument(\'-r\',\'--rulepath\', nargs=1, help=\'rules to search resources. if not given, search a hrefs or img resources in given urls\', required=False)
    args = parser.parse_args()
    init_urls = args.url
    rulepath = args.rulepath
    return (init_urls, rulepath)

def getAbsLink(serverDomain, link):

    try:
        href = link.attrs[\'href\']
        if href.startswith(\'//\'):
            return \'https:\' + href
        if href.startswith(\'/\'):
            return serverDomain + href
        else:
            return href
    except:
        return \'\'

def getTrueResLink(reslink):
    global serverDomain
    try:
        href = reslink.attrs[\'src\']
        if href.startswith(\'//\'):
            return \'http:\' + href 
        if href.startswith(\'/\'):
            href = serverDomain + href
            return href
        pos = href.find(\'jpg@\')
        if pos == -1:
            return href
        return href[0: pos+3]
    except:
        return \'\'

def batchGetResTrueLink(resLinks):
    hrefs = map(getTrueResLink, resLinks)
    return filter(lambda x: x != \'\', hrefs)

resTags = set([\'img\', \'video\'])

def findWantedLinks(htmlcontent, rule):
    \'\'\'
       find html links or res links from html by rule.
       sub rules such as:
          (1) a link with id=[value1,value2,...]
          (2) a link with class=[value1,value2,...]
          (3) res with src=xxx.jpg|png|mp4|...
       a rule is map containing sub rule such as:
          { \'id\': [id1, id2, ..., idn] } or
          { \'class\': [c1, c2, ..., cn] } or
          { \'img\': [\'jpg\', \'png\', ... ]} or
          { \'video\': [\'mp4\', ...]}

    \'\'\'

    #print("html===\n"+htmlcontent+"\n===End")
    #print("rule===\n"+str(rule)+"\n===End")

    soup = BeautifulSoup(htmlcontent, "lxml")
    alinks = []
    reslinks = []

    for (key, values) in rule.items():
        if key == \'id\':
            for id in values:
                links = soup.find_all(\'a\', id=id)
                links = map(getTrueResLink, links)
                links = filter(lambda x: x != \'\', links)
                alinks.extend(links)
        elif key == \'class\':
            for cls in values:
                if cls == \'*\':
                    links = soup.find_all(\'a\')
                else:
                    links = soup.find_all(\'a\', class_=cls)
                links = map(lambda link: getAbsLink(serverDomain, link), links)
                links = filter(lambda x: validate(x), links)
                alinks.extend(links)
        elif key in resTags:
            for resSuffix in values:
                reslinks.extend(soup.find_all(key, src=re.compile(resSuffix)))

    allLinks = []
    allLinks.extend(alinks)
    allLinks.extend(batchGetResTrueLink(reslinks))
    return allLinks

def validate(link):

    validSuffix = [\'png\', \'jpg\', \'jpeg\', \'mp4\']

    for suf in validSuffix:
        if link.endswith(suf):
            return True
    if link == \'\':
        return False
    if link.endswith(\'html\'):
        return False
    if \'javascript\' in link:
        return False    
    return True    

def batchGetLinksByRule(htmlcontentList, rules):
    \'\'\'
       find all res links from html content list by rules
    \'\'\'

    links = []
    for htmlcontent in htmlcontentList:
        for rule in rules:
            links.extend(findWantedLinks(htmlcontent, rule))
    return links

def batchGetLinks(urls, rules):
    conf = {"async":1, "targetIdWhenAsync": "page-fav", "sleepWhenAsync": 10}
    grasper = HTMLGrasper(conf)
    htmlcontentList = grasper.batchGrapHtmlContents(urls)
    allLinks = batchGetLinksByRule(htmlcontentList, rules)
    with open(SaveResLinksFile, \'w\') as f:
        for link in allLinks:
            print(link)
            f.write(link + "\n")

def parseRulesParam(rulesParam):
    \'\'\'
       parse rules params to rules json
       eg. img=jpg,png;class=resLink;id=xyz to
           [{"img":["jpg","png"], "class":["resLink"], "id":["xyz"]}]
    \'\'\'
    defaultRules = [{\'img\': [\'jpg\',\'png\',\'jpeg\']},{"class":"*"}]
    if rulesParam:
        try:
            rules = []
            rulesStrArr = rulesParam[0].split(";")
            for ruleStr in rulesStrArr:
                ruleArr = ruleStr.split("=")
                key = ruleArr[0]
                value = ruleArr[1].split(",")
                rules.append({key: value})
            return rules
        except ValueError as e:
            print(\'Param Error: invalid rulepath %s %s\' % (rulepathjson, e))
            sys.exit(1)
    return defaultRules

def parseServerDomain(url):
    parts = url.split(\'/\', 3)
    return parts[0] + \'//\' + parts[2]

def testBatchGetLinks():
    urls = [\'http://dp.pconline.com.cn/list/all_t145.html\']
    rules = [{"img":["jpg"], "video":["mp4"]}]

    batchGetLinks(urls, rules)

if __name__ == \'__main__\':

    #testBatchGetLinks()

    (init_urls, rulesParam) = parseArgs()
    print(\'init urls: %s\' % "\n".join(init_urls))

    rulepath = parseRulesParam(rulesParam)
    serverDomain = parseServerDomain(init_urls[0])
    print(\'rulepath: %s\n serverDomain:%s\' % (rulepath, serverDomain))

    batchGetLinks(init_urls, rulepath)

common.py

import os

def createDir(dirName):
    if not os.path.exists(dirName):
        os.makedirs(dirName)

def catchExc(func):
    def _deco(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            print ("error catch exception for %s (%s, %s): %s" % (func.__name__, str(*args), str(**kwargs), e))
            return None
    return _deco

multitasks.py

from multiprocessing import (cpu_count, Pool)
from multiprocessing.dummy import Pool as ThreadPool

ncpus = cpu_count()

def divideNParts(total, N):
    \'\'\'
       divide [0, total) into N parts:
        return [(0, total/N), (total/N, 2M/N), ((N-1)*total/N, total)]
    \'\'\'

    each = total / N
    parts = []
    for index in range(N):
        begin = index * each
        if index == N - 1:
            end = total
        else:
            end = begin + each
        parts.append((begin, end))
    return parts

class IoTaskThreadPool(object):
    \'\'\'
       thread pool for io operations
    \'\'\'
    def __init__(self, poolsize):
        self.ioPool = ThreadPool(poolsize)

    def exec(self, ioFunc, ioParams):
        if not ioParams or len(ioParams) == 0:
            return []
        return self.ioPool.map(ioFunc, ioParams)

    def execAsync(self, ioFunc, ioParams):
        if not ioParams or len(ioParams) == 0:
            return []
        self.ioPool.map_async(ioFunc, ioParams)

    def close(self):
        self.ioPool.close()

    def join(self):
        self.ioPool.join()

net.py

import requests
import time
from bs4 import BeautifulSoup
from common.common import catchExc
from common.multitasks import IoTaskThreadPool
from selenium import webdriver
from selenium.webdriver.chrome.options import Options

delayForHttpReq = 0.5 # 500ms

class HTMLGrasper(object):

    def __init__(self, conf):
        \'\'\'
        抓取 HTML 网页内容时的配置项
          _async: 是否异步加载网页。 _async = 1 当网页内容是动态生成时,异步加载网页; 
          targetIdWhenAsync: 当 _async = 1 指定。
             由于此时会加载到很多噪音内容,需要指定 ID 来精确获取所需的内容部分
          sleepWhenAsync:  当 _async = 1 指定。
             异步加载网页时需要等待的秒数  
        \'\'\'
        self._async = conf.get(\'async\', 0)
        self.targetIdWhenAsync = conf.get(\'targetIdWhenAsync\', \'\')
        self.sleepWhenAsync = conf.get(\'sleepWhenAsync\', 10)

    def batchGrapHtmlContents(self, urls):
        \'\'\'
           batch get the html contents of urls
        \'\'\'
        grapHtmlPool = IoTaskThreadPool(20)
        return grapHtmlPool.exec(self.getHTMLContent, urls)

    def getHTMLContent(self, url):
        if self._async == 1:
            htmlContent = self.getHTMLContentAsync(url)

            if htmlContent is not None and htmlContent != \'\':
                html = \'<html><head></head><body>\' + htmlContent + \'</body></html>\'
                return html

        return self.getHTMLContentFromUrl(url)

    def getHTMLContentAsync(self, url):
        \'\'\'
           get html content from dynamic loaed html url
        \'\'\'

        chrome_options = Options()
        chrome_options.add_argument(\'--headless\')
        chrome_options.add_argument(\'--disable-gpu\')
        driver = webdriver.Chrome(chrome_options=chrome_options)
        driver.get(url)
        time.sleep(self.sleepWhenAsync)

        try:
            elem = driver.find_element_by_id(self.targetIdWhenAsync)
        except:
            elem = driver.find_element_by_xpath(\'/html/body\')

        return elem.get_attribute(\'innerHTML\')       

    def getHTMLContentFromUrl(self, url):
        \'\'\'
           get html content from html url
        \'\'\'
        r = requests.get(url)
        status = r.status_code
        if status != 200:
            return \'\'
        return r.text

setup.py

from distutils.core import setup

setup(
       name = "pystudy" ,
       version = "1.0" ,
       description = "Python Study" ,
       author = " shuqin " ,
       author_email = " shuqin_1984@163.com ",
       url = " https://github.com/shuqin/pystudy " ,
       license = " LGPL " ,
       packages = [\'common\']
       )

install.sh

python3 setup.py build
python3 setup.py sdist
python3 setup.py install

分类:

技术点:

相关文章: