获取单页源码
def get_one_page(page):
\'\'\'获取单页源码\'\'\'
import requests
from requests.exceptions import RequestException
try:
url = "http://sh.ziroom.com/z/nl/z2.html?p=" + str(page)
print(\'url\',url)
headers = {
\'Referer\':\'http://sh.ziroom.com/\',
\'User-Agent\':\'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36\'
}
#创建请求对象
res = requests.get(url,headers=headers)
time.sleep(1)
if res.status_code == 200:
return res.text
return (\'status_code error\')
except RequestException:
return (\'RequestException error\')
def get_page_index(offset,keyword):
from urllib.parse import urlencode
data={
\'offset\': offset,
\'format\': \'json\',
\'keyword\': keyword,
\'autoload\': \'true\',
\'count\': 20,
\'cur_tab\': 1
}
url=\'https://www.toutiao.com/search_content/?\'+urlencode(data)
print(url)
try:
response=requests.get(url)
if response.status_code==200:
return response.text
return None
except RequestException:
print(\'error\')
return None
class RequestSpider(object):#定义类
def __init__(self):
url = \'https://www.baidu.com\'
headers = {
\'User-Agent\': \'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.67 Safari/537.36\'
}
self.response = requests.get(url, headers=headers)
def run(self):
data = self.response.content
# 1.获取请求头
request_headers = self.response.request.headers
# 2.获取相应头
coderesponse_headers = self.response.headers
# 3.响应状态码
code = self.response.status_code
# 4. 请求的cookie
request_cookie = self.response.request._cookies
print(request_cookie)
# 5. 响应的cookie
response_cookie = self.response.cookies
print(response_cookie)
print(data)
RequestSpider().run()
解析
import requests #xpath处理单页:
from lxml import etree
# 获取页面源码数据
url = \'https://bj.58.com/changping/ershoufang/?utm_source=sem-baidu-pc&spm=105916147073.26840108910\'
headers = {
\'User-Agent\': \'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36\'
}
page_text = requests.get(url=url, headers=headers).text
# print(page_text)
# 实例化etree对象且将页面源码数据加载到该对象中
tree = etree.HTML(page_text)
li_list = tree.xpath(\'//ul[@class="house-list-wrap"]/li\')
all_data_list = []
for li in li_list:
title = li.xpath(\'.//div[@class="list-info"]/h2/a/text()\')[0]
detail_url = li.xpath(\'.//div[@class="list-info"]/h2/a/@href\')[0]
if not \'https:\' in detail_url:
detail_url = \'https:\' + detail_url
price = li.xpath(\'.//div[@class="price"]/p//text()\')
price = \'\'.join(price)
# 对详情页发起请求,获取页面数据
detail_page_text = requests.get(url=detail_url, headers=headers).text
tree = etree.HTML(detail_page_text)
desc = tree.xpath(\'//div[@class="general-item-wrap"]//text()\')
desc = \'\'.join(desc).strip(\' \n \b \t\')
dic = {
\'title\': title,
\'price\': price,
\'desc\': desc
}
all_data_list.append(dic)
print(all_data_list)
import requests # xpath处理多页
import os
from lxml import etree
import random
headers = {
\'User-Agent\': \'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36\',
\'Connection\': \'close\'
}
start_page = 1
end_page = 5
if not os.path.exists(\'jianli\'):
os.mkdir(\'jianli\')
url = \'http://sc.chinaz.com/jianli/free_%d.html\'
#翻页
for page in range(start_page, end_page + 1):
if page == 1:
new_url = \'http://sc.chinaz.com/jianli/free.html\'
else:
new_url = format(url % page)
#获取detail url
response = requests.get(url=new_url, headers=headers)
response.encoding = \'utf-8\'
page_text = response.text
tree = etree.HTML(page_text)
div_list = tree.xpath(\'//div[@id="container"]/div\')
# tree.xpath(\'//div[@class="bottom"]/ul/li | //div[@class="bottom"]/div/li\')
for div in div_list:
detail_url = div.xpath(\'./a/@href\')[0]
name = div.xpath(\'./a/img/@alt\')[0]
#获取目标url
detail_page_text = requests.get(url=detail_url, headers=headers).text
tree = etree.HTML(detail_page_text)
download_url_list = tree.xpath(\'//div[@class="clearfix mt20 downlist"]/ul/li/a/@href\')
download_url = random.choice(download_url_list)
#请求 获取文件
jianli_data = requests.get(url=download_url, headers=headers).content
file_path = \'jianli/\' + name + \'.rar\'
with open(file_path, \'wb\') as fp:
fp.write(jianli_data)
print(file_path + \'下载成功\')
def parse_one_page(sourcehtml):#解析单页源码XPATH
\'\'\'解析单页源码\'\'\'
from lxml import etree
contentTree = etree.HTML(sourcehtml) #解析源代码
results = contentTree.xpath(\'//ul[@id="houseList"]/li\') #利用XPath提取相应内容
# xpath 语法 1. 节点 /
# 2. 跨节点: //
# 3. 精确的标签: //a[@属性="属性值"]
# 4. 标签包裹的内容 text()
# 5. 属性:@href
# xpath--s数据类型---list
for result in results:
title = result.xpath("./div/h3/a/text()")[0][5:] if len(result.xpath("./div/h3/a/text()")[0]) > 0 else ""
area = " ".join(result.xpath("./div/div/p[1]/span/text()")).replace(" ", "", 1) # 使用join方法将列表中的内容以" "字符连接
nearby = result.xpath("./div/div/p[2]/span/text()")[0].strip() if len(result.xpath("./div/div/p[2]/span/text()"))>0 else ""
data = {
"title": title,
"area": area,
"nearby": nearby
}
print(data)
save_to_mongodb(data)
//yield {"pages":pages}
def parse_one_page(sourcehtml): #解析单页源码XPATH
\'\'\'解析单页源码\'\'\'
from lxml import etree
contentTree = etree.HTML(sourcehtml) #解析源代码
results = contentTree.xpath(\'//ul[@id="houseList"]/li\') #利用XPath提取相应内容
for result in results:
title = result.xpath("./div/h3/a/text()")[0][5:] if len(result.xpath("./div/h3/a/text()")[0]) > 0 else ""
area = " ".join(result.xpath("./div/div/p[1]/span/text()")).replace(" ", "", 1) # 使用join方法将列表中的内容以" "字符连接
nearby = result.xpath("./div/div/p[2]/span/text()")[0].strip() if len(result.xpath("./div/div/p[2]/span/text()"))>0 else ""
data = {
"title": title,
"area": area,
"nearby": nearby
}
print(data)
save_to_mongodb(data)
//yield {"pages":pages}
def parse_one_page(html):#解析单页源码RE
\'\'\'解析单页源码\'\'\'
pattern = re.compile(\'<dd>.*?board-index.*?>(\d+)</i>.*?name"><a.*?>(.*?)</a>.*?star">(.*?)</p>.*?releasetime\'
+ \'.*?>(.*?)</p>.*?score.*?integer">(.*?)</i>.*?>(.*?)</i>.*?</dd>\',re.S)
items = re.findall(pattern,html)
#循环提取信息
for item in items:
yield {
\'rank\' :item[0],
\'name\':item[1],
# \'actor\':item[2].strip()[3:] if len(item[2])>3 else \'\', #判断是否大于3个字符
# \'time\' :item[3].strip()[5:] if len(item[3])>5 else \'\',
\'actor\':item[2].strip()[3:],
\'time\' :item[3].strip()[5:15],
\'score\':item[4] + item[5]
}
teacher_pat=\'class="lec-name">(.*?)<\'
teacher=re.compile(teacher_pat,re.S).findall(data)
if len(teacher)>0:
teacher=teacher[0]
else:
teacher=None
def parse_page_index(html):#解析单页源码JSON
data=json.loads(html)
print(data)
if data and \'data\' in data.keys():
for item in data.get(\'data\'):
yield item.get(\'article_url\')
def parse_page_detail(html): #解析单页源码BeautifulSoup
from bs4 import BeautifulSoup
soup=BeautifulSoup(html,\'lxml\')
title=soup.select(\'title\')[0].get_text()
print(title)
images_pattern=re.compile(\'gallery = (.*?);\', re.S)
result=re.search(images_pattern,html)
if result:
print(result.group(1))
def parse_items(who_sells=1):
urls=get_link(who_sells)
for url in urls:
wb_data=requests.get(url,headers=headers).text
soup=BeautifulSoup(wb_data,\'lxml\')
# print(soup)
data={
\'title\':soup.title.text,
\'price\':soup.select(\'span.infocard__container__item__main__text--price\')[0].text.replace(\'\t\',\'\').replace(\'\r\',\'\').replace(\'\n\',\'\').replace(\' \',\'\') if soup.find_all(\'span\',\'infocard__container__item__main__text--price\') else None,
# \'area\':list(soup.select(\'div.infocard__container__item__main a\')[0].stripped_strings),
\'area\':[soup.select(\'div.infocard__container__item__main a\')[0].text,soup.select(\'div.infocard__container__item__main a\')[1].text],
\'data\':soup.select(\'.detail-title__info__text\')[0].text,
# \'cate\':\'个人\' if who_sells ==0 else \'商家\',
# \'view\'
}
print(data)
write_to_file(data)
def get_attractions(url,data=None):
wb_data = requests.get(url)
time.sleep(4)
soup = BeautifulSoup(wb_data.text,\'lxml\')
titles = soup.select(\'div.property_title > a[target="_blank"]\')
imgs = soup.select(\'img[width="160"]\')
cates = soup.select(\'div.p13n_reasoning_v2\')
if data == None:
for title,img,cate in zip(titles,imgs,cates):
data = {
\'title\' :title.get_text(),
\'img\' :img.get(\'src\'),
\'cate\' :list(cate.stripped_strings),
}
print(data)
def get_pages(): #得到总页数
"""得到总页数"""
page = 1
html = get_one_page(page)
contentTree = etree.HTML(html)
pages = int(contentTree.xpath(\'//div[@class="pages"]/span[2]/text()\')[0].strip("共页"))
return pages
post提取api
import requests # 爬取药监总局的相关信息
headers = {
\'User-Agent\': \'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36\'
}
start_page = int(input(\'start page:\'))
end_page = int(input(\'end page:\'))
#
url = \'http://125.35.6.84:81/xk/itownet/portalAction.do?method=getXkzsList\'
id_list = []
# 指定爬取的页码范围
for page in range(start_page, end_page + 1):
data = {
\'on\': \'true\',
\'page\': str(page),
\'pageSize\': \'15\',
\'productName\': \'\',
\'conditionType\': \'1\',
\'applyname\': \'\',
\'applysn\': \'\'
}
first_dic = requests.post(url=url, headers=headers, data=data).json()
for d in first_dic[\'list\']:
id_list.append(d[\'ID\'])
detail_url = \'http://125.35.6.84:81/xk/itownet/portalAction.do?method=getXkzsById\'
for id in id_list:
data = {
\'id\': id
}
detail_dic = requests.post(url=detail_url, headers=headers, data=data).json()
print(detail_dic)
import requests #kfc餐厅位置信息
url = \'http://www.kfc.com.cn/kfccda/ashx/GetStoreList.ashx?op=keyword\'
# city = input(\'enter a city name:\')
city = \'上海\'
data = {
"cname": "",
"pid": "",
"keyword":city,
"pageIndex": "1",
"pageSize": "20",
}
headers = {
\'User-Agent\':\'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36\'
}
response = requests.post(url=url,data=data,headers=headers)
items=response.json()[\'Table1\']
for item in items:
print(item)
Ajax:
通用
import json
import requests
from lxml import etree
for i in range(1,5):
# url = \'http://product.dangdang.com/index.php?r=comment/list&productId=25340451&pageIndex=1\'
url = \'http://product.dangdang.com/index.php?r=comment/list&productId=25340451&categoryPath=01.07.07.04.00.00&mainProductId=25340451&mediumId=0&pageIndex={}\'.format(i)
header = {
\'Accept\': \'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8\',
\'User-Agent\': \'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 \'
\'(KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36\'
}
response = requests.get(url,
headers=header,
timeout=5
)
result = json.loads(response.text)
comment_html = result[\'data\'][\'list\'][\'html\']
#
tree = etree.HTML(comment_html)
#
comments = tree.xpath(\'//div[@class="items_right"]\')
for item in comments:
comment_time = item.xpath(\'./div[contains(@class,"starline")]/span/text()\')[0]
comment_content = item.xpath(\'./div[contains(@class,"describe_detail")]/span/text()\')[0]
print(comment_time)
print(comment_content)
url+count num抓取简书博客总阅读量
import requests
import json
import re
from lxml import etree
header = {
\'Accept\': \'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8\',
\'User-Agent\': \'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 \'
\'(KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36\'
}
def get_all_article_links():
links_list = []
for i in range(1,22):
url = \'https://www.jianshu.com/u/130f76596b02?order_by=shared_at&page={}\'.format(i)
response = requests.get(url,
headers=header,
timeout=5
)
tree = etree.HTML(response.text)
article_links = tree.xpath(\'//div[@class="content"]/a[@class="title"]/@href\')
for item in article_links:
article_link = \'https://www.jianshu.com\' + item
links_list.append(article_link)
return links_list
def get_read_num():
num_list = []
links_list = get_all_article_links()
for url in set(links_list):
response = requests.get(url,
headers=header,
timeout=5
)
content = response.text
read_num_pattern = re.compile(r\'"views_count":\d+\')
read_num = int(read_num_pattern.findall(content)[0].split(\':\')[-1])
print(read_num)
num_list.append(read_num)
return num_list
# total_read = 0
# for num in get_read_num():
# total_read += num
read_num_list = get_read_num()
print(\'总阅读量 =\', sum(read_num_list))
import json #抓取当当网图书评论
import requests
from lxml import etree
for i in range(1,5):
# url = \'http://product.dangdang.com/index.php?r=comment/list&productId=25340451&pageIndex=1\'
url = \'http://product.dangdang.com/index.php?r=comment/list&productId=25340451&categoryPath=01.07.07.04.00.00&mainProductId=25340451&mediumId=0&pageIndex={}\'.format(i)
header = {
\'Accept\': \'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8\',
\'User-Agent\': \'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 \'
\'(KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36\'
}
response = requests.get(url,
headers=header,
timeout=5
)
# print(response.text)
result = json.loads(response.text)
#
comment_html = result[\'data\'][\'list\'][\'html\']
#
tree = etree.HTML(comment_html)
#
comments = tree.xpath(\'//div[@class="items_right"]\')
#
for item in comments:
comment_time = item.xpath(\'./div[contains(@class,"starline")]/span/text()\')[0]
comment_content = item.xpath(\'./div[contains(@class,"describe_detail")]/span/text()\')[0]
print(comment_time)
print(comment_content)
import requests# 抓取简书博客总阅读量
import json
import re
from lxml import etree
# https://www.jianshu.com/u/130f76596b02
header = {
\'Accept\': \'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8\',
\'User-Agent\': \'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 \'
\'(KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36\'
}
def get_all_article_links():
links_list = []
for i in range(1,22):
url = \'https://www.jianshu.com/u/130f76596b02?order_by=shared_at&page={}\'.format(i)
response = requests.get(url,
headers=header,
timeout=5
)
tree = etree.HTML(response.text)
article_links = tree.xpath(\'//div[@class="content"]/a[@class="title"]/@href\')
for item in article_links:
article_link = \'https://www.jianshu.com\' + item
links_list.append(article_link)
return links_list
def get_read_num():
num_list = []
links_list = get_all_article_links()
for url in set(links_list):
response = requests.get(url,
headers=header,
timeout=5
)
content = response.text
read_num_pattern = re.compile(r\'"views_count":\d+\')
read_num = int(read_num_pattern.findall(content)[0].split(\':\')[-1])
print(read_num)
num_list.append(read_num)
return num_list
# total_read = 0
# for num in get_read_num():
# total_read += num
read_num_list = get_read_num()
print(\'总阅读量 =\', sum(read_num_list))
import requests #for while 循环退出
import time,random
import pyquery as pq
class Get_page():
def __init__(self):
# ajax 请求url
self.ajax_url = \'https://www.wandoujia.com/wdjweb/api/category/more\'
def get_page(self, page, cate_code, child_cate_code):
params = {
\'catId\': cate_code,
\'subCatId\': child_cate_code,
\'page\': page,
}
response = requests.get(self.ajax_url, params=params)
# print(response.json())
content = response.json()[\'data\'][\'content\'] # 提取json中的html页面数据
# print(content)
return content
def parse_page(self, content):
# 解析网页内容
contents = pq(content)(\'.card\').items()
data = []
for content in contents:
data1 = {
\'app_name\': content(\'.name\').text(),
\'install\': content(\'.install-count\').text(),
\'volume\': content(\'.meta span:last-child\').text(),
\'comment\': content(\'.comment\').text(),
}
data.append(data1)
if data:
# 写入MongoDB
self.write_to_mongodb(data)
if __name__ == \'__main__\':
# 实例化数据提取类
wandou_page = Get_page()
cate_code = 5029 # 影音播放大类别编号
child_cate_code = 716 # 视频小类别编号
for page in range(2, 100):
print(\'*\' * 50)
print(\'正在爬取:第 %s 页\' % page)
content = wandou_page.get_page(page, cate_code, child_cate_code)
# 添加循环判断,如果content 为空表示此页已经下载完成了,break 跳出循环
if not content == \'\':
wandou_page.parse_page(content)
sleep = time.sleep(random.randint(3, 6))
time.sleep(sleep)
else:
print(\'该类别已下载完最后一页\')
break
#========================================
page = 2 # 设置爬取起始页数
while True:
print(\'*\' * 50)
print(\'正在爬取:第 %s 页\' % page)
content = wandou_page.get_page(page, cate_code, child_cate_code)
if not content == \'\':
wandou_page.parse_page(content)
page += 1
sleep = time.sleep(random.randint(3, 6))
time.sleep(sleep)
else:
print(\'该类别已下载完最后一页\')
break
存储到DB中
def save_to_mongodb(result):#保存到mongo中
"""存储到MongoDB中"""
# 创建数据库连接对象, 即连接到本地
client = pymongo.MongoClient(host="localhost")
# 指定数据库,这里指定ziroom和表名
db = client.iroomz
db_table = db.roominfo
try:
#插入到数据库
if db_table.insert(result):
if db[\'lianjia\'].update({\'house_info\':item[\'house_info\']},{\'$set\':item},True):
print("抓取成功",result)
except Exception as reason:
print("抓取失败",reason)
from pymongo import MongoClient #将一串无头数据保存到mongo中
import datetime
client=MongoClient(\'mongodb://root:123@localhost:27017\')
table=client[\'db1\'][\'emp\']
# table.drop()
l=[
(\'kermit\',\'male\',18,\'20170301\',\'老男孩驻沙河办事处外交大使\',7300.33,401,1), #以下是教学部
(\'alex\',\'male\',78,\'20150302\',\'teacher\',1000000.31,401,1),
(\'wupeiqi\',\'male\',81,\'20130305\',\'teacher\',8300,401,1),
(\'yuanhao\',\'male\',73,\'20140701\',\'teacher\',3500,401,1),
(\'liwenzhou\',\'male\',28,\'20121101\',\'teacher\',2100,401,1),
(\'jingliyang\',\'female\',18,\'20110211\',\'teacher\',9000,401,1),
(\'jinxin\',\'male\',18,\'19000301\',\'teacher\',30000,401,1),
(\'成龙\',\'male\',48,\'20101111\',\'teacher\',10000,401,1),
(\'歪歪\',\'female\',48,\'20150311\',\'sale\',3000.13,402,2),#以下是销售部门
(\'丫丫\',\'female\',38,\'20101101\',\'sale\',2000.35,402,2),
(\'丁丁\',\'female\',18,\'20110312\',\'sale\',1000.37,402,2),
(\'星星\',\'female\',18,\'20160513\',\'sale\',3000.29,402,2),
(\'格格\',\'female\',28,\'20170127\',\'sale\',4000.33,402,2),
(\'张野\',\'male\',28,\'20160311\',\'operation\',10000.13,403,3), #以下是运营部门
(\'程咬金\',\'male\',18,\'19970312\',\'operation\',20000,403,3),
(\'程咬银\',\'female\',18,\'20130311\',\'operation\',19000,403,3),
(\'程咬铜\',\'male\',18,\'20150411\',\'operation\',18000,403,3),
(\'程咬铁\',\'female\',18,\'20140512\',\'operation\',17000,403,3)
]
for n,item in enumerate(l):
d={
"_id":n,
\'name\':item[0],
\'sex\':item[1],
\'age\':item[2],
\'hire_date\':datetime.datetime.strptime(item[3],\'%Y%m%d\'),
\'post\':item[4],
\'salary\':item[5]
}
table.save(d)
def save_to_mysql(title,teacher,price): #存储到mysql
import pymysql
conn=pymysql.connect(
host=\'localhost\', user=\'root\', password="123",
database=\'0207insertMysql\', port=3306
)
# conn.query("INSERT INTO tianshan(name,teacher,price) VALUES(\'" + str(title) + "\',\'" + str(teacher) + "\',\'" + str(price) + "\')")
# conn.query("INSERT INTO tianshan(name,teacher,price) VALUES(\'%s\',\'%s\',\'%s\')" %(str(title),str(teacher),str(price)))
conn.query("INSERT INTO tianshan(name,teacher,price) VALUES(\'str(title)\',\'str(teacher)\',\'str(price)\')")
conn.commit()
django中设置联合唯一
class MyModel(models.Model):
field1 = models.CharField(max_length=50)
field2 = models.CharField(max_length=50)
class Meta:
unique_together = (\'field1\', \'field2\',)
def write_to_file(content): #存储到本地txt
import json
with open(\'result.txt\',\'at\',encoding=\'utf-8\') as f:
#利用json.dumps()将字典序列化,并将ensure_ascii设置为False,从而显示中文.+换行
f.write(json.dumps(content,ensure_ascii=False) +\'\n\')
def store(ret):
with open("douban.text", "a", encoding="utf8") as f:
for item in ret:
#f.write(" ".join(item) + "\n")
f.write(str(item)+ "\n")
def write_to_csvField(fieldnames):#存储到本地csv
\'\'\'写入csv表头\'\'\'
import csv
with open("MovieResult.csv", \'a\', encoding=\'gb18030\', newline=\'\') as f:
#将字段名传给Dictwriter来初始化一个字典写入对象
writer = csv.DictWriter(f,fieldnames=fieldnames)
#调用writeheader方法写入字段名
writer.writeheader()
def write_to_csvRows(content,fieldnames):
\'\'\'写入csv内容\'\'\'
with open("MovieResult.csv",\'a\',encoding=\'gb18030\',newline=\'\') as f:
#将字段名传给Dictwriter来初始化一个字典写入对象
writer = csv.DictWriter(f,fieldnames=fieldnames)
writer.writerows(content)
f.close()
def save_video(video_url): #保存视频到本地
import uuid
try:
res = requests.get(video_url)
with open(r\'movies\%s.mp4\' % uuid.uuid4(), \'wb\') as f:
f.write(res.content)
f.flush()
print(\'%s done ...\' % video_url)
大文件的爬取保存
当使用requests的get下载大文件/数据时,建议使用使用stream模式。
当把get函数的stream参数设置成False时,它会立即开始下载文件并放到内存中,如果文件过大,有可能导致内存不足。
当把get函数的stream参数设置成True时,它不会立即开始下载,当你使用iter_content或iter_lines遍历内容或访问内容属性时才开始下载。需要注意一点:文件没有下载之前,它也需要保持连接。
iter_content:一块一块的遍历要下载的内容
iter_lines:一行一行的遍历要下载的内容
使用上面两个函数下载大文件可以防止占用过多的内存,因为每次只下载小部分数据。
示例代码:
r = requests.get(url_file, stream=True)
f = open("file_path", "wb")
# chunk是指定每次写入的大小,每次只写了512byte
for chunk in r.iter_content(chunk_size=512):
if chunk:
f.write(chunk)
开启爬虫
def task():
pages = get_pages()
print(\'总共\',pages)
for page in range(1,int(pages)+1):
html = get_one_page(page)
if html:
parse_one_page(html)
def task(offset):
url = "http://maoyan.com/board/4?offset={0}".format(offset)
html = get_one_page(url)
rows = []
for item in parse_one_page(html):
# write_to_textfile(item)
rows.append(item)
# 写入csv内容
write_to_csvRows(rows,fieldnames)
selenium
通用
from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
option = webdriver.ChromeOptions()
# option.add_argument(\'headless\')
# driver不加载图片,失效中
prefs = {"profile.managed_default_content_settings.images": 2}
option.add_experimental_option("prefs",prefs)
# 要换成适应自己操作系统的chromedriver
driver = webdriver.Chrome(
executable_path="F:\GoogleChrome_58.0.3029.110_x86_PortableSoft\App\Google Chrome\chromedriver.exe",
chrome_options=option
)
# PhantomJS
# browser = webdriver.PhantomJS()
#全屏
driver.maximize_window()
# 打开网站
url = \'https://www.baidu.com\'
driver.get(url)
# 打印当前页面标题
print(driver.title)
# 在搜索框中输入文字
timeout = 5
search_content = WebDriverWait(driver, timeout).until(
lambda d: d.find_element_by_xpath(\'//input[@id="kw"]\')
# EC.presence_of_element_located((By.XPATH, \'//input[@id="kw"]\'))
)
search_content.clear()
search_content.send_keys(\'python\')
import time
time.sleep(3)
# 模拟点击
search_button = WebDriverWait(driver, timeout).until(
lambda d: d.find_element_by_xpath(\'//input[@id="su"]\'))
search_button.click()
#获取浏览器alert
driver.switch_alert().submit()
driver.switch_alert().text
#截图保存
a=driver.get_screenshot_as_file("D:/Python35/test.jpg")
# 鼠标下拉到底多次
for i in range(3):
driver.execute_script(\'window.scrollTo(0, document.body.scrollHeight)\')
time.sleep(3)
下拉一定距离
#driver.execute_script(\'window.scrollTo(0, 5000)\')
# 保存网页
data=driver.page_source
print(len(data))
fh=open(\'baidu.html\',\'w\',encoding=\'utf-8\')
fh.write(str(data))
fh.close()
# 打印搜索结果
search_results = WebDriverWait(driver, timeout).until(
# lambda d: d.find_elements_by_xpath(\'//h3[@class="t c-title-en"] | //h3[@class="t"]\')
lambda e: e.find_elements_by_xpath(\'//h3[contains(@class,"t")]/a[1]\')
)
# print(search_results)
for item in search_results:
print(item.text)
print(item.get_attribute(\'href\'))
driver.close()
切换页面+xml利用搜狗搜索接口抓取微信公众号
from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait
option = webdriver.ChromeOptions()
# option.add_argument(\'headless\')
driver = webdriver.Chrome(
executable_path=\'F:\GoogleChrome_58.0.3029.110_x86_PortableSoft\App\Google Chrome\chromedriver.exe\',
chrome_options=option
)
url = \'http://weixin.sogou.com/weixin?type=1&s_from=input&query=python_shequ\'
driver.get(url)
print(driver.title)
timeout = 5
link = WebDriverWait(driver, timeout).until(
lambda d: d.find_element_by_link_text(\'Python爱好者社区\'))
link.click()
import time
time.sleep(1)
# 切换页面
window_handles = driver.window_handles
driver.switch_to.window(window_handles[-1])
print(driver.title)
article_links = WebDriverWait(driver, timeout).until(
lambda d: d.find_elements_by_xpath(\'//h4[@class="weui_media_title"]\'))
article_link_list = []
for item in article_links:
article_link = \'https://mp.weixin.qq.com\' + item.get_attribute(\'hrefs\')
# print(article_link)
article_link_list.append(article_link)
print(article_link_list)
# first_article_link = article_link_list[0]
import requests
from lxml import etree
header = {
\'Accept\': \'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8\',
\'User-Agent\': \'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 \'
\'(KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36\'
}
for i in range(0,len(article_link_list)):
response = requests.get(article_link_list[i],
headers=header,
timeout=5
)
tree = etree.HTML(response.text)
title = tree.xpath(\'//h2[@id="activity-name"]/text()\')[0].strip()
content = tree.xpath(\'//div[@id="js_content"]//text()\')
content = \'\'.join(content).strip()
print(title)
print(content)
淘宝selenium
import re
import pymysql
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from pyquery import PyQuery as pq
browser = webdriver.Chrome()
def get_one_page(name):
\'\'\'获取单个页面\'\'\'
print("-----------------------------------------------获取第一页-------------------------------------------------------")
try:
browser.get("https://www.taobao.com")
input = WebDriverWait(browser,10).until(
EC.presence_of_element_located((By.CSS_SELECTOR,"#q")))
input.send_keys(name)
button = WebDriverWait(browser,10).until(
EC.element_to_be_clickable((By.CSS_SELECTOR,"#J_TSearchForm > div.search-button > button")))
button.click()
pages = WebDriverWait(browser,10).until(
EC.presence_of_element_located((By.CSS_SELECTOR,"#mainsrp-pager > div > div > div > div.total")))
print("----即将解析第一页信息----")
get_info(name)
print("----第一页信息解析完成----")
return pages.text
except TimeoutException:
return get_one_page(name)
def get_next_page(page,name):
"""获取下一页"""
print("---------------------------------------------------正在获取第{0}页----------------------------------------".format(page))
try:
input = WebDriverWait(browser, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "#mainsrp-pager > div > div > div > div.form > input")))
input.send_keys(page)
button = WebDriverWait(browser, 10).until(
EC.element_to_be_clickable((By.CSS_SELECTOR, "#mainsrp-pager > div > div > div > div.form > span.btn.J_Submit")))
button.click()
WebDriverWait(browser,10).until(
EC.text_to_be_present_in_element((By.CSS_SELECTOR,"#mainsrp-pager > div > div > div > ul > li.item.active > span"),str(page)))
print("-----即将解析第{0}页信息-----".format(page))
get_info(name)
print("-----第{0}页信息解析完成-----".format(page))
except TimeoutException:
return get_next_page(page,name)
def get_info(name):
"""获取详情"""
WebDriverWait(browser,20).until(EC.presence_of_element_located((
By.CSS_SELECTOR,"#mainsrp-itemlist .items .item")))
text = browser.page_source
html = pq(text)
items = html(\'#mainsrp-itemlist .items .item\').items()
for item in items:
data = []
image = item.find(".pic .img").attr("data-src")
price = item.find(".price").text().strip().replace("\n","")
deal = item.find(".deal-cnt").text()[:-2]
title = item.find(".title").text().strip()
shop = item.find(".shop").text().strip()
location = item.find(".location").text()
data.append([shop, location, title, price, deal, image])
for dt in data:
save_to_mysql(dt,name)
def save_to_mysql(data,name):
"""存储到数据库"""
db= pymysql.connect(host = "localhost",user = "root",password = "123",port = 3306, db = "taobaoman",charset = "utf8")
cursor = db.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS {0}(shop VARCHAR(20),location VARCHAR(10),title VARCHAR(255),price VARCHAR(20),deal VARCHAR(20), image VARCHAR(255))".format(name))
sql = "INSERT INTO {0} values(%s,%s,%s,%s,%s,%s)".format(name)
try:
if cursor.execute(sql,data):
db.commit()
print("********已入库**********")
except:
print("#########入库失败#########")
db.rollback()
db.close()
def main(name):
pages = get_one_page(name)
pages = int(re.compile("(\d+)").findall(pages)[0])
for page in range(1,pages+1):
get_next_page(page,name)
if __name__ == \'__main__\':
name = "男装"
main(name)
验证码
图形验证码ydm
import json
import time
import requests
\'\'\'
有需要Python学习资料的小伙伴吗?小编整理【一套Python资料、源码和PDF】,感兴趣者可以加学习群:548377875或者加小编微信:【mmp9972】反正闲着也是闲着呢,不如学点东西啦~~
\'\'\'
class YDMHttp:
apiurl = \'http://api.yundama.com/api.php\'
username = \'\'
password = \'\'
appid = \'\'
appkey = \'\'
def __init__(self, username, password, appid, appkey):
self.username = username
self.password = password
self.appid = str(appid)
self.appkey = appkey
def request(self, fields, files=[]):
response = self.post_url(self.apiurl, fields, files)
response = json.loads(response)
return response
def balance(self):
data = {\'method\': \'balance\', \'username\': self.username, \'password\': self.password, \'appid\': self.appid,
\'appkey\': self.appkey}
response = self.request(data)
if response:
if response[\'ret\'] and response[\'ret\'] < 0:
return response[\'ret\']
else:
return response[\'balance\']
else:
return -9001
def login(self):
data = {\'method\': \'login\', \'username\': self.username, \'password\': self.password, \'appid\': self.appid,
\'appkey\': self.appkey}
response = self.request(data)
if response:
if response[\'ret\'] and response[\'ret\'] < 0:
return response[\'ret\']
else:
return response[\'uid\']
else:
return -9001
def upload(self, filename, codetype, timeout):
data = {\'method\': \'upload\', \'username\': self.username, \'password\': self.password, \'appid\': self.appid,
\'appkey\': self.appkey, \'codetype\': str(codetype), \'timeout\': str(timeout)}
file = {\'file\': filename}
response = self.request(data, file)
if response:
if response[\'ret\'] and response[\'ret\'] < 0:
return response[\'ret\']
else:
return response[\'cid\']
else:
return -9001
def result(self, cid):
data = {\'method\': \'result\', \'username\': self.username, \'password\': self.password, \'appid\': self.appid,
\'appkey\': self.appkey, \'cid\': str(cid)}
response = self.request(data)
return response and response[\'text\'] or \'\'
def decode(self, filename, codetype, timeout):
cid = self.upload(filename, codetype, timeout)
if cid > 0:
for i in range(0, timeout):
result = self.result(cid)
if result != \'\':
return cid, result
else:
time.sleep(1)
return -3003, \'\'
else:
return cid, \'\'
def report(self, cid):
data = {\'method\': \'report\', \'username\': self.username, \'password\': self.password, \'appid\': self.appid,
\'appkey\': self.appkey, \'cid\': str(cid), \'flag\': \'0\'}
response = self.request(data)
if response:
return response[\'ret\']
else:
return -9001
def post_url(self, url, fields, files=[]):
for key in files:
files[key] = open(files[key], \'rb\')
res = requests.post(url, files=files, data=fields)
return res.text
def use_ydm(filename):
username = \'a34955311\' # 用户名
password = \'a58591063\' # 密码
app_id = 7047 # 软件ID
app_key = \'d5133ed80d3e9f0e0e3e3f90143ec5d8\' # 软件密钥
code_type = 5000 # 验证码类型
timeout = 60 # 超时时间,秒
yundama = YDMHttp(username, password, app_id, app_key) # 初始化
balance = yundama.balance() # 查询余额
print(\'您的题分余额为{}\'.format(balance))
cid, result = yundama.decode(filename, code_type, timeout) # 开始识别
print(\'识别结果为{}\'.format(result))
return result
if __name__ == \'__main__\':
filename = \'captcha.jpg\'
use_ydm(filename)
点触验证码chaojiying
import requests
from hashlib import md5
class Chaojiying_Client(object):
def __init__(self, username, password, soft_id):
self.username = username
self.password = md5(password.encode(\'utf-8\')).hexdigest()
self.soft_id = soft_id
self.base_params = {
\'user\': self.username,
\'pass2\': self.password,
\'softid\': self.soft_id,
}
self.headers = {
\'Connection\': \'Keep-Alive\',
\'User-Agent\': \'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0)\',
}
def PostPic(self, im, codetype):
"""
im: 图片字节
codetype: 题目类型 参考 http://www.chaojiying.com/price.html
"""
params = {
\'codetype\': codetype,
}
params.update(self.base_params)
files = {\'userfile\': (\'ccc.jpg\', im)}
r = requests.post(\'http://upload.chaojiying.net/Upload/Processing.php\', data=params, files=files, headers=self.headers)
return r.json()
def ReportError(self, im_id):
"""
im_id:报错题目的图片ID
"""
params = {
\'id\': im_id,
}
params.update(self.base_params)
r = requests.post(\'http://upload.chaojiying.net/Upload/ReportError.php\', data=params, headers=self.headers)
return r.json()
if __name__ == \'__main__\':
chaojiying = Chaojiying_Client(\'超级鹰用户名\', \'超级鹰用户名的密码\', \'96001\')
im = open(\'a.jpg\', \'rb\').read()
print(chaojiying.PostPic(im, 1902))
spider.py
import time
from io import BytesIO
from PIL import Image
from selenium import webdriver
from selenium.webdriver import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from chaojiying2 import Chaojiying_Client
EMAIL = \'waratte\'
PASSWORD = \'nani0704\'
CHAOJIYING_USERNAME = \'a349553119\'
CHAOJIYING_PASSWORD = \'a58591063\'
CHAOJIYING_SOFT_ID = 896781 # 自己申请的ID
CHAOJIYING_KIND = 9102 # 要验证的点出验证码的类型
class cracktouclick(object):
def __init__(self):
self.url = \'https://kyfw.12306.cn/otn/resources/login.html\'
self.browser = webdriver.Chrome(\'F:\GoogleChrome_58.0.3029.110_x86_PortableSoft\App\Google Chrome\chromedriver.exe\')
self.wait = WebDriverWait(self.browser, 20)
self.email = EMAIL
self.password = PASSWORD
self.chaojiying = Chaojiying_Client(CHAOJIYING_USERNAME, CHAOJIYING_PASSWORD, CHAOJIYING_SOFT_ID)
def __del__(self):
self.browser.close()
def open(self): # 打开需要验证的网页
self.browser.maximize_window()
self.browser.get(self.url)
account = self.wait.until(EC.element_to_be_clickable((By.CLASS_NAME, \'login-hd-account\')))
account.click()
email = self.wait.until(EC.presence_of_element_located((By.ID, \'J-userName\'))) # 先识别输入框
password = self.wait.until(EC.presence_of_element_located((By.ID, \'J-password\'))) # 先识别输入框
email.send_keys(self.email)
password.send_keys(self.password) # 键入内容
def get_touclick_button(self): # 获取验证码的按钮
button = self.wait.until(EC.element_to_be_clickable((By.CLASS_NAME, \'lgcode-refresh\')))
return button
def get_touclick_element(self): # 该函数的作用是获取跳出的验证图片
element = self.wait.until(EC.presence_of_element_located((By.CLASS_NAME, \'imgCode\'))) # 获取图片
return element
def get_position(self): # 获取字段的位置,即验证码的位置
element = self.get_touclick_element()
time.sleep(2)
location = element.location
size = element.size
top, bottom, left, right = location[\'y\'], location[\'y\'] + size[\'height\'], location[\'x\'], location[\'x\'] + size[
\'width\']
return (top, bottom, left, right)
def get_screenshot(self): # 获取验证码的截图
screenshot = self.browser.get_screenshot_as_png()
screenshot = Image.open(BytesIO(screenshot))
return screenshot
def get_touclick_image(self, name=\'captcha.png\'): # 获取验证码图片
top, bottom, left, right = self.get_position()
print(\'验证码位置:\', top, bottom, left, right)
screenshot = self.get_screenshot()
captcha = screenshot.crop((left, top, right, bottom))
captcha.save(name)
return captcha
"""
接下来要做的就是将超级鹰识别的文字的位置(以字符串的形式返回)进行解析,然后模拟点击
形式为\'pic_str\':\'132,127|56,77\'
"""
def get_points(self, captcha_result): # 里面的参数为上述的识别结果,函数返回的是转化处理后的结果
groups = captcha_result.get(\'pic_str\').split(\'|\') # 返回的是分割后的列表
locations = [[int(number) for number in group.split(\',\')] for group in groups]
return locations
def touch_click_words(self, locations): # 下面就是模拟点击图片中文字的位置\'
for location in locations:
print(location)
ActionChains(self.browser).move_to_element_with_offset(self.get_touclick_element(), location[0],
location[1]).click().perform()
time.sleep(1)
def touch_click_verify(self): # 每点击字段,就会出现一个圆圈,这个就是验证按钮
button = self.wait.until(EC.element_to_be_clickable((By.CLASS_NAME, \'login-btn\'))) # 先识别这个按钮是否出现
button.click()
def login(self): # 当验证成功之后,就可以点击登陆按钮实现登陆了
submit = self.wait.until(EC.element_to_be_clickable((By.ID, \'_submit\'))) # submit实际就是按钮的定位
submit.click()
time.sleep(10)
print(\'登录成功\')
def crack(self): # 程序开启入口
self.open() # 进入登陆界面
button = self.get_touclick_button() # 开始识别验证按钮
button.click() # 点击该按钮
image = self.get_touclick_image()
"""
接下来就要用超级鹰Chaojiying类里的post_pic方法(参数为需要传入识别的图片对象,以及该类验证码的代号
在充值提分后,可以查询到
"""
bytes_array = BytesIO()
image.save(bytes_array, format=\'PNG\')
result = self.chaojiying.PostPic(bytes_array.getvalue(), CHAOJIYING_KIND)
print(result) # 返回的是json 类型的
locations = self.get_points(result) # 获得解析后的结果
self.touch_click_words(locations)
self.touch_click_verify()
success = self.wait.until(EC.text_to_be_present_in_element((By.CLASS_NAME, \'touclick-hod-note\'), \'验证成功\'))
print(success)
if not success:
self.crack()
else:
self.login()
if __name__ == \'__main__\':
crack = cracktouclick() # 实例化
crack.crack()
jquery滑动
from selenium import webdriver #模拟滑动
from selenium.webdriver.common.action_chains import ActionChains
from selenium.common.exceptions import UnexpectedAlertPresentException
from time import sleep
driver = webdriver.Chrome(executable_path=\'F:\GoogleChrome_58.0.3029.110_x86_PortableSoft\App\Google Chrome\chromedriver.exe\')
driver.get("https://www.helloweba.com/demo/2017/unlock/")
dragger = driver.find_elements_by_class_name("slide-to-unlock-handle")[0]
action = ActionChains(driver)
action.click_and_hold(dragger).perform() #鼠标左键按下不放
for index in range(200):
import random
try:
action.move_by_offset(random.randint(1,5), 0).perform() #平行移动鼠标
except UnexpectedAlertPresentException:
break
action.reset_actions()
sleep(0.005) #等待停顿时间
ActionChains(driver).release().perform()
from selenium import webdriver #切换到iframeResult子标签里 模拟滑动
from selenium.webdriver import ActionChains
from selenium.webdriver.common.by import By # 按照什么方式查找,By.ID,By.CSS_SELECTOR
from selenium.webdriver.common.keys import Keys # 键盘按键操作
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait # 等待页面加载某些元素
import time
option = webdriver.ChromeOptions()
# option.add_argument(\'headless\')
driver = webdriver.Chrome(executable_path=\'F:\GoogleChrome_58.0.3029.110_x86_PortableSoft\App\Google Chrome\chromedriver.exe\', chrome_options=option)
driver.get(\'http://www.runoob.com/try/try.php?filename=jqueryui-api-droppable\')
wait=WebDriverWait(driver,3)
# driver.implicitly_wait(3) # 使用隐式等待
try:
driver.switch_to.frame(\'iframeResult\') ##切换到iframeResult子标签里
sourse=driver.find_element_by_id(\'draggable\') #源
target=driver.find_element_by_id(\'droppable\') #目标
#方式一:基于同一个动作链串行执行
# actions=ActionChains(driver) #拿到动作链对象
# actions.drag_and_drop(sourse,target) #把动作放到动作链中,准备串行执行,水平执行
# actions.perform()
#方式二:不同的动作链,每次移动的位移都不同
ActionChains(driver).click_and_hold(sourse).perform() #选中源,不松手
distance=target.location[\'x\']-sourse.location[\'x\'] #source.location: 源的坐标 target.location[\'x\']
track=0 #走过的距离
while track < distance: # 走过的距离小于总距离
ActionChains(driver).move_by_offset(xoffset=2,yoffset=0).perform() #每次移动两个单位
track+=2 #加2
ActionChains(driver).release().perform()
time.sleep(10)
finally:
driver.close()
极验滑动
from selenium.webdriver.support import expected_conditions as EC #极验官网
from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.common.by import By
from selenium.webdriver import ActionChains
from PIL import Image
from io import BytesIO
import time
BORDER = 6
class CrackGeetest():
def __init__(self):
self.url = \'https://www.geetest.com/type/\'
self.browser = webdriver.Chrome()
self.wait = WebDriverWait(self.browser, 10)
def open(self):
\'\'\'
打开网页
:return None
\'\'\'
self.browser.get(self.url)
time.sleep(1)
def close(self):
\'\'\'
关闭网页
:return None
\'\'\'
self.browser.close()
self.browser.quit()
def change_to_slide(self):
\'\'\'
切换为滑动认证
:return 滑动选项对象
\'\'\'
huadong = self.wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, \'.products-content ul > li:nth-child(2)\'))
)
time.sleep(1)
return huadong
def get_geetest_button(self):
\'\'\'
获取初始认证按钮
:return 按钮对象
\'\'\'
button = self.wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, \'.geetest_radar_tip\'))
)
return button
def wait_pic(self):
\'\'\'
等待验证图片加载完成
:return None
\'\'\'
self.wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, \'.geetest_popup_wrap\'))
)
def get_screenshot(self):
"""
获取网页截图
:return: 截图对象
"""
screenshot = self.browser.get_screenshot_as_png()
screenshot = Image.open(BytesIO(screenshot))
return screenshot
def get_position(self):
\'\'\'
获取验证码位置
:return: 位置元组
\'\'\'
img = self.wait.until(EC.presence_of_element_located((By.CLASS_NAME, \'geetest_canvas_img\')))
time.sleep(2)
location = img.location
size = img.size
top, bottom = location[\'y\'], location[\'y\'] + size[\'height\']
left, right = location[\'x\'], location[\'x\'] + size[\'width\']
return (top, bottom, left, right)
def get_slider(self):
\'\'\'
获取滑块
:return: 滑块对象
\'\'\'
slider = self.wait.until(EC.element_to_be_clickable((By.CLASS_NAME, \'geetest_slider_button\')))
return slider
def get_geetest_image(self, name=\'captcha.png\'):
\'\'\'
获取验证码图片
:return: 图片对象
\'\'\'
top, bottom, left, right = self.get_position()
print(\'验证码位置\', top, bottom, left, right)
screenshot = self.get_screenshot()
captcha = screenshot.crop((left, top, right, bottom))
captcha.save(name)
return captcha
def delete_style(self):
\'\'\'
执行js脚本,获取无滑块图
:return None
\'\'\'
js = \'document.querySelectorAll("canvas")[2].style=""\'
self.browser.execute_script(js)
def is_pixel_equal(self, img1, img2, x, y):
\'\'\'
判断两个像素是否相同
:param img1: 不带缺口图片
:param img2: 带缺口图
:param x: 位置x
:param y: 位置y
:return: 像素是否相同
\'\'\'
# 取两个图片的像素点
pix1 = img1.load()[x, y]
pix2 = img2.load()[x, y]
threshold = 60
if abs(pix1[0] - pix2[0]) < threshold \
and abs(pix1[1] - pix2[1]) < threshold \
and abs(pix1[2] - pix2[2]) < threshold:
return True
else:
return False
def get_gap(self, img1, img2):
\'\'\'
获取缺口偏移量
:param img1: 不带缺口图片
:param img2: 带缺口图
:return 缺口位置
\'\'\'
left = 60
for i in range(left, img1.size[0]):
for j in range(img1.size[1]):
if not self.is_pixel_equal(img1, img2, i, j):
left = i
return left
return left
def get_track(self, distance):
\'\'\'
根据偏移量获取移动轨迹
:param distance: 偏移量
:return: 移动轨迹
\'\'\'
# 移动轨迹
track = []
# 当前位移
current = 0
# 减速阈值
mid = distance * 3 / 5
# 计算间隔
t = 0.2
# 初速度
v = 0
# 滑超过过一段距离
distance += 14
while current < distance:
if current < mid:
# 加速度为正
a = 2
else:
# 加速度为负
a = -1.5
# 初速度 v0
v0 = v
# 当前速度 v
v = v0 + a * t
# 移动距离 move-->x
move = v0 * t + 1 / 2 * a * t * t
# 当前位移
current += move
# 加入轨迹
track.append(round(move))
return track
def shake_mouse(self):
\'\'\'
模拟人手释放鼠标时的抖动
:return: None
\'\'\'
ActionChains(self.browser).move_by_offset(xoffset=-3, yoffset=0).perform()
ActionChains(self.browser).move_by_offset(xoffset=2, yoffset=0).perform()
def move_to_gap(self, slider, tracks):
\'\'\'
拖动滑块到缺口处
:param slider: 滑块
:param tracks: 轨迹
:return
\'\'\'
back_tracks = [-1, -1, -2, -2, -3, -2, -2, -1, -1]
ActionChains(self.browser).click_and_hold(slider).perform()
# 正向
for x in tracks:
ActionChains(self.browser).move_by_offset(xoffset=x, yoffset=0).perform()
# 逆向
for x in back_tracks:
ActionChains(self.browser).move_by_offset(xoffset=x, yoffset=0).perform()
# 模拟抖动
self.shake_mouse()
time.sleep(0.5)
ActionChains(self.browser).release().perform()
def crack(self):
try:
# 打开网页
self.open()
# 转换验证方式,点击认证按钮
s_button = self.change_to_slide()
s_button.click()
g_button = self.get_geetest_button()
g_button.click()
# 确认图片加载完成
self.wait_pic()
# 获取滑块
slider = self.get_slider()
# 获取带缺口的验证码图片
image1 = self.get_geetest_image(\'captcha1.png\')
self.delete_style()
image2 = self.get_geetest_image(\'captcha2.png\')
gap = self.get_gap(image1, image2)
print(\'缺口位置\', gap)
gap -= BORDER
track = self.get_track(gap)
self.move_to_gap(slider, track)
success = self.wait.until(
EC.text_to_be_present_in_element((By.CLASS_NAME, \'geetest_success_radar_tip_content\'), \'验证成功\')
)
print(success)
time.sleep(5)
self.close()
except:
print(\'Failed-Retry\')
self.crack()
if __name__ == \'__main__\':
crack = CrackGeetest()
crack.crack()
from selenium import webdriver #京东极限登录
from selenium.webdriver.common.action_chains import ActionChains
import datetime,random
import time
import cv2
import numpy as np
import urllib.request as request
driver = webdriver.Chrome(executable_path="F:\GoogleChrome_58.0.3029.110_x86_PortableSoft\App\Google Chrome\chromedriver.exe",)
# http://gate.jd.com/InitCart.aspx?pid=4993737&pcount=1&ptype=1
# 获取图形验证的图片,并滑动滑块实现滑块验证处理
def get_image_position(flag):
# 获取滑块图片的下载地址
try:
image1 = driver.find_element_by_class_name(\'JDJRV-smallimg\').find_element_by_xpath(\'img\').get_attribute(\'src\')
except BaseException:
flag= True
return flag
# 获取背景大图图片的下载地址
image2 = driver.find_element_by_class_name(\'JDJRV-bigimg\').find_element_by_xpath(\'img\').get_attribute(\'src\')
print("image1:", image1)
print("image2:", image2)
if image1 is None or image2 is None:
return
if driver.find_element_by_class_name(\'JDJRV-smallimg\').is_displayed() is False:
return
image1_name = \'slide_block.png\' # 滑块图片名
image2_name = \'slide_bkg.png\' # 背景大图名
# 下载滑块图片并存储到本地
request.urlretrieve(image1, image1_name)
# 下载背景大图并存储到本地
request.urlretrieve(image2, image2_name)
# 获取图片,并灰化
block = cv2.imread(image1_name, 0)
template = cv2.imread(image2_name, 0)
# 二值化之后的图片名称
block_name = \'block.jpg\'
template_name = \'template.jpg\'
# 将二值化后的图片进行保存
cv2.imwrite(template_name, template)
cv2.imwrite(block_name, block)
block = cv2.imread(block_name)
block = cv2.cvtColor(block, cv2.COLOR_BGR2GRAY)
block = abs(255 - block)
cv2.imwrite(block_name, block)
block = cv2.imread(block_name)
template = cv2.imread(template_name)
# 获取偏移量
result = cv2.matchTemplate(block, template, cv2.TM_CCOEFF_NORMED) # 查找block图片在template中的匹配位置,result是一个矩阵,返回每个点的匹配结果
# print(\'偏移量\',result)
x, y = np.unravel_index(result.argmax(), result.shape)
print(\'xy:\',x,y)
# 获取滑块
element = driver.find_element_by_class_name(\'JDJRV-slide-btn\')
# 滑动滑块
ActionChains(driver).click_and_hold(on_element=element).perform()
print("x方向的偏移", int(y * 0.4 + 18), \'x:\', x, \'y:\', y)
ActionChains(driver).move_to_element_with_offset(to_element=element, xoffset=y, yoffset=0).perform()
time.sleep(1)
ActionChains(driver).release(on_element=element).perform()
time.sleep(3)
def login(username, password):
driver.get("https://passport.jd.com/new/login.aspx")
time.sleep(3)
driver.find_element_by_link_text("账户登录").click()
driver.find_element_by_name("loginname").send_keys(username)
driver.find_element_by_name("nloginpwd").send_keys(password)
driver.find_element_by_id("loginsubmit").click()
while True:
time.sleep(random.randint(1,3))
a = get_image_position(True)
if a:
break
time.sleep(random.randint(1,3))
driver.get("https://cart.jd.com/cart.action")
time.sleep(random.randint(1,3))
# driver.find_element_by_id("toggle-checkboxes_down").click()
# driver.find_element_by_link_text("去结算").click()
# time.sleep(random.randint(1,3))
# driver.find_element_by_id("order-submit").click()
now = datetime.datetime.now()
#now_time = now.strftime(\'%Y-%m-%d %H:%M:%S\')
print(now.strftime(\'%Y-%m-%d %H:%M:%S\'))
print(\'login success, you can ou up!\')
def buy_on_time(buytime):
while True:
now = datetime.datetime.now()
if now.strftime(\'%Y-%m-%d %H:%M:%S\') == buytime:
driver.find_element_by_id(\'order-submit\').click()
time.sleep(3)
print(now.strftime(\'%Y-%m-%d %H:%M:%S\'))
print(\'purchase success\')
time.sleep(0.5)
if __name__ == \'__main__\':
login(\'18916826131\', \'aa58591063\')
auto login
通用1 先拿到cookie,直接用cookie登录
import requests
from lxml import etree
str = \'mfw_uuid=5b9a4ecc-a01c-a3c7-4c79-e5720323acc4; uva=s%3A286%3A%22a%3A4%3A%7Bs%3A13%3A%22host_pre_time%22%3Bs%3A10%3A%222018-09-13%22%3Bs%3A2%3A%22lt%22%3Bi%3A1536839374%3Bs%3A10%3A%22last_refer%22%3Bs%3A159%3A%22https%3A%2F%2Fwww.baidu.com%2Flink%3Furl%3D_zw5SJnLcpOBzcaWI-zxfT3zLFutByjvPJkOPZ0cIZXsqsm6OTvcOK5Rrp_454Mhn79QhyiI3-8oXVU6Jypbeq%26wd%3D%26eqid%3Dcb5428d400045c5b000000045b9a4ec7%22%3Bs%3A5%3A%22rhost%22%3Bs%3A13%3A%22www.baidu.com%22%3B%7D%22%3B; __mfwurd=a%3A3%3A%7Bs%3A6%3A%22f_time%22%3Bi%3A1536839374%3Bs%3A9%3A%22f_rdomain%22%3Bs%3A13%3A%22www.baidu.com%22%3Bs%3A6%3A%22f_host%22%3Bs%3A3%3A%22www%22%3B%7D; __mfwuuid=5b9a4ecc-a01c-a3c7-4c79-e5720323acc4; UM_distinctid=165d2c3e1799a-02f264b51995c7-454c092b-15f900-165d2c3e17a10f; oad_n=a%3A3%3A%7Bs%3A3%3A%22oid%22%3Bi%3A1029%3Bs%3A2%3A%22dm%22%3Bs%3A20%3A%22passport.mafengwo.cn%22%3Bs%3A2%3A%22ft%22%3Bs%3A19%3A%222019-03-04+08%3A38%3A15%22%3B%7D; __mfwlv=1551659960; __mfwvn=4; all_ad=1; CNZZDATA30065558=cnzz_eid%3D1570340712-1551659522-null%26ntime%3D1551659522; _r=qq; _rp=a%3A2%3A%7Bs%3A1%3A%22p%22%3Bs%3A12%3A%22mail.qq.com%2F%22%3Bs%3A1%3A%22t%22%3Bi%3A1551660052%3B%7D; uol_throttle=93133587; PHPSESSID=l5b5oogta6evalsa3rgtcc7bn0; __mfwlt=1551660679\'
str_list = str.split(\';\')
#转换cookie
cookies = {}
for item in str_list:
# print(item)
key = item.split(\'=\')[0].strip()
value = item.split(\'=\')[1].strip()
cookies[key] = value
print(cookies)
header = {
\'Accept\': \'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8\',
\'User-Agent\': \'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 \'
\'(KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36\'
}
logined_url = \'http://www.mafengwo.cn/friend/index/follow?uid=70360114\'
response = requests.get(logined_url, headers=header, cookies=cookies)
print(response.status_code)
tree = etree.HTML(response.text)
friends = tree.xpath(\'//div[@class="name"]/a/text()\')
print(friends)
通用2 session.post浏览 #马蜂窝
import requests
from lxml import etree
session = requests.Session()
# phone_number = input(\'电话\')
phone_number = \'349553119@qq.com\'
# password = input(\'密码\')
password = \'a58591063\'
data = {\'passport\': phone_number, \'password\': password}
header = {
\'Accept\': \'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8\',
\'User-Agent\': \'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 \'
\'(KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36\'
}
r = session.post("https://passport.mafengwo.cn/login/", headers=header, data=data)
print(r.status_code)
# print(r.text)
logined_url = \'http://www.mafengwo.cn/friend/index/follow?uid=70360114\'
response = session.get(logined_url, headers=header)
print(response.status_code)
# print(response.text)
tree = etree.HTML(response.text)
friends = tree.xpath(\'//div[@class="name"]/a/text()\')
print(friends)
通用3 先获取cookie和token再登录获取真实的cookie #github
import requests,re
from lxml import etree
from bs4 import BeautifulSoup
#先获取token和c1
session = requests.Session()
i1 = session.get(\'https://github.com/login\')
contentTree = etree.HTML(i1.text) #解析源代码
authenticity_token = contentTree.xpath(\'//input/@value\')[1]
print(authenticity_token)
c1 = i1.cookies.get_dict()
print(c1)
# 携带authenticity_token和用户名密码等信息,发送用户验证
form_data = {
"authenticity_token": authenticity_token,
"utf8": "",
"commit": "Sign in",
"login": "dujun31@vip.qq.com",
\'password\': \'a58591063\'
}
i2 = requests.post(\'https://github.com/session\', data=form_data, cookies=c1)
c2 = i2.cookies.get_dict()
print(c2)
print(i2.status_code)
# 用c2 请求网页
i3 = requests.get(\'https://github.com/settings/profile\', cookies=c2)
# print(i3.text)
contentTree = etree.HTML(i3.text) #解析源代码
name = contentTree.xpath(\'//input[@class="form-control"]/@value\')[0]
print(name)
模拟登录豆瓣_验证码
from urllib.request import urlretrieve
import requests
from bs4 import BeautifulSoup
from os import remove
try:
import cookielib
except:
import http.cookiejar as cookielib
try:
from PIL import Image
except:
pass
url = \'https://accounts.douban.com/login\'
datas = {\'source\': \'index_nav\',
\'remember\': \'on\'}
headers = {\'Referer\': \'https://www.douban.com/\',
\'User-Agent\': \'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36\'
\' (KHTML, like Gecko) Chrome/60.0.3112.101 Safari/537.36\',
\'Accept\': \'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8\',
\'Accept-Language\': \'zh-CN,zh;q=0.8\'}
# 尝试使用cookie信息
session = requests.session()
session.cookies = cookielib.LWPCookieJar(filename=\'cookies\')
try:
session.cookies.load(ignore_discard=True)
except:
print("Cookies未能加载")
#cookies加载不成功,则输入账号密码信息
# datas[\'form_email\'] = input(\'Please input your account:\')
datas[\'form_email\'] = \'13482632585\'
# datas[\'form_password\'] = input(\'Please input your password:\')
datas[\'form_password\'] = \'a58591063\'
def get_captcha():
\'\'\'
获取验证码及其ID
\'\'\'
r = requests.post(url, data=datas, headers=headers)
page = r.text
# print(r.text)
soup = BeautifulSoup(page, "html.parser")
# 利用bs4获得验证码图片地址
try:
img_src = soup.find(\'img\', {\'id\': \'captcha_image\'}).get(\'src\')
urlretrieve(img_src, \'captcha.jpg\')
im = Image.open(\'captcha.jpg\')
im.show()
im.close()
except:
print(\'到本地目录打开captcha.jpg获取验证码\')
finally:
captcha = input(\'please input the captcha:\')
remove(\'captcha.jpg\')
captcha_id = soup.find(
\'input\', {\'type\': \'hidden\', \'name\': \'captcha-id\'}).get(\'value\')
return captcha, captcha_id
def isLogin():
\'\'\'
通过查看用户个人账户信息来判断是否已经登录
\'\'\'
url = "https://www.douban.com/accounts/"
login_code = session.get(url, headers=headers,
allow_redirects=False).status_code
if login_code == 200:
return True
else:
return False
def login():
captcha, captcha_id = get_captcha()
# 增加表数据
datas[\'captcha-solution\'] = captcha
datas[\'captcha-id\'] = captcha_id
login_page = session.post(url, data=datas, headers=headers)
page = login_page.text
soup = BeautifulSoup(page, "html.parser")
result = soup.findAll(\'div\', attrs={\'class\': \'title\'})
#进入豆瓣登陆后页面,打印热门内容
for item in result:
print(item.find(\'a\').get_text())
# 保存 cookies 到文件,
# 下次可以使用 cookie 直接登录,不需要输入账号和密码
session.cookies.save()
if __name__ == \'__main__\':
if isLogin():
print(\'Login successfully\')
else:
login()
模拟登录微博_(用户名base64加密,然后预登陆获取一些参数,密码加密,验证码,session.post)
import time
import base64
import rsa
import binascii
import requests
import re
import random
try:
from PIL import Image
except:
pass
try:
from urllib.parse import quote_plus
except:
from urllib import quote_plus
\'\'\'
如果没有开启登录保护,不用输入验证码就可以登录
如果开启登录保护,需要输入验证码
\'\'\'
# 构造 Request headers
agent = \'Mozilla/5.0 (Windows NT 6.3; WOW64; rv:41.0) Gecko/20100101 Firefox/41.0\'
headers = {
\'User-Agent\': agent
}
session = requests.session()
# 访问 初始页面带上 cookie
index_url = "http://weibo.com/login.php"
try:
session.get(index_url, headers=headers, timeout=2)
except:
session.get(index_url, headers=headers)
try:
input = raw_input
except:
pass
def get_su(username):
"""
对 email 地址和手机号码 先 javascript 中 encodeURIComponent
对应 Python 3 中的是 urllib.parse.quote_plus
然后在 base64 加密后decode
"""
username_quote = quote_plus(username)
username_base64 = base64.b64encode(username_quote.encode("utf-8"))
return username_base64.decode("utf-8")
# 预登陆获得 servertime, nonce, pubkey, rsakv
def get_server_data(su):
pre_url = "http://login.sina.com.cn/sso/prelogin.php?entry=weibo&callback=sinaSSOController.preloginCallBack&su="
pre_url = pre_url + su + "&rsakt=mod&checkpin=1&client=ssologin.js(v1.4.18)&_="
pre_url = pre_url + str(int(time.time() * 1000))
pre_data_res = session.get(pre_url, headers=headers)
sever_data = eval(pre_data_res.content.decode("utf-8").replace("sinaSSOController.preloginCallBack", \'\'))
return sever_data
# print(sever_data)
def get_password(password, servertime, nonce, pubkey):
rsaPublickey = int(pubkey, 16)
key = rsa.PublicKey(rsaPublickey, 65537) # 创建公钥
message = str(servertime) + \'\t\' + str(nonce) + \'\n\' + str(password) # 拼接明文js加密文件中得到
message = message.encode("utf-8")
passwd = rsa.encrypt(message, key) # 加密
passwd = binascii.b2a_hex(passwd) # 将加密信息转换为16进制。
return passwd
def get_cha(pcid):
cha_url = "http://login.sina.com.cn/cgi/pin.php?r="
cha_url = cha_url + str(int(random.random() * 100000000)) + "&s=0&p="
cha_url = cha_url + pcid
cha_page = session.get(cha_url, headers=headers)
with open("cha.jpg", \'wb\') as f:
f.write(cha_page.content)
f.close()
try:
im = Image.open("cha.jpg")
im.show()
im.close()
except:
print(u"请到当前目录下,找到验证码后输入")
def login(username, password):
# su 是加密后的用户名
su = get_su(username)
sever_data = get_server_data(su)
servertime = sever_data["servertime"]
nonce = sever_data[\'nonce\']
rsakv = sever_data["rsakv"]
pubkey = sever_data["pubkey"]
showpin = sever_data["showpin"]
password_secret = get_password(password, servertime, nonce, pubkey)
postdata = {
\'entry\': \'weibo\',
\'gateway\': \'1\',
\'from\': \'\',
\'savestate\': \'7\',
\'useticket\': \'1\',
\'pagerefer\': "http://login.sina.com.cn/sso/logout.php?entry=miniblog&r=http%3A%2F%2Fweibo.com%2Flogout.php%3Fbackurl",
\'vsnf\': \'1\',
\'su\': su,
\'service\': \'miniblog\',
\'servertime\': servertime,
\'nonce\': nonce,
\'pwencode\': \'rsa2\',
\'rsakv\': rsakv,
\'sp\': password_secret,
\'sr\': \'1366*768\',
\'encoding\': \'UTF-8\',
\'prelt\': \'115\',
\'url\': \'http://weibo.com/ajaxlogin.php?framelogin=1&callback=parent.sinaSSOController.feedBackUrlCallBack\',
\'returntype\': \'META\'
}
login_url = \'http://login.sina.com.cn/sso/login.php?client=ssologin.js(v1.4.18)\'
if showpin == 0:
login_page = session.post(login_url, data=postdata, headers=headers)
else:
pcid = sever_data["pcid"]
get_cha(pcid)
postdata[\'door\'] = input(u"请输入验证码")
login_page = session.post(login_url, data=postdata, headers=headers)
login_loop = (login_page.content.decode("GBK"))
# print(login_loop)
pa = r\'location\.replace\([\\'"](.*?)[\\'"]\)\'
loop_url = re.findall(pa, login_loop)[0]
# print(loop_url)
# 此出还可以加上一个是否登录成功的判断,下次改进的时候写上
login_index = session.get(loop_url, headers=headers)
uuid = login_index.text
uuid_pa = r\'"uniqueid":"(.*?)"\'
uuid_res = re.findall(uuid_pa, uuid, re.S)[0]
web_weibo_url = "http://weibo.com/%s/profile?topnav=1&wvr=6&is_all=1" % uuid_res
weibo_page = session.get(web_weibo_url, headers=headers)
weibo_pa = r\'<title>(.*?)</title>\'
# print(weibo_page.content.decode("utf-8"))
userID = re.findall(weibo_pa, weibo_page.content.decode("utf-8", \'ignore\'), re.S)[0]
print(u"欢迎你 %s, 登陆成功" % userID)
if __name__ == "__main__":
# username = input(u\'用户名:\')
username = \'烦心雨\'
# password = input(u\'密码:\')
password = \'58591063\'
login(username, password)
from selenium import webdriver #selenium登录微信手动扫码保存cookie 到json文件
import time
browser = webdriver.Chrome()
url=\'https://weixin.sogou.com/\'
browser.get(url)
# browser.maximize_window()
time.sleep(1)
try:
browser.find_element_by_xpath(\'//*[@id="loginBtn"]\').click()
browser.delete_all_cookies()
time.sleep(10)
dictCookies=browser.get_cookies()
import json
jsonCookies = json.dumps(dictCookies)
# 登录完成后,将cookie保存到本地文件
with open(\'cookies111.json\', \'w\') as f:
f.write(jsonCookies)
except Exception as e:
pass
import json #用RequestsCookieJar读取json文件的cookie
from requests.cookies import RequestsCookieJar
# 读取cookie
with open(\'cookies.json\', \'r\', encoding=\'utf-8\') as f:
listCookies = json.loads(f.read())
jar = RequestsCookieJar()
for cookie in listCookies:
jar.set(cookie[\'name\'], cookie[\'value\'])
ck=dict(jar)
print(ck)
cookie获取与保持
import requests #获取验证码图片ydm,session.post登录保持cookie
from lxml import etree
from urllib import request
headers = {
\'User-Agent\':\'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36\'
}
session = requests.Session()
url = \'https://so.gushiwen.org/user/login.aspx?from=http://so.gushiwen.org/user/collect.aspx\'
page_text = session.get(url=url,headers=headers).text
tree = etree.HTML(page_text)
code_src = "https://so.gushiwen.org"+tree.xpath(\'//*[@id="imgCode"]/@src\')[0]
# request.urlretrieve(url=code_src,filename=\'./gushiwen.jpg\')
img_data = session.get(url=code_src,headers=headers).content
with open(\'./gushiwen.jpg\',\'wb\') as fp:
fp.write(img_data)
code_text = use_ydm(\'./gushiwen.jpg\')
print(code_text)
__VIEWSTATE = tree.xpath(\'//*[@id="__VIEWSTATE"]/@value\')[0]
__VIEWSTATEGENERATOR = tree.xpath(\'//*[@id="__VIEWSTATEGENERATOR"]/@value\')[0]
login_url = \'https://so.gushiwen.org/user/login.aspx?from=http%3a%2f%2fso.gushiwen.org%2fuser%2fcollect.aspx\'
data = {
"__VIEWSTATE":__VIEWSTATE,
"__VIEWSTATEGENERATOR":__VIEWSTATEGENERATOR,
"from":"http://so.gushiwen.org/user/collect.aspx",
"email":"www.zhangbowudi@qq.com",
"pwd":"bobo328410948",
"code":code_text,
"denglu":"登录"
}
page_text = session.post(url=login_url,headers=headers,data=data).text
with open(\'./gushiwen.html\',\'w\',encoding=\'utf-8\') as fp:
fp.write(page_text)
import requests #获取人人网验证码图片ydm,session.post登录保持cookie
from lxml import etree
from urllib import request
#获取一个session对象
seesion = requests.Session()
#session对象和requests作用几乎一样,都可以进行请求的发送,并且请求发送的方式也是一致的,
#session进行请求的发送,如果会产生cookie的话,则cookie会自动被存储到session对象中
#1.获取验证码图片:
url = \'http://www.renren.com/\'
headers = {
\'User-Agent\':\'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36\'
}
page_text = requests.get(url=url,headers=headers).text
#将验证码图片解析出来且进行持久化存储
tree = etree.HTML(page_text)
code_img_src = tree.xpath(\'//*[@id="verifyPic_login"]/@src\')[0]
request.urlretrieve(url=code_img_src,filename=\'./code.jpg\')
code = use_ydm(filename=\'./code.jpg\') #调取验证码接口
# #模拟登录
login_url = \'http://www.renren.com/ajaxLogin/login?1=1&uniqueTimestamp=201903101254\'
data = {
"email":"www.zhangbowudi@qq.com",
"icode":code,
"origURL":"http://www.renren.com/home",
"domain":"renren.com",
"key_id":"1",
"captcha_type":"web_login",
"password":"26db441b7bfd4e4d11a60c1263b5289ebdb3eb9922e7cd8666db02ad9614e087",
"rkey":"b5fef57ad0b3934e38e66ad5da5c19a8",
"f":"http%3A%2F%2Fwww.renren.com%2F289676607",
}
#进行登录,当登录成功之后,可以获取cookie
#cookie就会被存储到session中
response = seesion.post(url=login_url,headers=headers,data=data)
#对登录成功后对应的当前用户的个人详情页进行请求发送
detail_url = \'http://www.renren.com/289676607/profile\'
#该次请求使用的是session对象,该请求已经携带了cookie
page_text = seesion.get(url=detail_url,headers=headers).text
with open(\'./renren.html\',\'w\',encoding=\'utf-8\') as fp:
fp.write(page_text)
print(\'over\')
抽屉热搜榜点赞,先获取cookie ,带着cookie去post认证cookie,然后使用这个cookie
查看首页requests.get获取cookie
r1=requests.get(
url=\'https://dig.chouti.com/\',
headers={
\'user-agent\': \'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36\'
})
print(r1.cookies.get_dict())
提交用户名和密码登录
r2=requests.post(
url=\'https://dig.chouti.com/login\',
headers={
\'user-agent\': \'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36\'
},
data={
\'phone\': \'8618916826131\',
\'password\': \'a123456\',
\'oneMonth\': 1
},
cookies=r1.cookies.get_dict())
print(r2.text)
点赞
r3=requests.post(
url=\'https://dig.chouti.com/link/vote?linksId=23783561\',
headers={
\'user-agent\': \'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36\'
},
cookies=r1.cookies.get_dict())
print(r3.text)
Appium:
from appium import webdriver #36氪新闻 滑动抓取
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
server = \'http://localhost:4723/wd/hub\'
desired_caps = {
"platformName": "Android",
"deviceName": "Redmi_Note_3",
"appPackage": "com.android36kr.app",
"appActivity": ".ui.MainActivity"
}
print(\'111\')
driver = webdriver.Remote(server, desired_caps)
print(\'222\')
wait = WebDriverWait(driver,30)
print(\'333\')
kuaixun = wait.until(EC.presence_of_element_located((By.XPATH, \'/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.FrameLayout/android.widget.FrameLayout[1]/android.view.ViewGroup/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.HorizontalScrollView/android.widget.LinearLayout/android.widget.TextView[3]\')))
kuaixun.click()
FLICK_START_X = 300
FLICK_START_Y = 300
FLICK_DISTANCE = 700
while True:
driver.swipe(FLICK_START_X, FLICK_START_Y + FLICK_DISTANCE, FLICK_START_X, FLICK_START_Y)
time.sleep(2)
from appium import webdriver #考研帮登录及模拟滑动
import time
from selenium.webdriver.support.ui import WebDriverWait
#需要安装客户端的包
#pip3 install Appium-Python-Client
cap = {
"platformName": "Android",
"deviceName": "bb43c0d0",
# "deviceName": "127.0.0.1:62001",
"platformVersion": "6.0.1",
# "platformVersion": "5.1.1",
"appPackage": "com.tal.kaoyan",
"appActivity": "ui.activity.SplashActivity",
"noReset": True
}
driver = webdriver.Remote("http://localhost:4723/wd/hub",cap)
# while True:
# time.sleep(2)
# driver.tap([(535, 940), (540, 950)], 1500)
# time.sleep(2)
def get_size():
x = driver.get_window_size()[\'width\']
y = driver.get_window_size()[\'height\']
return(x,y)
try:
#是否跳过
if WebDriverWait(driver,3).until(lambda x:x.find_element_by_xpath("//android.widget.TextView[@resource-id=\'com.tal.kaoyan:id/tv_skip\']")):
driver.find_element_by_xpath("//android.widget.TextView[@resource-id=\'com.tal.kaoyan:id/tv_skip\']").click()
except:
pass
try:
#登录
# if WebDriverWait(driver,3).until(lambda x:x.find_element_by_xpath("//android.widget.EditText[@resource-id=\'com.tal.kaoyan:id/login_email_edittext\']")):
if WebDriverWait(driver,3).until(lambda x:x.find_element_by_xpath("//android.widget.EditText[@resource-id=\'com.tal.kaoyan:id/login_email_edittext\']")):
driver.find_element_by_xpath("//android.widget.EditText[@resource-id=\'com.tal.kaoyan:id/login_email_edittext\']").send_keys("a349553119")
# driver.find_element_by_xpath("//android.widget.EditText[@resource-id=\'com.tal.kaoyan:id/login_email_edittext\']").send_keys("a349553119")
driver.find_element_by_xpath("//android.widget.EditText[@resource-id=\'com.tal.kaoyan:id/login_password_edittext\']").send_keys("a123456")
driver.find_element_by_xpath("//android.widget.Button[@resource-id=\'com.tal.kaoyan:id/login_login_btn\']").click()
except:
pass
try:
#隐私协议
if WebDriverWait(driver,3).until(lambda x:x.find_element_by_xpath("//android.widget.TextView[@resource-id=\'com.tal.kaoyan:id/tv_title\']")):
driver.find_element_by_xpath("//android.widget.TextView[@resource-id=\'com.tal.kaoyan:id/tv_agree\']").click()
driver.find_element_by_xpath("//android.support.v7.widget.RecyclerView[@resource-id=\'com.tal.kaoyan:id/date_fix\']/android.widget.RelativeLayout[3]").click()
except:
pass
#点击研讯
if WebDriverWait(driver,3).until(lambda x:x.find_element_by_xpath("//android.support.v7.widget.RecyclerView[@resource-id=\'com.tal.kaoyan:id/date_fix\']/android.widget.RelativeLayout[3]/android.widget.LinearLayout[1]/android.widget.ImageView[1]")):
driver.find_element_by_xpath("//android.support.v7.widget.RecyclerView[@resource-id=\'com.tal.kaoyan:id/date_fix\']/android.widget.RelativeLayout[3]/android.widget.LinearLayout[1]/android.widget.ImageView[1]").click()
l = get_size()
x1 = int(l[0]*0.5) #滑动位置
y1 = int(l[1]*0.75) #从哪滑动
y2 = int(l[1]*0.25) #滑到哪个位置
#滑动操作
while True:
driver.swipe(x1,y1,x1,y2)
time.sleep(0.5)
from appium import webdriver #模拟登录微信,模拟滑动,抓取朋友圈信息
import os
from appium.webdriver.common.touch_action import TouchAction
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from pymongo import MongoClient
from time import sleep
from processor import Processor
from config import *
class Moments():
def __init__(self):
"""
初始化
"""
# 驱动配置
self.desired_caps = {
\'platformName\': PLATFORM,
\'deviceName\': DEVICE_NAME,
\'appPackage\': APP_PACKAGE,
\'appActivity\': APP_ACTIVITY
}
self.driver = webdriver.Remote(DRIVER_SERVER, self.desired_caps)
self.wait = WebDriverWait(self.driver, TIMEOUT)
self.client = MongoClient(MONGO_URL)
self.db = self.client[MONGO_DB]
self.collection = self.db[MONGO_COLLECTION]
# 处理器
self.processor = Processor()
def login(self):
"""
登录微信
:return:
"""
#权限
try:
if self.wait.until(EC.presence_of_element_located((By.ID, \'android:id/button1\'))):
self.wait.until(EC.presence_of_element_located((By.ID, \'android:id/button1\'))).click()
except:pass
# 登录按钮
login = self.wait.until(EC.presence_of_element_located((By.ID, \'com.tencent.mm:id/e4g\')))
login.click()
# 手机输入
phone = self.wait.until(EC.presence_of_element_located((By.ID, \'com.tencent.mm:id/kh\')))
phone.set_text(USERNAME)
# 下一步
next = self.wait.until(EC.element_to_be_clickable((By.ID, \'com.tencent.mm:id/axt\')))
next.click()
# 密码
password = self.wait.until(EC.element_to_be_clickable((By.XPATH, "//android.widget.LinearLayout[@resource-id=\'com.tencent.mm:id/d_t\']/android.widget.EditText[1]")))
# password = self.wait.until(EC.presence_of_element_located((By.XPATH, \'//*[@resource-id="com.tencent.mm:id/h2"][1]\')))
password.set_text(PASSWORD)
# 提交
submit = self.wait.until(EC.element_to_be_clickable((By.ID, \'com.tencent.mm:id/axt\')))
submit.click()
# 通讯录
try:
if self.wait.until(EC.presence_of_element_located((By.ID, \'com.tencent.mm:id/az9\'))):
self.wait.until(EC.presence_of_element_located((By.ID, \'com.tencent.mm:id/az9\'))).click()
except:
pass
# 字体
try:
if self.wait.until(EC.presence_of_element_located((By.ID, \'com.tencent.mm:id/az9\'))):
self.wait.until(EC.presence_of_element_located((By.ID, \'com.tencent.mm:id/az9\'))).click()
except:
pass
def enter(self):
"""
进入朋友圈
:return:
"""
# 选项卡
# tab = self.wait.until(EC.presence_of_element_located((By.XPATH, \'//*[@resource-id="com.tencent.mm:id/bw3"][3]\')))
tab = self.wait.until(EC.presence_of_element_located((By.XPATH, "//android.widget.TextView[@text=\'发现\']")))
tab.click()
# 朋友圈
moments = self.wait.until(EC.presence_of_element_located((By.XPATH, "//android.widget.TextView[@text=\'朋友圈\']")))
moments.click()
def crawl(self):
"""
爬取
:return:
"""
print(\'开始打印朋友圈\')
while True:
# 当前页面显示的所有状态
items = self.wait.until(
EC.presence_of_all_elements_located(
(By.ID, \'com.tencent.mm:id/ej9\')))
# 上滑
self.driver.swipe(FLICK_START_X, FLICK_START_Y + FLICK_DISTANCE, FLICK_START_X, FLICK_START_Y)
# 遍历每条状态
for item in items:
try:
# 昵称
nickname = item.find_element_by_id(\'com.tencent.mm:id/b5o\').get_attribute(\'text\')
# 正文
content = item.find_element_by_id(\'com.tencent.mm:id/ejc\').get_attribute(\'text\')
# 日期
pubdate = item.find_element_by_id(\'com.tencent.mm:id/eec\').get_attribute(\'text\')
# 处理日期
date = self.processor.handlerdate(pubdate)
print(nickname, content, date)
item = {
\'nickname\': nickname,
\'content\': content,
\'date\': date,
}
# 插入MongoDB
self.collection.update({\'nickname\': nickname, \'content\': content}, {\'$set\': item}, True)
sleep(SCROLL_SLEEP_TIME)
except NoSuchElementException:
pass
def main(self):
"""
入口
:return:
"""
# 登录
self.login()
# 进入朋友圈
self.enter()
# 爬取
self.crawl()
if __name__ == \'__main__\':
moments = Moments()
moments.main()
mitmproxy
import json #36ke新闻
def response(flow):
url = \'https://36kr.com/lapi/info-flow/newsflash_columns/newsflashes\'
if flow.request.url.startswith(url):
data_list = json.loads(flow.response.text)[\'data\'][\'items\']
for data in data_list:
# print(data)
title = data[\'title\']
content = data[\'description\']
print(title)
print(content)
高性能
from concurrent.futures import ThreadPoolExecutor #xiaohua(开启线程池)
import requests
import re
pool = ThreadPoolExecutor(50)
# 爬虫三部曲
# 一 发送请求
def get_page(url):
print(\'%s GET start ...\' % url)
index_res = requests.get(url)
return index_res.text
# 二 解析数据
# 解析主页
def parse_index(index_page):
# 拿到主页的返回结果
res = index_page.result()
detail_urls = re.findall(\'<div class="items">.*?href="(.*?)"\', res, re.S)
# print(detail_urls)
for detail_url in detail_urls:
if not detail_url.startswith(\'http\'):
detail_url = \'http://www.xiaohuar.com\' + detail_url
pool.submit(get_page, detail_url).add_done_callback(parse_detail)
# yield detail_url
# 解析详情页
def parse_detail(detail_page):
res = detail_page.result()
video_urls = re.findall(\'id="media".*?src="(.*?)"\', res, re.S)
if video_urls:
video_urls = video_urls[0]
if video_urls.endswith(\'.mp4\'):
print(video_urls)
pool.submit(save_video, video_urls)
# print(video_urls)
# 三 保存数据
import uuid
def save_video(video_url):
try:
res = requests.get(video_url)
with open(r\'movies\%s.mp4\' % uuid.uuid4(), \'wb\') as f:
f.write(res.content)
f.flush()
print(\'%s done ...\' % video_url)
except Exception:
pass
if __name__ == \'__main__\':
base_url = \'http://www.xiaohuar.com/list-3-{}.html\'
for line in range(1):
print(\'第几页{}\'.format(line))
index_url = base_url.format(line)
pool.submit(get_page, index_url).add_done_callback(parse_index)
import requests #多进程爬取梨视频
from lxml import etree
import re
import random
from multiprocessing.dummy import Pool
pool = Pool(5)
headers = {
\'User-Agent\': \'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36\'
}
url = \'https://www.pearvideo.com/category_8\'
page_text = requests.get(url=url, headers=headers).text
tree = etree.HTML(page_text)
li_list = tree.xpath(\'//div[@id="listvideoList"]/ul/li\')
video_url_list = [] # 存放的是视频对应的url
#获取detail url
for li in li_list:
detail_page_url = \'https://www.pearvideo.com/\' + li.xpath(\'.//a/@href\')[0]
#获取video url
detail_page_text = requests.get(url=detail_page_url, headers=headers).text
video_url = re.findall(\'srcUrl="(.*?)",\', detail_page_text, re.S)[0]
# 耗时操作可以使用并发机制
video_url_list.append(video_url)
# 并发下载视频
downloadVideo = lambda link: requests.get(url=link, headers=headers).content
# map返回的列表中存储的就是下载完毕的视频二进制的数据值
video_data_list = pool.map(downloadVideo, video_url_list)
#保存视频
def saveVideo(data):
name = str(random.randint(0,10000))+\'.mp4\'
with open(name,\'wb\') as fp:
fp.write(data)
print(name+\'下载成功\')
pool.map(saveVideo, video_data_list)
pool.close()
pool.join()
#事件循环+回调(驱动生成器)+epoll(IO多路复用)
#asyncio是python用于解决异步io编程的一整套解决方案
#tornado、gevent、twisted(scrapy, django channels)
#torando(实现web服务器), django+flask(uwsgi, gunicorn+nginx)
#tornado可以直接部署, nginx+tornado
aiohttp
import aiohttp #aiohttp
import asyncio
import ssl
async def fetch(session, url):
async with session.get(
url,
ssl=ssl.SSLContext()
) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
html = await fetch(session, \'http://www.baidu.com\')
# print(html)
print(len(html))
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
import asyncio # 通过gather方法
async def a(t):
print(\'-->\', t)
await asyncio.sleep(0.5)
print(\'<--\', t)
return t * 10
def main():
futs = [a(t) for t in range(6)]
print(futs)
ret = asyncio.gather(*futs)
print(ret)
loop = asyncio.get_event_loop()
ret1 = loop.run_until_complete(ret)
print(ret1)
main()
import asyncio # 通过create_task()方法
async def a(t):
print(\'-->\', t)
await asyncio.sleep(0.5)
print(\'<--\', t)
return t * 10
async def b():
# loop = asyncio.get_event_loop()
cnt = 0
while 1:
cnt += 1
cor = a(cnt) # coroutine
resp = loop.create_task(cor)
await asyncio.sleep(0.1)
# print(resp)
loop = asyncio.get_event_loop()
loop.run_until_complete(b())
import asyncio #获取协程的返回值
import time
from functools import partial
async def get_html(url):
print("start get url")
await asyncio.sleep(2)
return "bobby"
def callback(url, future):
print(url)
print("send email to bobby")
if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
# get_future = asyncio.ensure_future(get_html("http://www.imooc.com"))
task = loop.create_task(get_html("http://www.imooc.com"))
# task.add_done_callback(callback) #回调函数没有参数的情况下
task.add_done_callback(partial(callback, "http://www.imooc.com")) #回调函数有参数的情况下用partial(),from functools import partial
loop.run_until_complete(task)
print(task.result())
import asyncio #gather和wait的区别,gather更加high-level
import time
# async def get_html(url):
# print("start get url")
# await asyncio.sleep(2)
# print("end get url")
#
# if __name__ == "__main__":
# start_time = time.time()
# loop = asyncio.get_event_loop()
# tasks = [get_html("http://www.imooc.com") for i in range(10)]
# # loop.run_until_complete(asyncio.wait(tasks))
# # loop.run_until_complete(asyncio.gather(*tasks))
# # print(time.time()-start_time)
#
# #gather和wait的区别
# #gather更加high-level
# group1 = [get_html("http://projectsedu.com") for i in range(2)]
# group2 = [get_html("http://www.imooc.com") for i in range(2)]
# group1 = asyncio.gather(*group1)
# group2 = asyncio.gather(*group2)
# # group2.cancel()
# loop.run_until_complete(asyncio.gather(group1, group2))
// print(time.time() - start_time)
from aiohttp_socks import SocksConnector #socks5代理请求,两种方式
import aiohttp
async def get_html(url):
# 方法一:
connector = SocksConnector.from_url(\'socks5://localhost:1080\', rdns=True)
async with aiohttp.ClientSession(connector=connector) as session:
# 方法二:
# async with aiohttp.ClientSession() as session:
# session.proxies = {\'http\': \'socks5h://127.0.0.1:1080\',
# \'https\': \'socks5h://127.0.0.1:1080\'}
headers = {\'content-type\': \'image/gif\',
\'User-Agent\': \'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36\'
}
cookies = {\'cookies_are\': \'working\'}
# proxy = "http://127.0.0.1:1080"
with async_timeout.timeout(10):#设置请求的最长时间为10s
# async with sess.get(url, proxy="http://54.222.232.0:3128") as res:
async with session.get(url,headers=headers,cookies=cookies, verify_ssl=False) as res:
text = await res.text()
print(text)
import asyncio #使用多线程:在携程中集成阻塞io
from concurrent.futures import ThreadPoolExecutor
import socket
from urllib.parse import urlparse
def get_url(url):
#通过socket请求html
url = urlparse(url)
host = url.netloc
path = url.path
if path == "":
path = "/"
#建立socket连接
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# client.setblocking(False)
client.connect((host, 80)) #阻塞不会消耗cpu
#不停的询问连接是否建立好, 需要while循环不停的去检查状态
#做计算任务或者再次发起其他的连接请求
client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
data = b""
while True:
d = client.recv(1024)
if d:
data += d
else:
break
data = data.decode("utf8")
html_data = data.split("\r\n\r\n")[1]
print(html_data)
client.close()
if __name__ == "__main__":
import time
start_time = time.time()
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(3)
tasks = []
for url in range(20):
url = "http://shop.projectsedu.com/goods/{}/".format(url)
task = loop.run_in_executor(executor, get_url, url)
tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
print("last time:{}".format(time.time()-start_time))
import re #复杂性金融数据抓取,Queue+ProxyConnector
import ssl
import csv
import json
import time
import random
import asyncio
import aiohttp
import requests
from lxml import etree
from asyncio.queues import Queue
from aiosocksy import Socks5Auth
from aiosocksy.connector import ProxyConnector, ProxyClientRequest
class Common():
task_queue = Queue()
result_queue = Queue()
market_cap_all = 0
currency_rate = 0
# 线上内网
socks5_address_prod = [
\'socks5://10.1.100.253:1235\',
\'socks5://10.1.100.51:1235\',
\'socks5://10.1.100.70:1235\',
\'socks5://10.1.100.205:1235\',
\'socks5://10.1.100.73:1235\'
]
# 办公网
socks5_address_dev = [
\'socks5://18.208.81.123:1235\',
\'socks5://34.197.217.25:1235\',
\'socks5://52.20.255.43:1235\',
\'socks5://34.237.163.87:1235\',
\'socks5://18.208.81.123:1235\',
\'socks5://52.0.114.155:1235\'
]
DEPLOY_MODE = "dev"
async def session_get(session,url,socks):
auth = Socks5Auth(login=\'...\', password=\'...\')
headers = {\'User-Agent\': \'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)\'}
timeout = aiohttp.ClientTimeout(total=20)
response = await session.get(
url,
proxy=socks,
proxy_auth=auth,
timeout=timeout,
headers=headers,
ssl=ssl.SSLContext()
)
return await response.text(), response.status
async def download(url):
connector = ProxyConnector()
if DEPLOY_MODE == "dev":
socks = None
elif DEPLOY_MODE == "Prod":
socks = random.choice(socks5_address_prod)
async with aiohttp.ClientSession(
connector = connector,
request_class = ProxyClientRequest
) as session:
ret, status = await session_get(session, url, socks)
if \'window.location.href\' in ret and len(ret) < 1000:
url = ret.split("window.location.href=\'")[1].split("\'")[0]
ret, status = await session_get(session, url, socks)
return ret, status
async def parse_html(cid, url, response):
coin_info = {}
coin_value = {}
coin_info[\'url\'] = url
coin_info[\'cid\'] = cid
coin_info[\'time\'] = int(time.time())
tree = etree.HTML(response)
try:
price_usd = tree.xpath(
\'//div[@class="priceInfo"]/div[@class="sub"]/span[1]/text()\'
)[0].strip().replace(\'$\', \'\')
if \'?\' not in price_usd:
coin_value[\'price\'] = float(price_usd)
except:
pass
try:
updown = tree.xpath(
\'//div[@class="priceInfo"]/div[@class="sub smallfont"]/span[1]/text()\'
)[0].strip().replace(\'%\', \'\')
coin_value[\'updown\'] = float(updown)
except:
pass
try:
volume_24h_rmb = tree.xpath(
\'//div[@class="info"]/div[@class="charCell"][2]/div[2]/span/text()\'
)[0].strip().replace(\'¥\', \'\').replace(\',\', \'\')
coin_value[\'volume_24h\'] = int(float(volume_24h_rmb) / Common.currency_rate)
except:
pass
try:
circulating_supply = tree.xpath(
\'//div[@class="info"]//div[@class="charCell"][1]/div[@class="val"]/text()\'
)[0].strip().replace(\',\', \'\')
if \'?\' not in circulating_supply:
circulating_supply = re.match(r\'^(\d+)(\w+)$\', circulating_supply).group(1)
coin_value[\'circulating_supply\'] = int(circulating_supply)
except:
pass
try:
if coin_value[\'price\'] and coin_value[\'circulating_supply\']:
market_cap = coin_value[\'price\'] * coin_value[\'circulating_supply\']
coin_value[\'market_cap\'] = market_cap
except:
pass
try:
if coin_value[\'market_cap\']:
global_share = coin_value[\'market_cap\'] / Common.market_cap_all
if global_share < 0.001:
coin_value[\'global_share\'] = \'<0.1%\'
else:
coin_value[\'global_share\'] = str((global_share * 100).__round__(2)) + \'%\'
except:
pass
try:
circulation_rate = tree.xpath(
\'//div[@class="info"]//div[@class="charbox"][1]/div[@class="val"]/text()\'
)[0].strip()
if \'?\' not in circulation_rate:
coin_value[\'circulation_rate\'] = circulation_rate
except:
pass
try:
turnover_rate = tree.xpath(
\'//div[@class="info"]//div[@class="charbox"][1]/div[@class="val"]/text()\'
)[1].strip()
if \'?\' not in turnover_rate:
coin_value[\'turnover_rate\'] = turnover_rate
except:
pass
try:
issue_time = tree.xpath(
\'//div[@class="infoList"]/div[1]/div[1]/span[2]/text()\'
)[0].strip()
if issue_time != \'-\':
coin_value[\'issue_time\'] = issue_time
except:
pass
try:
exchange_num = tree.xpath(
\'//div[@class="infoList"]/div[3]/div[1]/span[2]/text()\'
)[0].strip().replace(\'家\', \'\')
coin_value[\'exchange_num\'] = int(exchange_num)
except:
pass
try:
total_circulation = tree.xpath(
\'//div[@class="infoList"]/div[2]/div[2]/span[2]/text()\'
)[0].strip().replace(\',\', \'\')
coin_value[\'total_circulation\'] = int(total_circulation)
except:
pass
coin_info[\'value\'] = coin_value
return coin_info
async def down_and_parse_task(queue):
while 1:
try:
cid, url = queue.get_nowait()[:2]
except:
return
for retry_cnt in range(3):
try:
html, status = await download(url)
if status != 200:
html, status = await download(url)
if \'访问控制拒绝了你的请求\' in html:
html, status = await download(url)
html_parse_result = await parse_html(cid, url, html)
print(html_parse_result)
await Common.result_queue.put(html_parse_result)
break
except:
await asyncio.sleep(0.2)
continue
async def push(data):
url = \'http://127.0.0.1:8000/aaa\'
error = None
for retry_cnt in range(3):
try:
async with aiohttp.ClientSession() as session:
async with session.post(
url,
data=json.dumps(data)
) as response:
pass
response.raise_for_status()
except Exception as e:
await asyncio.sleep(0.2)
print(e)
async def speed_monitor():
while Common.task_queue.qsize() != 0:
old_queue_len = Common.task_queue.qsize()
await asyncio.sleep(5)
new_queue_count = Common.task_queue.qsize()
print(\'=================\')
print(\'speed = \', (old_queue_len - new_queue_count) / 5)
async def monitor_finish():
while len(asyncio.Task.all_tasks()) > 3:
await asyncio.sleep(1)
await asyncio.sleep(5)
raise SystemExit()
async def push_results():
temp_q = []
while 1:
try:
await asyncio.sleep(3)
for _ in range(Common.result_queue.qsize()):
temp_q.append(await Common.result_queue.get())
if len(temp_q) > 0:
await push(temp_q)
temp_q.clear()
except:
import traceback
print(traceback.format_exc())
async def get_marketcap():
url = \'https://dncapi.feixiaohao.com/api/home/global?webp=0\'
response = requests.get(url)
response_json = json.loads(response.text)
marketcap = response_json[\'data\'][\'marketcapvol\']
Common.market_cap_all = int(marketcap) #总市值
async def get_currency_rate():
url_rate = \'https://dncapi.feixiaohao.com/api/coin/web-rate/\'
response = requests.get(url_rate)
currency_rate = json.loads(response.text)[\'data\'][11][\'cny\']
Common.currency_rate = currency_rate #汇率
# 300秒抓取时间上限
async def time_limit():
await asyncio.sleep(280)
raise SystemExit()
async def main():
# loop = asyncio.get_event_loop()
csv_reader = csv.reader(open(\'feixiaohao_mapping_data.csv\', encoding=\'utf-8\'))
for row in csv_reader:
try:
if row[1].startswith(\'https\'):
await Common.task_queue.put(row)
except:
pass
print(Common.task_queue)
await get_marketcap()
print(\'总市值\', Common.market_cap_all)
await get_currency_rate()
print(\'汇率\', Common.currency_rate)
for _ in range(10):
loop.create_task(down_and_parse_task(Common.task_queue))
loop.create_task(monitor_finish())
loop.create_task(speed_monitor())
loop.create_task(push_results())
loop.create_task(time_limit())
if __name__ == \'__main__\':
loop= asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()
import asyncio #获取简书总阅读量 mian,请求,解析,get_all_article_links,queue的放入和取出,
import re
import aiohttp
import requests
import ssl
from lxml import etree
from asyncio.queues import Queue
from aiosocksy import Socks5Auth
from aiosocksy.connector import ProxyConnector, ProxyClientRequest
# 用asyncio和aiohttp抓取博客的总阅读量 (提示:先用接又找到每篇文章的链接)
# https://www.jianshu.com/u/130f76596b02
class Common():
task_queue = Queue()
result_queue = Queue()
result_queue_1 = []
async def session_get(session,url,socks):
auth = Socks5Auth(login=\'...\', password=\'...\')
headers = {\'User-Agent\': \'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)\'}
timeout = aiohttp.ClientTimeout(total=20)
response = await session.get(
url,
proxy=socks,
proxy_auth=auth,
timeout=timeout,
headers=headers,
ssl=ssl.SSLContext()
)
return await response.text(), response.status
async def download(url):
connector = ProxyConnector()
socks = None
async with aiohttp.ClientSession(
connector = connector,
request_class = ProxyClientRequest
) as session:
ret, status = await session_get(session, url, socks)
if \'window.location.href\' in ret and len(ret) < 1000:
url = ret.split("window.location.href=\'")[1].split("\'")[0]
ret, status = await session_get(session, url, socks)
return ret, status
async def parse_html(content):
read_num_pattern = re.compile(r\'"views_count":\d+\')
read_num = int(read_num_pattern.findall(content)[0].split(\':\')[-1])
return read_num
def get_all_article_links():
links_list = []
for i in range(1,21):
url = \'https://www.jianshu.com/u/130f76596b02?order_by=shared_at&page={}\'.format(i)
header = {
\'Accept\': \'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8\',
\'User-Agent\': \'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 \'
\'(KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36\'
}
response = requests.get(url,
headers=header,
timeout=5
)
tree = etree.HTML(response.text)
article_links = tree.xpath(\'//div[@class="content"]/a[@class="title"]/@href\')
for item in article_links:
article_link = \'https://www.jianshu.com\' + item
links_list.append(article_link)
print(article_link)
return links_list
async def down_and_parse_task(queue):
while 1:
try:
url = queue.get_nowait()
except:
return
error = None
for retry_cnt in range(3):
try:
html, status = await download(url)
if status != 200:
html, status = await download(url)
read_num = await parse_html(html)
print(read_num)
# await Common.result_queue.put(read_num)
Common.result_queue_1.append(read_num)
break
except Exception as e:
error = e
await asyncio.sleep(0.2)
continue
else:
raise error
async def count_sum():
while 1:
try:
print(Common.result_queue_1)
print(\'总阅读量 = \',sum(Common.result_queue_1))
await asyncio.sleep(3)
except:
pass
async def main():
all_links = get_all_article_links()
for item in set(all_links):
await Common.task_queue.put(item)
for _ in range(10):
loop.create_task(down_and_parse_task(Common.task_queue))
loop.create_task(count_sum())
if __name__ == \'__main__\':
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()
import aiohttp #爬取美图保存本地,面向对象,do,getURLs,getHTMLText,getImageURLs,download_img,save_img,
import asyncio
from pyquery import PyQuery as PQ
import os
from time import time
class Spider(object):
def __init__(self,n=10):
self.headers = {
\'user-agent\': \'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36\'
}
path=\'./download/mzitu\'
if not os.path.exists(path):
os.mkdir(path)
self.path=path
else:
self.path=path
self.n=n
self.num=1
async def getURLs(self,n:int):
url,urls=\'http://www.mzitu.com/page/%d/\'%(n),[]
try:
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False)) as session:
response=await session.get(url=url,headers=self.headers,timeout=60)
text=await response.text()
pq=PQ(text)
for inf in pq(\'[id="pins"]\')(\'li\'):
inf=PQ(inf)
dic={
\'url\':str(inf(\'a\').attr(\'href\')),
\'title\':str(inf(\'img\').attr(\'alt\'))
}
urls.append(dic)
except Exception as e:
print(e.args)
finally:
await session.close()
return urls
async def getHTMLText(self,url:str):
text=""
try:
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False)) as session:
async with session.get(url=url,headers=self.headers,timeout=60) as r:
text=await r.text()
except Exception as e:
print(e.args)
finally:
await session.close()
return text
async def save_img(self,url:str):
content=""
try:
async with aiohttp.ClientSession() as session:
response=await session.get(url=url,headers=self.headers,timeout=60)
content=await response.read()
except Exception as e:
print(e.args)
finally:
await session.close()
return content
async def getImageURLs(self,dic:dict,n=10):
urls=[]
try:
for page in range(1,n+1):
url=dic[\'url\']+\'/%d\'%(page)
text=await self.getHTMLText(url)
pq=PQ(text)
inf=pq(\'[class="main-image"]\')(\'img\')
_dic={
\'id\':self.num,
\'headers\':{
\'Referer\': dic[\'url\'],
\'User-Agent\': \'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36\'
},
\'url\':str(inf.attr(\'src\')),
\'title\':str(dic[\'title\'])
}
self.num+=1
urls.append(_dic)
except Exception as e:
print(e.args)
finally:
return urls
async def download_img(self,dic:dict):
filename=str(int(dic[\'id\']/10)+1 if dic[\'id\']%10!=0 else int(dic[\'id\']/10))+dic[\'url\'].split(\'/\')[-1]
try:
if os.path.exists(self.path+\'/\'+filename):
print(\'第%d张图片下载失败,图片已存在\'%(dic[\'id\']))
else:
content = await self.save_img(dic[\'url\'])
with open(self.path+\'/\'+filename,\'wb\') as f:
f.write(content)
f.close()
print(\'成功下载第%d张图片\'%(dic[\'id\']))
except Exception as e:
print(e.args,1)
finally:
pass
async def do(self,n:int):
try:
urls = await self.getURLs(n)
for url in urls:
infs=await self.getImageURLs(url)
for inf in infs:
self.headers=inf[\'headers\']
await self.download_img(inf)
except Exception as e:
print(e.args)
finally:
pass
def run(self):
try:
tasks = [asyncio.ensure_future(self.do(n)) for n in range(1,self.n+1)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
except Exception as e:
print(e.args)
finally:
pass
def main():
start=time()
spider=Spider()
spider.run()
end=time()
print(\'耗时:\',end-start,\'s\')
if __name__ == \'__main__\':
main()
import asyncio #weibo美图保存本地,面向对象
import aiohttp
import os
from time import time
class Spider(object):
def __init__(self,uid:str):
self.uid=uid
self.page = 1
self.num=1
self.url=\'https://m.weibo.cn/api/container/getIndex\'
self.headers={
\'Accept\': \'application/json, text/plain, */*\',
\'MWeibo-Pwa\': \'1\',
\'User-Agent\': \'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36\',
\'X-Requested-With\': \'XMLHttpRequest\'
}
path=\'./download\'
if not os.path.exists(path):
os.mkdir(path)
os.chdir(path)
else:
os.chdir(path)
path=\'./weibo\'
if not os.path.exists(path):
os.mkdir(path)
os.chdir(path)
else:
os.chdir(path)
path=\'./\'+self.uid
if not os.path.exists(path):
os.mkdir(path)
os.chdir(path)
else:
os.chdir(path)
self.path=os.getcwd()
for i in range(3):
os.chdir(\'../\')
async def getURLs(self):
urls=[]
try:
self.params = {
\'uid\': self.uid,
\'luicode\': \'10000011\',
\'lfid\': \'230413\' + self.uid + \'_-_WEIBO_SECOND_PROFILE_WEIBO\',
\'containerid\': \'107603\' + self.uid,
\'page\': self.page
}
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False)) as session:
response=await session.get(url=self.url,params=self.params,headers=self.headers,timeout=60)
data=await response.json()
if \'msg\' in data.keys():
urls=[]
else:
for card in data[\'data\'][\'cards\']:
if card[\'card_type\'] == 9:
if \'pics\' in card[\'mblog\'].keys():
for pic in card[\'mblog\'][\'pics\']:
urls.append(pic[\'large\'][\'url\'])
else:
continue
else:
continue
except Exception as e:
print(e.args)
finally:
await session.close()
return urls
async def save_img(self,url:str):
content=""
try:
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False)) as sesson:
response=await sesson.get(url=url,headers=self.headers,timeout=60)
content=await response.read()
except Exception as e:
print(e.args)
finally:
await sesson.close()
return content
async def donwload_img(self,url:str):
filename=self.uid+\'-\'+str(self.num)+\'.jpg\'
try:
if os.path.exists(self.path+\'/\'+filename):
print(\'%s:第%d张图片下载失败,文件已存在\'%(self.uid,self.num))
else:
content=await self.save_img(url)
with open(self.path+\'/\'+filename,\'wb\') as f:
f.write(content)
f.close()
print(\'%s:成功下载第%d张图片,文件名:%s\'%(self.uid,self.num,filename))
self.num+=1
except Exception as e:
print(e.args)
finally:
pass
async def do(self):
try:
while True:
urls=await self.getURLs()
if urls==[]:
break
self.page+=1
for url in urls:
await self.donwload_img(url)
print(\'总共下载%d张图片\' % (self.num))
except Exception as e:
print(e.args)
finally:
pass
def run(self):
try:
tasks = [asyncio.ensure_future(self.do())]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
except Exception as e:
print(e.args)
finally:
pass
def main():
while True:
uid=input(\'uid:\')
start=time()
spider=Spider(uid)
spider.run()
end=time()
print(\'总共耗时%fs\'%(end-start))
t=input(\'继续?(Y/N)\')
if t in [\'Y\',\'y\']:
pass
elif t in [\'N\',\'n\']:
break
else:
print(\'输入有误!将退出!\')
break
if __name__ == \'__main__\':
main()
import aiohttp #异步方式爬取当当畅销书的图书信息,字段放入list,用pandas保存csv
import time
import asyncio
import pandas as pd
from bs4 import BeautifulSoup
# table表格用于储存书本信息
table = []
# 获取网页(文本信息)
async def fetch(session, url):
async with session.get(url) as response:
return await response.text(encoding=\'gb18030\')
# 解析网页
async def parser(html):
# 利用BeautifulSoup将获取到的文本解析成HTML
soup = BeautifulSoup(html, "lxml")
# 获取网页中的畅销书信息
book_list = soup.find(\'ul\', class_="bang_list clearfix bang_list_mode")(\'li\')
for book in book_list:
info = book.find_all(\'div\')
# 获取每本畅销书的排名,名称,评论数,作者,出版社
rank = info[0].text[0:-1]
name = info[2].text
comments = info[3].text.split(\'条\')[0]
author = info[4].text
date_and_publisher = info[5].text.split()
publisher = date_and_publisher[1] if len(date_and_publisher) >= 2 else \'\'
# 将每本畅销书的上述信息加入到table中
table.append([rank, name, comments, author, publisher])
# 处理网页
async def download(url):
async with aiohttp.ClientSession() as session:
html = await fetch(session, url)
await parser(html)
# 全部网页
urls = [\'http://bang.dangdang.com/books/bestsellers/01.00.00.00.00.00-recent7-0-0-1-%d\' % i for i in range(1, 26)]
# 统计该爬虫的消耗时间
print(\'#\' * 50)
t1 = time.time() # 开始时间
# 利用asyncio模块进行异步IO处理
loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(download(url)) for url in urls]
tasks = asyncio.gather(*tasks)
loop.run_until_complete(tasks)
# 将table转化为pandas中的DataFrame并保存为CSV格式的文件
df = pd.DataFrame(table, columns=[\'rank\', \'name\', \'comments\', \'author\', \'publisher\'])
df.to_csv(\'dangdangyibu.csv\', index=False)
t2 = time.time() # 结束时间
print(\'使用aiohttp,总共耗时:%s\' % (t2 - t1))
print(\'#\' * 50)
import asyncio #异步爬取链家(list队列,set去重),先循环放入urls_list,consumer,urls_list!=0,pop,fetch, extract_links,目的url放入links_detail_list...list=0时,等待,pop队列,handle_elements,fetch,add,extract_elements,save_to_database
import re
import aiohttp
from pyquery import PyQuery
import aiomysql
from lxml import etree
pool = \'\'
sem = asyncio.Semaphore(5) #用来控制并发数,不指定会全速运行
stop = False
headers = {
\'User-Agent\': \'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36\'
}
MAX_PAGE = 2
TABLE_NAME = \'data\' #数据表名
city = \'zh\' #城市简写
url = \'https://{}.lianjia.com/ershoufang/pg{}/\' #url地址拼接
urls = [] #所有页的url列表
links_detail = set() #爬取中的详情页链接的集合
crawled_links_detail = set() #爬取完成的链接集合,方便去重
async def fetch(url, session):
\'\'\'
aiohttp获取网页源码
\'\'\'
async with sem:
await asyncio.sleep(1)
try:
async with session.get(url, headers=headers, verify_ssl=False) as resp:
if resp.status in [200, 201]:
data = await resp.text()
return data
except Exception as e:
print(e)
def extract_links(source):
\'\'\'
提取出详情页的链接
\'\'\'
pq = PyQuery(source)
for link in pq.items("a"):
_url = link.attr("href")
if _url and re.match(\'https://.*?/\d+.html\', _url) and _url.find(\'{}.lianjia.com\'.format(city)):
links_detail.add(_url)
print(links_detail)
def extract_elements(source):
\'\'\'
提取出详情页里面的详情内容
\'\'\'
try:
dom = etree.HTML(source)
id = dom.xpath(\'//link[@rel="canonical"]/@href\')[0]
title = dom.xpath(\'//title/text()\')[0]
price = dom.xpath(\'//span[@class="unitPriceValue"]/text()\')[0]
information = dict(re.compile(\'<li><span class="label">(.*?)</span>(.*?)</li>\').findall(source))
information.update(title=title, price=price, url=id)
print(information)
asyncio.ensure_future(save_to_database(information, pool=pool))
except Exception as e:
print(\'解析详情页出错!\')
pass
async def save_to_database(information, pool):
\'\'\'
使用异步IO方式保存数据到mysql中
注:如果不存在数据表,则创建对应的表
\'\'\'
COLstr = \'\' # 列的字段
ROWstr = \'\' # 行字段
ColumnStyle = \' VARCHAR(255)\'
for key in information.keys():
COLstr = COLstr + \' \' + key + ColumnStyle + \',\'
ROWstr = (ROWstr + \'"%s"\' + \',\') % (information[key])
# 异步IO方式插入数据库
async with pool.acquire() as conn:
async with conn.cursor() as cur:
try:
await cur.execute("SELECT * FROM %s" % (TABLE_NAME))
await cur.execute("INSERT INTO %s VALUES (%s)"%(TABLE_NAME, ROWstr[:-1]))
print(\'插入数据成功\')
except aiomysql.Error as e:
await cur.execute("CREATE TABLE %s (%s)" % (TABLE_NAME, COLstr[:-1]))
await cur.execute("INSERT INTO %s VALUES (%s)" % (TABLE_NAME, ROWstr[:-1]))
except aiomysql.Error as e:
print(\'mysql error %d: %s\' % (e.args[0], e.args[1]))
async def handle_elements(link, session):
\'\'\'
获取详情页的内容并解析
\'\'\'
print(\'开始获取: {}\'.format(link))
source = await fetch(link, session)
#添加到已爬取的集合中
crawled_links_detail.add(link)
extract_elements(source)
async def consumer():
\'\'\'
消耗未爬取的链接
\'\'\'
async with aiohttp.ClientSession() as session:
while not stop:
if len(urls) != 0:
_url = urls.pop()
source = await fetch(_url, session)
print(_url)
extract_links(source)
if len(links_detail) == 0:
print(\'目前没有待爬取的链接\')
await asyncio.sleep(2)
continue
link = links_detail.pop()
if link not in crawled_links_detail:
asyncio.ensure_future(handle_elements(link, session))
# def stoploop(loop):
# loop.stop()
async def main(loop):
global pool
pool = await aiomysql.create_pool(host=\'127.0.0.1\', port=3306,
user=\'root\', password=\'123\',
db=\'aiomysql_lianjia\', loop=loop, charset=\'utf8\',
autocommit=True)
for i in range(1, MAX_PAGE):
urls.append(url.format(city, str(i)))
print(\'爬取总页数:{} 任务开始...\'.format(str(MAX_PAGE)))
asyncio.ensure_future(consumer())
if __name__ == \'__main__\':
# main()
loop = asyncio.get_event_loop()
loop.create_task(main(loop))
# loop.call_soon(stoploop, loop)
loop.run_forever()
用户代理池user-agent
uapools=[
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.79 Safari/537.36 Edge/14.14393",
"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.22 Safari/537.36 SE 2.X MetaSr 1.0",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Maxthon 2.0)",
"Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50",
"Mozilla/5.0 (Windows; U; Windows NT 6.1; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50",
"Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0;",
"Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.0; Trident/4.0)",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0)",
"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1)",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv:2.0.1) Gecko/20100101 Firefox/4.0.1",
"Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1",
"Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; en) Presto/2.8.131 Version/11.11",
"Opera/9.80 (Windows NT 6.1; U; en) Presto/2.8.131 Version/11.11",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Maxthon 2.0)",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; TencentTraveler 4.0)",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; The World)",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; SE 2.X MetaSr 1.0; SE 2.X MetaSr 1.0; .NET CLR 2.0.50727; SE 2.X MetaSr 1.0)",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; 360SE)",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Avant Browser)",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)",
"Mozilla/5.0 (iPhone; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5",
"Mozilla/5.0 (iPod; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5",
"Mozilla/5.0 (iPad; U; CPU OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5",
"Mozilla/5.0 (Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1",
"MQQBrowser/26 Mozilla/5.0 (Linux; U; Android 2.3.7; zh-cn; MB200 Build/GRJ22; CyanogenMod-7) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1",
"Opera/9.80 (Android 2.3.4; Linux; Opera Mobi/build-1107180945; U; en-GB) Presto/2.8.149 Version/11.10",
"Mozilla/5.0 (Linux; U; Android 3.0; en-us; Xoom Build/HRI39) AppleWebKit/534.13 (KHTML, like Gecko) Version/4.0 Safari/534.13",
"Mozilla/5.0 (BlackBerry; U; BlackBerry 9800; en) AppleWebKit/534.1+ (KHTML, like Gecko) Version/6.0.0.337 Mobile Safari/534.1+",
"Mozilla/5.0 (hp-tablet; Linux; hpwOS/3.0.0; U; en-US) AppleWebKit/534.6 (KHTML, like Gecko) wOSBrowser/233.70 Safari/534.6 TouchPad/1.0",
"Mozilla/5.0 (SymbianOS/9.4; Series60/5.0 NokiaN97-1/20.0.019; Profile/MIDP-2.1 Configuration/CLDC-1.1) AppleWebKit/525 (KHTML, like Gecko) BrowserNG/7.1.18124",
"Mozilla/5.0 (compatible; MSIE 9.0; Windows Phone OS 7.5; Trident/5.0; IEMobile/9.0; HTC; Titan)",
"UCWEB7.0.2.37/28/999",
"NOKIA5700/ UCWEB7.0.2.37/28/999",
"Openwave/ UCWEB7.0.2.37/28/999",
"Mozilla/4.0 (compatible; MSIE 6.0; ) Opera/UCWEB7.0.2.37/28/999",
]
import random
def ua(uapools):
thisua=random.choice(uapools)
print(thisua)
if __name__ == \'__main__\':
ua(uapools)
cookie代理池
proxy 代理
import requests #查询当前ip
url = \'http://icanhazip.com\'
try:
response = requests.get(url) #不使用代理
# print(response.status_code)
if response.status_code == 200:
print(response.text)
except requests.ConnectionError as e:
print(e.args)
固定代理ip
proxies = [
# {\'http\':\'socks5://127.0.0.1:1080\'},
{\'http\':\'127.0.0.1:1080\'},
{\'https\':\'127.0.0.1:1080\'}
]
proxies = random.choice(proxies)
print(proxies)
url = \'http://icanhazip.com\'
try:
response = requests.get(url,proxies=proxies) #使用代理
print(response.status_code)
if response.status_code == 200:
print(response.text)
except requests.ConnectionError as e:
print(e.args)
import requests 阿布云ip代理
# 待测试目标网页
targetUrl = "http://icanhazip.com"
def get_proxies():
# 代理服务器
proxyHost = "http-dyn.abuyun.com"
proxyPort = "9020"
# 代理隧道验证信息
proxyUser = "HS77K12Q77V4G9MD"
proxyPass = "4131FFDFCE27F104"
proxyMeta = "http://%(user)s:%(pass)s@%(host)s:%(port)s" % {
"host": proxyHost,
"port": proxyPort,
"user": proxyUser,
"pass": proxyPass,
}
proxies = {
"http": proxyMeta,
"https": proxyMeta,
}
for i in range(1, 6):
resp = requests.get(targetUrl, proxies=proxies)
print(resp.status_code)
print(\'第%s次请求的IP为:%s\' % (i, resp.text))
get_proxies()
py数据库
import pymongo # 增删改查,安装 pymongo pip install pymongo
try:
# 1.链接mongod的服务
mongo_py = pymongo.MongoClient()
# 2.库和表的名字; 有数据会自动建库建表
# 数据库
# db = mongo_py[\'six\']
# 表 集合
# collection = db[\'stu\']
# collection = mongo_py[\'six\'][\'stu\']
collection = mongo_py.six.stu
# 3.插入数据
one = {"name": "张三", "age": 50}
two_many = [
{"name": "小三", "age": 50},
{"name": "李四", "age": 30},
{"name": "王五", "age": 20},
{"name": "小刘", "age": 15}
]
# collection.insert_one(one)
# collection.insert_many(two_many)
# collection.insert()
# 删除数据
# collection.delete_one({"age": 15})
# collection.delete_many({"age": 50})
# 修改数据
# collection.update({"age": 20}, {"$set": {"name": "小王"}})
# collection.update_many({"name": "xiaowang"}, {"$set": {"age": 100}})
#查询
result = collection.find({"age":100})
result = collection.find_one({\'age\':100})
# for i in result:
# print(i)
print(result)
except Exception as e:
print(e)
finally:
# 关闭数据库
mongo_py.close()
import pymongo #pymongo从txt导入数据库, 查找
# 连接mongo
client=pymongo.MongoClient(\'localhost\',27017)
# 创建数据库
walden=client[\'walden\']
# 创建数据表
sheet_tab=walden[\'sheet_tab\']
# 打开文件并导入
path=\'walden.txt\'
with open(path,\'r\') as f:
lines=f.readlines()
for index,line in enumerate(lines):
data={
\'index\':index,
\'line\':line,
\'words\':len(line.split())
}
# print(data)
# 导入数据库
sheet_tab.insert(data)
# mongo中找出words大于5的数据
for item in sheet_tab.find({\'words\':{\'$lt\':5}}):
print(item)
from pymongo import MongoClient #pymongo提取图片
from gridfs import *
client=MongoClient(\'localhost\',27017)
db=client.image
#给予girdfs模块来写出,其中collection为上一步生成的,我不知道怎么该名称。实际上是由fs.flies和fs.chunks组成
gridFS = GridFS(db, collection="fs")
count=0
for grid_out in gridFS.find():
count+=1
print(count)
data = grid_out.read() # 获取图片数据
outf = open(\'%s.png\' %count,\'wb\')#创建文件
outf.write(data) # 存储图片
print(\'ok\')
outf.close()
from pymongo import MongoClient # 存储到mongodb
from gridfs import *
import os
#链接mongodb
client=MongoClient(\'localhost\',27017)
#取得对应的collection
db=client.image
#本地硬盘上的图片目录
dirs = \'H:\PYTHON-DUJUN\day999-spider\db\mongodb\'
#列出目录下的所有图片
files = os.listdir(dirs)
#遍历图片目录集合
for file in files:
#图片的全路径
filesname = dirs + \'\\\' + file
#分割,为了存储图片文件的格式和名称
f = file.split(\'.\')
#类似于创建文件
datatmp = open(filesname, \'rb\')
#创建写入流
imgput = GridFS(db)
#将数据写入,文件类型和名称通过前面的分割得到
insertimg=imgput.put(datatmp,content_type=f[1],filename=f[0])
datatmp.close()
print("保存成功")
插入去重
for i in data2:
mongo_collection.update_one(i,{\'$set\':i},upsert=True)
for i in data2: #这个方法比较老了,不建议使用
mongo_collection.update(i, i, upsert=True)
插入多个,insert_many
def save_file(data):
content = json.loads(data.T.to_json()).values()
if mongo_collection.insert_many(content):
print(\'存储到 mongondb 成功\')
import pymysql #pymysql的增删改查
try:
# 1.链接 数据库 链接对象 connection()
conn = pymysql.Connect(
host="localhost",
port=3306,
db=\'animal\',
user=\'root\',
passwd="mysql",
charset=\'utf8\'
)
# 2. 创建 游标对象 cursor()
cur = conn.cursor()
# 增加一条数据 科目表--GO语言
# insert_sub = \'insert into subjects values(0,"GO语言")\'
# result = cur.execute(insert_sub)
# 修改
# update_sub = \'update subjects set title="区块链" where id=7\'
# result = cur.execute(update_sub)
# 删除
# delete_sub = \'delete from stu where id=8\'
# result = cur.execute(delete_sub)
delete_sub = \'select * from subjects where id=1\'
cur.execute(delete_sub)
# result = cur.fetchall()
result = cur.fetchone()
print(result)
# for res in result:
#
# print(result)
# 提交事务
conn.commit()
# 关闭游标
cur.close()
# 关闭链接
conn.close()
except Exception as e:
print(e)
import redis #redis 连接池
pool = redis.ConnectionPool(host=\'129.28.74.174\', port=6379,password=\'123456dj\',max_connections=1000)
conn = redis.Redis(connection_pool=pool)
# conn.set(\'x1\',\'wanghuaqiang\',ex=5)
# val = conn.get(\'x1\')
# print(val)
conn.lpush(\'k2\',1213)
res=conn.keys()
print(res)
import redis #redis增删改车, 安装 pip install redis
# 1.链接数据库 key--value
client = redis.StrictRedis(host=\'127.0.0.1\', port=6379)
# 2.设置key
key = \'pyone\'
# 3.string 增加
result = client.set(key, "1")
# 4.删 1, 0
# result = client.delete(key)
# 5.改
result = client.set(key,\'2\')
# 6.查--bytes
result = client.get(key)
# 查看所有的键
result = client.keys()
print(result)
监听:
listenspider.py
\'\'\'
mysql> create table spider(id int(32) auto_increment primary key,name varchar(32) unique,isstart varchar(32),time1 varchar(100),stop varchar(32),ip1 varchar(32),ip2 varchar(32));
\'\'\'
import urllib.request
import re
def getip2():
try:
r=urllib.request.urlopen(\'http://httpbin.org/ip\').read().decode("utf-8","ignore")
ip=re.compile(\'"origin": "(.*?),\',re.S).findall(r)
if(len(ip)==0):
ip="0.0.0.0"
else:
ip=ip[0]
return ip
except Exception as err:
r=urllib.request.urlopen(\'http://ip.chinaz.com/getip.aspx\').read().decode("utf-8","ignore")
ip=re.compile("ip:\'(.*?)\'",re.S).findall(r)
if(len(ip)==0):
ip="0.0.0.0"
else:
ip=ip[0]
return ip
def listen(conn,name,step):
import socket
hostname=socket.getfqdn(socket.gethostname())
ip1=socket.gethostbyname(hostname)
ip2=getip2()
import datetime
thistime=datetime.datetime.now()
if(int(step)==0):
\'\'\'启动爬虫\'\'\'
isstart=1
stop=0
try:
sql="insert into spider(name,isstart,time1,stop,ip1,ip2) values(\'"+str(name)+"\',\'"+str(isstart)+"\',\'"+str(thistime)+"\',\'"+str(stop)+"\',\'"+str(ip1)+"\',\'"+str(ip2)+"\')"
conn.query(sql)
conn.commit()
except Exception as err:
pass
elif(int(step)==1):
\'\'\'更新爬虫状态\'\'\'
sql="update spider set time1=\'"+str(thistime)+"\' where name=\'"+str(name)+"\'"
try:
conn.query(sql)
conn.commit()
except Exception as err:
pass
elif(int(step)==2):
\'\'\'停止爬虫\'\'\'
sql="update spider set stop=\'1\' where name=\'"+str(name)+"\'"
try:
conn.query(sql)
conn.commit()
except Exception as err:
pass
import pymysql #糗事百科爬虫-监听改造演示
from listenspider import *
import urllib.request
import re
name="糗事百科爬虫"
mconn=pymysql.connect(host="127.0.0.1",user="root",passwd="123",db="listenspider")
listen(mconn,name,"0")
headers=("User-Agent","Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.22 Safari/537.36 SE 2.X MetaSr 1.0")
opener=urllib.request.build_opener()
opener.addheaders=[headers]
urllib.request.install_opener(opener)
for i in range(0,7):
try:
if(i%3):
listen(mconn,name,"1")
thisurl="http://www.qiushibaike.com/8hr/page/"+str(i+1)+"/"
data=urllib.request.urlopen(thisurl).read().decode("utf-8","ignore")
pat = \'class="recmd-content".*?>(.*?)</a>\'
rst=re.compile(pat,re.S).findall(data)
for j in range(0,len(rst)):
print(rst[j])
print("-------")
except Exception as err:
pass
listen(mconn,name,"2")
工作情况
1.需求出来了,写爬虫
2. 被反爬
3. 反反爬
3. 被反反反爬
4. 反反反反爬
5. 不断调试...不断调教爬虫.....当你以为没问题了,继续
6. 为提升爬虫效率而绞尽脑汁。
7. 为需求变更而重构模块。
8. 为网页结构变更而重构模块。
10. 被反爬 循环。
需要技能:
1.学会使用搜索,能看懂robot.txt。
2.调用reuqests.session()。
3.selenium --headless 配合你破解各种验证,拿cookies。
4.mitmproxy 等辅助中间件。
5.优化爬取规则算法。
6.time.sleep(x)的职业情怀。pymouse与selenium配合完成爬虫伪装。
8.会下demo,有些数据需求不需要写代码,hawk活着网上的demo就可以帮你解决。
9.掌握主流爬虫框架。不一一列举了。
搜索引擎
比价网站
备份网页数据
任务队列
当爬虫任务很大的时候,写一个程序跑下来是不合适的:
如果中间遇到错误停掉,重头再来?这不科学
我怎么知道程序在哪里失败了?任务和任务之间不应该相互影响
如果我有两台机器怎么分工?
所以我们需要一种任务队列,它的作用是:讲计划抓取的网页都放到任务队列里面去。然后worker从队列中拿出来一个一个执行,如果一个失败,记录一下,然后执行下一个。这样,worker就可以一个接一个地执行下去。也增加了扩展性,几亿个任务放在队列里也没问题,有需要可以增加worker,就像多一双亏筷子吃饭一样。
常用的任务队列有kafka,beanstalkd,celery等。
运维
-数据增量监控
-爬虫执行的成功情况
-抛出的Exception
这个话题要说的有很多,实际工作中运维和开发的时间差不多甚至更多一些。维护已经在工作的爬虫是一个繁重的工作。随着工作时间增加,一般我们都会学着让写出来的爬虫更好维护一些。比如爬虫的日志系统,数据量的统计等。将爬虫工程师和运维分开也不太合理,因为如果一个爬虫不工作了,那原因可能是要抓的网页更新了结构,也有可能出现在系统上,也有可能是当初开发爬虫的时候没发现反扒策略,上线之后出问题了,也可能是对方网站发现了你是爬虫把你封杀了,所以一般来说开发爬虫要兼顾运维。
所以爬虫的运维我可以提供下面几个思路:
首先,从数据增量监控。定向爬虫(指的是只针对一个网站的爬虫)比较容易,一段时间之后对一些网站的数据增量会有一个大体的了解。经常看看这些数据的增加趋势是否是正常就可以了(Grafana)。非定向爬虫的数据增量不是很稳定,一般看机器的网络状况,网站的更新情况等(这方面我的经验不多)。
然后看爬虫执行的成功情况。在上面提到了用任务队列控制爬虫工作,这样解耦可以带来很多好处,其中一个就是可以就是可以对一次爬虫执行进行日志。可以在每次爬虫任务执行的时候,将执行的时间、状态、目标url、异常等放入一个日志系统(比如kibana),然后通过一个可视化的手段可以清晰地看到爬虫的失败率。
爬虫抛出的Exception。几乎所有的项目都会用到错误日志收集(Sentry),这里需要注意的一点是,忽略正常的异常(比如Connection错误,锁冲突等),否则的话你会被这些错误淹没。