us-wjz

酷狗

代码实现
知道了要爬的标签,写起来就很轻松了,确定流程如下:
1.设置Headers的User-Agent伪装浏览器访问
2.获取酷狗主站的编码格式<meta charset=xxx>,并设置到request中res.encoding,对应的res.text就会重新编码.这样soup在使用lxml解析器时就不会出现乱码了.
3.打开文件kugou_500.txt,设置为写入方式,f.encoding=res.encoding,统一编码格式
4.对23个网页中,每个网页执行一次get_info()获取信息,然后按格式写入到文件kugou_500.txt中.
5.等待爬虫结束,然后查看爬取结果.

from bs4 import BeautifulSoup
import requests
import time
import re

headers = {
    \'User-Agent\': \'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36\'
}

def get_info(url, file):
    res = requests.get(url, headers=headers)
    res.encoding = file.encoding  # 同样读取和写入的编码格式
    soup = BeautifulSoup(res.text, \'lxml\')
    ranks = soup.select(\'span.pc_temp_num\')
    titles = soup.select(\'a.pc_temp_songname\')
    times = soup.select(\'span.pc_temp_time\')
    for rank, title, time in zip(ranks, titles, times):
        data = {
            \'rank\': rank.get_text().strip(),
            \'title\': title.get_text().strip(),
            \'time\': time.get_text().strip()
        }
        string="{: <10}{: <30}{: <10}\n".format(data[\'rank\'],data[\'title\'],data[\'time\'])#格式化输出
        file.write(string)


def get_website_encoding(url):  # 一般每个网站自己的网页编码都是一致的,所以只需要搜索一次主页确定
    res = requests.get(url, headers=headers)
    charset = re.search("charset=(.*?)>", res.text)
    if charset is not None:
        blocked = [\'\\'\', \' \', \'\"\', \'/\']
        filter = [c for c in charset.group(1) if c not in blocked]
        return \'\'.join(filter)  # 修改res编码格式为源网页的格式,防止出现乱码
    else:
        return res.encoding  # 没有找到编码格式,返回res的默认编码


if __name__ == \'__main__\':
    encoding = get_website_encoding(\'http://www.kugou.com\')
    urls = [\'http://www.kugou.com/yy/rank/home/{}-8888.html?from=rank\'.format(str(i)) for i in range(1, 23)]
    with open(r\'kugou_500.txt\', \'w\', encoding=encoding) as f:
           f.write("排名      歌手         歌名          长度\n")
           for url in urls:
                  get_info(url, f)
                  time.sleep(1) #缓冲一秒,防止请求频率过快
View Code
import requests
import json
header = {\'User-Agent\':\'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.108 Safari/537.36\'}
class KgDownLoader(object):
    def __init__(self):
        self.search_url = \'https://songsearch.kugou.com/song_search_v2?callback=jQuery1124016889794375320966_1569464279066&keyword={}&page=1&pagesize=30&userid=-1&clientver=&platform=WebFilter&tag=em&filter=2&iscorrection=1&privilege_filter=0&_=1569464279068\'
        # .format(\'张玮 - 吻得太逼真 (Live)\')
        self.play_url = \'https://wwwapi.kugou.com/yy/index.php?r=play/getdata&hash={}&mid=dc68c15234b49b8e6ae736862010f747\'
        self.song_info = {
                \'歌名\': None,
                \'演唱者\': None,
                \'专辑\': None,
                \'filehash\': None,
                \'mp3url\': None
            }

    def get_search_data(self, keys):
        search_file = requests.get(self.search_url.format(keys),headers=header)
        search_html = search_file.text.replace(\')\', \'\').replace(\'jQuery1124016889794375320966_1569464279066(\', \'\')
        views = json.loads(search_html)
        for view in views[\'data\'][\'lists\']:
            song_name = view[\'SongName\'].replace(\'<em>\', \'\').replace(\'</em>\', \'\')
            album_name = view[\'AlbumName\'].replace(\'<em>\', \'\').replace(\'</em>\', \'\')
            sing_name = view[\'SingerName\'].replace(\'<em>\', \'\').replace(\'</em>\', \'\')
            file_hash = view[\'FileHash\']
            new_info = {
                \'歌名\': song_name,
                \'演唱者\': sing_name,
                \'专辑\': album_name if album_name else None,
                \'filehash\': file_hash,
                \'mp3url\': None
            }
            self.song_info.update(new_info)
            yield self.song_info

    def get_mp3_url(self, filehash):
        mp3_file = requests.get(self.play_url.format(filehash),headers=header).text
        mp3_json = json.loads(mp3_file)
        real_url = mp3_json[\'data\'][\'play_url\']
        self.song_info[\'mp3url\'] = real_url
        yield self.song_info

    def save_mp3(self, song_name, real_url):
        response = requests.get(real_url,headers=header,stream = True).raw.read()  #获取一次性二进制文件,
        with open(song_name+\'.mp3\',\'wb\') as f:
            f.write(response)
#
if __name__ == \'__main__\':
    import time
    kg = KgDownLoader()
    mp3_info = kg.get_search_data(input(\'请输入歌名:\'))
    for x in mp3_info:
        mp3info = kg.get_mp3_url(x[\'filehash\'])
        for i in mp3info:
            print(i)
            time.sleep(5)
View Code

qq音乐

主要使用的库: 
requests 向服务器发起请求 
urllib 构建url地址 
re 提取需要的数据

\'该代码存在bug,有待优化\'
import json
import requests
from urllib import parse
from faker import Factory   #用于生成随机User-Agent
import os
from pydub import AudioSegment,playback
import shutil
import time
import threading
import inspect
import ctypes


def _async_raise(tid, exctype):
    """raises the exception, performs cleanup if needed"""
    tid = ctypes.c_long(tid)
    if not inspect.isclass(exctype):
        exctype = type(exctype)
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
    if res == 0:
        raise ValueError("invalid thread id")
    elif res != 1:
        # """if it returns a number greater than one, you\'re in trouble,
        # and you should call it again with exc=NULL to revert the effect"""
        ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
        raise SystemError("PyThreadState_SetAsyncExc failed")

def stop_thread(thread):
    _async_raise(thread.ident, SystemExit)

fake = Factory().create(\'Zh_cn\')
user_agent = fake.user_agent()
HEADERS = {\'User-Agent\':user_agent}


class Player():
    def __init__(self):
        self.file_path = \'\'
        self.segment = None
        self.thread = None

    def play(self,file_path):
        if not os.path.isfile(file_path):
            raise FileNotFoundError(\'文件不存在\')
        if isinstance(self.thread,threading.Thread):
            stop_thread(self.thread)
            self.thread = None
        self.start = 0
        self.file_path = file_path
        self.segment = AudioSegment.from_file(self.file_path)
        self.thread = threading.Thread(target=playback.play,args=(self.segment,))
        self.record_time = time.time()
        self.thread.start()
        print(\'正在播放:{}\'.format(os.path.basename(self.file_path)))

    def pause(self):
        if isinstance(self.thread,threading.Thread):
            stop_thread(self.thread)
            self.start = int((time.time() - self.record_time) * 1000)
            print(self.thread.is_alive())
            self.thread = 0
            print(\'暂停播放:{}\'.format(os.path.basename(self.file_path)))

    def unpause(self):
        if self.thread == 0:
            self.thread = threading.Thread(target=playback.play, args=(self.segment[self.start:],))
            self.record_time = time.time()
            self.thread.start()
            print(\'正在播放:{}\'.format(os.path.basename(self.file_path)))

class KuGou():
    def __init__(self):
        self.song_info_list = []
    def search_song(self):
        music_name = input(\'输入歌名:\')
        search_url = \'https://c.y.qq.com/soso/fcgi-bin/client_search_cp?ct=24&qqmusic_ver=1298&new_json=1&remoteplace=txt.yqq.song&t=0&aggr=1&cr=1&catZhida=1&lossless=0&flag_qc=0&p=1&n=10&w={}&g_tk=5381&loginUin=0&hostUin=0&format=json&inCharset=utf8&outCharset=utf-8&notice=0&platform=yqq.json&needNewCode=0\'.format(parse.quote(music_name))
        responses_json = requests.get(search_url,headers=HEADERS)
        song_list = json.loads(responses_json.text)[\'data\'][\'song\'][\'list\']
        print(\'{:<6}{:<50}{:<50}\'.format(\'序号\',\'歌名\',\'歌手\'))
        for i,song in enumerate(song_list):
            info_dict = {}
            info_dict[\'song_name\'] = song[\'title_hilight\'].replace(\'<em>\',\'\').replace(\'</em>\',\'\')
            info_dict[\'singer\'] = song[\'singer\'][0][\'name\']
            info_dict[\'song_id\'] = song[\'mid\']
            print(\'{:<6}{:<50}{:<50}\'.format(i, info_dict[\'song_name\'], info_dict[\'singer\']))
            self.song_info_list.append(info_dict)

    def download_song(self,index,out_dir):
        get_vkey_url = \'https://u.y.qq.com/cgi-bin/musicu.fcg?g_tk=5381&loginUin=0&hostUin=0&format=json&inCharset=utf8&outCharset=utf-8&notice=0&platform=yqq.json&needNewCode=0&data=%7B%22req%22%3A%7B%22module%22%3A%22CDN.SrfCdnDispatchServer%22%2C%22method%22%3A%22GetCdnDispatch%22%2C%22param%22%3A%7B%22guid%22%3A%224293542279%22%2C%22calltype%22%3A0%2C%22userip%22%3A%22%22%7D%7D%2C%22req_0%22%3A%7B%22module%22%3A%22vkey.GetVkeyServer%22%2C%22method%22%3A%22CgiGetVkey%22%2C%22param%22%3A%7B%22guid%22%3A%224293542279%22%2C%22songmid%22%3A%5B%22{}%22%5D%2C%22songtype%22%3A%5B0%5D%2C%22uin%22%3A%220%22%2C%22loginflag%22%3A1%2C%22platform%22%3A%2220%22%7D%7D%2C%22comm%22%3A%7B%22uin%22%3A0%2C%22format%22%3A%22json%22%2C%22ct%22%3A24%2C%22cv%22%3A0%7D%7D\'
        get_vkey = requests.get(get_vkey_url.format(self.song_info_list[index][\'song_id\']),headers=HEADERS)
        #获取播放地址
        purl = json.loads(get_vkey.text)[\'req_0\'][\'data\'][\'midurlinfo\'][0][\'purl\']
        if purl:
            song_url = \'http://isure.stream.qqmusic.qq.com/\'+purl
            response = requests.get(song_url,headers=HEADERS, stream=True).raw.read()
            with open(out_dir+\'/{}.mp3\'.format(\'{}-{}\'.format(self.song_info_list[index][\'song_name\'],self.song_info_list[index][\'singer\'])), \'wb\') as file:
                file.write(response)
        else:
            print(\'此歌曲为付费歌曲,只允许在客户端播放或下载\')

    def play_path(self,index,file_dir):
        if not os.path.exists(file_dir):
            os.makedirs(file_dir)
        self.download_song(index,file_dir)
        file = os.path.join(file_dir,\'{}.mp3\'.format(\'{}-{}\'.format(self.song_info_list[index][\'song_name\'],self.song_info_list[index][\'singer\'])))
        print(file)
        return file



if __name__ == \'__main__\':
    player = Player()
    isplay = False
    kg = KuGou()
    while True:
        file_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), \'temp\')
        kg.search_song()
        print(\'输入序号:可播放对应的歌曲\n输入 -1:搜索歌曲\n输入 -2:下载歌曲\n输入 -3:播放\n输入 -4:暂停\n输入 -5:退出程序\')
        while True:
            try:
                play_index = int(float(input(\'输入序号:\')))
                if play_index == -1:
                    break
                elif play_index == -2:
                    if kg.song_info_list:
                        output_dir = input(\'保存目录:\')
                        if os.path.exists(output_dir):
                            songid = play_index = int(float(input(\'输入序号:\')))
                            if 0 <= songid <= len(kg.song_info_list) - 1:
                                kg.download_song(songid, output_dir)
                            else:
                                print(\'您输入的序号没有对应的歌曲\')
                        else:
                            print(\'目录不存在\')
                elif  0 <= play_index <= len(kg.song_info_list) - 1 or play_index in [-3,-4,-5]:
                    if play_index not in [-3,-4,-5]:
                        try:
                            player.play(kg.play_path(play_index, file_dir))
                        except BaseException:
                            pass
                        isplay =True
                    elif play_index == -3 and not isplay:
                        player.unpause()
                        isplay =True
                    elif play_index == -4 and isplay:
                        player.pause()
                        isplay = False
                    elif play_index == -5:
                        if isinstance(player.thread, threading.Thread):
                            stop_thread(player.thread)
                        break
                else:
                    print(\'输入的命令不符合要求\')
            except BaseException:
                print(\'输入的命令不符合要求\')
                exit()
        if play_index == -5:
            break
    if os.path.exists(file_dir):
        shutil.rmtree(file_dir)
View Code

vip视频爬取

import json
import re
import requests
from faker import Factory   #用于生成随机User-Agent

fake = Factory().create(\'Zh_cn\')
user_agent = fake.user_agent()
headers = {\'User-Agent\':user_agent}
def get_play_url():
    url = input(\'该接口支持腾讯视频,优酷视频,部分B站视频请输入:\n\')
    r = requests.get(url="https://p2p.1616jx.com/api/api.php?url=%s" % url.strip(),headers = headers)
    print(r.text.strip(\'();\'))
    data = json.loads(r.text.strip(\'();\'))
    print(len(data[\'info\'][0][\'video\']))
    if r.status_code == 200:
        if data.get(\'success\') == 1:
            # print(\'解析成功\')
            title = data.get(\'title\')
            # print(\'请在浏览器中打开网址\')
            current_url = data.get(\'url\')
            # print(\'********************源地址****************************\n\')
            info = data.get(\'info\')
            result_list = []
            if isinstance(info,list):
                video_dict = info[0]
                if isinstance(video_dict,dict):
                    video_list = video_dict.get(\'video\')
                    for video in video_list:
                        result_list += re.findall(r\'(.*?)\$(http.*?)\$\',video)
            return title,current_url,result_list
    else:
        print(\'解析失败,或者这视频站点不支持解析\')

def creat_play_html():
    main_doc =\'<!DOCTYPE html>\' \
              \'<html lang="en">\' \
              \'<head>\' \
              \'<meta charset="UTF-8"><title>{}</title>\' \
              \'</head><body>\' \
              \'<div style="background-color: black;height: auto;width: 980px;">\' \
              \'<div style="width: 980px;height: 120px;background-color: blue;"></div>\' \
              \'<div style="height: 600px;width: 980px;">\' \
              \'<video id="video" class="vjs-tech" width="100%" height="100%" controls="controls" x-webkit-airplay="true" x5-video-player-fullscreen="true" preload="auto" playsinline="true" webkit-playsinline x5-video-player-typ="h5">\' \
              \'<source type="application/x-mpegURL" src="{}">\' \
              \'</video></div>\' \
              \'<div style="width: 980px;height: 120px;background-color: blue;"></div>\' \
              \'<div style="width: 980px;height: auto;">{}</div></div>\' \
              \'<script type="text/javascript">var video = document.getElementById("video");{}\n{}</script></body></html>\'
    show_list = \'<div id="li%s" style="background-color: #cfcfcf;height: 100px;width: 100px; float:left; margin:11px;font-size:12px;line-height:100px;text-align: center;">%s</div>\'
    get_element = \'var li%s = document.getElementById("li%s");\'
    click = \'li%s.onclick = function(){video.src = "%s";video.play();}\n\'
    all_show = \'\'
    all_get = \'\'
    all_click = \'\'
    get_data = get_play_url()
    print(get_data)
    for i,e in enumerate(get_data[2]):
        all_show += show_list%(i,e[0])
        all_get += get_element%(i,i)
        all_click += click%(i,e[1])
    print(get_data[0])
    print(get_data[1])
    return main_doc.format(get_data[0],get_data[1],all_show,all_get,all_click)

if __name__ == \'__main__\':
    # with open(\'test.html\',\'w\',encoding=\'utf-8\') as f:
    #     f.write(creat_play_html())
    print(get_play_url())

 

分类:

技术点:

相关文章:

  • 2022-02-23
  • 2021-11-17
  • 2021-12-16
  • 2021-11-26
  • 2021-11-15
  • 2021-11-20
  • 2021-09-28
猜你喜欢
  • 2022-02-07
  • 2022-12-23
  • 2021-12-05
  • 2021-08-26
  • 2021-10-18
  • 2021-12-19
  • 2021-12-12
相关资源
相似解决方案