Cristy 发表于 2023-1-11 12:28

【Pyton】 根据关键词爬取音乐网音乐

本帖最后由 Cristy 于 2023-1-12 00:38 编辑

该功能块会实现搜索、下载某音乐网站的功能。可以当做一个便捷下载器使用。
输入的关键字会自动创建文件夹,将下载好的歌曲放入对应的文件夹中。

执行效果:




本机使用的python3.9


目前有两个版本都有点小问题,后续会慢慢调整。
初始版本:
    问题:单线程下载速度慢。
新版本:
    多线程下载速度快。
    问题:目前下载一个关键字没有问题。
    但是多个关键字的歌曲会出现问题。获取到多个关键字的所有歌曲后,会下载多次放到各个目录下。
    比如设定2个关键字A、B,每个下10首,则会出现A的10首歌即出现在A目录下也出现在B目录下,B的10首也是。会下载多次




源代码


# 初始版本
# 下载MP3文件到本地
import os
import requests
import json
import urllib.parse

#---------启动配置区----------

keyNames = ["张国荣"]
save_path = "D:/Music"
# 每个关键字下载多少首   每个关键词下载 pageNum * 10首
pageNum = 1


# 单线程下载
def DownloadFile(mp3_url, save_url, file_name):
    try:
      if mp3_url is None or save_url is None or file_name is None:
            print('参数错误')
            return None
      # 文件夹不存在,则创建文件夹
      folder = os.path.exists(save_url)
      if not folder:
            os.makedirs(save_url)
      # 读取MP3资源
      res = requests.get(mp3_url, stream=True)
      # 获取文件地址
      file_path = os.path.join(save_url, file_name)
      print('开始写入文件:', file_path)
      # 打开本地文件夹路径file_path,以二进制流方式写入,保存到本地
      with open(file_path, 'wb') as fd:
            for chunk in res.iter_content():
                fd.write(chunk)
      print(file_name + ' 成功下载!')
    except:
      print("下载程序错误!")


def downLoadMp3Files(url, save_url, title, author):
    file_name = title + "-" + author + ".mp3"
    DownloadFile(url, save_url, file_name)


def getPageList(author, pageNum):
    url = "https://www.jbsou.cn/"
    files = [
    ]
    headers = {
      'authority': 'www.jbsou.cn',
      'cookie': '__51cke__=; __51uvsct__JPfsNuTH3tFBO3If=1; __51vcke__JPfsNuTH3tFBO3If=7ce649a4-2f3a-5f38-a46e-c0ba70372790; __51vuft__JPfsNuTH3tFBO3If=1657457772373; Hm_lvt_76749151b22324218af38671cb634aaa=1657457772; __gads=ID=9569f23632163ae7-22f1a6fda4d30046:T=1657457774:RT=1657457774:S=ALNI_MaiKWrecyIGhnohZrRQXzpbNmAhWw; __gpi=UID=00000641ebdfbb03:T=1657457774:RT=1657457774:S=ALNI_MZ77MogL7uZoU0-4rFToiZw2Tno4Q; __tins__19428461={"sid": 1657457772361, "vd": 3, "expires": 1657459620332}; __51laig__=3; __vtins__JPfsNuTH3tFBO3If={"sid": "497a83dc-a208-56b0-b362-e29c476ca102", "vd": 3, "stt": 47972, "dr": 9098, "expires": 1657459620342, "ct": 1657457820342}; Hm_lpvt_76749151b22324218af38671cb634aaa=1657457820',
      'origin': 'https://www.jbsou.cn',
      'referer': 'https://www.jbsou.cn/?name=' + urllib.parse.quote(author.encode('gb2312')) + '&type=netease',
      'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36',
      'x-requested-with': 'XMLHttpRequest'
    }
    resultList = []
    for page in range(pageNum):
      payload = {'input': author,
                   'filter': 'name',
                   'type': 'netease',
                   'page': str(page + 1)}
      response = requests.request("POST", url, headers=headers, data=payload, files=files)
      try:
            songList = json.loads(response.text)["data"]
            for i in songList:
                tempDict = {}
                tempDict["url"] = i["url"]
                tempDict["author"] = i["author"]
                tempDict["title"] = i["title"]
                resultList.append(tempDict)
            # print("——【{}】歌曲第{}页信息获取如下——\n{}".format(author,page,resultList))
            print("第{}页一共有{}首歌".format(page + 1, len(songList)))

      except:
            print("获取歌曲信息错误!")
    return resultList


for keyName in keyNames:
    save_url = save_path + "/" + keyName + "/"
    # 获取所有页面歌曲信息
    mp3pageList = getPageList(keyName, pageNum)
    print("——【{}】获取下载信息完毕,共有{}首歌曲".format(keyName, len(mp3pageList)))
    # 打印所有歌曲信息
    for i in mp3pageList:
      print(i)

    # 下载歌曲
    for i in mp3pageList:
      # 单线程下载
      downLoadMp3Files(i["url"], save_url, i["title"], i["author"])
    print("——【{}】所有歌曲下载完毕,共有{}首歌曲!!——".format(keyName, len(mp3pageList)))



# 新版
# -*- coding: UTF-8 -*-

# 多线程相关
from __future__ import annotations
# 用于显示进度条
import random

from tqdm import tqdm
# 用于发起网络请求
import requests
# 用于多线程操作
import multitasking
import signal
# 导入 retry 库以方便进行下载出错重试
from retry import retry

# 常用导入
# 下载MP3文件到本地
import time
import json
import urllib.parse
import os
from multiprocessing.dummy import Pool as ThreadPool# 线程池
from sys import stdout

"""
    配置区域
"""
# 存储路径
save_path = "G:/"
# 关键字
keyNames = ["小虎队", "邓丽君", "郭富城", "刘德华", "张学友"]

waittime = 0# 每次下载等待时间

# 每个关键字下载多少首   每个关键词下载 songNum * 10首
songNum = 5
# 从第一页开始下载
start_page = 1

# 是否使用多线程
use_multhread = True
# 是否使用分块下载
use_block = False
totalThread = 8# 多线程数量

############################################################
# 公用变量
resultListMul = []

user_agent_list = [
    "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; …) Gecko/20100101 Firefox/61.0",
    "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.186 Safari/537.36",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.62 Safari/537.36",
    "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36",
    "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0)",
    "Mozilla/5.0 (Macintosh; U; PPC Mac OS X 10.5; en-US; rv:1.9.2.15) Gecko/20110303 Firefox/3.6.15",
]


# 此处为多线程处理事件
def doingFunction(i):
    downLoadMp3Files(i["url"], save_url, i["title"], i["author"])


# 多线程工具
def mulTool(totalThread, inputList, function):
    """
    :param totalThread: 线程数
    :param inputList: 传入的参数列表(列表里依次存放每一次处理的参数)
    :param function: 每个线程处理的函数名
    :return:
    """

    pool = ThreadPool(totalThread)# 创建一个包含 totalThread 个线程的线程池
    pool.map(function, inputList)
    pool.close()# 关闭线程池的写入
    pool.join()# 阻塞,保证子线程运行完毕后再继续主进程
    print("main func end")


# 下载文件方法
def DownloadFile(mp3_url, save_url, file_name, otherFuncUse=0):
    try:
      if mp3_url is None or save_url is None or file_name is None:
            print('参数错误')
            return None

      # 读取MP3资源
      res = requests.get(mp3_url, stream=True)
      if otherFuncUse != 0:
            if "Location" not in res.headers or "404" in res.headers["Location"] or "Content-Length" not in res.headers:
                print("文件名:{} - url:{}获取服务器端文件为空,不做处理!".format(file_name, mp3_url))
                return
      try:
            if "很抱歉,你要查找的网页找不到" in res.content.decode('utf-8'):
                print("文件名:{} - url:{}页面404,不做处理!".format(file_name, mp3_url))
                return
      except:
            None
      # 获取文件地址
      file_path = os.path.join(save_url, file_name)
      print('开始写入文件:', file_path)

      if "Content-Length" in res.headers:
            filesize = res.headers["Content-Length"]
      else:
            filesize = len(res.content)
      with open(file_path, "wb") as f:
            f.write(res.content)
            chunk_size = 128
            times = int(filesize) // chunk_size
            show = 1 / times
            show2 = 1 / times
            start = 1

            for chunk in res.iter_content(chunk_size):
                f.write(chunk)
                if start <= times:
                  stdout.write(f"{file_name}-下载进度: {show:.2%}\n")
                  start += 1
                  show += show2
                else:
                  stdout.write("下载进度: 100%")
      print(file_name + ' 成功下载!')
      time.sleep(waittime)
      print("开始等待!")
    except Exception as e:
      print("filename:{} - url:{} - 下载程序错误!{}".format(file_name, mp3_url, e))


# ----多线程下载工具----
# https://zhuanlan.zhihu.com/p/369531344

signal.signal(signal.SIGINT, multitasking.killall)
B = 1024
headers = {
    'User-Agent': random.choice(user_agent_list)
}


def split(start: int, end: int, step: int, total: int) -> list]:
    # 分多块
    parts = [(start, min(start + step, total)) for start in range(0, total, step)]
    return parts


def get_file_size(url: str, raise_error: bool = False) -> int:
    '''
    获取文件大小

    Parameters
    ----------
    url : 文件直链
    raise_error : 如果无法获取文件大小,是否引发错误

    Return
    ------
    文件大小(B为单位)
    如果不支持则会报错

    '''

    # allow_redirects=True 自动处理302
    # response = requests.head(url, allow_redirects=True)
    session = requests.session()
    while (1):
      try:
            response = session.get(url, headers=headers)
            if "404" in url or ("Location" in response.headers and "404" in response.headers["Location"]):
                return None
            if (response.status_code == requests.codes.ok):
                break
            elif (response.status_code == 302 or response.status_code == 301):
                print(url + " --重定向到--> " + response.headers["Location"])
                url = response.headers["Location"]
            else:
                print("出问题啦!等待3秒继续!", response.status_code)
                time.sleep(5)
      except Exception as e:
            print("出问题啦!等待3秒继续!", e)
            if '404' in e:
                return None
            time.sleep(3)

    file_size = response.headers.get('Content-Length')
    if file_size is None:
      if raise_error is True:
            raise ValueError('该文件不支持多线程分段下载!')
      return file_size
    return int(file_size)


def downloadMulTool(save_url: str, url: str, file_name: str, retry_times: int = 3, each_size=1000 * B) -> None:
    '''
    根据文件直链和文件名下载文件

    Parameters
    ----------
    save_url: 存储路径
    url : 文件直链
    file_name : 文件名
    retry_times: 可选的,每次连接失败重试次数
    Return
    ------
    None

    '''
    file_path = os.path.join(save_url, file_name)
    for i in range(5):
      file_size = get_file_size(url)
      if file_size is None or file_size <= 0:
            continue
      else:
            break

    if i >= 4 and (file_size is None or file_size <= 0):
      # print("{}--查询方法显示该文件为空,尝试使用单线程方法下载".format(file_name))
      # DownloadFile(url, save_url, file_name, 1)
      return

    f = open(file_path, 'wb')

    @retry(tries=retry_times)
    @multitasking.task
    def start_download(start: int, end: int) -> None:
      '''
      根据文件起止位置下载文件

      Parameters
      ----------
      start : 开始位置
      end : 结束位置
      '''
      _headers = headers.copy()
      # 分段下载的核心
      _headers['Range'] = f'bytes={start}-{end}'
      # 发起请求并获取响应(流式)
      response = session.get(url, headers=_headers, stream=True)
      # 每次读取的流式响应大小
      chunk_size = 128
      # 暂存已获取的响应,后续循环写入
      chunks = []
      for chunk in response.iter_content(chunk_size=chunk_size):
            # 暂存获取的响应
            chunks.append(chunk)
            # 更新进度条
            bar.update(chunk_size)
      f.seek(start)
      for chunk in chunks:
            f.write(chunk)
      # 释放已写入的资源
      del chunks

    session = requests.Session()
    # 分块文件如果比文件大,就取文件大小为分块大小
    each_size = min(each_size, file_size)
    print("文件名:{} -- 文件大小:{} -- 分块大小:{}".format(file_name, file_size, each_size))

    # 分块
    parts = split(0, file_size, each_size, file_size)
    print(f'分块数:{len(parts)}')
    # 创建进度条
    bar = tqdm(total=file_size, desc=f'下载文件:{file_name}')
    for part in parts:
      start, end = part
      start_download(start, end)
    # 等待全部线程结束
    multitasking.wait_for_tasks()
    f.close()
    bar.close()


# ----多线程下载工具 end ----

def downLoadMp3Files(url, save_url, title, author):
    # 文件夹不存在,则创建文件夹
    folder = os.path.exists(save_url)
    if not folder:
      try:
            os.makedirs(save_url)
      except:
            print("{}创建失败!".format(folder))

    file_name = title + "-" + author + ".mp3"
    if use_block:
      downloadMulTool(save_url, url, file_name)
    else:
      DownloadFile(url, save_url, file_name)


# 单线程获取列表
def getPageList(author, pageNum):
    url = "https://www.jbsou.cn/"
    files = [
    ]
    headers = {
      'authority': 'www.jbsou.cn',
      'cookie': '__51cke__=; __51uvsct__JPfsNuTH3tFBO3If=1; __51vcke__JPfsNuTH3tFBO3If=7ce649a4-2f3a-5f38-a46e-c0ba70372790; __51vuft__JPfsNuTH3tFBO3If=1657457772373; Hm_lvt_76749151b22324218af38671cb634aaa=1657457772; __gads=ID=9569f23632163ae7-22f1a6fda4d30046:T=1657457774:RT=1657457774:S=ALNI_MaiKWrecyIGhnohZrRQXzpbNmAhWw; __gpi=UID=00000641ebdfbb03:T=1657457774:RT=1657457774:S=ALNI_MZ77MogL7uZoU0-4rFToiZw2Tno4Q; __tins__19428461={"sid": 1657457772361, "vd": 3, "expires": 1657459620332}; __51laig__=3; __vtins__JPfsNuTH3tFBO3If={"sid": "497a83dc-a208-56b0-b362-e29c476ca102", "vd": 3, "stt": 47972, "dr": 9098, "expires": 1657459620342, "ct": 1657457820342}; Hm_lpvt_76749151b22324218af38671cb634aaa=1657457820',
      'origin': 'https://www.jbsou.cn',
      'referer': 'https://www.jbsou.cn/?name=' + urllib.parse.quote(author.encode('gb2312')) + '&type=netease',
      'User-Agent': random.choice(user_agent_list),
      'x-requested-with': 'XMLHttpRequest'
    }
    resultList = []
    for page in range(pageNum):
      payload = {'input': author,
                   'filter': 'name',
                   'type': 'netease',
                   'page': str(page + 1)}
      temp = 0
      nowpageSongNum = 0
      while 1:
            try:
                response = requests.request("POST", url, headers=headers, data=payload, files=files)
                songList = json.loads(response.text)["data"]
                for i in songList:
                  tempDict = {}
                  tempDict["url"] = i["url"]
                  tempDict["author"] = i["author"]
                  tempDict["title"] = i["title"]
                  resultList.append(tempDict)
                # print("——【{}】歌曲第{}页信息获取如下——\n{}".format(author,page,resultList))
                print("第{}页一共有{}首歌".format(page + 1, len(songList)))
                nowpageSongNum = len(songList)
                break
            except Exception as e:
                temp += 1
                print("获取歌曲信息错误,准备重试!错误信息:{}".format(e))
            if temp == 5:
                print("获取歌曲信息多次错误!跳过处理")
                break
      if nowpageSongNum < 10:
            print("该关键字无更多歌曲!")
            break

    return resultList


# 多线程获取工具
def getPageListMulTool(i):
    print("getPageListMulTool入参: ", i)
    x = i.split("###")
    author = x
    page = int(x)

    url = "https://www.jbsou.cn/"
    files = [
    ]
    headers = {
      'authority': 'www.jbsou.cn',
      'cookie': '__51cke__=; __51uvsct__JPfsNuTH3tFBO3If=1; __51vcke__JPfsNuTH3tFBO3If=7ce649a4-2f3a-5f38-a46e-c0ba70372790; __51vuft__JPfsNuTH3tFBO3If=1657457772373; Hm_lvt_76749151b22324218af38671cb634aaa=1657457772; __gads=ID=9569f23632163ae7-22f1a6fda4d30046:T=1657457774:RT=1657457774:S=ALNI_MaiKWrecyIGhnohZrRQXzpbNmAhWw; __gpi=UID=00000641ebdfbb03:T=1657457774:RT=1657457774:S=ALNI_MZ77MogL7uZoU0-4rFToiZw2Tno4Q; __tins__19428461={"sid": 1657457772361, "vd": 3, "expires": 1657459620332}; __51laig__=3; __vtins__JPfsNuTH3tFBO3If={"sid": "497a83dc-a208-56b0-b362-e29c476ca102", "vd": 3, "stt": 47972, "dr": 9098, "expires": 1657459620342, "ct": 1657457820342}; Hm_lpvt_76749151b22324218af38671cb634aaa=1657457820',
      'origin': 'https://www.jbsou.cn',
      'referer': 'https://www.jbsou.cn/?name=' + urllib.parse.quote(author.encode('gb2312')) + '&type=netease',
      'User-Agent': urllib.parse.quote(random.choice(user_agent_list).encode('gb2312')),
      'x-requested-with': 'XMLHttpRequest',
      'Accept-Language': 'zh-CN,zh;q=0.9'
    }

    payload = {'input': author,
               'filter': 'name',
               'type': 'netease',
               'page': str(page + 1)}
    try:
      s = requests.session()
      requests.adapters.DEFAULT_RETRIES = 5
      response = s.request("POST", url, headers=headers, data=payload, files=files)
      songList = json.loads(response.text)["data"]
      for i in songList:
            tempDict = {}
            tempDict["url"] = i["url"]
            tempDict["author"] = i["author"]
            tempDict["title"] = i["title"]
            resultListMul.append(tempDict)
      # print("——【{}】歌曲第{}页信息获取如下——\n{}".format(author,page,resultList))
      print("第{}页一共有{}首歌".format(page + 1, len(songList)))

    except:
      print("获取歌曲信息错误!")


#---------启动配置区----------

for keyName in keyNames:
    save_url = save_path + keyName + "/"

    # 获取所有页面歌曲信息
    if use_multhread:
      tempList = []
      for i in range(len(keyNames)):
            for j in range(start_page - 1, start_page + songNum - 1):
                tempList.append(str(keyNames) + '###' + str(j))
      mulTool(totalThread, tempList, getPageListMulTool)
      mp3pageList = resultListMul
    else:
      mp3pageList = getPageList(keyName, songNum)

    print("——【{}】获取下载信息完毕,共有{}首歌曲".format(keyName, len(mp3pageList)))
    # 打印所有歌曲信息
    for i in mp3pageList:
      print(i)

    # 下载歌曲

    if use_multhread:
      mulTool(totalThread, mp3pageList, doingFunction)
    else:
      for i in mp3pageList:
            # 单线程下载
            downLoadMp3Files(i["url"], save_url, i["title"], i["author"])
    print("——【{}】所有歌曲下载完毕,共有{}首歌曲!!——".format(keyName, len(mp3pageList)))

爱的人 发表于 2023-1-19 05:02

建议改成面向对象

bywdyz2005 发表于 2023-1-11 17:01

学到了,谢谢

thengcee 发表于 2023-1-11 17:33

FIzz001 发表于 2023-1-11 18:26

很不错,谢谢分享

suilibin 发表于 2023-1-11 18:46

感谢分享!

iapeng 发表于 2023-1-11 19:56

不错,学习了

a725 发表于 2023-1-16 09:03

多谢分享!

风流人物 发表于 2023-1-18 17:16

有点东西,感谢分享

iapeng 发表于 2023-1-19 15:40

学习了,有很多以前没了解的
页: [1]
查看完整版本: 【Pyton】 根据关键词爬取音乐网音乐