论坛二创第三方全网视频电视剧全网电影解析多线程下载
本帖最后由 蜗牛很牛 于 2024-9-2 16:02 编辑第三方全网视频电视剧全网电影解析多线程下载
感谢原创。本人学艺不精还请大家有好的意见一起交流import tkinter as tk
from tkinter import ttk, filedialog, messagebox
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import asyncio
import aiohttp
import aiofiles
import hashlib
import shutil
import os
import re
import requests
from tqdm import tqdm
import datetime
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36',
}
def download_m3u8(urls, film_name, path):
print("请等待 m3u8 文件下载完成!")
film_lists = []
film_dir = os.path.join(path, film_name)
os.makedirs(film_dir, exist_ok=True)
for url in tqdm(urls):
episode_name = url["name"].strip().replace('\t', '')
film_lists.append(episode_name)
episode_dir = os.path.join(film_dir, episode_name)
os.makedirs(episode_dir, exist_ok=True)
response = requests.get(url['url'], headers=headers)
m3u8_url = url['url'] if '#EXT-X-ENDLIST' in response.text else re.search(r'.*m3u8.*', response.text).group(0)
m3u8_content = requests.get(m3u8_url).text
with open(os.path.join(episode_dir, 'first.m3u8'), 'w') as f:
f.write(m3u8_content)
return film_lists, film_name
def get_key(key_url):
return requests.get(key_url).content
async def download_one(path, url, sem, key, episode):
file_name = url.split('/')[-1]
async with sem:
for _ in range(10):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
content = await response.read()
if key:
cipher = Cipher(algorithms.AES(key), modes.CBC(b"0000000000000000"),
backend=default_backend())
decryptor = cipher.decryptor()
content = decryptor.update(content) + decryptor.finalize()
async with aiofiles.open(os.path.join(path, file_name), mode='wb') as f:
await f.write(content)
print(f"{episode} - {file_name} 完成片段!")
break
except Exception as e:
print(f"{episode} - {file_name} 超时重试!", e)
def merge(path, episode):
print("开始合并")
episode_dir = os.path.join(path, episode)
ts_files = []
with open(os.path.join(episode_dir, "first.m3u8"), "r", encoding='utf-8') as fa:
ts_files = for line in fa if not line.startswith("#")]
os.chdir(episode_dir)
n = 1
temp = []
for i, ts_file in enumerate(ts_files):
temp.append(ts_file)
if i > 0 and i % 100 == 0:
cmd = f"copy /b {'+'.join(temp)} {n}.ts"
os.system(cmd)
videos_del(temp)
temp = []
n += 1
if temp:
cmd = f"copy /b {'+'.join(temp)} {n}.ts"
os.system(cmd)
videos_del(temp)
last_temp =
cmd = f"copy /b {'+'.join(last_temp)} {episode}.mp4"
os.system(cmd)
shutil.move(f"{episode}.mp4", path)
os.chdir(path)
for i in range(1, n + 1):
os.remove(os.path.join(episode_dir, f"{i}.ts"))
print("合并完成")
def videos_del(ts_files):
for ts_file in ts_files:
os.remove(ts_file)
async def download_all_videos(film_list, film_name, film_path, m3u8_urls):
tasks = []
sem = asyncio.Semaphore(100)
film_path = os.path.join(film_path, film_name)
for episode, m3u8_url in zip(film_list, m3u8_urls):
key_value = ''
resp = requests.get(m3u8_url['url'])
key_exists = 'AES-128' in resp.text
if key_exists:
key_url = m3u8_url['url'].replace('index.m3u8', 'enc.key')
key_value = get_key(key_url)
with open(os.path.join(film_path, episode, 'first.m3u8'), 'r', encoding='utf-8') as f:
for line in f:
if line.startswith('#'):
continue
line = line.strip()
task = asyncio.create_task(
download_one(os.path.join(film_path, episode), line, sem, key_value, episode))
tasks.append(task)
await asyncio.wait(tasks)
await asyncio.get_event_loop().run_in_executor(None, merge, film_path, episode)
print(f"{episode} 完成")
print("全部下载完成!")
def get_md5_day():
now = datetime.datetime.now()
day_of_num = str(now.day + 9 + 9 ^ 10)
md5_hash = hashlib.md5(day_of_num.encode()).hexdigest()[:10]
md5_hash = hashlib.md5(md5_hash.encode()).hexdigest()
return md5_hash, str(now.day + 11372)
def get_m3u8_url(film):
md5_hash, one_day = get_md5_day()
params = {'z': md5_hash, 'jx': film, 's1ig': one_day, 'g': ''}
response = requests.get('https://m1-a1.cloud.nnpp.vip:2223/api/v/', params=params, headers=headers)
if response.status_code != 200:
print(f"请求失败,状态码:{response.status_code}")
return None
try:
response_data = response.json()
except requests.exceptions.JSONDecodeError:
print("响应内容不是有效的 JSON 格式")
return None
if 'data' not in response_data:
print("未搜索到相关片源")
return None
return response_data['data']
def download_via_ui(film, selected_idx, episodes, path):
film_data = get_m3u8_url(film)
if not film_data:
return
selected_film = film_data
film_name = selected_film['name']
m3u8_urls = selected_film['source']['eps']
target_episode = episodes if len(episodes) > 1 else ''
if '-' in target_episode:
start_idx, end_idx = map(lambda x: int(x) - 1, target_episode.split('-'))
if start_idx < 0:
start_idx = 0
if end_idx >= len(m3u8_urls):
end_idx = len(m3u8_urls) - 1
m3u8_list = m3u8_urls
elif ',' in target_episode:
indexes =
m3u8_list = for i in indexes]
elif target_episode.strip() == '':
m3u8_list = m3u8_urls
else:
start_idx = int(target_episode) - 1
if start_idx < 0:
start_idx = 0
if start_idx >= len(m3u8_urls):
start_idx = len(m3u8_urls) - 1
m3u8_list = ]
film_lists, _ = download_m3u8(m3u8_list, film_name, path)
asyncio.run(download_all_videos(film_lists, film_name, path, m3u8_list))
def main_ui():
def select_directory():
path = filedialog.askdirectory()
if path:
entry_path.delete(0, tk.END)
entry_path.insert(0, path)
def search_film():
film = entry_film.get().strip()
if not film:
messagebox.showwarning("警告", "请输入影片名")
return
film_data = get_m3u8_url(film)
if not film_data:
messagebox.showwarning("警告", "未搜索到相关片源")
return
combo_film['values'] = } ({len(item['source']['eps'])}集)" for item in film_data]
combo_film.current(0)
def start_download():
film = entry_film.get().strip()
episodes = entry_episodes.get().strip().split('-')
path = entry_path.get().strip() or os.getcwd()
selected_idx = combo_film.current()
if not film or not episodes or not path or selected_idx < 0:
messagebox.showwarning("警告", "请填写所有信息")
return
download_via_ui(film, selected_idx, episodes, path)
root = tk.Tk()
root.title("M3U8 批量下载工具")
label_film = ttk.Label(root, text="影片名:")
label_film.grid(row=0, column=0, sticky=tk.W, padx=5, pady=5)
entry_film = ttk.Entry(root)
entry_film.grid(row=0, column=1, sticky=tk.EW, padx=5, pady=5)
button_search = ttk.Button(root, text="搜索影片", command=search_film)
button_search.grid(row=0, column=2, sticky=tk.EW, padx=5, pady=5)
label_rule = ttk.Label(root, text="集数输入规则: 1-3 或 1,2,3 或 空")
label_rule.grid(row=1, column=0, columnspan=3, sticky=tk.W, padx=5, pady=5)
label_episodes = ttk.Label(root, text="集数:")
label_episodes.grid(row=2, column=0, sticky=tk.W, padx=5, pady=5)
entry_episodes = ttk.Entry(root)
entry_episodes.grid(row=2, column=1, columnspan=2, sticky=tk.EW, padx=5, pady=5)
label_combo_film = ttk.Label(root, text="选择影片:")
label_combo_film.grid(row=3, column=0, sticky=tk.W, padx=5, pady=5)
combo_film = ttk.Combobox(root)
combo_film.grid(row=3, column=1, columnspan=2, sticky=tk.EW, padx=5, pady=5)
label_path = ttk.Label(root, text="下载路径:")
label_path.grid(row=4, column=0, sticky=tk.W, padx=5, pady=5)
entry_path = ttk.Entry(root)
entry_path.grid(row=4, column=1, sticky=tk.EW, padx=5, pady=5)
button_browse = ttk.Button(root, text="浏览", command=select_directory)
button_browse.grid(row=4, column=2, sticky=tk.EW, padx=5, pady=5)
button_start = ttk.Button(root, text="开始下载", command=start_download)
button_start.grid(row=5, column=0, columnspan=3, sticky=tk.EW, padx=5, pady=5)
root.columnconfigure(1, weight=1)
root.mainloop()
if __name__ == "__main__":
main_ui()
感谢分享,这个应该不能下VIP吧。
搜索返回:{'type': 4000}
是哪个参数有问题吗,下载界面很容易卡顿 谢谢楼主的分享!这个绝对要顶!!! 谢谢分享 搜索任何关键字都是返回:{'type': 4000}
未搜索到相关片源 感谢分享!光是学习研究就很值当了 收藏了,感谢分享! 不太好用啊 画质不高啊 chaozhi 发表于 2024-9-2 17:28
搜索任何关键字都是返回:{'type': 4000}
未搜索到相关片源
直接调用的的别人解析接口有时候会抽风{:1_896:}
页:
[1]
2