吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 2393|回复: 8
收起左侧

[Python 原创] 【Pyton】 根据关键词爬取音乐网音乐

[复制链接]
Cristy 发表于 2023-1-11 12:28
本帖最后由 Cristy 于 2023-1-12 00:38 编辑

该功能块会实现搜索、下载某音乐网站的功能。可以当做一个便捷下载器使用。
输入的关键字会自动创建文件夹,将下载好的歌曲放入对应的文件夹中。

执行效果
image.png image.png



本机使用的python3.9


目前有两个版本都有点小问题,后续会慢慢调整。
初始版本:
    问题:单线程下载速度慢。

新版本:
    多线程下载速度快。
    问题:目前下载一个关键字没有问题。
    但是多个关键字的歌曲会出现问题。获取到多个关键字的所有歌曲后,会下载多次放到各个目录下。
    比如设定2个关键字A、B,每个下10首,则会出现A的10首歌即出现在A目录下也出现在B目录下,B的10首也是。会下载多次




源代码


# 初始版本
[Python] 纯文本查看 复制代码
# 下载MP3文件到本地
import os
import requests
import json
import urllib.parse

#  ---------启动配置区----------

keyNames = ["张国荣"]
save_path = "D:/Music"
# 每个关键字下载多少首   每个关键词下载 pageNum * 10首
pageNum = 1


# 单线程下载
def DownloadFile(mp3_url, save_url, file_name):
    try:
        if mp3_url is None or save_url is None or file_name is None:
            print('参数错误')
            return None
        # 文件夹不存在,则创建文件夹
        folder = os.path.exists(save_url)
        if not folder:
            os.makedirs(save_url)
        # 读取MP3资源
        res = requests.get(mp3_url, stream=True)
        # 获取文件地址
        file_path = os.path.join(save_url, file_name)
        print('开始写入文件:', file_path)
        # 打开本地文件夹路径file_path,以二进制流方式写入,保存到本地
        with open(file_path, 'wb') as fd:
            for chunk in res.iter_content():
                fd.write(chunk)
        print(file_name + ' 成功下载!')
    except:
        print("下载程序错误!")


def downLoadMp3Files(url, save_url, title, author):
    file_name = title + "-" + author + ".mp3"
    DownloadFile(url, save_url, file_name)


def getPageList(author, pageNum):
    url = "https://www.jbsou.cn/"
    files = [
    ]
    headers = {
        'authority': 'www.jbsou.cn',
        'cookie': '__51cke__=; __51uvsct__JPfsNuTH3tFBO3If=1; __51vcke__JPfsNuTH3tFBO3If=7ce649a4-2f3a-5f38-a46e-c0ba70372790; __51vuft__JPfsNuTH3tFBO3If=1657457772373; Hm_lvt_76749151b22324218af38671cb634aaa=1657457772; __gads=ID=9569f23632163ae7-22f1a6fda4d30046:T=1657457774:RT=1657457774:S=ALNI_MaiKWrecyIGhnohZrRQXzpbNmAhWw; __gpi=UID=00000641ebdfbb03:T=1657457774:RT=1657457774:S=ALNI_MZ77MogL7uZoU0-4rFToiZw2Tno4Q; __tins__19428461={"sid": 1657457772361, "vd": 3, "expires": 1657459620332}; __51laig__=3; __vtins__JPfsNuTH3tFBO3If={"sid": "497a83dc-a208-56b0-b362-e29c476ca102", "vd": 3, "stt": 47972, "dr": 9098, "expires": 1657459620342, "ct": 1657457820342}; Hm_lpvt_76749151b22324218af38671cb634aaa=1657457820',
        'origin': 'https://www.jbsou.cn',
        'referer': 'https://www.jbsou.cn/?name=' + urllib.parse.quote(author.encode('gb2312')) + '&type=netease',
        'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36',
        'x-requested-with': 'XMLHttpRequest'
    }
    resultList = []
    for page in range(pageNum):
        payload = {'input': author,
                   'filter': 'name',
                   'type': 'netease',
                   'page': str(page + 1)}
        response = requests.request("POST", url, headers=headers, data=payload, files=files)
        try:
            songList = json.loads(response.text)["data"]
            for i in songList:
                tempDict = {}
                tempDict["url"] = i["url"]
                tempDict["author"] = i["author"]
                tempDict["title"] = i["title"]
                resultList.append(tempDict)
            # print("——【{}】歌曲第{}页信息获取如下——\n{}".format(author,page,resultList))
            print("第{}页一共有{}首歌".format(page + 1, len(songList)))

        except:
            print("获取歌曲信息错误!")
    return resultList


for keyName in keyNames:
    save_url = save_path + "/" + keyName + "/"
    # 获取所有页面歌曲信息
    mp3pageList = getPageList(keyName, pageNum)
    print("——【{}】获取下载信息完毕,共有{}首歌曲".format(keyName, len(mp3pageList)))
    # 打印所有歌曲信息
    for i in mp3pageList:
        print(i)

    # 下载歌曲
    for i in mp3pageList:
        # 单线程下载
        downLoadMp3Files(i["url"], save_url, i["title"], i["author"])
    print("——【{}】所有歌曲下载完毕,共有{}首歌曲!!——".format(keyName, len(mp3pageList)))



# 新版
[Python] 纯文本查看 复制代码
# -*- coding: UTF-8 -*-

# 多线程相关
from __future__ import annotations
# 用于显示进度条
import random

from tqdm import tqdm
# 用于发起网络请求
import requests
# 用于多线程操作
import multitasking
import signal
# 导入 retry 库以方便进行下载出错重试
from retry import retry

# 常用导入
# 下载MP3文件到本地
import time
import json
import urllib.parse
import os
from multiprocessing.dummy import Pool as ThreadPool  # 线程池
from sys import stdout

"""
    配置区域
"""
# 存储路径
save_path = "G:/"
# 关键字
keyNames = ["小虎队", "邓丽君", "郭富城", "刘德华", "张学友"]

waittime = 0  # 每次下载等待时间

# 每个关键字下载多少首   每个关键词下载 songNum * 10首
songNum = 5
# 从第一页开始下载
start_page = 1

# 是否使用多线程
use_multhread = True
# 是否使用分块下载
use_block = False
totalThread = 8  # 多线程数量

############################################################
# 公用变量
resultListMul = []

user_agent_list = [
    "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; …) Gecko/20100101 Firefox/61.0",
    "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.186 Safari/537.36",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.62 Safari/537.36",
    "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36",
    "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0)",
    "Mozilla/5.0 (Macintosh; U; PPC Mac OS X 10.5; en-US; rv:1.9.2.15) Gecko/20110303 Firefox/3.6.15",
]


# 此处为多线程处理事件
def doingFunction(i):
    downLoadMp3Files(i["url"], save_url, i["title"], i["author"])


# 多线程工具
def mulTool(totalThread, inputList, function):
    """
    :param totalThread: 线程数
    :param inputList: 传入的参数列表(列表里依次存放每一次处理的参数)
    :param function: 每个线程处理的函数名
    :return:
    """

    pool = ThreadPool(totalThread)  # 创建一个包含 totalThread 个线程的线程池
    pool.map(function, inputList)
    pool.close()  # 关闭线程池的写入
    pool.join()  # 阻塞,保证子线程运行完毕后再继续主进程
    print("main func end")


# 下载文件方法
def DownloadFile(mp3_url, save_url, file_name, otherFuncUse=0):
    try:
        if mp3_url is None or save_url is None or file_name is None:
            print('参数错误')
            return None

        # 读取MP3资源
        res = requests.get(mp3_url, stream=True)
        if otherFuncUse != 0:
            if "Location" not in res.headers or "404" in res.headers["Location"] or "Content-Length" not in res.headers:
                print("文件名:{} - url:{}  获取服务器端文件为空,不做处理!".format(file_name, mp3_url))
                return
        try:
            if "很抱歉,你要查找的网页找不到" in res.content.decode('utf-8'):
                print("文件名:{} - url:{}  页面404,不做处理!".format(file_name, mp3_url))
                return
        except:
            None
        # 获取文件地址
        file_path = os.path.join(save_url, file_name)
        print('开始写入文件:', file_path)

        if "Content-Length" in res.headers:
            filesize = res.headers["Content-Length"]
        else:
            filesize = len(res.content)
        with open(file_path, "wb") as f:
            f.write(res.content)
            chunk_size = 128
            times = int(filesize) // chunk_size
            show = 1 / times
            show2 = 1 / times
            start = 1

            for chunk in res.iter_content(chunk_size):
                f.write(chunk)
                if start <= times:
                    stdout.write(f"{file_name}-下载进度: {show:.2%}\n")
                    start += 1
                    show += show2
                else:
                    stdout.write("下载进度: 100%")
        print(file_name + ' 成功下载!')
        time.sleep(waittime)
        print("开始等待!")
    except Exception as e:
        print("filename:{} - url:{} - 下载程序错误!{}".format(file_name, mp3_url, e))


# ----多线程下载工具----
# https://zhuanlan.zhihu.com/p/369531344

signal.signal(signal.SIGINT, multitasking.killall)
B = 1024
headers = {
    'User-Agent': random.choice(user_agent_list)
}


def split(start: int, end: int, step: int, total: int) -> list[tuple[int, int]]:
    # 分多块
    parts = [(start, min(start + step, total)) for start in range(0, total, step)]
    return parts


def get_file_size(url: str, raise_error: bool = False) -> int:
    '''
    获取文件大小

    Parameters
    ----------
    url : 文件直链
    raise_error : 如果无法获取文件大小,是否引发错误

    Return
    ------
    文件大小(B为单位)
    如果不支持则会报错

    '''

    # allow_redirects=True 自动处理302
    # response = requests.head(url, allow_redirects=True)
    session = requests.session()
    while (1):
        try:
            response = session.get(url, headers=headers)
            if "404" in url or ("Location" in response.headers and "404" in response.headers["Location"]):
                return None
            if (response.status_code == requests.codes.ok):
                break
            elif (response.status_code == 302 or response.status_code == 301):
                print(url + " --重定向到--> " + response.headers["Location"])
                url = response.headers["Location"]
            else:
                print("出问题啦!等待3秒继续!", response.status_code)
                time.sleep(5)
        except Exception as e:
            print("出问题啦!等待3秒继续!", e)
            if '404' in e:
                return None
            time.sleep(3)

    file_size = response.headers.get('Content-Length')
    if file_size is None:
        if raise_error is True:
            raise ValueError('该文件不支持多线程分段下载!')
        return file_size
    return int(file_size)


def downloadMulTool(save_url: str, url: str, file_name: str, retry_times: int = 3, each_size=1000 * B) -> None:
    '''
    根据文件直链和文件名下载文件

    Parameters
    ----------
    save_url: 存储路径
    url : 文件直链
    file_name : 文件名
    retry_times: 可选的,每次连接失败重试次数
    Return
    ------
    None

    '''
    file_path = os.path.join(save_url, file_name)
    for i in range(5):
        file_size = get_file_size(url)
        if file_size is None or file_size <= 0:
            continue
        else:
            break

    if i >= 4 and (file_size is None or file_size <= 0):
        # print("{}--查询方法显示该文件为空,尝试使用单线程方法下载".format(file_name))
        # DownloadFile(url, save_url, file_name, 1)
        return

    f = open(file_path, 'wb')

    @retry(tries=retry_times)
    @multitasking.task
    def start_download(start: int, end: int) -> None:
        '''
        根据文件起止位置下载文件

        Parameters
        ----------
        start : 开始位置
        end : 结束位置
        '''
        _headers = headers.copy()
        # 分段下载的核心
        _headers['Range'] = f'bytes={start}-{end}'
        # 发起请求并获取响应(流式)
        response = session.get(url, headers=_headers, stream=True)
        # 每次读取的流式响应大小
        chunk_size = 128
        # 暂存已获取的响应,后续循环写入
        chunks = []
        for chunk in response.iter_content(chunk_size=chunk_size):
            # 暂存获取的响应
            chunks.append(chunk)
            # 更新进度条
            bar.update(chunk_size)
        f.seek(start)
        for chunk in chunks:
            f.write(chunk)
        # 释放已写入的资源
        del chunks

    session = requests.Session()
    # 分块文件如果比文件大,就取文件大小为分块大小
    each_size = min(each_size, file_size)
    print("文件名:{} -- 文件大小:{} -- 分块大小:{}".format(file_name, file_size, each_size))

    # 分块
    parts = split(0, file_size, each_size, file_size)
    print(f'分块数:{len(parts)}')
    # 创建进度条
    bar = tqdm(total=file_size, desc=f'下载文件:{file_name}')
    for part in parts:
        start, end = part
        start_download(start, end)
    # 等待全部线程结束
    multitasking.wait_for_tasks()
    f.close()
    bar.close()


# ----多线程下载工具 end ----

def downLoadMp3Files(url, save_url, title, author):
    # 文件夹不存在,则创建文件夹
    folder = os.path.exists(save_url)
    if not folder:
        try:
            os.makedirs(save_url)
        except:
            print("{}  创建失败!".format(folder))

    file_name = title + "-" + author + ".mp3"
    if use_block:
        downloadMulTool(save_url, url, file_name)
    else:
        DownloadFile(url, save_url, file_name)


# 单线程获取列表
def getPageList(author, pageNum):
    url = "https://www.jbsou.cn/"
    files = [
    ]
    headers = {
        'authority': 'www.jbsou.cn',
        'cookie': '__51cke__=; __51uvsct__JPfsNuTH3tFBO3If=1; __51vcke__JPfsNuTH3tFBO3If=7ce649a4-2f3a-5f38-a46e-c0ba70372790; __51vuft__JPfsNuTH3tFBO3If=1657457772373; Hm_lvt_76749151b22324218af38671cb634aaa=1657457772; __gads=ID=9569f23632163ae7-22f1a6fda4d30046:T=1657457774:RT=1657457774:S=ALNI_MaiKWrecyIGhnohZrRQXzpbNmAhWw; __gpi=UID=00000641ebdfbb03:T=1657457774:RT=1657457774:S=ALNI_MZ77MogL7uZoU0-4rFToiZw2Tno4Q; __tins__19428461={"sid": 1657457772361, "vd": 3, "expires": 1657459620332}; __51laig__=3; __vtins__JPfsNuTH3tFBO3If={"sid": "497a83dc-a208-56b0-b362-e29c476ca102", "vd": 3, "stt": 47972, "dr": 9098, "expires": 1657459620342, "ct": 1657457820342}; Hm_lpvt_76749151b22324218af38671cb634aaa=1657457820',
        'origin': 'https://www.jbsou.cn',
        'referer': 'https://www.jbsou.cn/?name=' + urllib.parse.quote(author.encode('gb2312')) + '&type=netease',
        'User-Agent': random.choice(user_agent_list),
        'x-requested-with': 'XMLHttpRequest'
    }
    resultList = []
    for page in range(pageNum):
        payload = {'input': author,
                   'filter': 'name',
                   'type': 'netease',
                   'page': str(page + 1)}
        temp = 0
        nowpageSongNum = 0
        while 1:
            try:
                response = requests.request("POST", url, headers=headers, data=payload, files=files)
                songList = json.loads(response.text)["data"]
                for i in songList:
                    tempDict = {}
                    tempDict["url"] = i["url"]
                    tempDict["author"] = i["author"]
                    tempDict["title"] = i["title"]
                    resultList.append(tempDict)
                # print("——【{}】歌曲第{}页信息获取如下——\n{}".format(author,page,resultList))
                print("第{}页一共有{}首歌".format(page + 1, len(songList)))
                nowpageSongNum = len(songList)
                break
            except Exception as e:
                temp += 1
                print("获取歌曲信息错误,准备重试!错误信息:{}".format(e))
            if temp == 5:
                print("获取歌曲信息多次错误!跳过处理")
                break
        if nowpageSongNum < 10:
            print("该关键字无更多歌曲!")
            break

    return resultList


# 多线程获取工具
def getPageListMulTool(i):
    print("getPageListMulTool入参: ", i)
    x = i.split("###")
    author = x[0]
    page = int(x[1])

    url = "https://www.jbsou.cn/"
    files = [
    ]
    headers = {
        'authority': 'www.jbsou.cn',
        'cookie': '__51cke__=; __51uvsct__JPfsNuTH3tFBO3If=1; __51vcke__JPfsNuTH3tFBO3If=7ce649a4-2f3a-5f38-a46e-c0ba70372790; __51vuft__JPfsNuTH3tFBO3If=1657457772373; Hm_lvt_76749151b22324218af38671cb634aaa=1657457772; __gads=ID=9569f23632163ae7-22f1a6fda4d30046:T=1657457774:RT=1657457774:S=ALNI_MaiKWrecyIGhnohZrRQXzpbNmAhWw; __gpi=UID=00000641ebdfbb03:T=1657457774:RT=1657457774:S=ALNI_MZ77MogL7uZoU0-4rFToiZw2Tno4Q; __tins__19428461={"sid": 1657457772361, "vd": 3, "expires": 1657459620332}; __51laig__=3; __vtins__JPfsNuTH3tFBO3If={"sid": "497a83dc-a208-56b0-b362-e29c476ca102", "vd": 3, "stt": 47972, "dr": 9098, "expires": 1657459620342, "ct": 1657457820342}; Hm_lpvt_76749151b22324218af38671cb634aaa=1657457820',
        'origin': 'https://www.jbsou.cn',
        'referer': 'https://www.jbsou.cn/?name=' + urllib.parse.quote(author.encode('gb2312')) + '&type=netease',
        'User-Agent': urllib.parse.quote(random.choice(user_agent_list).encode('gb2312')),
        'x-requested-with': 'XMLHttpRequest',
        'Accept-Language': 'zh-CN,zh;q=0.9'
    }

    payload = {'input': author,
               'filter': 'name',
               'type': 'netease',
               'page': str(page + 1)}
    try:
        s = requests.session()
        requests.adapters.DEFAULT_RETRIES = 5
        response = s.request("POST", url, headers=headers, data=payload, files=files)
        songList = json.loads(response.text)["data"]
        for i in songList:
            tempDict = {}
            tempDict["url"] = i["url"]
            tempDict["author"] = i["author"]
            tempDict["title"] = i["title"]
            resultListMul.append(tempDict)
        # print("——【{}】歌曲第{}页信息获取如下——\n{}".format(author,page,resultList))
        print("第{}页一共有{}首歌".format(page + 1, len(songList)))

    except:
        print("获取歌曲信息错误!")


#  ---------启动配置区----------

for keyName in keyNames:
    save_url = save_path + keyName + "/"

    # 获取所有页面歌曲信息
    if use_multhread:
        tempList = []
        for i in range(len(keyNames)):
            for j in range(start_page - 1, start_page + songNum - 1):
                tempList.append(str(keyNames[i]) + '###' + str(j))
        mulTool(totalThread, tempList, getPageListMulTool)
        mp3pageList = resultListMul
    else:
        mp3pageList = getPageList(keyName, songNum)

    print("——【{}】获取下载信息完毕,共有{}首歌曲".format(keyName, len(mp3pageList)))
    # 打印所有歌曲信息
    for i in mp3pageList:
        print(i)

    # 下载歌曲

    if use_multhread:
        mulTool(totalThread, mp3pageList, doingFunction)
    else:
        for i in mp3pageList:
            # 单线程下载
            downLoadMp3Files(i["url"], save_url, i["title"], i["author"])
    print("——【{}】所有歌曲下载完毕,共有{}首歌曲!!——".format(keyName, len(mp3pageList)))

免费评分

参与人数 4吾爱币 +9 热心值 +4 收起 理由
zhaoqingdz + 1 谢谢@Thanks!
苏紫方璇 + 7 + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!
zg2600 + 1 + 1 用心讨论,共获提升!
刚刚刚刚好 + 1 + 1 我很赞同!

查看全部评分

本帖被以下淘专辑推荐:

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

爱的人 发表于 2023-1-19 05:02
建议改成面向对象
bywdyz2005 发表于 2023-1-11 17:01
头像被屏蔽
thengcee 发表于 2023-1-11 17:33
FIzz001 发表于 2023-1-11 18:26
很不错,谢谢分享
suilibin 发表于 2023-1-11 18:46
感谢分享!
iapeng 发表于 2023-1-11 19:56
不错,学习了
a725 发表于 2023-1-16 09:03
多谢分享!
风流人物 发表于 2023-1-18 17:16
有点东西,感谢分享
iapeng 发表于 2023-1-19 15:40
学习了,有很多以前没了解的
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2024-11-25 00:44

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表