酷狗
代码实现
知道了要爬的标签,写起来就很轻松了,确定流程如下:
1.设置Headers的User-Agent伪装浏览器访问
2.获取酷狗主站的编码格式<meta charset=xxx>,并设置到request中res.encoding,对应的res.text就会重新编码.这样soup在使用lxml解析器时就不会出现乱码了.
3.打开文件kugou_500.txt,设置为写入方式,f.encoding=res.encoding,统一编码格式
4.对23个网页中,每个网页执行一次get_info()获取信息,然后按格式写入到文件kugou_500.txt中.
5.等待爬虫结束,然后查看爬取结果.
from bs4 import BeautifulSoup import requests import time import re headers = { \'User-Agent\': \'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36\' } def get_info(url, file): res = requests.get(url, headers=headers) res.encoding = file.encoding # 同样读取和写入的编码格式 soup = BeautifulSoup(res.text, \'lxml\') ranks = soup.select(\'span.pc_temp_num\') titles = soup.select(\'a.pc_temp_songname\') times = soup.select(\'span.pc_temp_time\') for rank, title, time in zip(ranks, titles, times): data = { \'rank\': rank.get_text().strip(), \'title\': title.get_text().strip(), \'time\': time.get_text().strip() } string="{: <10}{: <30}{: <10}\n".format(data[\'rank\'],data[\'title\'],data[\'time\'])#格式化输出 file.write(string) def get_website_encoding(url): # 一般每个网站自己的网页编码都是一致的,所以只需要搜索一次主页确定 res = requests.get(url, headers=headers) charset = re.search("charset=(.*?)>", res.text) if charset is not None: blocked = [\'\\'\', \' \', \'\"\', \'/\'] filter = [c for c in charset.group(1) if c not in blocked] return \'\'.join(filter) # 修改res编码格式为源网页的格式,防止出现乱码 else: return res.encoding # 没有找到编码格式,返回res的默认编码 if __name__ == \'__main__\': encoding = get_website_encoding(\'http://www.kugou.com\') urls = [\'http://www.kugou.com/yy/rank/home/{}-8888.html?from=rank\'.format(str(i)) for i in range(1, 23)] with open(r\'kugou_500.txt\', \'w\', encoding=encoding) as f: f.write("排名 歌手 歌名 长度\n") for url in urls: get_info(url, f) time.sleep(1) #缓冲一秒,防止请求频率过快
import requests import json header = {\'User-Agent\':\'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.108 Safari/537.36\'} class KgDownLoader(object): def __init__(self): self.search_url = \'https://songsearch.kugou.com/song_search_v2?callback=jQuery1124016889794375320966_1569464279066&keyword={}&page=1&pagesize=30&userid=-1&clientver=&platform=WebFilter&tag=em&filter=2&iscorrection=1&privilege_filter=0&_=1569464279068\' # .format(\'张玮 - 吻得太逼真 (Live)\') self.play_url = \'https://wwwapi.kugou.com/yy/index.php?r=play/getdata&hash={}&mid=dc68c15234b49b8e6ae736862010f747\' self.song_info = { \'歌名\': None, \'演唱者\': None, \'专辑\': None, \'filehash\': None, \'mp3url\': None } def get_search_data(self, keys): search_file = requests.get(self.search_url.format(keys),headers=header) search_html = search_file.text.replace(\')\', \'\').replace(\'jQuery1124016889794375320966_1569464279066(\', \'\') views = json.loads(search_html) for view in views[\'data\'][\'lists\']: song_name = view[\'SongName\'].replace(\'<em>\', \'\').replace(\'</em>\', \'\') album_name = view[\'AlbumName\'].replace(\'<em>\', \'\').replace(\'</em>\', \'\') sing_name = view[\'SingerName\'].replace(\'<em>\', \'\').replace(\'</em>\', \'\') file_hash = view[\'FileHash\'] new_info = { \'歌名\': song_name, \'演唱者\': sing_name, \'专辑\': album_name if album_name else None, \'filehash\': file_hash, \'mp3url\': None } self.song_info.update(new_info) yield self.song_info def get_mp3_url(self, filehash): mp3_file = requests.get(self.play_url.format(filehash),headers=header).text mp3_json = json.loads(mp3_file) real_url = mp3_json[\'data\'][\'play_url\'] self.song_info[\'mp3url\'] = real_url yield self.song_info def save_mp3(self, song_name, real_url): response = requests.get(real_url,headers=header,stream = True).raw.read() #获取一次性二进制文件, with open(song_name+\'.mp3\',\'wb\') as f: f.write(response) # if __name__ == \'__main__\': import time kg = KgDownLoader() mp3_info = kg.get_search_data(input(\'请输入歌名:\')) for x in mp3_info: mp3info = kg.get_mp3_url(x[\'filehash\']) for i in mp3info: print(i) time.sleep(5)
qq音乐
主要使用的库:
- requests 向服务器发起请求
- urllib 构建url地址
- re 提取需要的数据
\'该代码存在bug,有待优化\' import json import requests from urllib import parse from faker import Factory #用于生成随机User-Agent import os from pydub import AudioSegment,playback import shutil import time import threading import inspect import ctypes def _async_raise(tid, exctype): """raises the exception, performs cleanup if needed""" tid = ctypes.c_long(tid) if not inspect.isclass(exctype): exctype = type(exctype) res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) if res == 0: raise ValueError("invalid thread id") elif res != 1: # """if it returns a number greater than one, you\'re in trouble, # and you should call it again with exc=NULL to revert the effect""" ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) raise SystemError("PyThreadState_SetAsyncExc failed") def stop_thread(thread): _async_raise(thread.ident, SystemExit) fake = Factory().create(\'Zh_cn\') user_agent = fake.user_agent() HEADERS = {\'User-Agent\':user_agent} class Player(): def __init__(self): self.file_path = \'\' self.segment = None self.thread = None def play(self,file_path): if not os.path.isfile(file_path): raise FileNotFoundError(\'文件不存在\') if isinstance(self.thread,threading.Thread): stop_thread(self.thread) self.thread = None self.start = 0 self.file_path = file_path self.segment = AudioSegment.from_file(self.file_path) self.thread = threading.Thread(target=playback.play,args=(self.segment,)) self.record_time = time.time() self.thread.start() print(\'正在播放:{}\'.format(os.path.basename(self.file_path))) def pause(self): if isinstance(self.thread,threading.Thread): stop_thread(self.thread) self.start = int((time.time() - self.record_time) * 1000) print(self.thread.is_alive()) self.thread = 0 print(\'暂停播放:{}\'.format(os.path.basename(self.file_path))) def unpause(self): if self.thread == 0: self.thread = threading.Thread(target=playback.play, args=(self.segment[self.start:],)) self.record_time = time.time() self.thread.start() print(\'正在播放:{}\'.format(os.path.basename(self.file_path))) class KuGou(): def __init__(self): self.song_info_list = [] def search_song(self): music_name = input(\'输入歌名:\') search_url = \'https://c.y.qq.com/soso/fcgi-bin/client_search_cp?ct=24&qqmusic_ver=1298&new_json=1&remoteplace=txt.yqq.song&t=0&aggr=1&cr=1&catZhida=1&lossless=0&flag_qc=0&p=1&n=10&w={}&g_tk=5381&loginUin=0&hostUin=0&format=json&inCharset=utf8&outCharset=utf-8¬ice=0&platform=yqq.json&needNewCode=0\'.format(parse.quote(music_name)) responses_json = requests.get(search_url,headers=HEADERS) song_list = json.loads(responses_json.text)[\'data\'][\'song\'][\'list\'] print(\'{:<6}{:<50}{:<50}\'.format(\'序号\',\'歌名\',\'歌手\')) for i,song in enumerate(song_list): info_dict = {} info_dict[\'song_name\'] = song[\'title_hilight\'].replace(\'<em>\',\'\').replace(\'</em>\',\'\') info_dict[\'singer\'] = song[\'singer\'][0][\'name\'] info_dict[\'song_id\'] = song[\'mid\'] print(\'{:<6}{:<50}{:<50}\'.format(i, info_dict[\'song_name\'], info_dict[\'singer\'])) self.song_info_list.append(info_dict) def download_song(self,index,out_dir): get_vkey_url = \'https://u.y.qq.com/cgi-bin/musicu.fcg?g_tk=5381&loginUin=0&hostUin=0&format=json&inCharset=utf8&outCharset=utf-8¬ice=0&platform=yqq.json&needNewCode=0&data=%7B%22req%22%3A%7B%22module%22%3A%22CDN.SrfCdnDispatchServer%22%2C%22method%22%3A%22GetCdnDispatch%22%2C%22param%22%3A%7B%22guid%22%3A%224293542279%22%2C%22calltype%22%3A0%2C%22userip%22%3A%22%22%7D%7D%2C%22req_0%22%3A%7B%22module%22%3A%22vkey.GetVkeyServer%22%2C%22method%22%3A%22CgiGetVkey%22%2C%22param%22%3A%7B%22guid%22%3A%224293542279%22%2C%22songmid%22%3A%5B%22{}%22%5D%2C%22songtype%22%3A%5B0%5D%2C%22uin%22%3A%220%22%2C%22loginflag%22%3A1%2C%22platform%22%3A%2220%22%7D%7D%2C%22comm%22%3A%7B%22uin%22%3A0%2C%22format%22%3A%22json%22%2C%22ct%22%3A24%2C%22cv%22%3A0%7D%7D\' get_vkey = requests.get(get_vkey_url.format(self.song_info_list[index][\'song_id\']),headers=HEADERS) #获取播放地址 purl = json.loads(get_vkey.text)[\'req_0\'][\'data\'][\'midurlinfo\'][0][\'purl\'] if purl: song_url = \'http://isure.stream.qqmusic.qq.com/\'+purl response = requests.get(song_url,headers=HEADERS, stream=True).raw.read() with open(out_dir+\'/{}.mp3\'.format(\'{}-{}\'.format(self.song_info_list[index][\'song_name\'],self.song_info_list[index][\'singer\'])), \'wb\') as file: file.write(response) else: print(\'此歌曲为付费歌曲,只允许在客户端播放或下载\') def play_path(self,index,file_dir): if not os.path.exists(file_dir): os.makedirs(file_dir) self.download_song(index,file_dir) file = os.path.join(file_dir,\'{}.mp3\'.format(\'{}-{}\'.format(self.song_info_list[index][\'song_name\'],self.song_info_list[index][\'singer\']))) print(file) return file if __name__ == \'__main__\': player = Player() isplay = False kg = KuGou() while True: file_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), \'temp\') kg.search_song() print(\'输入序号:可播放对应的歌曲\n输入 -1:搜索歌曲\n输入 -2:下载歌曲\n输入 -3:播放\n输入 -4:暂停\n输入 -5:退出程序\') while True: try: play_index = int(float(input(\'输入序号:\'))) if play_index == -1: break elif play_index == -2: if kg.song_info_list: output_dir = input(\'保存目录:\') if os.path.exists(output_dir): songid = play_index = int(float(input(\'输入序号:\'))) if 0 <= songid <= len(kg.song_info_list) - 1: kg.download_song(songid, output_dir) else: print(\'您输入的序号没有对应的歌曲\') else: print(\'目录不存在\') elif 0 <= play_index <= len(kg.song_info_list) - 1 or play_index in [-3,-4,-5]: if play_index not in [-3,-4,-5]: try: player.play(kg.play_path(play_index, file_dir)) except BaseException: pass isplay =True elif play_index == -3 and not isplay: player.unpause() isplay =True elif play_index == -4 and isplay: player.pause() isplay = False elif play_index == -5: if isinstance(player.thread, threading.Thread): stop_thread(player.thread) break else: print(\'输入的命令不符合要求\') except BaseException: print(\'输入的命令不符合要求\') exit() if play_index == -5: break if os.path.exists(file_dir): shutil.rmtree(file_dir)
vip视频爬取
import json import re import requests from faker import Factory #用于生成随机User-Agent fake = Factory().create(\'Zh_cn\') user_agent = fake.user_agent() headers = {\'User-Agent\':user_agent} def get_play_url(): url = input(\'该接口支持腾讯视频,优酷视频,部分B站视频请输入:\n\') r = requests.get(url="https://p2p.1616jx.com/api/api.php?url=%s" % url.strip(),headers = headers) print(r.text.strip(\'();\')) data = json.loads(r.text.strip(\'();\')) print(len(data[\'info\'][0][\'video\'])) if r.status_code == 200: if data.get(\'success\') == 1: # print(\'解析成功\') title = data.get(\'title\') # print(\'请在浏览器中打开网址\') current_url = data.get(\'url\') # print(\'********************源地址****************************\n\') info = data.get(\'info\') result_list = [] if isinstance(info,list): video_dict = info[0] if isinstance(video_dict,dict): video_list = video_dict.get(\'video\') for video in video_list: result_list += re.findall(r\'(.*?)\$(http.*?)\$\',video) return title,current_url,result_list else: print(\'解析失败,或者这视频站点不支持解析\') def creat_play_html(): main_doc =\'<!DOCTYPE html>\' \ \'<html lang="en">\' \ \'<head>\' \ \'<meta charset="UTF-8"><title>{}</title>\' \ \'</head><body>\' \ \'<div style="background-color: black;height: auto;width: 980px;">\' \ \'<div style="width: 980px;height: 120px;background-color: blue;"></div>\' \ \'<div style="height: 600px;width: 980px;">\' \ \'<video id="video" class="vjs-tech" width="100%" height="100%" controls="controls" x-webkit-airplay="true" x5-video-player-fullscreen="true" preload="auto" playsinline="true" webkit-playsinline x5-video-player-typ="h5">\' \ \'<source type="application/x-mpegURL" src="{}">\' \ \'</video></div>\' \ \'<div style="width: 980px;height: 120px;background-color: blue;"></div>\' \ \'<div style="width: 980px;height: auto;">{}</div></div>\' \ \'<script type="text/javascript">var video = document.getElementById("video");{}\n{}</script></body></html>\' show_list = \'<div id="li%s" style="background-color: #cfcfcf;height: 100px;width: 100px; float:left; margin:11px;font-size:12px;line-height:100px;text-align: center;">%s</div>\' get_element = \'var li%s = document.getElementById("li%s");\' click = \'li%s.onclick = function(){video.src = "%s";video.play();}\n\' all_show = \'\' all_get = \'\' all_click = \'\' get_data = get_play_url() print(get_data) for i,e in enumerate(get_data[2]): all_show += show_list%(i,e[0]) all_get += get_element%(i,i) all_click += click%(i,e[1]) print(get_data[0]) print(get_data[1]) return main_doc.format(get_data[0],get_data[1],all_show,all_get,all_click) if __name__ == \'__main__\': # with open(\'test.html\',\'w\',encoding=\'utf-8\') as f: # f.write(creat_play_html()) print(get_play_url())