star0angel 发表于 2024-5-21 01:05

新手勿喷 加密电影爬取,理论上电视剧也可以

本帖最后由 star0angel 于 2024-5-21 11:52 编辑

第一次尝试加密的视频缺少麻烦一点
本来想尝试一下ffmpeg.exe下载和合并鼓捣了大半天没整明白 老是报错有大神教下怎么用吗
还有ffmpeg.exe用的话能一起打包吗有大神能教教嘛百度了半天没弄明白 一堆报错

最后还是土办法仅做学习交流用 请勿用于非法用途请勿滥用!!!!
增加了sem限制 免得总是下载错误增加了删除过度文件的操作

放个链接吧:仅做学习交流用 请勿用于非法用途请勿滥用!!!!
https://star0angel.lanzouw.com/inTvo1zbjb7a
密码:c80s
关于刚刚看到有些朋友说闪退粘贴链接最好看看能不能播放或者是不是网址有问题比如多了空格啥的反正我没遇到过这个网站很多链接根本不能播放我也是win11刚刚特意测试了下代码中的示例网站没啥问题速度还可以这个图片摆不来第一张和第二章是刚刚测试的最后一张是昨天上传之前测试电视剧的就这样吧!!





import asyncio
import time
from Crypto.Cipher import AES
import requests
import re
import os
from urllib.parse import urljoin
import aiohttp
import aiofiles
import shutil

# 定义请求头
hearders = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.5735.289 Safari/537.36'
}


def get_m3u8_1(url):
    """
    从给定的URL获取初始的m3u8链接和视频名称,并创建相关目录。

    :param url: 视频链接
    :return: 视频名称和m3u8链接的元组
    """
    resp = requests.get(url, headers=hearders)
    if resp.status_code == 200:
      # print(resp.text)
      pattern = re.compile(r'link_pre":"(.*?)","url":"(.*?)"')
      pattern_name = re.compile(r'<title>《(.*?)》')
      result = re.search(pattern, resp.text)
      m3u8_url = result.group(2)
      m3u8_url = m3u8_url.replace('\\', '')
      result_name = re.search(pattern_name, resp.text)
      name = result_name.group(1)
      name = name.replace(' ', '')

      os.makedirs(f'{name}/encryption')
      os.makedirs(f'{name}/decryption')
      with open(name + '/' + '1.m3u8', 'w') as f:
            f.write(requests.get(m3u8_url, headers=hearders).text)
      return name, m3u8_url


def get_m3u8_2(name, m3u8_url):
    """
    获取深层次的m3u8链接(如果存在)。

    :param name: 视频名称
    :param m3u8_url: 初始或上层的m3u8链接
    :return: 深层次m3u8链接或原链接
    """
    path = f'{name}/1.m3u8'
    if os.path.exists(path):
      with open(path, 'r') as f:
            m3u8_lst = f.readlines()
            # print(len(m3u8_lst))
            if (len(m3u8_lst) < 10):
                for line in m3u8_lst:
                  if line.startswith('#'):
                        continue
                  else:
                        m3u8_url_2 = urljoin(m3u8_url, line.strip().replace('\n', ''))
                        # print(m3u8_url_2)
                if not os.path.exists(f'{name}/2.m3u8'):
                  with open(f'{name}/2.m3u8', 'w') as f:
                        f.write(requests.get(m3u8_url_2, headers=hearders).text)
                  return m3u8_url_2
            else:
                with open(f'{name}/2.m3u8', 'w') as f:
                  f.writelines(m3u8_lst)
                return m3u8_url


def get_movie_lsts(name, m3u8_url_2):
    """
    从m3u8链接中提取电影片段列表和密钥。

    :param name: 视频名称
    :param m3u8_url_2: m3u8链接(可能是深层次的)
    """
    with open(f'{name}/movie_lsts.txt', 'w') as f1:
      with open(f'{name}/2.m3u8', 'r') as f:
            for line in f.readlines():
                if line.startswith('#'):
                  if 'key' in line:
                        key = re.search(r'URI="(.*?)"', line).group(1)
                        if 'IV' in line:
                            # 这玩意好像没什么用 都是0用none一样
                            iv = line.split('=')[-1].replace('"', '').strip()
                        # print(key, iv)
                        # print(urljoin(m3u8_url_2, key))
                  continue
                else:
                  f1.write(urljoin(m3u8_url_2, line.strip()) + '\n')
    with open(f'{name}/key.key', 'wb') as f:
      f.write(requests.get(urljoin(m3u8_url_2, key), headers=hearders).content)


async def down_load(movie_url, sema, name):
    """
    异步下载电影片段。

    :param movie_url: 电影片段链接
    :param sema: 线程锁
    :param name: 视频名称
    """
    file_path = f'{name}/encryption/{movie_url.split("/")[-1]}'
    async with sema:
      for i in range(10):
            try:
                print(f'{file_path}第{i + 1}次下载开始下载')
                async with aiohttp.ClientSession() as session:
                  async with session.get(movie_url, headers=hearders) as response:
                        content = await response.content.read()
                        async with aiofiles.open(file_path, 'wb') as f:
                            await f.write(content)
                print(f'{file_path}下载成功')
                break
            except Exception as e:
                print(f'{file_path}第{i + 1}次下载失败正在重试下载,原因:{e}')
                continue


async def main(name):
    """
    主异步下载函数,管理下载任务。

    :param name: 视频名称
    """
    movie_lsts = get_movie_names(name)
    tasks = []
    sema = asyncio.Semaphore(100)
    for movie_lst in movie_lsts:
      tasks.append(down_load(movie_lst.strip().replace('\n', ''), sema, name))
    await asyncio.gather(*tasks)


def get_movie_names(name):
    """
    获取电影片段名称列表。

    :param name: 视频名称
    :return: 电影片段链接列表
    """
    with open(f'{name}/movie_lsts.txt', 'r') as f:
      movie_lsts = f.readlines()
    return movie_lsts


async def decrypt_file(input_filename, output_filename, key, iv=None):
    """
    异步解密文件。

    :param input_filename: 输入文件名
    :param output_filename: 输出文件名
    :param key: 解密密钥
    :param iv: 初始化向量
    """
    cipher = AES.new(key, AES.MODE_CBC, iv)
    try:
      async with aiofiles.open(input_filename, 'rb') as infile:
            encrypted_data = await infile.read()
      async with aiofiles.open(output_filename, 'wb') as outfile:
            await outfile.write(cipher.decrypt(encrypted_data))
    except Exception as e:
      print(f'{input_filename}解密失败,原因:应该是未加密的广告{e}')


async def file_lsts(name):
    """
    管理文件解密任务。

    :param name: 视频名称
    """
    movie_lsts = get_movie_names(name)
    with open(f'{name}/key.key', 'rb') as f:
      key = f.read()
    tasks = []
    for movie_name in movie_lsts:
      movie_name = movie_name.split('/')[-1].replace('\n', '')
      input_filename = f'{name}/encryption/{movie_name}'
      output_filename = f'{name}/decryption/{movie_name}'
      tasks.append(asyncio.create_task(decrypt_file(input_filename, output_filename, key)))
    await asyncio.gather(*tasks)


def merge_movie(name):
    """
    合并解密后的电影文件为MP4格式。

    :param name: 视频名称
    """
    temp = []# 临时存储合成批次的文件名
    n = 1# 初始化批次号
    now_path = os.getcwd()# 获取当前工作目录
    lst_movies = get_movie_names(name)
    path = f'{name}/decryption'# 设置合成前文件存放路径
    os.chdir(path)# 切换到合成前文件存放路径
    # 循环处理每个电影文件,直到处理完所有文件
    for i in range(len(lst_movies)):
      file_name = lst_movies.replace('\n', '').split('/')[-1]
      temp.append(file_name)# 添加文件名到临时列表
      # 当临时列表达到20个文件名时,进行一次合成
      if len(temp) == 20:
            cmd = f'copy /b {"+".join(temp)} {n}.ts'# 构造合成命令
            r = os.popen(cmd)# 执行合成命令
            print(r.read())# 打印命令执行结果
            n += 1# 更新批次号
            temp = []# 清空临时列表

    # 处理剩余的文件名,进行最后一次合成
    cmd = f'copy /b {"+".join(temp)} {n}.ts'
    r = os.popen(cmd)
    print(r.read())
    last_temp = []# 存储所有合成批次的文件名
    for i in range(1, n + 1):
      last_temp.append(f'{i}.ts')
    cmd = f'copy /b {"+".join(last_temp)} {name}.mp4'# 构造最终合成命令
    r = os.popen(cmd)# 执行最终合成命令
    print(r.read())# 打印最终合成命令的执行结果
    os.chdir(now_path)# 返回初始工作目录
    print('合并完成')


def last_work(name):
    """
    清理工作,移动文件和删除临时目录。

    :param name: 视频名称
    """
    det_file_path = f'{name}/'
    src_file_path = f'{name}/decryption/{name}.mp4'
    if os.path.exists(det_file_path) and os.path.exists(src_file_path):
      shutil.move(src_file_path, det_file_path)
    del_dir(f'{name}/decryption')
    del_dir(f'{name}/encryption')
    try:
      os.remove(f'{name}/key.key')
      os.remove(f'{name}/1.m3u8')
      os.remove(f'{name}/2.m3u8')
      os.remove(f'{name}/movie_lsts.txt')
    except Exception as e:
      print(e)


def del_dir(directory_path):
    """
    删除指定目录及其内容。

    :param directory_path: 目录路径
    """
    try:
      shutil.rmtree(directory_path)
      print(f"目录 {directory_path} 及其内容已删除")
    except FileNotFoundError:
      print(f"目录 {directory_path} 不存在")
    except Exception as e:
      print(f"删除目录时出错: {e}")


def all_func(url):
    """
    执行全部功能的入口函数。
    """
    # url = 'http://www.ahljtj.com/play/5618346-2-1.html'
    name, m3u8_url = get_m3u8_1(url)
    m3u8_url_2 = get_m3u8_2(name, m3u8_url)
    get_movie_lsts(name, m3u8_url_2)

    asyncio.run(main(name))
    # name = '庆余年第二季'
    asyncio.run(file_lsts(name))
    merge_movie(name)
    last_work(name)
    print('全部完成')


if __name__ == '__main__':
    while True:
      print('http://www.ahljtj.com/list/1.html')
      print('只作为学习研究,请勿用于非法用途')
      url = input('请输入电影链接,理论上也支持电视剧(下完改名字不然会覆盖):')
      all_func(url)

最新的 发表于 2024-5-21 19:36

本帖最后由 最新的 于 2024-5-21 19:37 编辑

import asyncio
import time
from Crypto.Cipher import AES
import requests
import re
import os
from urllib.parse import urljoin
import aiohttp
import aiofiles
import shutil

# 定义请求头
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.5735.289 Safari/537.36'
}

def get_m3u8_1(url):
    """
    从给定的URL获取初始的m3u8链接和视频名称,并创建相关目录。
    :param url: 视频链接
    :return: 视频名称和m3u8链接的元组
    """
    resp = requests.get(url, headers=headers)
    if resp.status_code == 200:
      pattern = re.compile(r'link_pre":"(.*?)","url":"(.*?)"')
      pattern_name = re.compile(r'<title>《(.*?)》')
      result = re.search(pattern, resp.text)
      if result:
            m3u8_url = result.group(2)
            m3u8_url = m3u8_url.replace('\\', '')
      else:
            raise ValueError("No m3u8 URL found in the provided URL")

      result_name = re.search(pattern_name, resp.text)
      if result_name:
            name = result_name.group(1)
            name = name.replace(' ', '')
      else:
            raise ValueError("No video name found in the provided URL")

      os.makedirs(f'{name}/encryption', exist_ok=True)
      os.makedirs(f'{name}/decryption', exist_ok=True)
      with open(name + '/' + '1.m3u8', 'w') as f:
            f.write(requests.get(m3u8_url, headers=headers).text)
      return name, m3u8_url

def get_m3u8_2(name, m3u8_url):
    """
    获取深层次的m3u8链接(如果存在)。
    :param name: 视频名称
    :param m3u8_url: 初始或上层的m3u8链接
    :return: 深层次m3u8链接或原链接
    """
    path = f'{name}/1.m3u8'
    if os.path.exists(path):
      with open(path, 'r') as f:
            m3u8_lst = f.readlines()
            if (len(m3u8_lst) < 10):
                for line in m3u8_lst:
                  if line.startswith('#'):
                        continue
                  else:
                        m3u8_url_2 = urljoin(m3u8_url, line.strip().replace('\n', ''))
                if not os.path.exists(f'{name}/2.m3u8'):
                  with open(f'{name}/2.m3u8', 'w') as f:
                        f.write(requests.get(m3u8_url_2, headers=headers).text)
                  return m3u8_url_2
            else:
                with open(f'{name}/2.m3u8', 'w') as f:
                  f.writelines(m3u8_lst)
                return m3u8_url

def get_movie_lsts(name, m3u8_url_2):
    """
    从m3u8链接中提取电影片段列表和密钥。
    :param name: 视频名称
    :param m3u8_url_2: m3u8链接(可能是深层次的)
    """
    with open(f'{name}/movie_lsts.txt', 'w') as f1:
      with open(f'{name}/2.m3u8', 'r') as f:
            for line in f.readlines():
                if line.startswith('#'):
                  if 'key' in line:
                        key = re.search(r'URI="(.*?)"', line).group(1)
                        if 'IV' in line:
                            iv = line.split('=')[-1].replace('"', '').strip()
                  continue
                else:
                  f1.write(urljoin(m3u8_url_2, line.strip()) + '\n')
    with open(f'{name}/key.key', 'wb') as f:
      f.write(requests.get(urljoin(m3u8_url_2, key), headers=headers).content)

async def down_load(movie_url, sema, name):
    """
    异步下载电影片段。
    :param movie_url: 电影片段链接
    :param sema: 线程锁
    :param name: 视频名称
    """
    file_path = f'{name}/encryption/{movie_url.split("/")[-1]}'
    async with sema:
      for i in range(10):
            try:
                print(f'{file_path}第{i + 1}次下载开始下载')
                async with aiohttp.ClientSession() as session:
                  async with session.get(movie_url, headers=headers) as response:
                        content = await response.content.read()
                        async with aiofiles.open(file_path, 'wb') as f:
                            await f.write(content)
                print(f'{file_path}下载成功')
                break
            except Exception as e:
                print(f'{file_path}第{i + 1}次下载失败正在重试下载,原因:{e}')
                continue

async def main(name):
    """
    主异步下载函数,管理下载任务。
    :param name: 视频名称
    """
    movie_lsts = get_movie_names(name)
    tasks = []
    sema = asyncio.Semaphore(100)
    for movie_lst in movie_lsts:
      tasks.append(down_load(movie_lst.strip().replace('\n', ''), sema, name))
    await asyncio.gather(*tasks)

def get_movie_names(name):
    """
    获取电影片段名称列表。
    :param name: 视频名称
    :return: 电影片段链接列表
    """
    with open(f'{name}/movie_lsts.txt', 'r') as f:
      movie_lsts = f.readlines()
    return movie_lsts

async def decrypt_file(input_filename, output_filename, key, iv=None):
    """
    异步解密文件。
    :param input_filename: 输入文件名
    :param output_filename: 输出文件名
    :param key: 解密密钥
    :param iv: 初始化向量
    """
    cipher = AES.new(key, AES.MODE_CBC, iv)
    try:
      async with aiofiles.open(input_filename, 'rb') as infile:
            encrypted_data = await infile.read()
      async with aiofiles.open(output_filename, 'wb') as outfile:
            await outfile.write(cipher.decrypt(encrypted_data))
    except Exception as e:
      print(f'{input_filename}解密失败,原因:应该是未加密的广告{e}')

async def file_lsts(name):
    """
    管理文件解密任务。
    :param name: 视频名称
    """
    movie_lsts = get_movie_names(name)
    with open(f'{name}/key.key', 'rb') as f:
      key = f.read()
    tasks = []
    for movie_name in movie_lsts:
      movie_name = movie_name.split('/')[-1].replace('\n', '')
      input_filename = f'{name}/encryption/{movie_name}'
      output_filename = f'{name}/decryption/{movie_name}'
      tasks.append(asyncio.create_task(decrypt_file(input_filename, output_filename, key)))
    await asyncio.gather(*tasks)

def merge_movie(name):
    """
    合并解密后的电影文件为MP4格式。
    :param name: 视频名称
    """
    temp = []# 临时存储合成批次的文件名
    n = 1# 初始化批次号
    now_path = os.getcwd()# 获取当前工作目录
    lst_movies = get_movie_names(name)
    path = f'{name}/decryption'# 设置合成前文件存放路径
    os.chdir(path)# 切换到合成前文件存放路径
    # 循环处理每个电影文件,直到处理完所有文件
    for i in range(len(lst_movies)):
      file_name = lst_movies.replace('\n', '').split('/')[-1]
      temp.append(file_name)# 添加文件名到临时列表
      # 当临时列表达到20个文件名时,进行一次合成
      if len(temp) == 20:
            cmd = f'copy /b {"+".join(temp)} {n}.ts'# 构造合成命令
            r = os.popen(cmd)# 执行合成命令
            print(r.read())# 打印命令执行结果
            n += 1# 更新批次号
            temp = []# 清空临时列表

    # 处理剩余的文件名,进行最后一次合成
    cmd = f'copy /b {"+".join(temp)} {n}.ts'
    r = os.popen(cmd)
    print(r.read())
    last_temp = []# 存储所有合成批次的文件名
    for i in range(1, n + 1):
      last_temp.append(f'{i}.ts')
    cmd = f'copy /b {"+".join(last_temp)} {name}.mp4'# 构造最终合成命令
    r = os.popen(cmd)# 执行最终合成命令
    print(r.read())# 打印最终合成命令的执行结果
    os.chdir(now_path)# 返回初始工作目录
    print('合并完成')

def last_work(name):
    """
    清理工作,移动文件和删除临时目录。
    :param name: 视频名称
    """
    det_file_path = f'{name}/'
    src_file_path = f'{name}/decryption/{name}.mp4'
    if os.path.exists(det_file_path) and os.path.exists(src_file_path):
      shutil.move(src_file_path, det_file_path)
    del_dir(f'{name}/decryption')
    del_dir(f'{name}/encryption')
    try:
      os.remove(f'{name}/key.key')
      os.remove(f'{name}/1.m3u8')
      os.remove(f'{name}/2.m3u8')
      os.remove(f'{name}/movie_lsts.txt')
    except Exception as e:
      print(e)

def del_dir(directory_path):
    """
    删除指定目录及其内容。
    :param directory_path: 目录路径
    """
    try:
      shutil.rmtree(directory_path)
      print(f"目录 {directory_path} 及其内容已删除")
    except FileNotFoundError:
      print(f"目录 {directory_path} 不存在")
    except Exception as e:
      print(f"删除目录时出错: {e}")

def all_func(url):
    """
    执行全部功能的入口函数。
    """
    try:
      name, m3u8_url = get_m3u8_1(url)
      m3u8_url_2 = get_m3u8_2(name, m3u8_url)
      get_movie_lsts(name, m3u8_url_2)

      asyncio.run(main(name))
      asyncio.run(file_lsts(name))
      merge_movie(name)
      last_work(name)
      print('全部完成')
    except Exception as e:
      print(f"运行过程中发生错误: {e}")

if __name__ == '__main__':
    while True:
      print('http://www.ahljtj.com/list/1.html')
      print('只作为学习研究,请勿用于非法用途')
      url = input('请输入电影链接,理论上也支持电视剧(下完改名字不然会覆盖):')
      all_func(url)


增加了异常处理,仅供学习使用
主要修改点
[*]get_m3u8_1 函数中检查正则表达式匹配结果:
[*]在调用 result.group 和 result_name.group 之前,检查 result 和 result_name 是否为 None。
[*]如果匹配失败,抛出一个 ValueError 异常,并在 all_func 中捕获和处理异常。

[*]all_func 函数中的异常处理:
[*]捕获 all_func 中任何函数可能抛出的异常,并打印错误消息。

通过这些修改,你可以确保在正则表达式没有匹配到任何内容时,程序不会崩溃,并且会输出适当的错误消息。

最新的 发表于 2024-5-22 19:28

本帖最后由 最新的 于 2024-5-22 19:33 编辑

为爱而伤心 发表于 2024-5-22 08:40
大佬作为小白 这个代码如何打包成EXE文件啊
安装 PyInstaller:

pip install pyinstaller
创建打包文件:
在你的命令行中导航到包含 Python 脚本的目录,并执行以下命令:

pyinstaller your_script.py

这样打包出来会有依赖文件

pyinstaller --onefile your_script.py

这样打包出来就是一个exe文件 示例:https://www.123pan.com/s/nTvKVv-Gez0h.html

dx163 发表于 2024-5-21 07:09

我win11上测试失败了,把网址粘贴,回车 ,程序就退出了。

cbkxh 发表于 2024-5-21 02:39

谢谢分享,研究一下

jswxll 发表于 2024-5-21 06:55

只为学习研究,不可非法用途!

于生 发表于 2024-5-21 07:05

这个牛逼,是所有电影(非其他电影)都可以下载吗?直接输入名称即可?

zsf123 发表于 2024-5-21 07:37

这个非常实用,感谢大神

想不出名字 发表于 2024-5-21 07:48

好像比较简单?我研究一下

林古月凡 发表于 2024-5-21 07:56

下来用看看效果如何?

sondycnc 发表于 2024-5-21 07:56

粘贴链接后回车就闪退,啥情况?

milu1123 发表于 2024-5-21 08:08

学习学习,,,,
页: [1] 2 3 4 5 6 7 8 9 10
查看完整版本: 新手勿喷 加密电影爬取,理论上电视剧也可以