蜗牛很牛 发表于 2024-9-2 15:52

论坛二创第三方全网视频电视剧全网电影解析多线程下载

本帖最后由 蜗牛很牛 于 2024-9-2 16:02 编辑

第三方全网视频电视剧全网电影解析多线程下载
感谢原创。本人学艺不精还请大家有好的意见一起交流import tkinter as tk
from tkinter import ttk, filedialog, messagebox
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import asyncio
import aiohttp
import aiofiles
import hashlib
import shutil
import os
import re
import requests
from tqdm import tqdm
import datetime

headers = {
    'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36',
}


def download_m3u8(urls, film_name, path):
    print("请等待 m3u8 文件下载完成!")
    film_lists = []

    film_dir = os.path.join(path, film_name)
    os.makedirs(film_dir, exist_ok=True)

    for url in tqdm(urls):
      episode_name = url["name"].strip().replace('\t', '')
      film_lists.append(episode_name)
      episode_dir = os.path.join(film_dir, episode_name)
      os.makedirs(episode_dir, exist_ok=True)

      response = requests.get(url['url'], headers=headers)
      m3u8_url = url['url'] if '#EXT-X-ENDLIST' in response.text else re.search(r'.*m3u8.*', response.text).group(0)
      m3u8_content = requests.get(m3u8_url).text

      with open(os.path.join(episode_dir, 'first.m3u8'), 'w') as f:
            f.write(m3u8_content)

    return film_lists, film_name


def get_key(key_url):
    return requests.get(key_url).content


async def download_one(path, url, sem, key, episode):
    file_name = url.split('/')[-1]
    async with sem:
      for _ in range(10):
            try:
                async with aiohttp.ClientSession() as session:
                  async with session.get(url) as response:
                        content = await response.read()
                        if key:
                            cipher = Cipher(algorithms.AES(key), modes.CBC(b"0000000000000000"),
                                          backend=default_backend())
                            decryptor = cipher.decryptor()
                            content = decryptor.update(content) + decryptor.finalize()
                        async with aiofiles.open(os.path.join(path, file_name), mode='wb') as f:
                            await f.write(content)
                print(f"{episode} - {file_name} 完成片段!")
                break
            except Exception as e:
                print(f"{episode} - {file_name} 超时重试!", e)


def merge(path, episode):
    print("开始合并")
    episode_dir = os.path.join(path, episode)
    ts_files = []

    with open(os.path.join(episode_dir, "first.m3u8"), "r", encoding='utf-8') as fa:
      ts_files = for line in fa if not line.startswith("#")]

    os.chdir(episode_dir)
    n = 1
    temp = []

    for i, ts_file in enumerate(ts_files):
      temp.append(ts_file)
      if i > 0 and i % 100 == 0:
            cmd = f"copy /b {'+'.join(temp)} {n}.ts"
            os.system(cmd)
            videos_del(temp)
            temp = []
            n += 1

    if temp:
      cmd = f"copy /b {'+'.join(temp)} {n}.ts"
      os.system(cmd)
      videos_del(temp)

    last_temp =
    cmd = f"copy /b {'+'.join(last_temp)} {episode}.mp4"
    os.system(cmd)
    shutil.move(f"{episode}.mp4", path)
    os.chdir(path)

    for i in range(1, n + 1):
      os.remove(os.path.join(episode_dir, f"{i}.ts"))
    print("合并完成")


def videos_del(ts_files):
    for ts_file in ts_files:
      os.remove(ts_file)


async def download_all_videos(film_list, film_name, film_path, m3u8_urls):
    tasks = []
    sem = asyncio.Semaphore(100)
    film_path = os.path.join(film_path, film_name)

    for episode, m3u8_url in zip(film_list, m3u8_urls):
      key_value = ''
      resp = requests.get(m3u8_url['url'])
      key_exists = 'AES-128' in resp.text

      if key_exists:
            key_url = m3u8_url['url'].replace('index.m3u8', 'enc.key')
            key_value = get_key(key_url)

      with open(os.path.join(film_path, episode, 'first.m3u8'), 'r', encoding='utf-8') as f:
            for line in f:
                if line.startswith('#'):
                  continue
                line = line.strip()
                task = asyncio.create_task(
                  download_one(os.path.join(film_path, episode), line, sem, key_value, episode))
                tasks.append(task)

      await asyncio.wait(tasks)
      await asyncio.get_event_loop().run_in_executor(None, merge, film_path, episode)
      print(f"{episode} 完成")

    print("全部下载完成!")


def get_md5_day():
    now = datetime.datetime.now()
    day_of_num = str(now.day + 9 + 9 ^ 10)
    md5_hash = hashlib.md5(day_of_num.encode()).hexdigest()[:10]
    md5_hash = hashlib.md5(md5_hash.encode()).hexdigest()
    return md5_hash, str(now.day + 11372)


def get_m3u8_url(film):
    md5_hash, one_day = get_md5_day()
    params = {'z': md5_hash, 'jx': film, 's1ig': one_day, 'g': ''}
    response = requests.get('https://m1-a1.cloud.nnpp.vip:2223/api/v/', params=params, headers=headers)

    if response.status_code != 200:
      print(f"请求失败,状态码:{response.status_code}")
      return None

    try:
      response_data = response.json()
    except requests.exceptions.JSONDecodeError:
      print("响应内容不是有效的 JSON 格式")
      return None

    if 'data' not in response_data:
      print("未搜索到相关片源")
      return None

    return response_data['data']


def download_via_ui(film, selected_idx, episodes, path):
    film_data = get_m3u8_url(film)
    if not film_data:
      return

    selected_film = film_data
    film_name = selected_film['name']
    m3u8_urls = selected_film['source']['eps']

    target_episode = episodes if len(episodes) > 1 else ''
    if '-' in target_episode:
      start_idx, end_idx = map(lambda x: int(x) - 1, target_episode.split('-'))
      if start_idx < 0:
            start_idx = 0
      if end_idx >= len(m3u8_urls):
            end_idx = len(m3u8_urls) - 1
      m3u8_list = m3u8_urls
    elif ',' in target_episode:
      indexes =
      m3u8_list = for i in indexes]
    elif target_episode.strip() == '':
      m3u8_list = m3u8_urls
    else:
      start_idx = int(target_episode) - 1
      if start_idx < 0:
            start_idx = 0
      if start_idx >= len(m3u8_urls):
            start_idx = len(m3u8_urls) - 1
      m3u8_list = ]

    film_lists, _ = download_m3u8(m3u8_list, film_name, path)
    asyncio.run(download_all_videos(film_lists, film_name, path, m3u8_list))


def main_ui():
    def select_directory():
      path = filedialog.askdirectory()
      if path:
            entry_path.delete(0, tk.END)
            entry_path.insert(0, path)

    def search_film():
      film = entry_film.get().strip()
      if not film:
            messagebox.showwarning("警告", "请输入影片名")
            return
      film_data = get_m3u8_url(film)
      if not film_data:
            messagebox.showwarning("警告", "未搜索到相关片源")
            return
      combo_film['values'] = } ({len(item['source']['eps'])}集)" for item in film_data]
      combo_film.current(0)

    def start_download():
      film = entry_film.get().strip()
      episodes = entry_episodes.get().strip().split('-')
      path = entry_path.get().strip() or os.getcwd()
      selected_idx = combo_film.current()

      if not film or not episodes or not path or selected_idx < 0:
            messagebox.showwarning("警告", "请填写所有信息")
            return

      download_via_ui(film, selected_idx, episodes, path)

    root = tk.Tk()
    root.title("M3U8 批量下载工具")

    label_film = ttk.Label(root, text="影片名:")
    label_film.grid(row=0, column=0, sticky=tk.W, padx=5, pady=5)
    entry_film = ttk.Entry(root)
    entry_film.grid(row=0, column=1, sticky=tk.EW, padx=5, pady=5)

    button_search = ttk.Button(root, text="搜索影片", command=search_film)
    button_search.grid(row=0, column=2, sticky=tk.EW, padx=5, pady=5)

    label_rule = ttk.Label(root, text="集数输入规则: 1-3 或 1,2,3 或 空")
    label_rule.grid(row=1, column=0, columnspan=3, sticky=tk.W, padx=5, pady=5)

    label_episodes = ttk.Label(root, text="集数:")
    label_episodes.grid(row=2, column=0, sticky=tk.W, padx=5, pady=5)
    entry_episodes = ttk.Entry(root)
    entry_episodes.grid(row=2, column=1, columnspan=2, sticky=tk.EW, padx=5, pady=5)

    label_combo_film = ttk.Label(root, text="选择影片:")
    label_combo_film.grid(row=3, column=0, sticky=tk.W, padx=5, pady=5)
    combo_film = ttk.Combobox(root)
    combo_film.grid(row=3, column=1, columnspan=2, sticky=tk.EW, padx=5, pady=5)

    label_path = ttk.Label(root, text="下载路径:")
    label_path.grid(row=4, column=0, sticky=tk.W, padx=5, pady=5)
    entry_path = ttk.Entry(root)
    entry_path.grid(row=4, column=1, sticky=tk.EW, padx=5, pady=5)
    button_browse = ttk.Button(root, text="浏览", command=select_directory)
    button_browse.grid(row=4, column=2, sticky=tk.EW, padx=5, pady=5)

    button_start = ttk.Button(root, text="开始下载", command=start_download)
    button_start.grid(row=5, column=0, columnspan=3, sticky=tk.EW, padx=5, pady=5)

    root.columnconfigure(1, weight=1)
    root.mainloop()


if __name__ == "__main__":
    main_ui()

Haoyua 发表于 2024-9-2 19:00

感谢分享,这个应该不能下VIP吧。
搜索返回:{'type': 4000}
是哪个参数有问题吗,下载界面很容易卡顿

zelen 发表于 2024-9-2 16:45

谢谢楼主的分享!这个绝对要顶!!!

蛋疼王子 发表于 2024-9-2 17:06

谢谢分享

chaozhi 发表于 2024-9-2 17:28

搜索任何关键字都是返回:{'type': 4000}
未搜索到相关片源

gun008 发表于 2024-9-2 17:34

感谢分享!光是学习研究就很值当了

jtui6999 发表于 2024-9-2 19:04

收藏了,感谢分享!

xuanqi521 发表于 2024-9-2 19:21

不太好用啊

Dj927 发表于 2024-9-2 21:15

画质不高啊

蜗牛很牛 发表于 2024-9-2 21:23

chaozhi 发表于 2024-9-2 17:28
搜索任何关键字都是返回:{'type': 4000}
未搜索到相关片源

直接调用的的别人解析接口有时候会抽风{:1_896:}
页: [1] 2
查看完整版本: 论坛二创第三方全网视频电视剧全网电影解析多线程下载