[Python] 纯文本查看 复制代码
import asyncio
import time
from Crypto.Cipher import AES
import requests
import re
import os
from urllib.parse import urljoin
import aiohttp
import aiofiles
import shutil
# 定义请求头
hearders = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.5735.289 Safari/537.36'
}
def get_m3u8_1(url):
"""
从给定的URL获取初始的m3u8链接和视频名称,并创建相关目录。
:param url: 视频链接
:return: 视频名称和m3u8链接的元组
"""
resp = requests.get(url, headers=hearders)
if resp.status_code == 200:
# print(resp.text)
pattern = re.compile(r'link_pre":"(.*?)","url":"(.*?)"')
pattern_name = re.compile(r'<title>《(.*?)》')
result = re.search(pattern, resp.text)
m3u8_url = result.group(2)
m3u8_url = m3u8_url.replace('\\', '')
result_name = re.search(pattern_name, resp.text)
name = result_name.group(1)
name = name.replace(' ', '')
os.makedirs(f'{name}/encryption')
os.makedirs(f'{name}/decryption')
with open(name + '/' + '1.m3u8', 'w') as f:
f.write(requests.get(m3u8_url, headers=hearders).text)
return name, m3u8_url
def get_m3u8_2(name, m3u8_url):
"""
获取深层次的m3u8链接(如果存在)。
:param name: 视频名称
:param m3u8_url: 初始或上层的m3u8链接
:return: 深层次m3u8链接或原链接
"""
path = f'{name}/1.m3u8'
if os.path.exists(path):
with open(path, 'r') as f:
m3u8_lst = f.readlines()
# print(len(m3u8_lst))
if (len(m3u8_lst) < 10):
for line in m3u8_lst:
if line.startswith('#'):
continue
else:
m3u8_url_2 = urljoin(m3u8_url, line.strip().replace('\n', ''))
# print(m3u8_url_2)
if not os.path.exists(f'{name}/2.m3u8'):
with open(f'{name}/2.m3u8', 'w') as f:
f.write(requests.get(m3u8_url_2, headers=hearders).text)
return m3u8_url_2
else:
with open(f'{name}/2.m3u8', 'w') as f:
f.writelines(m3u8_lst)
return m3u8_url
def get_movie_lsts(name, m3u8_url_2):
"""
从m3u8链接中提取电影片段列表和密钥。
:param name: 视频名称
:param m3u8_url_2: m3u8链接(可能是深层次的)
"""
with open(f'{name}/movie_lsts.txt', 'w') as f1:
with open(f'{name}/2.m3u8', 'r') as f:
for line in f.readlines():
if line.startswith('#'):
if 'key' in line:
key = re.search(r'URI="(.*?)"', line).group(1)
if 'IV' in line:
# 这玩意好像没什么用 都是0用none一样
iv = line.split('=')[-1].replace('"', '').strip()
# print(key, iv)
# print(urljoin(m3u8_url_2, key))
continue
else:
f1.write(urljoin(m3u8_url_2, line.strip()) + '\n')
with open(f'{name}/key.key', 'wb') as f:
f.write(requests.get(urljoin(m3u8_url_2, key), headers=hearders).content)
async def down_load(movie_url, sema, name):
"""
异步下载电影片段。
:param movie_url: 电影片段链接
:param sema: 线程锁
:param name: 视频名称
"""
file_path = f'{name}/encryption/{movie_url.split("/")[-1]}'
async with sema:
for i in range(10):
try:
print(f'{file_path}第{i + 1}次下载开始下载')
async with aiohttp.ClientSession() as session:
async with session.get(movie_url, headers=hearders) as response:
content = await response.content.read()
async with aiofiles.open(file_path, 'wb') as f:
await f.write(content)
print(f'{file_path}下载成功')
break
except Exception as e:
print(f'{file_path}第{i + 1}次下载失败正在重试下载,原因:{e}')
continue
async def main(name):
"""
主异步下载函数,管理下载任务。
:param name: 视频名称
"""
movie_lsts = get_movie_names(name)
tasks = []
sema = asyncio.Semaphore(100)
for movie_lst in movie_lsts:
tasks.append(down_load(movie_lst.strip().replace('\n', ''), sema, name))
await asyncio.gather(*tasks)
def get_movie_names(name):
"""
获取电影片段名称列表。
:param name: 视频名称
:return: 电影片段链接列表
"""
with open(f'{name}/movie_lsts.txt', 'r') as f:
movie_lsts = f.readlines()
return movie_lsts
async def decrypt_file(input_filename, output_filename, key, iv=None):
"""
异步解密文件。
:param input_filename: 输入文件名
:param output_filename: 输出文件名
:param key: 解密密钥
:param iv: 初始化向量
"""
cipher = AES.new(key, AES.MODE_CBC, iv)
try:
async with aiofiles.open(input_filename, 'rb') as infile:
encrypted_data = await infile.read()
async with aiofiles.open(output_filename, 'wb') as outfile:
await outfile.write(cipher.decrypt(encrypted_data))
except Exception as e:
print(f'{input_filename}解密失败,原因:应该是未加密的广告{e}')
async def file_lsts(name):
"""
管理文件解密任务。
:param name: 视频名称
"""
movie_lsts = get_movie_names(name)
with open(f'{name}/key.key', 'rb') as f:
key = f.read()
tasks = []
for movie_name in movie_lsts:
movie_name = movie_name.split('/')[-1].replace('\n', '')
input_filename = f'{name}/encryption/{movie_name}'
output_filename = f'{name}/decryption/{movie_name}'
tasks.append(asyncio.create_task(decrypt_file(input_filename, output_filename, key)))
await asyncio.gather(*tasks)
def merge_movie(name):
"""
合并解密后的电影文件为MP4格式。
:param name: 视频名称
"""
temp = [] # 临时存储合成批次的文件名
n = 1 # 初始化批次号
now_path = os.getcwd() # 获取当前工作目录
lst_movies = get_movie_names(name)
path = f'{name}/decryption' # 设置合成前文件存放路径
os.chdir(path) # 切换到合成前文件存放路径
# 循环处理每个电影文件,直到处理完所有文件
for i in range(len(lst_movies)):
file_name = lst_movies[i].replace('\n', '').split('/')[-1]
temp.append(file_name) # 添加文件名到临时列表
# 当临时列表达到20个文件名时,进行一次合成
if len(temp) == 20:
cmd = f'copy /b {"+".join(temp)} {n}.ts' # 构造合成命令
r = os.popen(cmd) # 执行合成命令
print(r.read()) # 打印命令执行结果
n += 1 # 更新批次号
temp = [] # 清空临时列表
# 处理剩余的文件名,进行最后一次合成
cmd = f'copy /b {"+".join(temp)} {n}.ts'
r = os.popen(cmd)
print(r.read())
last_temp = [] # 存储所有合成批次的文件名
for i in range(1, n + 1):
last_temp.append(f'{i}.ts')
cmd = f'copy /b {"+".join(last_temp)} {name}.mp4' # 构造最终合成命令
r = os.popen(cmd) # 执行最终合成命令
print(r.read()) # 打印最终合成命令的执行结果
os.chdir(now_path) # 返回初始工作目录
print('合并完成')
def last_work(name):
"""
清理工作,移动文件和删除临时目录。
:param name: 视频名称
"""
det_file_path = f'{name}/'
src_file_path = f'{name}/decryption/{name}.mp4'
if os.path.exists(det_file_path) and os.path.exists(src_file_path):
shutil.move(src_file_path, det_file_path)
del_dir(f'{name}/decryption')
del_dir(f'{name}/encryption')
try:
os.remove(f'{name}/key.key')
os.remove(f'{name}/1.m3u8')
os.remove(f'{name}/2.m3u8')
os.remove(f'{name}/movie_lsts.txt')
except Exception as e:
print(e)
def del_dir(directory_path):
"""
删除指定目录及其内容。
:param directory_path: 目录路径
"""
try:
shutil.rmtree(directory_path)
print(f"目录 {directory_path} 及其内容已删除")
except FileNotFoundError:
print(f"目录 {directory_path} 不存在")
except Exception as e:
print(f"删除目录时出错: {e}")
def all_func(url):
"""
执行全部功能的入口函数。
"""
# url = 'http://www.ahljtj.com/play/5618346-2-1.html'
name, m3u8_url = get_m3u8_1(url)
m3u8_url_2 = get_m3u8_2(name, m3u8_url)
get_movie_lsts(name, m3u8_url_2)
asyncio.run(main(name))
# name = '庆余年第二季'
asyncio.run(file_lsts(name))
merge_movie(name)
last_work(name)
print('全部完成')
if __name__ == '__main__':
while True:
print('http://www.ahljtj.com/list/1.html')
print('只作为学习研究,请勿用于非法用途')
url = input('请输入电影链接,理论上也支持电视剧(下完改名字不然会覆盖):')
all_func(url)