nguaduot 发表于 2023-9-18 22:50

[Python] 某站有声小说全本下载代码分享

因为喜欢子慕播音的《全职高手》,所以花了点时间研究 ting55,用 Python 写了全本下载工具,分享一些经验。

关于 ting55 的访问限制

有声小说的相关信息(比如书名、播音演员、章节名等)可以从 HTML 页面抓取(参考下文源码),不是难点,难点在于音频资源接口的处理。
ting55 核心接口是 ting55.com/nlinka,POST 调用,用于解析音频资源直链 URL(参考下文源码),ting55 主要对该接口做了访问控制:

[*]同一 IP 短时间连续请求间隔不得低于6秒(否则返回 503 状态码),不超过6次,否则返回空的 URL;
[*]单 IP 超过限制大概30分钟后可继续请求,一天能发起有效请求(获取到音频直链)20~40次(不严谨测试)。

注意,不要频繁请求该接口,无论是否成功获取到音频 URL,都会被视为请求一次,导致推迟下次请求时间。

代码实现

访问控制搞清楚后,来想想思路,多线程不用考虑,意义不大。有两种思路:

[*]基于定时器,卡着时间请求;
[*]基于代{过}{滤}理 IP,单 IP 受限后更换 IP 继续。

本工具初版是按第一种思路实现,但过于缓慢,在服务器上挂一天只能获取几十章,《全职高手》全本1104章,难等,于是按第二种思路更新了代码(参考下文源码)。

下文代码为当前版本,后续可能更新,追踪开源代码即可:Gitee - nguaduot / audiobook

代{过}{滤}理 IP

参考:https://www.dailiservers.com/free-proxy-list/
将搜集的代{过}{滤}理 IP 按行分割存放在文本文件中,通过参数引入即可。运行后会逐个检查使用,失效代{过}{滤}理会即时从文件中剔除。

使用参考

命令行工具调用,建议先查看命令参数:python audiobook.py -h

usage: audiobook.py [-h] [--name NAME] [--actor ACTOR] [--start START] [--end END] [--out OUT]
                  [--proxy PROXY]
                  url

audiobook thief, just for https://ting55.com/

positional arguments:
url            novel url (e.g. https://ting55.com/book/9200)

options:
-h, --help   show this help message and exit
--name NAME    novel name
--actor ACTORvoice actor
--start STARTstarting chapter index (default 1)
--end END      last chapter index (default 1)
--out OUT      output folder (default same as novel name)
--proxy PROXYtext file containing proxy IPs (e.g. 81.10.80.155:8080)

下载《全职高手》前500章,并手动设置小说名和播音演员(用于音频文件命名):

python audiobook.py https://ting55.com/book/9200 --name 全职高手 --actor 子慕 --start 1 --end 500

下载《雪中悍刀行》第1章(省略参数则自动抓取小说名和播音演员,不过只下载第1章):

python audiobook.py https://ting55.com/book/1419

运行后会自动生成代{过}{滤}理文件 audiobook.proxy,为纯文本文件。如需使用代{过}{滤}理,打开按行添加代{过}{滤}理即可,或手动指定其他代{过}{滤}理文件。



TODO


[*]ting55《全职高手》全本免费,本工具未对收费内容做研究,后续会研究其 APP,抓包分析一下接口。




#!/usr/bin/python3
# -*- coding: utf-8 -*-

import argparse
import datetime
import math
import os
import re
import time
import urllib.parse
from typing import List, Dict, Tuple

import requests


class Proxy(object):
    """
    https://www.dailiservers.com/free-proxy-list/
    https://proxyscrape.com/free-proxy-list-f
    """
    DEF_PROXY_FILE_NAME = "audiobook.proxy"

    def __init__(self, file_proxy: str):
      self.file_proxy = file_proxy
      if not self.file_proxy:
            self.file_proxy = os.path.join(os.path.dirname(__file__), self.DEF_PROXY_FILE_NAME)
      self.data_work = self.__load()
      self.data_sleep = []
      self.data_dead = []

    @staticmethod
    def __ip_with_port(proxy) -> bool:
      return True if re.match("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$", proxy) else False

    def __load(self) -> List:
      """
      始终将空字符串作为第一个值,表示不使用代{过}{滤}理
      """
      if not os.path.isfile(self.file_proxy):
            return [""]
      data = set()
      with open(self.file_proxy, "r", encoding="utf-8") as f:
            for line in f.readlines():
                line = line.strip()
                if self.__ip_with_port(line):
                  data.add(line)
      return [""] + list(data)

    def get(self) -> str:
      return self.data_work if self.data_work else ""

    def count(self) -> int:
      return len(self.data_work)

    def empty(self) -> bool:
      return self.count() == 0

    def feedback_dead(self, proxy: str):
      if proxy in self.data_work:
            self.data_work.remove(proxy)
      self.data_dead.append(proxy)
      self.save()

    def feedback_sleep(self, proxy: str):
      if proxy in self.data_work:
            self.data_work.remove(proxy)
      # self.data_work.append(proxy)
      self.data_sleep.append(proxy)
      self.save()

    def save(self):
      with open(self.file_proxy, "w", encoding="utf-8") as f:
            f.write("\n".join())
            f.write("\n".join())


class Ting55Thief(object):
    """
    恋听网 https://ting55.com/
    访问限制规则:
    1. 同一 IP 短时间只能请求6次 nlinka 接口
    """
    URL_NOVEL = "https://ting55.com/book/%s"# GET
    URL_CHAPTER = "https://ting55.com/book/%s-%d"# GET
    URL_API_RES = "https://ting55.com/nlinka"# POST
    USER_AGENT = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
                  " AppleWebKit/537.36 (KHTML, like Gecko)"
                  " Chrome/116.0.0.0 Safari/537.36 Edg/116.0.1938.76")

    NLINKA_INTERVAL = 6# nlinka 请求间隔时长
    FILE_SIZE_VALID = 100 * 1024# 有效文件大小校验

    def __init__(self, novel_id: str, novel_name: str, voice_actor: str, folder_out: str, file_proxy: str):
      self.novel_id = novel_id# 小说ID
      self.novel_name = novel_name# 小说名
      self.voice_actor = voice_actor# 播音演员
      self.folder_out = folder_out# 音频资源输出文件夹
      self.chapter_max = 1# 最大章节,用于固定宽度命名
      self.log_nlinka = # nlinka 请求时间日志
      self.proxy = Proxy(file_proxy)# 代{过}{滤}理,用于 nlinka 接口

      self.session = requests.session()
      self.session.headers.update({
            "User-Agent": self.USER_AGENT,# 必填
      })

      self.fix_novel_info()# 修正小说名、播音演员、输出文件夹、最大章节

    def __res_url(self, url_page: str, chapter_id: int, token: str, proxy="") -> Tuple:
      """
      响应示例:{'ourl': '', 'plink': '', 'url': '', 'status': 1}
      不使用代{过}{滤}理:proxies={"http": "", "https": ""}
      返回值:-2 - 错误,-1 - 该资源失效,0 - 失败,1 - 成功且还剩余请求次数
      """
      t = time.time()
      try:
            res = self.session.post(self.URL_API_RES, data={
                "bookId": self.novel_id,# 必填
                "page": chapter_id# 必填
                # "isPay": 0# 选填
            }, headers={
                "Referer": url_page,# 必填
                "xt": token,# 必填
                "l": "1"# 必填
            }, proxies={
                "http": proxy,
                "https": proxy
            }, timeout=8)
            self.log_nlinka.append(t)# 无论是否获取到 URL,都计1次
            if res.status_code != 200:
                if res.status_code == 503:
                  return "", 0, "503: too fast"
                return "", 0, "status code %d" % res.status_code
            data = res.json()
            if data["status"] != 1:
                if data["status"] == -1:
                  return "", -1, "non-free content"
                return "", -1, "unexpected response: %s" % data
            if not data["url"]:# 达到请求限制
                return "", 0, "too many requests"
            return data["url"], 1, ""
      except requests.exceptions.ReadTimeout:
            return "", -2, "timeout"
      except requests.exceptions.ProxyError:
            return "", -2, "useless proxy"
      except requests.exceptions.ConnectionError:
            return "", -2, "connection error"

    def fix_novel_info(self):
      res = self.session.get(self.URL_NOVEL % self.novel_id)
      text = res.text
      novel_name, voice_actor, chapter_max = "", "", 1
      match = re.search("<p>播音:<a[^>]+class=\"by\"[^>]*>(.+?)</a>", text)
      if match:
            voice_actor = match.group(1).strip()
      match = re.search("class=\"binfo\"><h1>(.+?)</h1>", text)
      if match:
            novel_name = match.group(1).strip()
            if voice_actor:
                novel_name = re.sub("有声小说$", "", novel_name)
                novel_name = re.sub("[((]子慕[))]", "", novel_name)
                novel_name = novel_name.strip()
      match = re.search(">(\\d+)</a></li></ul>", text)
      if match:
            chapter_max = int(match.group(1))

      if not self.novel_name:# 修正小说名
            self.novel_name = novel_name if novel_name else "未知小说"
      if not self.voice_actor:# 修正播音演员
            self.voice_actor = voice_actor if voice_actor else "未知播音"
      if not self.folder_out:# 修正输出文件夹
            self.folder_out = os.path.join(os.path.dirname(__file__), self.novel_name)
      self.chapter_max = chapter_max# 修正最大章节

    def chapter_url(self, chapter_id: int) -> str:
      return self.URL_CHAPTER % (self.novel_id, chapter_id)

    def file_name(self, chapter_id: int, chapter_name: str, fmt: str) -> str:
      """
      格式:{小说名}-{播音演员}-{章节索引}-{章节名}.{扩展名}
      """
      pattern = "%%s-%%s-%%0%dd-%%s%%s" % len(str(self.chapter_max))
      name = pattern % (self.novel_name, self.voice_actor, chapter_id, chapter_name, fmt)
      name = re.sub("[/\\\\:*?\"<>|]+", "_", name)# 替换不可用字符
      return name

    def get(self, chapter_id: int) -> Tuple:
      """
      Args:
            chapter_id:
      Returns:
            音频资源直链、章节名
      """
      url_page = self.chapter_url(chapter_id)
      res = self.session.get(url_page)
      text = res.text
      match = re.search("name=\"_c\" content=\"(.+?)\"", text)
      if not match:
            print("token not found")
            return "", ""
      token = match.group(1)
      while True:
            proxy = self.proxy.get()
            print("try proxy %s" % (proxy if proxy else "null"))
            url_audio, status, msg = self.__res_url(url_page, chapter_id, token, proxy)
            if status == 1:
                break
            if status == -1:# 非免费资源或资源失效
                print(msg)
                break
            if status == 0:# 代{过}{滤}理有效但数据异常
                print(msg)
                self.proxy.feedback_sleep(proxy)
            else:# -2 无效代{过}{滤}理
                print(msg)
                self.proxy.feedback_dead(proxy)
            if self.proxy.empty():# 代{过}{滤}理耗尽
                break
      if not url_audio:
            print("error in parsing chapter url")
            return "", ""
      match = re.search(">\\s*第(\\d+)章\\s*(.+?)在线收听\\s*<", text)
      if not match:
            print("chapter id and name not found")
            return "", ""
      chapter_id_src = int(match.group(1))
      if chapter_id_src != chapter_id:
            print("chapter id not matched")
            return "", ""
      chapter_name = match.group(2)
      return url_audio, chapter_name

    def download(self, chapter_url: str, chapter_id: int, chapter_name: str) -> str:
      """
      链接有时效
      Args:
            chapter_url:
            chapter_id:
            chapter_name:
      Returns:
            下载完成文件路径
      """
      if not chapter_url:
            print("empty chapter url")
            return ""
      _, fmt = os.path.splitext(urllib.parse.urlparse(chapter_url).path)
      name = self.file_name(chapter_id, chapter_name, fmt)
      if not os.path.isdir(self.folder_out):
            os.mkdir(self.folder_out)
      path = os.path.join(self.folder_out, name)
      res = self.session.get(chapter_url)
      with open(path, "wb") as f:
            f.write(res.content)
      if os.path.getsize(path) < self.FILE_SIZE_VALID:
            print("wrong file size: %.2fkb" % (os.path.getsize(path) / 1024))
            return ""
      return path

    def downloaded(self) -> List:
      data = []
      if not os.path.isdir(self.folder_out):
            return []
      for name in os.listdir(self.folder_out):
            m = re.match("%s-%s-(\\d+)-.+" % (self.novel_name, self.voice_actor), name)
            if m:
                data.append(int(m.group(1)))
      return data

    def available(self) -> bool:
      return time.time() - self.log_nlinka[-1] > self.NLINKA_INTERVAL

    def wait(self) -> int:
      return max(math.ceil(self.NLINKA_INTERVAL - (time.time() - self.log_nlinka[-1])), 0)

    def config(self) -> Dict:
      return {
            "novel_id": self.novel_id,
            "novel_name": self.novel_name,
            "voice_actor": self.voice_actor,
            "chapter_max": self.chapter_max,
            "folder_out": self.folder_out,
            "file_proxy": self.proxy.file_proxy
      }

    @staticmethod
    def parse_novel_id(novel_url: str) -> str:
      if not novel_url:
            return ""
      m = re.search("ting55\\.com/book/(\\d+)", novel_url)
      if m:
            return m.group(1)
      return ""


def parse_config() -> Dict:
    """
    argparse:
    https://docs.python.org/zh-cn/3/howto/argparse.html
    """
    parser = argparse.ArgumentParser(description="audiobook thief, just for https://ting55.com/")
    parser.add_argument(
      "url",
      type=str,
      help="novel url (e.g. https://ting55.com/book/9200)"
    )
    parser.add_argument(
      "--name",
      type=str, default="", required=False,
      help="novel name"
    )
    parser.add_argument(
      "--actor",
      type=str, default="", required=False,
      help="voice actor"
    )
    parser.add_argument(
      "--start",
      type=int, default=1, required=False,
      help="starting chapter index (default 1)"
    )
    parser.add_argument(
      "--end",
      type=int, default=1, required=False,
      help="last chapter index (default 1)"
    )
    parser.add_argument(
      "--out",
      type=str, default="", required=False,
      help="output folder (default same as novel name)"
    )
    parser.add_argument(
      "--proxy",
      type=str, default="", required=False,
      help="text file containing proxy IPs (e.g. 81.10.80.155:8080)"
    )
    args = parser.parse_args()
    novel_id = Ting55Thief.parse_novel_id(args.url)
    start = args.start
    end = args.end
    if end < start:
      end = start
    config = {
      "id": novel_id,
      "name": args.name,
      "actor": args.actor,
      "start": start,
      "end": end,
      "out": args.out,
      "proxy": args.proxy
    }
    if not novel_id:
      print("invalid novel url (e.g. https://ting55.com/book/9200)")
      exit()
    return config


def now() -> str:
    return datetime.datetime.now().strftime("%H:%M:%S")


def run():
    thief = Ting55Thief(CONFIG["id"], CONFIG["name"], CONFIG["actor"], CONFIG["out"], CONFIG["proxy"])
    print("config:", thief.config())
    data_downloaded = thief.downloaded()
    data_todo = , CONFIG["end"] + 1) if i not in data_downloaded]
    print("todo: %d" % len(data_todo))
    if not data_todo:
      return
    index = 0
    while True:
      if index >= len(data_todo):
            break
      chapter_id = data_todo
      print("[%s] %d/%d: %s" % (now(), chapter_id, CONFIG["end"], thief.chapter_url(chapter_id)))
      chapter_url, chapter_name = thief.get(chapter_id)
      print("[%s] %d/%d: %s %s" % (
            now(), chapter_id, CONFIG["end"],
            chapter_name if chapter_name else "null", chapter_url if chapter_url else "null"
      ))
      if not chapter_url or not chapter_name:
            break
      file_saved = thief.download(chapter_url, chapter_id, chapter_name)
      if not file_saved:
            break
      print("[%s] %d/%d: %s %.2fmb" % (
            now(), chapter_id, CONFIG["end"], file_saved, os.path.getsize(file_saved) / 1024 / 1024
      ))
      index += 1
      print("sleep %d sec..." % thief.wait())
      time.sleep(thief.wait())


if __name__ == "__main__":
    VERSION = "1.5.230918"
    print("audiobook thief v%s gitee.com/nguaduot/audiobook" % VERSION)
    CONFIG = parse_config()
    run()

指尖的阳光 发表于 2023-9-21 11:28

能分享成品工具使用吗

pentium 发表于 2023-9-20 12:06

感谢分享,源码继续研究

magicny 发表于 2023-9-20 12:18

不明觉厉,多谢分享。

MAOSKE 发表于 2023-9-20 12:29

感谢分享!

adm1nSQL 发表于 2023-9-21 23:26

好帖帮顶

HR741158 发表于 2023-9-26 20:31

感谢楼主的分享{:1_893:}

kukukikibear 发表于 2023-11-2 13:40

这个可以 谢谢分享

lingwushexi 发表于 2023-11-2 14:05

感谢分享{:1_893:}

roddick1122 发表于 2023-11-2 15:05

感谢分享!!!
页: [1] 2 3
查看完整版本: [Python] 某站有声小说全本下载代码分享