好友
阅读权限 10
听众
最后登录 1970-1-1
因为喜欢子慕播音的《全职高手》,所以花了点时间研究 ting55,用 Python 写了全本下载工具,分享一些经验。
关于 ting55 的访问限制
有声小说的相关信息(比如书名、播音演员、章节名等)可以从 HTML 页面抓取(参考下文源码),不是难点,难点在于音频资源接口的处理。
ting55 核心接口是 ting55.com/nlinka ,POST 调用,用于解析音频资源直链 URL(参考下文源码),ting55 主要对该接口做了访问控制:
同一 IP 短时间连续请求间隔不得低于6秒(否则返回 503 状态码),不超过6次,否则返回空的 URL; 单 IP 超过限制大概30分钟后可继续请求,一天能发起有效请求(获取到音频直链)20~40次(不严谨测试)。
注意,不要频繁请求该接口,无论是否成功获取到音频 URL,都会被视为请求一次,导致推迟下次请求时间。
代码实现
访问控制搞清楚后,来想想思路,多线程不用考虑,意义不大。有两种思路:
基于定时器,卡着时间请求; 基于代{过}{滤}理 IP,单 IP 受限后更换 IP 继续。
本工具初版是按第一种思路实现,但过于缓慢,在服务器上挂一天只能获取几十章,《全职高手》全本1104章,难等,于是按第二种思路更新了代码(参考下文源码)。
下文代码为当前版本,后续可能更新,追踪开源代码即可:Gitee - nguaduot / audiobook
代{过}{滤}理 IP
参考:https://www.dailiservers.com/free-proxy-list/
将搜集的代{过}{滤}理 IP 按行分割存放在文本文件中,通过参数引入即可。运行后会逐个检查使用,失效代{过}{滤}理会即时从文件中剔除。
使用参考
命令行工具调用,建议先查看命令参数:python audiobook.py -h
[Asm] 纯文本查看 复制代码
usage: audiobook.py [-h] [--name NAME] [--actor ACTOR] [--start START] [--end END] [--out OUT]
[--proxy PROXY]
url
audiobook thief, just for https://ting55.com/
positional arguments:
url novel url (e.g. https://ting55.com/book/9200)
options:
-h, --help show this help message and exit
--name NAME novel name
--actor ACTOR voice actor
--start START starting chapter index (default 1)
--end END last chapter index (default 1)
--out OUT output folder (default same as novel name)
--proxy PROXY text file containing proxy IPs (e.g. 81.10.80.155:8080)
下载《全职高手》前500章,并手动设置小说名和播音演员(用于音频文件命名):
python audiobook.py https://ting55.com/book/9200 --name 全职高手 --actor 子慕 --start 1 --end 500
下载《雪中悍刀行》第1章(省略参数则自动抓取小说名和播音演员,不过只下载第1章):
python audiobook.py https://ting55.com/book/1419
运行后会自动生成代{过}{滤}理文件 audiobook.proxy ,为纯文本文件。如需使用代{过}{滤}理,打开按行添加代{过}{滤}理即可,或手动指定其他代{过}{滤}理文件。
TODO
ting55《全职高手》全本免费,本工具未对收费内容做研究 ,后续会研究其 APP,抓包分析一下接口。
[Python] 纯文本查看 复制代码
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import argparse
import datetime
import math
import os
import re
import time
import urllib.parse
from typing import List, Dict, Tuple
import requests
class Proxy(object):
"""
https://www.dailiservers.com/free-proxy-list/
https://proxyscrape.com/free-proxy-list-f
"""
DEF_PROXY_FILE_NAME = "audiobook.proxy"
def __init__(self, file_proxy: str):
self.file_proxy = file_proxy
if not self.file_proxy:
self.file_proxy = os.path.join(os.path.dirname(__file__), self.DEF_PROXY_FILE_NAME)
self.data_work = self.__load()
self.data_sleep = []
self.data_dead = []
@staticmethod
def __ip_with_port(proxy) -> bool:
return True if re.match("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$", proxy) else False
def __load(self) -> List[str]:
"""
始终将空字符串作为第一个值,表示不使用代{过}{滤}理
"""
if not os.path.isfile(self.file_proxy):
return [""]
data = set()
with open(self.file_proxy, "r", encoding="utf-8") as f:
for line in f.readlines():
line = line.strip()
if self.__ip_with_port(line):
data.add(line)
return [""] + list(data)
def get(self) -> str:
return self.data_work[0] if self.data_work else ""
def count(self) -> int:
return len(self.data_work)
def empty(self) -> bool:
return self.count() == 0
def feedback_dead(self, proxy: str):
if proxy in self.data_work:
self.data_work.remove(proxy)
self.data_dead.append(proxy)
self.save()
def feedback_sleep(self, proxy: str):
if proxy in self.data_work:
self.data_work.remove(proxy)
# self.data_work.append(proxy)
self.data_sleep.append(proxy)
self.save()
def save(self):
with open(self.file_proxy, "w", encoding="utf-8") as f:
f.write("\n".join([p for p in self.data_work if self.__ip_with_port(p)]))
f.write("\n".join([p for p in self.data_sleep if self.__ip_with_port(p)]))
class Ting55Thief(object):
"""
恋听网 https://ting55.com/
访问限制规则:
1. 同一 IP 短时间只能请求6次 nlinka 接口
"""
URL_NOVEL = "https://ting55.com/book/%s" # GET
URL_CHAPTER = "https://ting55.com/book/%s-%d" # GET
URL_API_RES = "https://ting55.com/nlinka" # POST
USER_AGENT = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
" AppleWebKit/537.36 (KHTML, like Gecko)"
" Chrome/116.0.0.0 Safari/537.36 Edg/116.0.1938.76")
NLINKA_INTERVAL = 6 # nlinka 请求间隔时长
FILE_SIZE_VALID = 100 * 1024 # 有效文件大小校验
def __init__(self, novel_id: str, novel_name: str, voice_actor: str, folder_out: str, file_proxy: str):
self.novel_id = novel_id # 小说ID
self.novel_name = novel_name # 小说名
self.voice_actor = voice_actor # 播音演员
self.folder_out = folder_out # 音频资源输出文件夹
self.chapter_max = 1 # 最大章节,用于固定宽度命名
self.log_nlinka = [0.0] # nlinka 请求时间日志
self.proxy = Proxy(file_proxy) # 代{过}{滤}理,用于 nlinka 接口
self.session = requests.session()
self.session.headers.update({
"User-Agent": self.USER_AGENT, # 必填
})
self.fix_novel_info() # 修正小说名、播音演员、输出文件夹、最大章节
def __res_url(self, url_page: str, chapter_id: int, token: str, proxy="") -> Tuple[str, int, str]:
"""
响应示例:{'ourl': '', 'plink': '', 'url': '', 'status': 1}
不使用代{过}{滤}理:proxies={"http": "", "https": ""}
返回值:-2 - 错误,-1 - 该资源失效,0 - 失败,1 - 成功且还剩余请求次数
"""
t = time.time()
try:
res = self.session.post(self.URL_API_RES, data={
"bookId": self.novel_id, # 必填
"page": chapter_id # 必填
# "isPay": 0 # 选填
}, headers={
"Referer": url_page, # 必填
"xt": token, # 必填
"l": "1" # 必填
}, proxies={
"http": proxy,
"https": proxy
}, timeout=8)
self.log_nlinka.append(t) # 无论是否获取到 URL,都计1次
if res.status_code != 200:
if res.status_code == 503:
return "", 0, "503: too fast"
return "", 0, "status code %d" % res.status_code
data = res.json()
if data["status"] != 1:
if data["status"] == -1:
return "", -1, "non-free content"
return "", -1, "unexpected response: %s" % data
if not data["url"]: # 达到请求限制
return "", 0, "too many requests"
return data["url"], 1, ""
except requests.exceptions.ReadTimeout:
return "", -2, "timeout"
except requests.exceptions.ProxyError:
return "", -2, "useless proxy"
except requests.exceptions.ConnectionError:
return "", -2, "connection error"
def fix_novel_info(self):
res = self.session.get(self.URL_NOVEL % self.novel_id)
text = res.text
novel_name, voice_actor, chapter_max = "", "", 1
match = re.search("<p>播音:<a[^>]+class=\"by\"[^>]*>(.+?)</a>", text)
if match:
voice_actor = match.group(1).strip()
match = re.search("class=\"binfo\"><h1>(.+?)</h1>", text)
if match:
novel_name = match.group(1).strip()
if voice_actor:
novel_name = re.sub("有声小说$", "", novel_name)
novel_name = re.sub("[((]子慕[))]", "", novel_name)
novel_name = novel_name.strip()
match = re.search(">(\\d+)</a></li></ul>", text)
if match:
chapter_max = int(match.group(1))
if not self.novel_name: # 修正小说名
self.novel_name = novel_name if novel_name else "未知小说"
if not self.voice_actor: # 修正播音演员
self.voice_actor = voice_actor if voice_actor else "未知播音"
if not self.folder_out: # 修正输出文件夹
self.folder_out = os.path.join(os.path.dirname(__file__), self.novel_name)
self.chapter_max = chapter_max # 修正最大章节
def chapter_url(self, chapter_id: int) -> str:
return self.URL_CHAPTER % (self.novel_id, chapter_id)
def file_name(self, chapter_id: int, chapter_name: str, fmt: str) -> str:
"""
格式:{小说名}-{播音演员}-{章节索引}-{章节名}.{扩展名}
"""
pattern = "%%s-%%s-%%0%dd-%%s%%s" % len(str(self.chapter_max))
name = pattern % (self.novel_name, self.voice_actor, chapter_id, chapter_name, fmt)
name = re.sub("[/\\\\:*?\"<>|]+", "_", name) # 替换不可用字符
return name
def get(self, chapter_id: int) -> Tuple[str, str]:
"""
Args:
chapter_id:
Returns:
音频资源直链、章节名
"""
url_page = self.chapter_url(chapter_id)
res = self.session.get(url_page)
text = res.text
match = re.search("name=\"_c\" content=\"(.+?)\"", text)
if not match:
print("token not found")
return "", ""
token = match.group(1)
while True:
proxy = self.proxy.get()
print("try proxy %s" % (proxy if proxy else "null"))
url_audio, status, msg = self.__res_url(url_page, chapter_id, token, proxy)
if status == 1:
break
if status == -1: # 非免费资源或资源失效
print(msg)
break
if status == 0: # 代{过}{滤}理有效但数据异常
print(msg)
self.proxy.feedback_sleep(proxy)
else: # -2 无效代{过}{滤}理
print(msg)
self.proxy.feedback_dead(proxy)
if self.proxy.empty(): # 代{过}{滤}理耗尽
break
if not url_audio:
print("error in parsing chapter url")
return "", ""
match = re.search(">\\s*第(\\d+)章\\s*(.+?)在线收听\\s*<", text)
if not match:
print("chapter id and name not found")
return "", ""
chapter_id_src = int(match.group(1))
if chapter_id_src != chapter_id:
print("chapter id not matched")
return "", ""
chapter_name = match.group(2)
return url_audio, chapter_name
def download(self, chapter_url: str, chapter_id: int, chapter_name: str) -> str:
"""
链接有时效
Args:
chapter_url:
chapter_id:
chapter_name:
Returns:
下载完成文件路径
"""
if not chapter_url:
print("empty chapter url")
return ""
_, fmt = os.path.splitext(urllib.parse.urlparse(chapter_url).path)
name = self.file_name(chapter_id, chapter_name, fmt)
if not os.path.isdir(self.folder_out):
os.mkdir(self.folder_out)
path = os.path.join(self.folder_out, name)
res = self.session.get(chapter_url)
with open(path, "wb") as f:
f.write(res.content)
if os.path.getsize(path) < self.FILE_SIZE_VALID:
print("wrong file size: %.2fkb" % (os.path.getsize(path) / 1024))
return ""
return path
def downloaded(self) -> List[int]:
data = []
if not os.path.isdir(self.folder_out):
return []
for name in os.listdir(self.folder_out):
m = re.match("%s-%s-(\\d+)-.+" % (self.novel_name, self.voice_actor), name)
if m:
data.append(int(m.group(1)))
return data
def available(self) -> bool:
return time.time() - self.log_nlinka[-1] > self.NLINKA_INTERVAL
def wait(self) -> int:
return max(math.ceil(self.NLINKA_INTERVAL - (time.time() - self.log_nlinka[-1])), 0)
def config(self) -> Dict:
return {
"novel_id": self.novel_id,
"novel_name": self.novel_name,
"voice_actor": self.voice_actor,
"chapter_max": self.chapter_max,
"folder_out": self.folder_out,
"file_proxy": self.proxy.file_proxy
}
@staticmethod
def parse_novel_id(novel_url: str) -> str:
if not novel_url:
return ""
m = re.search("ting55\\.com/book/(\\d+)", novel_url)
if m:
return m.group(1)
return ""
def parse_config() -> Dict:
"""
argparse:
https://docs.python.org/zh-cn/3/howto/argparse.html
"""
parser = argparse.ArgumentParser(description="audiobook thief, just for https://ting55.com/")
parser.add_argument(
"url",
type=str,
help="novel url (e.g. https://ting55.com/book/9200)"
)
parser.add_argument(
"--name",
type=str, default="", required=False,
help="novel name"
)
parser.add_argument(
"--actor",
type=str, default="", required=False,
help="voice actor"
)
parser.add_argument(
"--start",
type=int, default=1, required=False,
help="starting chapter index (default 1)"
)
parser.add_argument(
"--end",
type=int, default=1, required=False,
help="last chapter index (default 1)"
)
parser.add_argument(
"--out",
type=str, default="", required=False,
help="output folder (default same as novel name)"
)
parser.add_argument(
"--proxy",
type=str, default="", required=False,
help="text file containing proxy IPs (e.g. 81.10.80.155:8080)"
)
args = parser.parse_args()
novel_id = Ting55Thief.parse_novel_id(args.url)
start = args.start
end = args.end
if end < start:
end = start
config = {
"id": novel_id,
"name": args.name,
"actor": args.actor,
"start": start,
"end": end,
"out": args.out,
"proxy": args.proxy
}
if not novel_id:
print("invalid novel url (e.g. https://ting55.com/book/9200)")
exit()
return config
def now() -> str:
return datetime.datetime.now().strftime("%H:%M:%S")
def run():
thief = Ting55Thief(CONFIG["id"], CONFIG["name"], CONFIG["actor"], CONFIG["out"], CONFIG["proxy"])
print("config:", thief.config())
data_downloaded = thief.downloaded()
data_todo = [i for i in range(CONFIG["start"], CONFIG["end"] + 1) if i not in data_downloaded]
print("todo: %d" % len(data_todo))
if not data_todo:
return
index = 0
while True:
if index >= len(data_todo):
break
chapter_id = data_todo[index]
print("[%s] %d/%d: %s" % (now(), chapter_id, CONFIG["end"], thief.chapter_url(chapter_id)))
chapter_url, chapter_name = thief.get(chapter_id)
print("[%s] %d/%d: %s %s" % (
now(), chapter_id, CONFIG["end"],
chapter_name if chapter_name else "null", chapter_url if chapter_url else "null"
))
if not chapter_url or not chapter_name:
break
file_saved = thief.download(chapter_url, chapter_id, chapter_name)
if not file_saved:
break
print("[%s] %d/%d: %s %.2fmb" % (
now(), chapter_id, CONFIG["end"], file_saved, os.path.getsize(file_saved) / 1024 / 1024
))
index += 1
print("sleep %d sec..." % thief.wait())
time.sleep(thief.wait())
if __name__ == "__main__":
VERSION = "1.5.230918"
print("audiobook thief v%s gitee.com/nguaduot/audiobook" % VERSION)
CONFIG = parse_config()
run()
免费评分
查看全部评分