吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 3632|回复: 25
收起左侧

[Python 原创] [Python] 某站有声小说全本下载代码分享

  [复制链接]
nguaduot 发表于 2023-9-18 22:50
因为喜欢子慕播音的《全职高手》,所以花了点时间研究 ting55,用 Python 写了全本下载工具,分享一些经验。

关于 ting55 的访问限制

有声小说的相关信息(比如书名、播音演员、章节名等)可以从 HTML 页面抓取(参考下文源码),不是难点,难点在于音频资源接口的处理。
ting55 核心接口是 ting55.com/nlinka,POST 调用,用于解析音频资源直链 URL(参考下文源码),ting55 主要对该接口做了访问控制:
  • 同一 IP 短时间连续请求间隔不得低于6秒(否则返回 503 状态码),不超过6次,否则返回空的 URL;
  • 单 IP 超过限制大概30分钟后可继续请求,一天能发起有效请求(获取到音频直链)20~40次(不严谨测试)。

注意,不要频繁请求该接口,无论是否成功获取到音频 URL,都会被视为请求一次,导致推迟下次请求时间。

代码实现

访问控制搞清楚后,来想想思路,多线程不用考虑,意义不大。有两种思路:
  • 基于定时器,卡着时间请求;
  • 基于代{过}{滤}理 IP,单 IP 受限后更换 IP 继续。

本工具初版是按第一种思路实现,但过于缓慢,在服务器上挂一天只能获取几十章,《全职高手》全本1104章,难等,于是按第二种思路更新了代码(参考下文源码)。

下文代码为当前版本,后续可能更新,追踪开源代码即可:Gitee - nguaduot / audiobook

代{过}{滤}理 IP

参考:https://www.dailiservers.com/free-proxy-list/
将搜集的代{过}{滤}理 IP 按行分割存放在文本文件中,通过参数引入即可。运行后会逐个检查使用,失效代{过}{滤}理会即时从文件中剔除。

使用参考

命令行工具调用,建议先查看命令参数:python audiobook.py -h

[Asm] 纯文本查看 复制代码
usage: audiobook.py [-h] [--name NAME] [--actor ACTOR] [--start START] [--end END] [--out OUT]
                    [--proxy PROXY]
                    url

audiobook thief, just for https://ting55.com/

positional arguments:
  url            novel url (e.g. https://ting55.com/book/9200)

options:
  -h, --help     show this help message and exit
  --name NAME    novel name
  --actor ACTOR  voice actor
  --start START  starting chapter index (default 1)
  --end END      last chapter index (default 1)
  --out OUT      output folder (default same as novel name)
  --proxy PROXY  text file containing proxy IPs (e.g. 81.10.80.155:8080)


下载《全职高手》前500章,并手动设置小说名和播音演员(用于音频文件命名):

python audiobook.py https://ting55.com/book/9200 --name 全职高手 --actor 子慕 --start 1 --end 500


下载《雪中悍刀行》第1章(省略参数则自动抓取小说名和播音演员,不过只下载第1章):

python audiobook.py https://ting55.com/book/1419


运行后会自动生成代{过}{滤}理文件 audiobook.proxy,为纯文本文件。如需使用代{过}{滤}理,打开按行添加代{过}{滤}理即可,或手动指定其他代{过}{滤}理文件。

屏幕截图 2023-09-18 224711.png

TODO

  • ting55《全职高手》全本免费,本工具未对收费内容做研究,后续会研究其 APP,抓包分析一下接口。





[Python] 纯文本查看 复制代码
#!/usr/bin/python3
# -*- coding: utf-8 -*-

import argparse
import datetime
import math
import os
import re
import time
import urllib.parse
from typing import List, Dict, Tuple

import requests


class Proxy(object):
    """
    https://www.dailiservers.com/free-proxy-list/
    https://proxyscrape.com/free-proxy-list-f
    """
    DEF_PROXY_FILE_NAME = "audiobook.proxy"

    def __init__(self, file_proxy: str):
        self.file_proxy = file_proxy
        if not self.file_proxy:
            self.file_proxy = os.path.join(os.path.dirname(__file__), self.DEF_PROXY_FILE_NAME)
        self.data_work = self.__load()
        self.data_sleep = []
        self.data_dead = []

    @staticmethod
    def __ip_with_port(proxy) -> bool:
        return True if re.match("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$", proxy) else False

    def __load(self) -> List[str]:
        """
        始终将空字符串作为第一个值,表示不使用代{过}{滤}理
        """
        if not os.path.isfile(self.file_proxy):
            return [""]
        data = set()
        with open(self.file_proxy, "r", encoding="utf-8") as f:
            for line in f.readlines():
                line = line.strip()
                if self.__ip_with_port(line):
                    data.add(line)
        return [""] + list(data)

    def get(self) -> str:
        return self.data_work[0] if self.data_work else ""

    def count(self) -> int:
        return len(self.data_work)

    def empty(self) -> bool:
        return self.count() == 0

    def feedback_dead(self, proxy: str):
        if proxy in self.data_work:
            self.data_work.remove(proxy)
        self.data_dead.append(proxy)
        self.save()

    def feedback_sleep(self, proxy: str):
        if proxy in self.data_work:
            self.data_work.remove(proxy)
        # self.data_work.append(proxy)
        self.data_sleep.append(proxy)
        self.save()

    def save(self):
        with open(self.file_proxy, "w", encoding="utf-8") as f:
            f.write("\n".join([p for p in self.data_work if self.__ip_with_port(p)]))
            f.write("\n".join([p for p in self.data_sleep if self.__ip_with_port(p)]))


class Ting55Thief(object):
    """
    恋听网 https://ting55.com/
    访问限制规则:
    1. 同一 IP 短时间只能请求6次 nlinka 接口
    """
    URL_NOVEL = "https://ting55.com/book/%s"  # GET
    URL_CHAPTER = "https://ting55.com/book/%s-%d"  # GET
    URL_API_RES = "https://ting55.com/nlinka"  # POST
    USER_AGENT = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
                  " AppleWebKit/537.36 (KHTML, like Gecko)"
                  " Chrome/116.0.0.0 Safari/537.36 Edg/116.0.1938.76")

    NLINKA_INTERVAL = 6  # nlinka 请求间隔时长
    FILE_SIZE_VALID = 100 * 1024  # 有效文件大小校验

    def __init__(self, novel_id: str, novel_name: str, voice_actor: str, folder_out: str, file_proxy: str):
        self.novel_id = novel_id  # 小说ID
        self.novel_name = novel_name  # 小说名
        self.voice_actor = voice_actor  # 播音演员
        self.folder_out = folder_out  # 音频资源输出文件夹
        self.chapter_max = 1  # 最大章节,用于固定宽度命名
        self.log_nlinka = [0.0]  # nlinka 请求时间日志
        self.proxy = Proxy(file_proxy)  # 代{过}{滤}理,用于 nlinka 接口

        self.session = requests.session()
        self.session.headers.update({
            "User-Agent": self.USER_AGENT,  # 必填
        })

        self.fix_novel_info()  # 修正小说名、播音演员、输出文件夹、最大章节

    def __res_url(self, url_page: str, chapter_id: int, token: str, proxy="") -> Tuple[str, int, str]:
        """
        响应示例:{'ourl': '', 'plink': '', 'url': '', 'status': 1}
        不使用代{过}{滤}理:proxies={"http": "", "https": ""}
        返回值:-2 - 错误,-1 - 该资源失效,0 - 失败,1 - 成功且还剩余请求次数
        """
        t = time.time()
        try:
            res = self.session.post(self.URL_API_RES, data={
                "bookId": self.novel_id,  # 必填
                "page": chapter_id  # 必填
                # "isPay": 0  # 选填
            }, headers={
                "Referer": url_page,  # 必填
                "xt": token,  # 必填
                "l": "1"  # 必填
            }, proxies={
                "http": proxy,
                "https": proxy
            }, timeout=8)
            self.log_nlinka.append(t)  # 无论是否获取到 URL,都计1次
            if res.status_code != 200:
                if res.status_code == 503:
                    return "", 0, "503: too fast"
                return "", 0, "status code %d" % res.status_code
            data = res.json()
            if data["status"] != 1:
                if data["status"] == -1:
                    return "", -1, "non-free content"
                return "", -1, "unexpected response: %s" % data
            if not data["url"]:  # 达到请求限制
                return "", 0, "too many requests"
            return data["url"], 1, ""
        except requests.exceptions.ReadTimeout:
            return "", -2, "timeout"
        except requests.exceptions.ProxyError:
            return "", -2, "useless proxy"
        except requests.exceptions.ConnectionError:
            return "", -2, "connection error"

    def fix_novel_info(self):
        res = self.session.get(self.URL_NOVEL % self.novel_id)
        text = res.text
        novel_name, voice_actor, chapter_max = "", "", 1
        match = re.search("<p>播音:<a[^>]+class=\"by\"[^>]*>(.+?)</a>", text)
        if match:
            voice_actor = match.group(1).strip()
        match = re.search("class=\"binfo\"><h1>(.+?)</h1>", text)
        if match:
            novel_name = match.group(1).strip()
            if voice_actor:
                novel_name = re.sub("有声小说$", "", novel_name)
                novel_name = re.sub("[((]子慕[))]", "", novel_name)
                novel_name = novel_name.strip()
        match = re.search(">(\\d+)</a></li></ul>", text)
        if match:
            chapter_max = int(match.group(1))

        if not self.novel_name:  # 修正小说名
            self.novel_name = novel_name if novel_name else "未知小说"
        if not self.voice_actor:  # 修正播音演员
            self.voice_actor = voice_actor if voice_actor else "未知播音"
        if not self.folder_out:  # 修正输出文件夹
            self.folder_out = os.path.join(os.path.dirname(__file__), self.novel_name)
        self.chapter_max = chapter_max  # 修正最大章节

    def chapter_url(self, chapter_id: int) -> str:
        return self.URL_CHAPTER % (self.novel_id, chapter_id)

    def file_name(self, chapter_id: int, chapter_name: str, fmt: str) -> str:
        """
        格式:{小说名}-{播音演员}-{章节索引}-{章节名}.{扩展名}
        """
        pattern = "%%s-%%s-%%0%dd-%%s%%s" % len(str(self.chapter_max))
        name = pattern % (self.novel_name, self.voice_actor, chapter_id, chapter_name, fmt)
        name = re.sub("[/\\\\:*?\"<>|]+", "_", name)  # 替换不可用字符
        return name

    def get(self, chapter_id: int) -> Tuple[str, str]:
        """
        Args:
            chapter_id:
        Returns:
            音频资源直链、章节名
        """
        url_page = self.chapter_url(chapter_id)
        res = self.session.get(url_page)
        text = res.text
        match = re.search("name=\"_c\" content=\"(.+?)\"", text)
        if not match:
            print("token not found")
            return "", ""
        token = match.group(1)
        while True:
            proxy = self.proxy.get()
            print("try proxy %s" % (proxy if proxy else "null"))
            url_audio, status, msg = self.__res_url(url_page, chapter_id, token, proxy)
            if status == 1:
                break
            if status == -1:  # 非免费资源或资源失效
                print(msg)
                break
            if status == 0:  # 代{过}{滤}理有效但数据异常
                print(msg)
                self.proxy.feedback_sleep(proxy)
            else:  # -2 无效代{过}{滤}理
                print(msg)
                self.proxy.feedback_dead(proxy)
            if self.proxy.empty():  # 代{过}{滤}理耗尽
                break
        if not url_audio:
            print("error in parsing chapter url")
            return "", ""
        match = re.search(">\\s*第(\\d+)章\\s*(.+?)在线收听\\s*<", text)
        if not match:
            print("chapter id and name not found")
            return "", ""
        chapter_id_src = int(match.group(1))
        if chapter_id_src != chapter_id:
            print("chapter id not matched")
            return "", ""
        chapter_name = match.group(2)
        return url_audio, chapter_name

    def download(self, chapter_url: str, chapter_id: int, chapter_name: str) -> str:
        """
        链接有时效
        Args:
            chapter_url:
            chapter_id:
            chapter_name:
        Returns:
            下载完成文件路径
        """
        if not chapter_url:
            print("empty chapter url")
            return ""
        _, fmt = os.path.splitext(urllib.parse.urlparse(chapter_url).path)
        name = self.file_name(chapter_id, chapter_name, fmt)
        if not os.path.isdir(self.folder_out):
            os.mkdir(self.folder_out)
        path = os.path.join(self.folder_out, name)
        res = self.session.get(chapter_url)
        with open(path, "wb") as f:
            f.write(res.content)
        if os.path.getsize(path) < self.FILE_SIZE_VALID:
            print("wrong file size: %.2fkb" % (os.path.getsize(path) / 1024))
            return ""
        return path

    def downloaded(self) -> List[int]:
        data = []
        if not os.path.isdir(self.folder_out):
            return []
        for name in os.listdir(self.folder_out):
            m = re.match("%s-%s-(\\d+)-.+" % (self.novel_name, self.voice_actor), name)
            if m:
                data.append(int(m.group(1)))
        return data

    def available(self) -> bool:
        return time.time() - self.log_nlinka[-1] > self.NLINKA_INTERVAL

    def wait(self) -> int:
        return max(math.ceil(self.NLINKA_INTERVAL - (time.time() - self.log_nlinka[-1])), 0)

    def config(self) -> Dict:
        return {
            "novel_id": self.novel_id,
            "novel_name": self.novel_name,
            "voice_actor": self.voice_actor,
            "chapter_max": self.chapter_max,
            "folder_out": self.folder_out,
            "file_proxy": self.proxy.file_proxy
        }

    @staticmethod
    def parse_novel_id(novel_url: str) -> str:
        if not novel_url:
            return ""
        m = re.search("ting55\\.com/book/(\\d+)", novel_url)
        if m:
            return m.group(1)
        return ""


def parse_config() -> Dict:
    """
    argparse:
    https://docs.python.org/zh-cn/3/howto/argparse.html
    """
    parser = argparse.ArgumentParser(description="audiobook thief, just for https://ting55.com/")
    parser.add_argument(
        "url",
        type=str,
        help="novel url (e.g. https://ting55.com/book/9200)"
    )
    parser.add_argument(
        "--name",
        type=str, default="", required=False,
        help="novel name"
    )
    parser.add_argument(
        "--actor",
        type=str, default="", required=False,
        help="voice actor"
    )
    parser.add_argument(
        "--start",
        type=int, default=1, required=False,
        help="starting chapter index (default 1)"
    )
    parser.add_argument(
        "--end",
        type=int, default=1, required=False,
        help="last chapter index (default 1)"
    )
    parser.add_argument(
        "--out",
        type=str, default="", required=False,
        help="output folder (default same as novel name)"
    )
    parser.add_argument(
        "--proxy",
        type=str, default="", required=False,
        help="text file containing proxy IPs (e.g. 81.10.80.155:8080)"
    )
    args = parser.parse_args()
    novel_id = Ting55Thief.parse_novel_id(args.url)
    start = args.start
    end = args.end
    if end < start:
        end = start
    config = {
        "id": novel_id,
        "name": args.name,
        "actor": args.actor,
        "start": start,
        "end": end,
        "out": args.out,
        "proxy": args.proxy
    }
    if not novel_id:
        print("invalid novel url (e.g. https://ting55.com/book/9200)")
        exit()
    return config


def now() -> str:
    return datetime.datetime.now().strftime("%H:%M:%S")


def run():
    thief = Ting55Thief(CONFIG["id"], CONFIG["name"], CONFIG["actor"], CONFIG["out"], CONFIG["proxy"])
    print("config:", thief.config())
    data_downloaded = thief.downloaded()
    data_todo = [i for i in range(CONFIG["start"], CONFIG["end"] + 1) if i not in data_downloaded]
    print("todo: %d" % len(data_todo))
    if not data_todo:
        return
    index = 0
    while True:
        if index >= len(data_todo):
            break
        chapter_id = data_todo[index]
        print("[%s] %d/%d: %s" % (now(), chapter_id, CONFIG["end"], thief.chapter_url(chapter_id)))
        chapter_url, chapter_name = thief.get(chapter_id)
        print("[%s] %d/%d: %s %s" % (
            now(), chapter_id, CONFIG["end"],
            chapter_name if chapter_name else "null", chapter_url if chapter_url else "null"
        ))
        if not chapter_url or not chapter_name:
            break
        file_saved = thief.download(chapter_url, chapter_id, chapter_name)
        if not file_saved:
            break
        print("[%s] %d/%d: %s %.2fmb" % (
            now(), chapter_id, CONFIG["end"], file_saved, os.path.getsize(file_saved) / 1024 / 1024
        ))
        index += 1
        print("sleep %d sec..." % thief.wait())
        time.sleep(thief.wait())


if __name__ == "__main__":
    VERSION = "1.5.230918"
    print("audiobook thief v%s gitee.com/nguaduot/audiobook" % VERSION)
    CONFIG = parse_config()
    run()

免费评分

参与人数 5吾爱币 +11 热心值 +4 收起 理由
hshcompass + 1 + 1 感谢发布原创作品,吾爱破解论坛因你更精彩!
lunker2019 + 1 + 1 用心讨论,共获提升!
gaoxiangfei + 1 谢谢@Thanks!
jojo198365 + 1 + 1 用心讨论,共获提升!
苏紫方璇 + 7 + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!

查看全部评分

本帖被以下淘专辑推荐:

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

指尖的阳光 发表于 2023-9-21 11:28
能分享成品工具使用吗
pentium 发表于 2023-9-20 12:06
magicny 发表于 2023-9-20 12:18
MAOSKE 发表于 2023-9-20 12:29
感谢分享!
adm1nSQL 发表于 2023-9-21 23:26
好帖帮顶
HR741158 发表于 2023-9-26 20:31
感谢楼主的分享
kukukikibear 发表于 2023-11-2 13:40
这个可以 谢谢分享
lingwushexi 发表于 2023-11-2 14:05
感谢分享
roddick1122 发表于 2023-11-2 15:05
感谢分享!!!
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2024-11-21 22:57

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表