1.豆瓣采集
1 #coding:utf-8 2 #采集豆瓣书信息和图片,写进数据库 3 4 from urllib import request 5 # from bs4 import BeautifulSoup 6 from lxml import etree 7 import json,pymysql 8 9 # from my_pymysql import pymysql 10 11 url="https://book.douban.com/tag/%E5%B0%8F%E8%AF%B4" 12 headers={ 13 \'Host\':\'book.douban.com\', 14 \'Upgrade-Insecure-Requests\':\'1\', 15 \'User-Agent\':\'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.140 Safari/537.36\' 16 } 17 req = request.Request(url=url,headers=headers,method="GET") 18 content = request.urlopen(req).read().decode("utf-8") 19 content_dict=etree.HTML(content) #格式化 20 # print(content_dict) 21 content_dict_allli = content_dict.xpath(r\'//*[@id="subject_list"]/ul/li\') #拿到列表 22 info_all = \'\' 23 24 for li in content_dict_allli: 25 # 书名/标题 26 title_list = li.xpath(r\'div[2]/h2/a/@title\') #取标签里的内容,注意地址是相对地址,不能直接拿来用 (注:和bs4不一样) 27 title =title_list[0] 28 title=title.replace(" ",\'\') 29 print(title) 30 #信息 作者、出版社 31 info_list = li.xpath(r\'div[2]/div[1]/text()\') 32 author = info_list[0].split(\'/\')[0] 33 author = author.replace(\'\n\',\'\').replace(" ",\'\') 34 chubanshe = info_list[0].split(\'/\')[1] 35 print(author) 36 print(chubanshe) 37 #评分 38 pingfen_list = li.xpath(r\'div[2]/div[2]/span[2]/text()\') 39 pingfen = pingfen_list[0] 40 print(pingfen) 41 42 #图片 43 img_net_addr =li.xpath(r\'div[1]/a/img/@src\') 44 img_net_addr = img_net_addr[0] 45 print(img_net_addr) 46 data = request.urlopen(img_net_addr).read() 47 img_name =str(\'douban/\') + title + str(\'.jpg\') 48 with open(img_name,\'wb\')as f: 49 f.write(data) 50 51 #数据库 52 db = pymysql.connect(host=\'localhost\',port=3306,user="root",password=\'root\',db=\'douban\',charset=\'utf8\') # 53 cur=db.cursor() 54 sql = "insert into douban(title,author,chubanshe,pingfen)values(\'%s\',\'%s\',\'%s\',\'%s\')"%(title,author,chubanshe,pingfen) 55 cur.execute(sql) 56 db.commit() 57 58 db.close()
2.链家
#coding:utf-8 #完成,,取出链家数据存到文件里 from urllib import request,error from bs4 import BeautifulSoup import pymysql # from my_pymysql import pymysql #引入数据库 #创建数据库 db = pymysql.connect(host=\'localhost\',user=\'root\',password=\'root\',db=\'lianjia\',charset=\'utf8\') cur = db.cursor() #实例化游标 for i in range(1,33): req=request.urlopen(\'https://xa.lianjia.com/ershoufang/pg\'+str(i)).read().decode(\'utf-8\') req_bs4 = BeautifulSoup(req,\'html.parser\') #建立对象,才能用bs4 body_ul=req_bs4.find(\'ul\',class_="sellListContent") try: s=\'\' for li in body_ul: # info_all = li.find(\'div\',class_="info clear").get_text() #全部信息 tit = li.find(\'div\',class_="title").get_text() #标题 addr = li.find(\'div\',class_="houseInfo").get_text() #地址 pric = li.find(\'div\',class_="totalPrice").get_text() #价格 s+=tit s+=addr s+=pric s+=\'\n\n\' print(i) #提示采集的位置 # 采集图片开始++++++++++++++++++++++++++++++++++++++++++++ img = li.find("img", class_=\'lj-lazy\')[\'data-original\'] #图片地址 img_format = img.split(\'.\')[-1] # 用点隔开,取图片的后缀 img_name = \'lianjia/images/\' + li.find("img", class_=\'lj-lazy\')[\'alt\'] + \'.\' + img_format # 名字 adr = request.urlopen(img).read() # 读取图片地址,拿到字节流形式的图片,,写进去 try: #;空的话就跳过 with open(img_name, \'wb\')as f: f.write(adr) except: pass # 采集图片完毕---------------------------- #存到数据库 sql = "insert into lianjia_hotel(title,address) values (\'%s\',\'%s\')"%(tit,addr) cur.execute(sql) db.commit() except: print("本页完毕~") #最后再关闭数据库 db.close() #写到一个txt文件里面 # with open(\'lianjia/lianjia.txt\',\'w\',encoding="utf-8")as f: # f.write(s)
3.今日头条
from selenium import webdriver from lxml import etree from pyquery import PyQuery as pq import time driver = webdriver.Chrome() driver.maximize_window() driver.get(\'https://www.toutiao.com/\') driver.implicitly_wait(10) driver.find_element_by_link_text(\'科技\').click() driver.implicitly_wait(10) for x in range(3): js="var q=document.documentElement.scrollTop="+str(x*500) driver.execute_script(js) time.sleep(2) time.sleep(5) page = driver.page_source doc = pq(page) doc = etree.HTML(str(doc)) contents = doc.xpath(\'//div[@class="wcommonFeed"]/ul/li\') print(contents) for x in contents: title = x.xpath(\'div/div[1]/div/div[1]/a/text()\') if title: title = title[0] with open(\'toutiao.txt\',\'a+\',encoding=\'utf8\')as f: f.write(title+\'\n\') print(title) else: pass
4.微信群信息(包括成员)和联系人
# -*- coding:utf-8 -*- \'\'\' 扫码登陆微信后获取该微信账号的微信群(包括群内人员)和通讯录联系人信息【注:好像不全】 \'\'\' import os import re import time import sys import subprocess import requests import xml.dom.minidom import json # 微信登陆 class WebwxLogin(object): def __init__(self): self.session = requests.session() self.headers = { \'User-Agent\': \'Mozilla/5.0 (Windows NT 5.1; rv:33.0) Gecko/20100101 Firefox/33.0\'} self.QRImgPath = os.path.split(os.path.realpath(__file__))[0] + os.sep + \'webWeixinQr.jpg\' self.uuid = \'\' self.tip = 0 self.base_uri = \'\' self.redirect_uri = \'\' self.skey = \'\' self.wxsid = \'\' self.wxuin = \'\' self.pass_ticket = \'\' self.deviceId = \'e000000000000000\' self.BaseRequest = {} self.ContactList = [] self.My = [] self.SyncKey = \'\' def getUUID(self): url = \'https://login.weixin.qq.com/jslogin\' params = { \'appid\': \'wx782c26e4c19acffb\', \'redirect_uri\': \'https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxnewloginpage\', \'fun\': \'new\', \'lang\': \'zh_CN\', \'_\': int(time.time() * 1000), # 时间戳 } response = self.session.get(url, params=params) target = response.content.decode(\'utf-8\') pattern = r\'window.QRLogin.code = (\d+); window.QRLogin.uuid = "(\S+?)"\' ob = re.search(pattern, target) # 正则提取uuid code = ob.group(1) self.uuid = ob.group(2) if code == \'200\': # 判断请求是否成功 return True return False def showQRImage(self): url = \'https://login.weixin.qq.com/qrcode/\' + self.uuid response = self.session.get(url) self.tip = 1 with open(self.QRImgPath, \'wb\') as f: f.write(response.content) f.close() # 打开二维码 if sys.platform.find(\'darwin\') >= 0: subprocess.call([\'open\', self.QRImgPath]) # 苹果系统 elif sys.platform.find(\'linux\') >= 0: subprocess.call([\'xdg-open\', self.QRImgPath]) # linux系统 else: os.startfile(self.QRImgPath) # windows系统 print(\'请使用微信扫描二维码登录\') def checkLogin(self): url = \'https://login.weixin.qq.com/cgi-bin/mmwebwx-bin/login?tip=%s&uuid=%s&_=%s\' % ( self.tip, self.uuid, int(time.time() * 1000)) response = self.session.get(url) target = response.content.decode(\'utf-8\') pattern = r\'window.code=(\d+);\' ob = re.search(pattern, target) code = ob.group(1) if code == \'201\': # 已扫描 print(\'成功扫描,请在手机上点击确认登录\') self.tip = 0 elif code == \'200\': # 已登录 print(\'正在登录中...\') regx = r\'window.redirect_uri="(\S+?)";\' ob = re.search(regx, target) self.redirect_uri = ob.group(1) + \'&fun=new\' self.base_uri = self.redirect_uri[:self.redirect_uri.rfind(\'/\')] elif code == \'408\': # 超时 pass return code def login(self): response = self.session.get(self.redirect_uri, verify=False) data = response.content.decode(\'utf-8\') doc = xml.dom.minidom.parseString(data) root = doc.documentElement # 提取响应中的参数 for node in root.childNodes: if node.nodeName == \'skey\': self.skey = node.childNodes[0].data elif node.nodeName == \'wxsid\': self.wxsid = node.childNodes[0].data elif node.nodeName == \'wxuin\': self.wxuin = node.childNodes[0].data elif node.nodeName == \'pass_ticket\': self.pass_ticket = node.childNodes[0].data if not all((self.skey, self.wxsid, self.wxuin, self.pass_ticket)): return False self.BaseRequest = { \'Uin\': int(self.wxuin), \'Sid\': self.wxsid, \'Skey\': self.skey, \'DeviceID\': self.deviceId, } return True def webwxinit(self): url = self.base_uri + \ \'/webwxinit?pass_ticket=%s&skey=%s&r=%s\' % ( self.pass_ticket, self.skey, int(time.time() * 1000)) params = { \'BaseRequest\': self.BaseRequest } h = self.headers h[\'ContentType\'] = \'application/json; charset=UTF-8\' response = self.session.post(url, data=json.dumps(params), headers=h, verify=False) data = response.content.decode(\'utf-8\') print(data) dic = json.loads(data) self.ContactList = dic[\'ContactList\'] self.My = dic[\'User\'] SyncKeyList = [] for item in dic[\'SyncKey\'][\'List\']: SyncKeyList.append(\'%s_%s\' % (item[\'Key\'], item[\'Val\'])) self.SyncKey = \'|\'.join(SyncKeyList) ErrMsg = dic[\'BaseResponse\'][\'ErrMsg\'] Ret = dic[\'BaseResponse\'][\'Ret\'] if Ret != 0: return False return True def webwxgetcontact(self): url = self.base_uri + \ \'/webwxgetcontact?pass_ticket=%s&skey=%s&r=%s\' % ( self.pass_ticket, self.skey, int(time.time())) h = self.headers h[\'ContentType\'] = \'application/json; charset=UTF-8\' response = self.session.get(url, headers=h, verify=False) data = response.content.decode(\'utf-8\') # print(data) dic = json.loads(data) MemberList = dic[\'MemberList\'] # 倒序遍历,不然删除的时候出问题.. SpecialUsers = ["newsapp", "fmessage", "filehelper", "weibo", "qqmail", "tmessage", "qmessage", "qqsync", "floatbottle", "lbsapp", "shakeapp", "medianote", "qqfriend", "readerapp", "blogapp", "facebookapp", "masssendapp", "meishiapp", "feedsapp", "voip", "blogappweixin", "weixin", "brandsessionholder", "weixinreminder", "wxid_novlwrv3lqwv11", "gh_22b87fa7cb3c", "officialaccounts", "notification_messages", "wxitil", "userexperience_alarm"] for i in range(len(MemberList) - 1, -1, -1): Member = MemberList[i] if Member[\'VerifyFlag\'] & 8 != 0: # 公众号/服务号 MemberList.remove(Member) elif Member[\'UserName\'] in SpecialUsers: # 特殊账号 MemberList.remove(Member) elif Member[\'UserName\'].find(\'@@\') != -1: # 群聊 MemberList.remove(Member) elif Member[\'UserName\'] == self.My[\'UserName\']: # 自己 MemberList.remove(Member) return MemberList def main(self): if not self.getUUID(): print(\'获取uuid失败\') return self.showQRImage() time.sleep(1) while self.checkLogin() != \'200\': pass os.remove(self.QRImgPath) if not self.login(): print(\'登录失败\') return # 登录完成, 下面查询好友 if not self.webwxinit(): print(\'初始化失败\') return MemberList = self.webwxgetcontact() print(\'通讯录共%s位好友\' % len(MemberList)) for x in MemberList: sex = \'未知\' if x[\'Sex\'] == 0 else \'男\' if x[\'Sex\'] == 1 else \'女\' print(\'昵称:%s, 性别:%s, 备注:%s, 签名:%s\' % (x[\'NickName\'], sex, x[\'RemarkName\'], x[\'Signature\'])) if __name__ == \'__main__\': print(\'开始\') wx = WebwxLogin() wx.main()
5.爬取淘宝固定类别商品信息+保存到mysql数据库【格式很规范】
import requests import re import pymysql def getHTMLtext(url): try: r=requests.get(url,timeout=100) r.raise_for_status() r.encoding=r.apparent_encoding return r.text except: return "" def getpage(itl,html): try: plt=re.findall(r\'"view_price":"[\d.]*"\',html) nlt=re.findall(r\'"raw_title":".*?"\',html) for i in range(len(plt)): price = eval(plt[i].split(\':\')[1]) # eval(fun,obj) title = eval(nlt[i].split(\':\')[1]) itl.append([price, title]) except: print("") def printgoods(itl): tplt = "{:2}\t{:8}\t{:16}" print(tplt.format("序号", "价格", "商品名称")) count = 0 conn = pymysql.connect(host=\'127.0.0.1\', user=\'root\', password=\'123456\', db=\'company\',charset="utf8") cur = conn.cursor() sqlc = \'\'\' create table coffee( id int(11) not null auto_increment primary key, name varchar(255) not null, price float not null)DEFAULT CHARSET=utf8; \'\'\' try: A = cur.execute(sqlc) conn.commit() print(\'成功\') except: print("错误") for g in itl: count = count + 1 b=tplt.format(count, g[0], g[1]) sqla = \'\'\' insert into coffee(name,price) values(%s,%s); \'\'\' try: B = cur.execute(sqla,(g[1],g[0])) conn.commit() print(\'成功\') except: print("错误") # save_path = \'D:/taobao.txt\' # f=open(save_path,\'a\') # # f.write(b+\'\n\') # f.close() conn.commit() cur.close() conn.close() def main(): goods="咖啡" depth =2 start_url=\'https://s.taobao.com/search?q=\'+goods List =[] for i in range(depth): try: url =start_url +"&s="+ str(i*44) html=getHTMLtext(url) getpage(List,html) except: continue print(printgoods(List)) # savefiles(data) main()
6.Ajax请求,模范代码
import requests import logging import json from os import makedirs from os.path import exists # 总页数 TOTAL_PAGE = 2 # 文件存放位置 RESULTS_DIR = \'results\' exists(RESULTS_DIR) or makedirs(RESULTS_DIR) # 日志配置 logging.basicConfig(level=logging.INFO, format=\'%(asctime)s - %(levelname)s: %(message)s\') # 起始url INDEX_URL = \'https://dynamic1.scrape.center/api/movie/?limit={limit}&offset={offset}\' # 页数 LIMIT = 2 # 详情页url DETAIL_URL = \'https://dynamic1.scrape.center/api/movie/{id}\' def scrape_api(url): logging.info(\'scraping %s...\', url) try: response = requests.get(url) if response.status_code == 200: return response.json() logging.error(\'get invalid status code %s while scraping %s\', response.status_code, url) except requests.RequestException: logging.error(\'error occurred while scraping %s\', url, exc_info=True) def scrape_index(page): url = INDEX_URL.format(limit=LIMIT, offset=LIMIT * (page - 1)) return scrape_api(url) def scrape_detail(id): url = DETAIL_URL.format(id=id) return scrape_api(url) def save_data(data): name = data.get(\'name\') data_path = f\'{RESULTS_DIR}/{name}.json\' json.dump(data, open(data_path, \'w\', encoding=\'utf-8\'), ensure_ascii=False, indent=2) def main(): for page in range(1, TOTAL_PAGE + 1): index_data = scrape_index(page) for item in index_data.get(\'results\'): id = item.get(\'id\') detail_data = scrape_detail(id) logging.info(\'detail data %s\', detail_data) save_data(detail_data) if __name__ == \'__main__\': main()