E站爬虫在网上已经有很多了,但多数都只能以图片为单位下载,且偶尔会遇到图片加载失败的情况;熟悉E站的朋友们应该知道,E站许多资源都是有提供BT种子的,而且通常打包的是比默认看图模式更高清的文件;但如果只下载种子,又会遇到某些资源未放种/种子已死的情况。本文将编写一个能自动检测最优下载来源并储存到本地的E站爬虫,该爬虫以数据库作为缓冲区,支持以后台服务方式运行,可以轻易进行分布式扩展,并对于网络错误有良好的鲁棒性。
环境要求
Python3,MySQL,安装了Aria2并开启PRC远程访问
Aria2是一个强大的命令行下载工具,并支持web界面管理,可以运行在window和Linux下。介绍及安装使用可参见
基础配置
在MySQL中按如下格式建表
| 字段名称 | 字段意义 |
| id | id主键 |
| comic_name | 本子名称 |
| starttime | 开始下载的时间 |
| endtime | 下载结束的时间 |
| status | 当前下载状态 |
| checktimes | 遇错重试次数 |
| raw_address | e-hentai页面地址 |
| failed_links | 记录由于网络波动暂时访问失败的页面地址 |
| failed_paths | 记录失败页面地址对应的图片本地路径 |
| inserttime | 记录地址进入到数据库的时间 |
| oldpage | 存放Aria2条目的gid |
| filepath | bt下载路径 |
本文之后假设MySQL数据库名为comics_local,表名为comic_urls
aria2配置为后台服务,假设RPC地址为:127.0.0.1:6800,token为12345678
需要安装pymysql, requests, filetype, zipfile, wget等Python包
pip install pymysql requests filetype zipfile wget
项目代码
工作流程
整个爬虫服务的工作流程如下:用户将待抓取的E站链接(形式如 https://e-hentai.org/g/xxxxxxx/yyyyyyyyyy/ )放入数据表的raw_address字段,设置状态字段为待爬取;爬取服务可以在后台轮询或回调触发,提取出数据库中待爬取的链接后访问页面,判断页面里是否提供了bt种子下载,如有则下载种子并传给Aria2下载,如无则直接下载图片(图片优先下载高清版)。
在图片下载模式下,如果一切正常,则结束后置状态字段为已完成;如出现了问题,则置字段为相应异常状态,在下次轮询/调用时进行处理。
在bt下载模式下,另开一个后台进程定时询问Aria2的下载状态,在Aria2返回下载完成报告后解压目标文件,并置状态字段为已完成;如出现了种子已死等问题,则删除Aria2任务并切换到图片下载模式。
数据库操作模块
该模块包装了一些MySQL的操作接口,遵照此逻辑,MySQL可以换成其他数据库,如Redis,进而支持分布式部署。
1 #!/usr/bin/env python3 2 # -*- coding: utf-8 -*- 3 """ 4 filename: sql_module.py 5 6 Created on Sun Sep 22 23:24:39 2019 7 8 @author: qjfoidnh 9 """ 10 11 import pymysql 12 from pymysql.err import IntegrityError 13 14 class MySQLconn_url(object): 15 def __init__(self): 16 17 self.conn = pymysql.connect( 18 host='127.0.0.1', 19 port=3306, 20 user='username', 21 passwd='password', 22 db='comics_local' 23 ) 24 self.conn.autocommit(True) #开启自动提交,生产环境不建议数据库DBA这样做 25 self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor) 26 #让MySQL以字典形式返回数据 27 28 29 def __del__(self): 30 31 self.conn.close() 32 33 #功能:取指定状态的一条数据 34 def fetchoneurl(self, mode="pending", tabname='comic_urls'): 35 36 sql = "SELECT * FROM %s \ 37 WHERE status = '%s'" %(tabname, mode) 38 self.conn.ping(True) #mysql长连接防止timeut自动断开 39 try: 40 self.cursor.execute(sql) 41 except Exception as e: 42 return e 43 else: 44 item = self.cursor.fetchone() 45 if not item: 46 return None 47 if mode=="pending" or mode=='aria2': 48 if item['checktimes']<3: 49 sql = "UPDATE %s SET starttime = now(), status = 'ongoing' \ 50 WHERE id = %d" %(tabname, item['id']) 51 else: 52 sql = "UPDATE %s SET status = 'error' \ 53 WHERE id = %d" %(tabname, item['id']) 54 if mode=='aria2': 55 sql = "UPDATE %s SET status = 'pending', checktimes = 0, raw_address=CONCAT('chmode',raw_address) \ 56 WHERE id = %d" %(tabname, item['id']) 57 self.cursor.execute(sql) 58 return 'toomany' 59 elif mode=="except": 60 sql = "UPDATE %s SET status = 'ongoing' \ 61 WHERE id = %d" %(tabname, item['id']) 62 try: 63 self.cursor.execute(sql) 64 except Exception as e: 65 self.conn.rollback() 66 return e 67 else: 68 return item 69 70 #功能:更新指定id条目的状态字段 71 def updateurl(self, itemid, status='finished', tabname='comic_urls'): 72 sql = "UPDATE %s SET endtime = now(),status = '%s' WHERE id = %d" %(tabname, status, itemid) 73 self.conn.ping(True) 74 try: 75 self.cursor.execute(sql) 76 except Exception as e: 77 self.conn.rollback() 78 return e 79 else: 80 return itemid 81 82 #功能:更新指定id条目状态及重试次数字段 83 def reseturl(self, itemid, mode, count=0, tabname='comic_urls'): 84 85 sql = "UPDATE %s SET status = '%s', checktimes=checktimes+%d WHERE id = %d" %(tabname, mode, count, itemid) 86 self.conn.ping(True) 87 try: 88 self.cursor.execute(sql) 89 except Exception as e: 90 print(e) 91 self.conn.rollback() 92 return e 93 else: 94 return itemid 95 96 #功能:将未下载完成图片的网址列表写入数据库, 97 def fixunfinish(self, itemid, img_urls, filepaths, tabname='comic_urls'): 98 99 img_urls = "Š".join(img_urls) #用不常见拉丁字母做分隔符,避免真实地址中有分隔符导致错误分割 100 filepaths = "Š".join(filepaths) 101 sql = "UPDATE %s SET failed_links = '%s', failed_paths = '%s', status='except' WHERE id = %d" %(tabname, img_urls, filepaths, itemid) 102 self.conn.ping(True) 103 try: 104 self.cursor.execute(sql) 105 except Exception as e: 106 self.conn.rollback() 107 return e 108 else: 109 return 0 110 111 #功能:在尝试完一次未完成补全后,更新未完成列表 112 def resetunfinish(self, itemid, img_urls, filepaths, tabname='comic_urls'): 113 failed_num = len(img_urls) 114 if failed_num==0: 115 sql = "UPDATE %s SET failed_links = null, failed_paths = null, status = 'finished', endtime = now() WHERE id = %d" %(tabname, itemid) 116 else: 117 img_urls = "Š".join(img_urls) #用拉丁字母做分隔符,避免真实地址中有分隔符导致错误分割 118 filepaths = "Š".join(filepaths) 119 sql = "UPDATE %s SET failed_links = '%s', failed_paths = '%s', status = 'except' WHERE id = %d" %(tabname, img_urls, filepaths, itemid) 120 self.conn.ping(True) 121 try: 122 self.cursor.execute(sql) 123 except Exception as e: 124 self.conn.rollback() 125 return e 126 else: 127 return failed_num 128 129 #功能:为条目补上资源名称 130 def addcomicname(self, address, title, tabname='comic_urls'): 131 sql = "UPDATE %s SET comic_name = '%s' WHERE raw_address = '%s'" %(tabname, title, address) #由于调用地点处没有id值,所以这里用address定位。也是本项目中唯二处用address定位的 132 self.conn.ping(True) 133 try: 134 self.cursor.execute(sql) 135 except IntegrityError: 136 self.conn.rollback() 137 sql_sk = "UPDATE %s SET status = 'skipped' \ 138 WHERE raw_address = '%s'" %(tabname, address) 139 self.cursor.execute(sql_sk) 140 return Exception(title+" Already downloaded!") 141 except Exception as e: 142 self.conn.rollback() 143 return e 144 else: 145 return 0 146 147 #功能:通过网址查询标识Aria2里对应的gid 148 def fetchonegid(self, address, tabname='comic_urls'): 149 sql = "SELECT * FROM %s \ 150 WHERE raw_address = '%s'" %(tabname, address) 151 self.conn.ping(True) 152 try: 153 self.cursor.execute(sql) 154 except Exception as e: 155 return e 156 else: 157 item = self.cursor.fetchone() 158 if not item: 159 return None 160 else: 161 return item.get('oldpage') 162 163 mq = MySQLconn_url()