实例一:扒取猫眼电影TOP100 的信息
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
扒取猫眼电影TOP100 的信息
"""
import re
import json
from multiprocessing import Pool
import requests
from requests.exceptions import RequestException
def get_page_content(url):
"""
获取页面源码
:param url:
:return:
"""
try:
response = requests.get(url)
if response.status_code == 200:
return response.text
return None
except RequestException:
return None
def parse_html(html):
"""
使用正则表达式解析源码
:param html:
:return:
"""
pattern = re.compile(
\'<dd>.*?board-index.*?>(\d+)</i>.*?data-src="(.*?)".*?class="name"><a.*?>(.*?)</a>.*?class="score"><i.*?>(.*?)</i><i.*?>(.*?)</i>.*?</dd>\',
re.S)
items = re.findall(pattern, html)
for item in items:
yield {
\'index\': item[0],
\'image\': item[1],
\'title\': item[2],
\'score\': item[3] + item[4]
}
def write_to_file(dic):
"""
写入文件
:param dic:
:return:
"""
with open(\'result\', \'a\', encoding=\'utf-8\') as f:
f.write(json.dumps(dic, ensure_ascii=False) + "\n")
def main(offset):
"""
主函数
:param offset:
:return:
"""
url = \'http://maoyan.com/board/4?offset=\' + str(offset)
html = get_page_content(url)
for item in parse_html(html):
print(item)
write_to_file(item)
if __name__ == \'__main__\':
# 通过multiprocessing.Pool 创建多线程并发执行。
pool = Pool(4)
pool.map(main, [i * 10 for i in range(10)])
pool.close()
pool.join()
实例二:扒取头条网中的街拍图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
扒取头条网中的街拍图片
"""
import requests
import os
from hashlib import md5
from multiprocessing import Pool
def get_page(offset):
"""
搜索接口返回json结果
:param offset:
:return:
"""
url = \'http://www.toutiao.com/search_content\'
params = {
\'offset\': offset,
\'format\': \'json\',
\'keyword\': \'街拍\',
\'autoload\': \'true\',
\'count\': 20,
\'cur_tab\': 1,
\'from\': \'search_tab\'
}
try:
response = requests.get(url, params=params)
if response.status_code == 200:
return response.json()
except Exception as e:
print(e)
return None
def get_images(json):
"""
解析json,获取图片信息迭代器
:param json:
:return:
"""
if json.get("data"):
for item in json.get("data"):
title = item.get("title")
images = item.get("image_detail")
if images:
for image in images:
url = image.get("url")
yield {
\'title\': title,
\'image\': url
}
def download_image(item):
"""
保存图片到本地
:param item:
:return:
"""
url = item.get("image")
title = item.get("title")
if not os.path.exists(title):
try:
os.mkdir(title)
except Exception as e:
print(e)
file_path = os.path.join(title, md5(
item.get("image").encode("utf-8")).hexdigest() + ".jpg")
if not os.path.exists(file_path):
try:
response = requests.get(url)
if response.status_code == 200:
with open(file_path, \'wb\') as fp:
fp.write(response.content)
except Exception as e:
print(\'Failed to Save Image\')
print(e)
def main(offset):
"""
主要函数
:param offset:
:return:
"""
# step1.获取街拍搜索api的json返回
content = get_page(offset)
# step2.获取每条搜索记录的title与图片地址,并且遍历
for item in get_images(content):
print(item)
# step3.下载图片
download_image(item)
if __name__ == \'__main__\':
GROUP_START = 1
GROUP_END = 2
offsets = ([x * 20 for x in range(GROUP_START, GROUP_END + 1)])
# Make the Pool of workers
pool = Pool(4)
pool.map(main, offsets)
# close the pool and wait for the work to finish
pool.close()
pool.join()
实例三:扒取妹子图中的所有图片
#!/usr/bin/python
# coding=utf-8
"""
爬取妹子网图片到本地
"""
import os
import requests
from pyquery import PyQuery as pq
from multiprocessing import Pool
import time
class MeiZi:
def __init__(self):
self.index_url = "http://www.mzitu.com/"
self.headers = {
\'User-Agent\': \'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.108 Safari/537.36\',
\'Referer\': self.index_url
}
def get_document(self, url):
"""
获取指定页面的DOM文档,并使用pyquery解析
:param url:
:return:
"""
try:
response = requests.get(url)
if response.status_code == 200:
return pq(response.text)
else:
print("[页面访问失败]:status_code:%s,url:%s" % (
response.status_code, url))
except Exception as e:
print(e)
def index_page_range(self):
"""
获取首页翻页总数迭代器
:return:
"""
doc = self.get_document(self.index_url)
if doc:
page_count = doc(\'.nav-links a:nth-last-child(2)\').text()
if page_count:
return range(int(page_count))
def get_items(self, page):
"""
获取一级列表页中的主题页标题与主题页ID
:param html:
:return:
"""
doc = self.get_document(self.index_url + "page/" + str(page))
if doc:
items = doc(\'#pins li span a\').items()
if items:
for item in items:
yield {
\'item_url\': item.attr(\'href\'),
\'item_title\': item.text()
}
def save_item_details(self, item):
"""
保存二级专题页中的图片
:param detail_url:
:return:
"""
# 获取并解析二级专题页信息
item_url = item.get("item_url")
item_title = item.get("item_title")
doc = self.get_document(item_url)
if doc:
# 获取二级专题页中的翻页总数
page_count = doc(\'.pagenavi a:nth-last-child(2)\').text()
# 获取二级专题页中的图片地址
img_src = doc(\'.main-image img\').attr(\'src\')
# 保存第一页图片
self.save_image(item_title, img_src)
# 如果还有翻页,獲取保存分頁中的圖片
page = 1
while page < int(page_count):
page += 1
detail_url = item_url + "/" + str(page)
doc = self.get_document(detail_url)
if doc:
img_src = doc(\'.main-image img\').attr(\'src\')
self.save_image(item_title, img_src)
def save_image(self, title, img_src):
"""
保存图片img_src到本地,根据title命名文件夹
:param title:
:param img_src:
:return:
"""
root = "F:\\meizitu"
# 保存到的文件夾,如果不存在,則創建
save_dir = os.path.join(root, title)
if not os.path.exists(save_dir):
try:
os.makedirs(save_dir)
except Exception as e:
print("[創建文件夾失敗]:%s" % save_dir)
print(e)
# 如果圖片不存在,則下載並且保存圖片
pic_name = os.path.basename(img_src)
pic_path = os.path.join(save_dir, pic_name)
if not os.path.exists(pic_path):
try:
response = requests.get(img_src, headers=self.headers)
if response.status_code == 200:
with open(pic_path, \'wb\') as f:
f.write(response.content)
print(pic_path)
else:
print("[图片访问失败]status_code:%s,url:%s" % (
response.status_code, img_src))
except Exception as e:
print(e)
def main(self, page):
# 获取一级页面中的专题
items = self.get_items(page)
# 遍历一级专题,获取二级页面中的图片,保存图片
for item in items:
self.save_item_details(item)
if __name__ == \'__main__\':
start_time = time.time()
mz = MeiZi()
pool = Pool(10)
page_count = mz.index_page_range()
pool.map(mz.main, page_count)
pool.close()
pool.join()
print("times:", time.time() - start_time)
实例四:扒取1688商品宝贝的数据包
#!/usr/bin/python
# coding=utf-8
"""
扒取1688淘宝图片到本地
"""
import os
import requests
from pyquery import PyQuery as pq
import json
import re
def save_image(root, pic_name, img_src):
""" 保存图片到本地
:param root: 保存位置文件夹
:param pic_name:保存图片名称,如a.jpg
:param img_src:图片源地址
:return:
"""
# 保存到的文件夾,如果不存在,則創建
if not os.path.exists(root):
try:
os.makedirs(root)
except Exception as e:
print("[創建文件夾失敗]:%s" % root)
print(e)
# 如果圖片不存在,則下載並且保存圖片
pic_path = os.path.join(root, pic_name)
if not os.path.exists(pic_path):
try:
response = requests.get(img_src)
if response.status_code == 200:
with open(pic_path, \'wb\') as f:
f.write(response.content)
print("下载完成:", pic_path)
else:
print("[图片访问失败]status_code:%s,url:%s" % (
response.status_code, img_src))
except Exception as e:
print(e)
def main(root, detail_url, cookie):
""" 下载数据包主函数"""
header = {
\'User-Agent\': \'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.108 Safari/537.36\',
# 用户cookie
\'Accept-Encoding\': \'gzip, deflate, br\',
\'upgrade-insecure-requests\': \'1\',
\'cookie\': cookie,
}
resp = requests.get(url=detail_url, headers=header)
doc = pq(resp.text)
# 保存主图
images_show = doc(\'.content li.tab-trigger\')
if images_show:
loop = 1
for image in images_show:
src = json.loads(image.get(\'data-imgs\')).get(\'original\')
pic_type = os.path.splitext(src)[1]
save_image(root, \'主图_%s%s\' % (loop, pic_type), src)
loop += 1
# 保存详情图:
images_detail = doc(\'#desc-lazyload-container\')
detail = images_detail.attr(\'data-tfs-url\')
resp = requests.get(url=detail)
if resp.status_code == 200:
src_match = re.findall(\'<img.*?(https://.*?\.(jpg|png|jpeg|gif))\',
resp.text)
loop = 1
for src in src_match:
save_image(root, \'详情_%s.%s\' % (loop, src[1]), src[0])
loop += 1
if __name__ == \'__main__\':
# 设置图片保存的目录,注意每次都需要修改到不同的目录下,避免图片覆盖
root = \'F:\\ShopData\\777\'
# 准确填写 1688宝贝详情页地址
detail_url = \'https://detail.1688.com/offer/36413052665.html?spm=b26110380.sw1688.mof001.34.xEXARY\'
# 手工去登录1688网站后,将浏览器的cookie拷贝到这里
cookie = \'XXXXXXXXXXXX\'
# 开始执行扒取...
main(root, detail_url, cookie)
print("数据包下载完成!")
***微信扫一扫,关注“python测试开发圈”,了解更多测试教程!***