转发-来自http://www.redicecn.com/html/Python/20101209/204.html
与之前的版本http://www.redicecn.com/html/yuanchuangchengxu/20101205/201.html相 比,这个使用了多线程。验证时间由原来的20分钟缩短到现在的1分钟左右。
直接上源码:
与之前的版本http://www.redicecn.com/html/yuanchuangchengxu/20101205/201.html相 比,这个使用了多线程。验证时间由原来的20分钟缩短到现在的1分钟左右。
直接上源码:
- # coding:gbk
- # 验证最新可用代 理 For http://www.5uproxy.net 多线程版
- # by redice 2010.12.09
- import sys
- reload(sys)
- sys.setdefaultencoding(\'gbk\')
- import urllib
- import urllib2
- from urllib2 import URLError, HTTPError
- DEBUG = True
- #html页面下载函数
- def getHtml(url,post_data=None,cookie=None):
- """Fetch the target html
- url - URL to fetch
- post_data - POST Entity
- cookie - Cookie Header
- """
- if DEBUG:
- print "getHtml: ",url
- result =\'\'
- try:
- #create a request
- request = urllib2.Request(url)
- #change User-Agent
- request.add_header(\'User-Agent\',\'Mozilla/5.0\')
- #change Referrer
- request.add_header(\'Referrer\',url)
- #if has cookie,add cookie header
- if cookie:
- request.add_header(\'Cookie\',cookie)
- #create a opener
- opener = urllib2.build_opener()
- #if has post entity
- if post_data:
- #encode post data
- post_data = urllib.urlencode(post_data)
- response = opener.open(request,post_data)
- else:
- response = opener.open(request)
- result = response.read()
- response.close()
- #no content,don\'t save
- if not result or len(result)==0:
- return \'\'
- return result
- except HTTPError, e:
- if DEBUG:
- print \'Error retrieving data:\',e
- print \'Server error document follows:\n\'
- #print e.read()
- return \'\'
- except URLError, e:
- if hasattr(e, \'reason\'):
- if DEBUG:
- print \'Failed to reach a server.\'
- print \'Reason: \', e.reason
- return \'\'
- elif hasattr(e, \'code\'):
- if DEBUG:
- print \'The server couldn\\'t fulfill the request.\'
- print \'Error code: \', e.code
- return \'\'
- except Exception, e:
- if DEBUG:
- print e
- return \'\'
- #需要验证的代理列表
- proxy_urls = []
- proxy_urls.append({\'url\':\'http://www.5uproxy.net/http_fast.html\',\'type\':\'http_fast\'})
- proxy_urls.append({\'url\':\'http://www.5uproxy.net/http_anonymous.html\',\'type\':\'http_anonymous\'})
- proxy_urls.append({\'url\':\'http://www.5uproxy.net/http_non_anonymous.html\',\'type\':\'http_transparent\'})
- proxy_urls.append({\'url\':\'http://www.5uproxy.net/socks5.html\',\'type\':\'socks5\'})
- import re
- import socket
- import time
- import threading
- result =[]
- #线程同步锁
- lock = threading.Lock()
- def synchronous(f):
- def call(*args, **kwargs):
- lock.acquire()
- try:
- return f(*args, **kwargs)
- finally:
- lock.release()
- return call
- # 先获取所有待验证的代理
- proxies = []
- for proxy_url in proxy_urls:
- html = getHtml(proxy_url[\'url\'])
- #正则匹配获取每一代理
- rs = re.compile(r\'\'\'\'\'<tr .*?>[\s\S]*?<td .*?>\d+?</td>[\s\S]*?<td>(\S+?)</td>[\s\S]*?<td .*?>(\S+?)</td>[\s\S]*?<td .*?>(\S+?)</td>[\s\S]*?</tr>\'\'\',re.DOTALL).findall(html)
- for r in rs:
- proxy = {}
- #代理域名
- proxy[\'domain\'] = r[0]
- #代理端口
- proxy[\'port\'] = r[1]
- #代理国家
- proxy[\'state\'] = r[2]
- #代理类型
- proxy[\'type\'] = proxy_url[\'type\']
- #响应时间
- proxy[\'time\'] = 0
- if not (proxy in proxies):
- proxies.append(proxy)
- # 获取一个待验证代理
- @synchronous
- def getproxy():
- global proxies
- if len(proxies)>0:
- return proxies.pop()
- else:
- return \'\'
- #保存验证结果
- @synchronous
- def saveresult(proxy):
- global result
- if not(proxy in result):
- result.append(proxy)
- #线程函数
- def verify():
- while 1:
- proxy = getproxy()
- #所有代理均已验证完毕
- if len(proxy)==0:
- return
- print "正在验证:%s,%s" % (proxy[\'domain\'],proxy[\'port\'])
- #验证代理的可用性
- #创建一个TCP连接套接字
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- #设置10超时
- sock.settimeout(10)
- try:
- start = time.clock()
- #连接代理服务器
- sock.connect((proxy[\'domain\'], int(proxy[\'port\'])))
- proxy[\'time\'] = int((time.clock() - start) * 1000)
- sock.close()
- saveresult(proxy)
- print "%s,%s 验证通过,响应时间:%d ms." % (proxy[\'domain\'],proxy[\'port\'],proxy[\'time\'])
- except Exception, e:
- if DEBUG:
- print e
- print "%s,%s 验 证失败." % (proxy[\'domain\'],proxy[\'port\'])
- #init thread_pool
- thread_pool = []
- for i in range(20):
- th = threading.Thread(target=verify,args=()) ;
- thread_pool.append(th)
- # start threads one by one
- for thread in thread_pool:
- thread.start()
- #collect all threads
- for thread in thread_pool:
- threading.Thread.join(thread)
- #结果按响应时间从小到大排序
- result.sort(lambda x,y: cmp(x[\'time\'], y[\'time\']))
- fname = \'proxy_\'+ time.strftime(\'%Y-%m-%d-%H-%M-%S\',time.localtime(time.time())) +\'.txt\'
- file = open(fname,\'w\')
- print "验证结果如下:"
- for item in result:
- str = \'%s:%s %s,%s,%d\' % (item[\'domain\'],item[\'port\'],item[\'type\'],item[\'state\'],item[\'time\'])
- print str
- file.write(str+\'\n\')
- file.close()
- print "所有代理已验证完 毕,共计%d个验证通过。验证通过的代理已存入%s" % (len(result),fname)