guanfuchang

 

实例一:扒取猫眼电影TOP100 的信息

#!/usr/bin/env python 
# -*- coding: utf-8 -*-
"""
扒取猫眼电影TOP100 的信息
"""

import re
import json
from multiprocessing import Pool

import requests
from requests.exceptions import RequestException


def get_page_content(url):
    """
    获取页面源码
    :param url:
    :return:
    """
    try:
        response = requests.get(url)
        if response.status_code == 200:
            return response.text
        return None
    except RequestException:
        return None


def parse_html(html):
    """
    使用正则表达式解析源码
    :param html:
    :return:
    """
    pattern = re.compile(
        \'<dd>.*?board-index.*?>(\d+)</i>.*?data-src="(.*?)".*?class="name"><a.*?>(.*?)</a>.*?class="score"><i.*?>(.*?)</i><i.*?>(.*?)</i>.*?</dd>\',
        re.S)
    items = re.findall(pattern, html)
    for item in items:
        yield {
            \'index\': item[0],
            \'image\': item[1],
            \'title\': item[2],
            \'score\': item[3] + item[4]
        }


def write_to_file(dic):
    """
    写入文件
    :param dic:
    :return:
    """
    with open(\'result\', \'a\', encoding=\'utf-8\') as f:
        f.write(json.dumps(dic, ensure_ascii=False) + "\n")


def main(offset):
    """
    主函数
    :param offset:
    :return:
    """
    url = \'http://maoyan.com/board/4?offset=\' + str(offset)
    html = get_page_content(url)
    for item in parse_html(html):
        print(item)
        write_to_file(item)


if __name__ == \'__main__\':
    # 通过multiprocessing.Pool 创建多线程并发执行。
    pool = Pool(4)
    pool.map(main, [i * 10 for i in range(10)])
    pool.close()
    pool.join()

实例二:扒取头条网中的街拍图片

#!/usr/bin/env python 
# -*- coding: utf-8 -*-
"""
扒取头条网中的街拍图片
"""
import requests
import os
from hashlib import md5
from multiprocessing import Pool


def get_page(offset):
    """
    搜索接口返回json结果
    :param offset:
    :return:
    """
    url = \'http://www.toutiao.com/search_content\'
    params = {
        \'offset\': offset,
        \'format\': \'json\',
        \'keyword\': \'街拍\',
        \'autoload\': \'true\',
        \'count\': 20,
        \'cur_tab\': 1,
        \'from\': \'search_tab\'
    }
    try:
        response = requests.get(url, params=params)
        if response.status_code == 200:
            return response.json()
    except Exception as e:
        print(e)
        return None


def get_images(json):
    """
    解析json,获取图片信息迭代器
    :param json:
    :return:
    """
    if json.get("data"):
        for item in json.get("data"):
            title = item.get("title")
            images = item.get("image_detail")
            if images:
                for image in images:
                    url = image.get("url")
                    yield {
                        \'title\': title,
                        \'image\': url
                    }


def download_image(item):
    """
    保存图片到本地
    :param item:
    :return:
    """
    url = item.get("image")
    title = item.get("title")
    if not os.path.exists(title):
        try:
            os.mkdir(title)
        except Exception as e:
            print(e)
    file_path = os.path.join(title, md5(
        item.get("image").encode("utf-8")).hexdigest() + ".jpg")
    if not os.path.exists(file_path):
        try:
            response = requests.get(url)
            if response.status_code == 200:
                with open(file_path, \'wb\') as fp:
                    fp.write(response.content)
        except Exception as e:
            print(\'Failed to Save Image\')
            print(e)


def main(offset):
    """
    主要函数
    :param offset:
    :return:
    """
    # step1.获取街拍搜索api的json返回
    content = get_page(offset)
    # step2.获取每条搜索记录的title与图片地址,并且遍历
    for item in get_images(content):
        print(item)
        # step3.下载图片
        download_image(item)


if __name__ == \'__main__\':
    GROUP_START = 1
    GROUP_END = 2
    offsets = ([x * 20 for x in range(GROUP_START, GROUP_END + 1)])
    # Make the Pool of workers
    pool = Pool(4)
    pool.map(main, offsets)
    # close the pool and wait for the work to finish
    pool.close()
    pool.join()

实例三:扒取妹子图中的所有图片

#!/usr/bin/python
# coding=utf-8

"""
爬取妹子网图片到本地
"""
import os
import requests
from pyquery import PyQuery as pq
from multiprocessing import Pool
import time


class MeiZi:
    def __init__(self):
        self.index_url = "http://www.mzitu.com/"
        self.headers = {
            \'User-Agent\': \'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.108 Safari/537.36\',
            \'Referer\': self.index_url
        }

    def get_document(self, url):
        """
        获取指定页面的DOM文档,并使用pyquery解析
        :param url:
        :return:
        """
        try:
            response = requests.get(url)
            if response.status_code == 200:
                return pq(response.text)
            else:
                print("[页面访问失败]:status_code:%s,url:%s" % (
                    response.status_code, url))
        except Exception as e:
            print(e)

    def index_page_range(self):
        """
        获取首页翻页总数迭代器
        :return:
        """
        doc = self.get_document(self.index_url)
        if doc:
            page_count = doc(\'.nav-links a:nth-last-child(2)\').text()
            if page_count:
                return range(int(page_count))

    def get_items(self, page):
        """
        获取一级列表页中的主题页标题与主题页ID
        :param html:
        :return:
        """
        doc = self.get_document(self.index_url + "page/" + str(page))
        if doc:
            items = doc(\'#pins li span a\').items()
            if items:
                for item in items:
                    yield {
                        \'item_url\': item.attr(\'href\'),
                        \'item_title\': item.text()
                    }

    def save_item_details(self, item):
        """
       保存二级专题页中的图片
        :param detail_url:
        :return:
        """
        # 获取并解析二级专题页信息
        item_url = item.get("item_url")
        item_title = item.get("item_title")
        doc = self.get_document(item_url)
        if doc:
            # 获取二级专题页中的翻页总数
            page_count = doc(\'.pagenavi a:nth-last-child(2)\').text()
            # 获取二级专题页中的图片地址
            img_src = doc(\'.main-image img\').attr(\'src\')
            # 保存第一页图片
            self.save_image(item_title, img_src)
            # 如果还有翻页,獲取保存分頁中的圖片
            page = 1
            while page < int(page_count):
                page += 1
                detail_url = item_url + "/" + str(page)
                doc = self.get_document(detail_url)
                if doc:
                    img_src = doc(\'.main-image img\').attr(\'src\')
                    self.save_image(item_title, img_src)

    def save_image(self, title, img_src):
        """
        保存图片img_src到本地,根据title命名文件夹
        :param title:
        :param img_src:
        :return:
        """
        root = "F:\\meizitu"
        # 保存到的文件夾,如果不存在,則創建
        save_dir = os.path.join(root, title)
        if not os.path.exists(save_dir):
            try:
                os.makedirs(save_dir)
            except Exception as e:
                print("[創建文件夾失敗]:%s" % save_dir)
                print(e)
        # 如果圖片不存在,則下載並且保存圖片
        pic_name = os.path.basename(img_src)
        pic_path = os.path.join(save_dir, pic_name)
        if not os.path.exists(pic_path):
            try:
                response = requests.get(img_src, headers=self.headers)
                if response.status_code == 200:
                    with open(pic_path, \'wb\') as f:
                        f.write(response.content)
                        print(pic_path)
                else:
                    print("[图片访问失败]status_code:%s,url:%s" % (
                        response.status_code, img_src))
            except Exception as e:
                print(e)

    def main(self, page):
        # 获取一级页面中的专题
        items = self.get_items(page)
        # 遍历一级专题,获取二级页面中的图片,保存图片
        for item in items:
            self.save_item_details(item)


if __name__ == \'__main__\':
    start_time = time.time()
    mz = MeiZi()
    pool = Pool(10)
    page_count = mz.index_page_range()
    pool.map(mz.main, page_count)
    pool.close()
    pool.join()
    print("times:", time.time() - start_time)

 

实例四:扒取1688商品宝贝的数据包

#!/usr/bin/python
# coding=utf-8

"""
扒取1688淘宝图片到本地
"""
import os
import requests
from pyquery import PyQuery as pq
import json
import re


def save_image(root, pic_name, img_src):
    """ 保存图片到本地
    :param root: 保存位置文件夹
    :param pic_name:保存图片名称,如a.jpg
    :param img_src:图片源地址
    :return:
    """

    # 保存到的文件夾,如果不存在,則創建
    if not os.path.exists(root):
        try:
            os.makedirs(root)
        except Exception as e:
            print("[創建文件夾失敗]:%s" % root)
            print(e)
    # 如果圖片不存在,則下載並且保存圖片
    pic_path = os.path.join(root, pic_name)
    if not os.path.exists(pic_path):
        try:
            response = requests.get(img_src)
            if response.status_code == 200:
                with open(pic_path, \'wb\') as f:
                    f.write(response.content)
                    print("下载完成:", pic_path)
            else:
                print("[图片访问失败]status_code:%s,url:%s" % (
                    response.status_code, img_src))
        except Exception as e:
            print(e)


def main(root, detail_url, cookie):
    """ 下载数据包主函数"""
    header = {
        \'User-Agent\': \'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.108 Safari/537.36\',
        # 用户cookie
        \'Accept-Encoding\': \'gzip, deflate, br\',
        \'upgrade-insecure-requests\': \'1\',
        \'cookie\': cookie,
    }
    resp = requests.get(url=detail_url, headers=header)
    doc = pq(resp.text)
    # 保存主图
    images_show = doc(\'.content li.tab-trigger\')
    if images_show:
        loop = 1
        for image in images_show:
            src = json.loads(image.get(\'data-imgs\')).get(\'original\')
            pic_type = os.path.splitext(src)[1]
            save_image(root, \'主图_%s%s\' % (loop, pic_type), src)
            loop += 1
    # 保存详情图:
    images_detail = doc(\'#desc-lazyload-container\')
    detail = images_detail.attr(\'data-tfs-url\')
    resp = requests.get(url=detail)
    if resp.status_code == 200:
        src_match = re.findall(\'<img.*?(https://.*?\.(jpg|png|jpeg|gif))\',
                               resp.text)
        loop = 1
        for src in src_match:
            save_image(root, \'详情_%s.%s\' % (loop, src[1]), src[0])
            loop += 1


if __name__ == \'__main__\':
    # 设置图片保存的目录,注意每次都需要修改到不同的目录下,避免图片覆盖
    root = \'F:\\ShopData\\777\'
    # 准确填写 1688宝贝详情页地址
    detail_url = \'https://detail.1688.com/offer/36413052665.html?spm=b26110380.sw1688.mof001.34.xEXARY\'
    # 手工去登录1688网站后,将浏览器的cookie拷贝到这里
    cookie = \'XXXXXXXXXXXX\'
    # 开始执行扒取...
    main(root, detail_url, cookie)
    print("数据包下载完成!")

 

***微信扫一扫,关注“python测试开发圈”,了解更多测试教程!***

分类:

技术点:

相关文章: