某信公众号视频批量下载
示例网址:aHR0cHM6Ly9tcC53ZWl4aW4ucXEuY29tL21wL2FwcG1zZ2FsYnVtP2FjdGlvbj1nZXRhbGJ1bSZhbGJ1bV9pZD0xNjQwODY5NjU4MTU1MDczNTQxI3dlY2hhdF9yZWRpcmVjdA==
注意:论坛把一些不该解码的字符解码了,导致代码中 &
解码相关的地方等会出现问题。请参考链接给出的源码。
获取合集链接的 HTML
这一步比较简单,我们首先把常见的请求头抄到代码里,然后用 requests
请求它即可:
from requests import RequestException, Session
HEADERS = {
# ...
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36 Edg/127.0.0.0",
}
x = Session()
x.headers.update(HEADERS)
def fetch_article_urls(album_url: str) -> list:
"""Fetch the list of article urls from the `album_url`."""
assert album_url.startswith(
"https://<domain>/mp/appmsgalbum?"
), "Invalid album URL"
r = x.get(album_url)
album_info = extract_album_info(r.text) # TODO
article_urls = extract_article_urls(album_info) # TODO
return article_urls
注意到代码里使用 Session.get
来请求,而非直接 requests.get
。这么做主要是为了保存会话中的 cookie,从而可以避免一些反爬措施。另外来说,Session
会保持连接,从而加快请求速度。
从合集中提取推送链接
首先打开网址,点击合集链接,从中选取关键词复制 (例如 __biz=Mzg5ODU0MjM2NA
),然后回到合集网址,打开源代码面板在此网页源码中进行搜索:
可以看到,马上就找到了需要的数据。那么接下来就让 GPT 编写正则表达式来提取这个 videoList
变量内的数据:
var\s+videoList\s+=\s*(\[\s*\{.*?\}\s*\]);
稍微有点头痛的是将匹配到的字符串结果解析为 list
对象。如果是标准的 JSON 格式的话,可以通过 json.loads
轻松解决;但是它的 key 没有用双引号包裹,不能通过 json.loads
解析。在网上寻找了一番后,发现 json5
可以很好的满足我们的需求:它允许数据内含有注释,允许字典键不被引号包括,允许数组最后一个元素末尾有逗号... 同时,还有一个小细节需要注意:每个项目的 pos_num
最后还有一个 * 1
需要特殊处理一下,给它删掉。那么我们就可以实现一个函数来从合集的 HTML 中提取数据的函数:
def extract_album_info(html: str) -> list:
"""Extract the album information from the HTML."""
# Extract the __appmsgalbum array string
match = search(r"var\s+videoList\s+=\s*(\[\s*\{.*?\}\s*\]);", html, DOTALL)
if not match:
return []
json_str = match.group(1)
# Remove ' * 1'
json_str = sub(r"\s+\*\s+1", "", json_str)
data = loads(json_str)
return data
接下来,我们需要从数据中提取出感兴趣的部分:各个推送的网址。这一部分相对简单,只需要注意将 &
解码为 &
即可,代码略。
获取推送链接 HTML
这一步和 [获取合集链接的 HTML](#获取合集链接的 HTML) 基本一样,此处不再赘述。
def download_single(article_url: str, filename: str):
"""Download a single video from the given `article_url`."""
assert article_url.startswith("https://<domain>/s?"), "Invalid article URL"
print(f"Extracting video from {article_url}...")
r = x.get(article_url)
if "环境异常" in r.text:
print("Aborted due to detected environment exception.")
return
data = extract_video_info(r.text) # TODO
if not data:
print("No video found.")
return
item = best_quality(data) # TODO
item["url"] = item["url"].replace("&", "&") # 解码
download_video(item["url"], f"{filename}.mp4") # TODO
从推送链接中提取最优视频链接
这一步和 从合集中提取推送链接 基本一样,只是正则表达式、解码和特殊处理的具体操作有异,此处不再赘述。
def extract_video_info(html: str) -> list:
"""Extract the video information from the HTML."""
# Extract the __mpVideoTransInfo array string
match = search(
r"window\.__mpVideoTransInfo\s*=\s*(\[\s*\{.*?\},\s*\]);", html, DOTALL
)
if not match:
return []
json_str = match.group(1)
# Remove '* 1 || 0'
json_str = sub(r"\s*\*\s*1\s*\|\|\s*0", "", json_str)
# Only keep url in '(url).replace(/^http(s?):/, location.protocol)'
json_str = sub(
r"\(\s*(\'http[^\)]*)\)\.replace\(\s*/\^http\(s\?\):/, location\.protocol\s*\)",
r"\1",
json_str,
)
data = loads(json_str)
return data
def best_quality(data: list) -> dict:
"""Return the URL of the best quality video."""
# Consider first `video_quality_level`, then `filesize`
# Note that they're both strings, so we should convert them to integers
item = max(
data,
key=lambda x: (int(x["video_quality_level"] or 0), int(x["filesize"]) or 0),
)
return item
支持断点续传的视频下载
视频下载这里反爬较为严格,需要额外补充 Host
, Origin
, Referer
请求头。另外,由于希望实现断点续传,需要自行构造 Range
请求头来获取想要的片段。断点续传与流式下载大文件的代码是改编而来的,参考来源已在文末列出。这里我就偷个懒,贴一下我的下载函数完事:
def download_video(video_url: str, filename: str):
"""Download the video given the video URL."""
print(f" 🔍 Downloading {filename}...", end="\r")
tmp_file_path = filename + ".tmp"
if not path.exists(filename) or path.exists(tmp_file_path):
try:
r = x.get(video_url, headers=VIDEO_HEADERS, stream=True)
r.raise_for_status() # Raise an exception if the response is not 200 OK
total_size = int(r.headers["Content-Length"])
if path.exists(tmp_file_path):
tmp_size = path.getsize(tmp_file_path)
print(f" Already downloaded {tmp_size} Bytes out of {total_size} Bytes ({100 * tmp_size / total_size:.2f}%)")
if tmp_size == total_size:
move(tmp_file_path, filename)
print(" ✅ Downloaded {filename} successfully.")
return True
elif tmp_size > total_size:
print(" ❌ The downloaded .tmp file is larger than the remote file. It is likely corrupted.")
return False
else:
tmp_size = 0
print(f" File is {total_size} Bytes, downloading...")
with open(tmp_file_path, "ab") as f:
retries = 0
while retries < RETRIES:
try:
res = x.get(video_url, headers={**VIDEO_HEADERS, "Range": f"bytes={tmp_size}-"}, stream=True)
for chunk in res.iter_content(chunk_size=CHUNK_SIZE):
tmp_size += len(chunk)
f.write(chunk)
f.flush()
done = int(50 * tmp_size / total_size)
print(f"\r [{'█' * done}{' ' * (50 - done)}] {100 * tmp_size / total_size:.0f}%", end="")
break
except RequestException as e:
retries += 1
print(f"\n ⚠️ Retrying... ({retries}/{RETRIES})")
sleep(INTERVAL)
else:
print(f"\n ❌ Failed to download {filename} after {RETRIES} retries.")
return False
if tmp_size == total_size:
move(tmp_file_path, filename)
print(f"\n ✅ Downloaded {filename} successfully.")
except RequestException as e:
# Log the error
print(e)
with open(filename + '_log.txt', 'a+', encoding = 'UTF-8') as f:
f.write('%s, %s\n' % (video_url, e))
print(f" ❌ Failed to download {filename}.")
else:
print(f" ✅ Downloaded {filename} successfully.")
开源地址与参考