初始数据
proxypool.setting
# Redis数据库地址
REDIS_HOST = \'127.0.0.1\'
# Redis端口
REDIS_PORT = 6379
# Redis密码,如无填None
REDIS_PASSWORD = None
REDIS_KEY = \'proxies\'
# 代理分数,最高为100分,最低我0分。初始分数为10分
MAX_SCORE = 100
MIN_SCORE = 0
INITIAL_SCORE = 10
# 有效的状态码
VALID_STATUS_CODES = [200, 302]
# 代理池数量界限
POOL_UPPER_THRESHOLD = 50000
# 检查周期
TESTER_CYCLE = 20
# 获取周期
GETTER_CYCLE = 300
# 测试API,建议抓哪个网站测哪个,这里用百度网来测试API
TEST_URL = \'http://www.baidu.com\'
# API配置,API地址、端口
API_HOST = \'0.0.0.0\'
API_PORT = 5555
# 开关
TESTER_ENABLED = True
GETTER_ENABLED = True
API_ENABLED = True
# 最大批测试量
BATCH_TEST_SIZE = 10
异常问题
proxypool.error
class PoolEmptyError(Exception):
def __init__(self):
# Exception类,是所有非退出异常的公共基类。
Exception.__init__(self)
def __str__(self):
# repr函数,返回对象的规范字符串表示形式。
return repr(\'代理池已经枯竭\')
存储模块
proxypool.db
import redis
from proxypool.error import PoolEmptyError
from proxypool.setting import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_KEY
from proxypool.setting import MAX_SCORE, MIN_SCORE, INITIAL_SCORE
from random import choice
import re
class RedisClient(object):
def __init__(self, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD):
"""
初始化
:param host: Redis 地址
:param port: Redis 端口
:param password: Redis密码
"""
# 创建链接数据库对象
self.db = redis.StrictRedis(host=host, port=port, password=password, decode_responses=True)
def add(self, proxy, score=INITIAL_SCORE):
"""
添加代理,设置分数为最高
:param proxy: 代理
:param score: 分数
:return: 添加结果
"""
# zscore() 方法,返回 "REDIS_KEY" 中元素为 "proxy" 的分数。
# zadd() 方法,将任意数量的元素名称、分数对设置为键 "REDIS_KEY"。
if not re.match(\'\d+\.\d+\.\d+\.\d+\:\d+\', proxy):
print(\'代理不符合规范\', proxy, \'丢弃\')
return
if not self.db.zscore(REDIS_KEY, proxy):
return self.db.zadd(REDIS_KEY, score, proxy)
def random(self):
"""
从数据库中随机获取有效代理,首先尝试获取最高分数代理,如果不存在,按照排名获取,否则异常
:return: 随机代理
"""
# zrangebyscore() 方法,返回排序集 "REDIS_KEY" 中带分数的值,其中分数范围为 MAX_SCORE ~ MAX_SCORE ,即返回 100 分的代理
# 代理的有效程度以分数来区分,分数高,说明代理越稳定。
# 首先查找 100 分的代理,如果没有,则在 0 ~ 100 之间查找最高分的代理,
# 如果还没有,则调用 proxypool.error.PoolEmptyError 抛出异常
result = self.db.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE)
if len(result):
return choice(result)
else:
result = self.db.zrevrange(REDIS_KEY, 0, 100)
if len(result):
return choice(result)
else:
raise PoolEmptyError
def decrease(self, proxy):
"""
代理值减一分,小于最小值则删除
:param proxy: 代理
:return: 修改后的代理分数
"""
# zscore() 方法,返回 "REDIS_KEY" 中元素为 "proxy" 的分数。
# zincrby() 方法,按 “-1” 增加排序集 "REDIS_KEY" 中 "proxy" 的分数
# zrem() 方法,从排序集 "REDIS_KEY" 中删除成员 "proxy"
score = self.db.zscore(REDIS_KEY, proxy)
if score and score > MIN_SCORE:
print(\'代理\', proxy, \'当前分数\', score, \'减1\')
return self.db.zincrby(REDIS_KEY, proxy, -1)
else:
print(\'代理\', proxy, \'当前分数\', score, \'移除\')
return self.db.zrem(REDIS_KEY, proxy)
def exists(self, proxy):
"""
判断是否存在
:param proxy: 代理
:return: 是否存在
"""
# zscore() 方法,返回 "REDIS_KEY" 中元素为 "proxy" 的分数。
return not self.db.zscore(REDIS_KEY, proxy) == None
def max(self, proxy):
"""
将代理设置为MAX_SCORE
:param proxy: 代理
:return: 设置结果
"""
print(\'代理\', proxy, \'可用,设置为\', MAX_SCORE)
# zadd() 方法,将任意数量的元素名称、分数对设置为键 "REDIS_KEY"。
return self.db.zadd(REDIS_KEY, MAX_SCORE, proxy)
def count(self):
"""
获取数量
:return: 数量
"""
# zcard() 方法,返回排序集 "REDIS_KEY" 中的元素个数
return self.db.zcard(REDIS_KEY)
def all(self):
"""
获取全部代理
:return: 全部代理列表
"""
# zrangebyscore()方法,获取 MIN_SCORE ~ MAX_SCORE 的代理,即所有代理
return self.db.zrangebyscore(REDIS_KEY, MIN_SCORE, MAX_SCORE)
def batch(self, start, stop):
"""
批量获取
:param start: 开始索引
:param stop: 结束索引
:return: 代理列表
"""
# zrangebyscore()方法,获取 start ~ (stop - 1) 的代理
return self.db.zrevrange(REDIS_KEY, start, stop - 1)
if __name__ == \'__main__\':
conn = RedisClient()
result = conn.batch(680, 688)
print(result)
爬虫抓取代理
proxypool.utils
import requests
from requests.exceptions import ConnectionError
base_headers = {
\'User-Agent\': \'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.71 Safari/537.36\',
\'Accept-Encoding\': \'gzip, deflate, sdch\',
\'Accept-Language\': \'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7\'
}
def get_page(url, options={}):
"""
抓取代理
:param url:
:param options:
:return:
"""
headers = dict(base_headers, **options)
print(\'正在抓取\', url)
try:
response = requests.get(url, headers=headers)
print(\'抓取成功\', url, response.status_code)
if response.status_code == 200:
return response.text
except ConnectionError:
print(\'抓取失败\', url)
return None
获取模块
获取代理的每个方法统一定义为以 crawl 开头
proxypool.crawler
import json
import re
from .utils import get_page
from pyquery import PyQuery as pq
class ProxyMetaclass(type):
"""
定义一个类 ProxyMetaclass,Crawl类将设置为元类,元类中实现了 __new__()方法,这个方法有固定几个参数,
第四个参数 attrs 包含了类的一些属性,通过遍历 attrs 这个参数即可获取类的所有方法信息,就同遍历字典一样,
键名对应方法的名,接着判断方法的开关是否 crawl,是则将其加入到 __CrawlFunc__属性中。
这样就将所有以 crawl 开头的方法定义成一个属性,动态获取到所有以 crawl 开头的方法列表。
"""
def __new__(cls, name, bases, attrs):
"""
__new__() 方法与 __init__() 方法类似,
__new__ 负责对象的创建
而 __init__ 负责对象的初始化。
"""
count = 0
attrs[\'__CrawlFunc__\'] = []
for k, v in attrs.items():
if \'crawl_\' in k:
attrs[\'__CrawlFunc__\'].append(k)
count = count + 1
attrs[\'__CrawlFuncCount__\'] = count
return type.__new__(cls, name, bases, attrs)
class Crawler(object, metaclass=ProxyMetaclass):
def get_proxies(self, callback):
"""
将所有以 crawl 开头的方法都调用一遍
:param callback:要被回调的方法
:return: 每个方法返回的代理并组合成列表
"""
proxies = []
for proxy in eval("self.{}()".format(callback)):
print(\'成功获取到代理\', proxy)
proxies.append(proxy)
return proxies
def crawl_daili66(self, page_count=4):
"""
获取代理 "66ip"
:param page_count: 页码
:return: 代理
"""
start_url = \'http://www.66ip.cn/{}.html\'
urls = [start_url.format(page) for page in range(1, page_count + 1)]
for url in urls:
print(\'Crawling\', url)
html = get_page(url) # 爬取网页源代码
if html:
doc = pq(html) # 对网页源代码进行解析
# 选取 "class=containerbox" 的节点里的 table 节点,再选择其中的带有以 "0" 开头的 gt 属性值的每个 <tr> 元素。
trs = doc(\'.containerbox table tr:gt(0)\').items()
for tr in trs:
# 选择属于其父元素的第 1 个子元素的每个 <td> 元素。
ip = tr.find(\'td:nth-child(1)\').text()
# 选择属于其父元素的第 2 个子元素的每个 <td> 元素。
port = tr.find(\'td:nth-child(2)\').text()
yield \':\'.join([ip, port])
def crawl_ip3366(self):
"""
获取代理 "ip3366"
:return:代理的迭代器
"""
for page in range(1, 4):
start_url = \'http://www.ip3366.net/free/?stype=1&page={}\'.format(page)
html = get_page(start_url) # 获取网页源代码
ip_address = re.compile(\'<tr>\s*<td>(.*?)</td>\s*<td>(.*?)</td>\') # 正则表达式对象
# \s * 匹配空格,起到换行作用
re_ip_address = ip_address.findall(html)
for address, port in re_ip_address:
result = address+\':\' + port
yield result.replace(\' \', \'\') # 把 result 中的空格去掉
def crawl_kuaidaili(self):
"""
获取代理 "快代理"
:return:代理的迭代器
"""
for i in range(1, 4):
start_url = \'http://www.kuaidaili.com/free/inha/{}/\'.format(i)
html = get_page(start_url) # 获取网页源代码
if html:
ip_address = re.compile(\'<td data-title="IP">(.*?)</td>\')
re_ip_address = ip_address.findall(html) # 代理地址
port = re.compile(\'<td data-title="PORT">(.*?)</td>\')
re_port = port.findall(html) # 代理端口
for address, port in zip(re_ip_address, re_port):
# zip()将对象中对应的元素打包成一个个元组,然后返回由这些元组组成的列表
address_port = address+\':\'+port
yield address_port.replace(\' \', \'\')
def crawl_xicidaili(self):
"""
获取代理 "西刺代理"
:return:代理的迭代器
"""
for i in range(1, 3):
start_url = \'http://www.xicidaili.com/nn/{}\'.format(i)
headers = {
\'Accept\': \'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8\',
\'Cookie\': \'_free_proxy_session=BAh7B0kiD3Nlc3Npb25faWQGOgZFVEkiJWRjYzc5MmM1MTBiMDMzYTUzNTZjNzA4NjBhNWRjZjliBjsAVEkiEF9jc3JmX3Rva2VuBjsARkkiMUp6S2tXT3g5a0FCT01ndzlmWWZqRVJNek1WanRuUDBCbTJUN21GMTBKd3M9BjsARg%3D%3D--2a69429cb2115c6a0cc9a86e0ebe2800c0d471b3\',
\'Host\': \'www.xicidaili.com\',
\'Referer\': \'http://www.xicidaili.com/nn/3\',
\'Upgrade-Insecure-Requests\': \'1\',
}
html = get_page(start_url, options=headers) # 获取网页源代码
if html:
find_trs = re.compile(\'<tr class.*?>(.*?)</tr>\', re.S)
trs = find_trs.findall(html)
for tr in trs:
find_ip = re.compile(\'<td>(\d+\.\d+\.\d+\.\d+)</td>\')
re_ip_address = find_ip.findall(tr) # 代理地址
find_port = re.compile(\'<td>(\d+)</td>\')
re_port = find_port.findall(tr) # 代理端口
for address,port in zip(re_ip_address, re_port):
address_port = address+\':\'+port
yield address_port.replace(\' \', \'\')
def crawl_iphai(self):
"""
获取代理 "IP海"
:return:代理的迭代器
"""
start_url = \'http://www.iphai.com/\'
html = get_page(start_url) # 获取网页源代码
if html:
find_tr = re.compile(\'<tr>(.*?)</tr>\', re.S)
trs = find_tr.findall(html) # 选择网页源代码中有代理信息的部分源码
for s in range(1, len(trs)):
find_ip = re.compile(\'<td>\s+(\d+\.\d+\.\d+\.\d+)\s+</td>\', re.S)
re_ip_address = find_ip.findall(trs[s]) # 代理地址
find_port = re.compile(\'<td>\s+(\d+)\s+</td>\', re.S)
re_port = find_port.findall(trs[s]) # 代理端口
for address, port in zip(re_ip_address, re_port):
# zip()将对象中对应的元素打包成一个个元组,然后返回由这些元组组成的列表
address_port = address+\':\'+port
yield address_port.replace(\' \', \'\')
def crawl_data5u(self):
"""
获取代理 "无忧代理"
:return:代理的迭代器
"""
start_url = \'http://www.data5u.com\'
headers = {
\'Accept\': \'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8\',
\'Accept-Encoding\': \'gzip, deflate\',
\'Accept-Language\': \'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7\',
\'Cache-Control\': \'max-age=0\',
\'Connection\': \'keep-alive\',
\'Cookie\': \'JSESSIONID=47AA0C887112A2D83EE040405F837A86\',
\'Host\': \'www.data5u.com\',
\'Referer\': \'http://www.data5u.com/free/index.shtml\',
\'Upgrade-Insecure-Requests\': \'1\',
\'User-Agent\': \'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.108 Safari/537.36\',
}
html = get_page(start_url, options=headers) # 获取网页源代码
if html:
ip_address = re.compile(\'<span><li>(\d+\.\d+\.\d+\.\d+)</li>.*?<li class=\"port.*?>(\d+)</li>\', re.S)
re_ip_address = ip_address.findall(html) # 返回 (代理地址, 代理端口) 组成的列表
for address, port in re_ip_address:
result = address + \':\' + port
yield result.replace(\' \', \'\')
代理存储
proxypool.getter
from proxypool.tester import Tester
from proxypool.db import RedisClient
from proxypool.crawler import Crawler
from proxypool.setting import *
import sys
class Getter():
def __init__(self):
self.redis = RedisClient()
self.crawler = Crawler()
def is_over_threshold(self):
"""
判断是否达到了代理池限制
"""
if self.redis.count() >= POOL_UPPER_THRESHOLD:
return True
else:
return False
def run(self):
print(\'获取器开始执行\')
if not self.is_over_threshold():
# 遍历定义的 Crawler 类中的几个 craw 开头的网页代理爬虫,运行着几个爬虫
for callback_label in range(self.crawler.__CrawlFuncCount__):
callback = self.crawler.__CrawlFunc__[callback_label]
# 获取代理
proxies = self.crawler.get_proxies(callback)
sys.stdout.flush()
for proxy in proxies:
# 将代理逐个加入到数据库中
self.redis.add(proxy)
检测模块
proxypool.tester
import asyncio
import aiohttp
import time
import sys
try:
from aiohttp import ClientError
except:
from aiohttp import ClientProxyConnectionError as ProxyConnectionError
from proxypool.db import RedisClient
from proxypool.setting import *
class Tester(object):
def __init__(self):
self.redis = RedisClient()
async def test_single_proxy(self, proxy):
"""
测试单个代理
:param proxy:
:return:
"""
conn = aiohttp.TCPConnector(verify_ssl=False)
async with aiohttp.ClientSession(connector=conn) as session:
try:
if isinstance(proxy, bytes):
proxy = proxy.decode(\'utf-8\')
real_proxy = \'http://\' + proxy
print(\'正在测试\', proxy)
# 测试所选的代理,如果响应状态码在 [200, 302] 里面,说明代理可用,否则代理分数减 1
async with session.get(TEST_URL, proxy=real_proxy, timeout=15, allow_redirects=False) as response:
if response.status in VALID_STATUS_CODES:
self.redis.max(proxy)
print(\'代理可用\', proxy)
else:
self.redis.decrease(proxy)
print(\'请求响应码不合法 \', response.status, \'IP\', proxy)
except (ClientError, aiohttp.client_exceptions.ClientConnectorError, asyncio.TimeoutError, AttributeError):
self.redis.decrease(proxy)
print(\'代理请求失败\', proxy)
def run(self):
"""
测试主函数
:return:
"""
print(\'测试器开始运行\')
try:
# 数据库中代理个数
count = self.redis.count()
print(\'当前剩余\', count, \'个代理\')
# 遍历所有代理,间隔为 BATCH_TEST_SIZE = 10
for i in range(0, count, BATCH_TEST_SIZE):
# 开始测试的代理,停止测试的代理
start = i
stop = min(i + BATCH_TEST_SIZE, count)
print(\'正在测试第\', start + 1, \'-\', stop, \'个代理\')
# 通过 redis.batch() 方法,批量获取代理(10个代理)
test_proxies = self.redis.batch(start, stop)
# asyncio.get_event_loop() 方法,返回异步事件循环。
loop = asyncio.get_event_loop()
# 通过异步处理,测试10个代理, run_until_complete() 方法:运行事件循环,直到完成。
# asyncio.wait(tasks) , 等待 "tasks" 提供的未来和协同工作完成
tasks = [self.test_single_proxy(proxy) for proxy in test_proxies]
loop.run_until_complete(asyncio.wait(tasks))
# sys.stdout.flush() , 1、 flush() 刷新缓存区。 2、缓冲区满时,自动刷新。 3、文件关闭或者是程序结束自动刷新。
sys.stdout.flush()
time.sleep(5)
except Exception as e:
print(\'测试器发生错误\', e.args)
接口模块
proxypool.api
from flask import Flask, g
from .db import RedisClient
__all__ = [\'app\']
# __name__ 参数,让flask知道什么是属于自己的应用程序。
app = Flask(__name__)
# 获取连接
def get_conn():
# hasattr() 返回对象是否具有具有给定名称的属性。
if not hasattr(g, \'redis\'):
g.redis = RedisClient()
return g.redis
# http://localhost:port
# http://127.0.0.0:5555
@app.route(\'/\')
def index():
# 返回副标题 "Welcome to Proxy Pool System"
return \'<h2>Welcome to Proxy Pool System</h2>\'
# http://localhost:port/random
# http://127.0.0.0:5555/random
@app.route(\'/random\')
def get_proxy():
"""
Get a proxy
:return: 随机代理
"""
conn = get_conn()
return conn.random()
# http://localhost:port/count
# http://127.0.0.0:5555/count
@app.route(\'/count\')
def get_counts():
"""
Get the count of proxies
:return: 代理池总量
"""
conn = get_conn()
return str(conn.count())
if __name__ == \'__main__\':
app.run()
调度模块
scheduler
import time
# 多处理程序
from multiprocessing import Process
from proxypool.api import app
from proxypool.getter import Getter
from proxypool.tester import Tester
from proxypool.db import RedisClient
from proxypool.setting import *
class Scheduler():
def schedule_tester(self, cycle=TESTER_CYCLE):
"""
定时测试代理
cycle = TESTER_CYCLE = 20:检查周期
"""
tester = Tester()
while True:
print(\'测试器开始运行\')
tester.run()
time.sleep(cycle)
def schedule_getter(self, cycle=GETTER_CYCLE):
"""
定时获取代理
cycle = GETTER_CYCLE = 300:获取周期
"""
getter = Getter()
while True:
print(\'开始抓取代理\')
getter.run()
time.sleep(cycle)
def schedule_api(self):
"""
开启API
"""
app.run(API_HOST, API_PORT)
def run(self):
print(\'代理池开始运行\')
\'\'\'
开关
TESTER_ENABLED = True:测试开关
GETTER_ENABLED = True:获取开关
API_ENABLED = True:API开关
\'\'\'
if TESTER_ENABLED:
tester_process = Process(target=self.schedule_tester)
tester_process.start()
if GETTER_ENABLED:
getter_process = Process(target=self.schedule_getter)
getter_process.start()
if API_ENABLED:
api_process = Process(target=self.schedule_api)
api_process.start()
运行
run
from proxypool.scheduler import Scheduler
import sys
import io
# 文本 IO 包装:打包流
# sys.stdout.buffer ,标准输出的缓冲区
# 将 sys.stdout.buffer(标准输出的缓冲区 )中的内容打包
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding=\'utf-8\')
def main():
try:
s = Scheduler()
s.run()
except:
main()
if __name__ == \'__main__\':
main()
获取代理
有了上面的代码,可以通过 \'http://127.0.0.1:5555/random\' 随机从数据库中获取我们爬取到的可用代理
import requests
PROXY_POOL_URL = \'http://127.0.0.1:5555/random\'
def get_proxy():
try:
response = requests.get(PROXY_POOL_URL)
if response.status_code == 200:
return response.text
except ConnectionError:
return None
proxy = get_proxy()
proxies = {
\'http\': \'http://\' + proxy,
\'https\': \'https://\' + proxy,
}
try:
response = requests.get(\'http://httpbin.org/get\', proxies=proxies)
print(response.text)
except requests.exceptions.ConnectionError as e:
print(\'Error\', e.args)