吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 6237|回复: 109
上一主题 下一主题
收起左侧

[Python 原创] 新手勿喷 加密电影爬取,理论上电视剧也可以

[复制链接]
跳转到指定楼层
楼主
star0angel 发表于 2024-5-21 01:05 回帖奖励
本帖最后由 star0angel 于 2024-5-21 11:52 编辑

第一次尝试加密的视频  缺少麻烦一点
本来想尝试一下ffmpeg.exe下载和合并  鼓捣了大半天没整明白 老是报错  有大神教下怎么用吗
还有ffmpeg.exe用的话能一起打包吗  有大神能教教嘛  百度了半天没弄明白 一堆报错

最后还是土办法  仅做学习交流用 请勿用于非法用途  请勿滥用!!!!
增加了sem限制 免得总是下载错误  增加了删除过度文件的操作

放个链接吧:仅做学习交流用 请勿用于非法用途  请勿滥用!!!!
https://star0angel.lanzouw.com/inTvo1zbjb7a
密码:c80s
关于刚刚看到有些朋友说闪退  粘贴链接最好看看能不能播放或者是不是网址有问题比如多了空格啥的反正我没遇到过  这个网站很多链接根本不能播放  我也是win11  刚刚特意测试了下代码中的示例网站没啥问题速度还可以  这个图片摆不来  第一张和第二章是刚刚测试的  最后一张是昨天上传之前测试电视剧的就这样吧!!





[Python] 纯文本查看 复制代码
import asyncio
import time
from Crypto.Cipher import AES
import requests
import re
import os
from urllib.parse import urljoin
import aiohttp
import aiofiles
import shutil

# 定义请求头
hearders = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.5735.289 Safari/537.36'
}


def get_m3u8_1(url):
    """
    从给定的URL获取初始的m3u8链接和视频名称,并创建相关目录。

    :param url: 视频链接
    :return: 视频名称和m3u8链接的元组
    """
    resp = requests.get(url, headers=hearders)
    if resp.status_code == 200:
        # print(resp.text)
        pattern = re.compile(r'link_pre":"(.*?)","url":"(.*?)"')
        pattern_name = re.compile(r'<title>《(.*?)》')
        result = re.search(pattern, resp.text)
        m3u8_url = result.group(2)
        m3u8_url = m3u8_url.replace('\\', '')
        result_name = re.search(pattern_name, resp.text)
        name = result_name.group(1)
        name = name.replace(' ', '')

        os.makedirs(f'{name}/encryption')
        os.makedirs(f'{name}/decryption')
        with open(name + '/' + '1.m3u8', 'w') as f:
            f.write(requests.get(m3u8_url, headers=hearders).text)
        return name, m3u8_url


def get_m3u8_2(name, m3u8_url):
    """
    获取深层次的m3u8链接(如果存在)。

    :param name: 视频名称
    :param m3u8_url: 初始或上层的m3u8链接
    :return: 深层次m3u8链接或原链接
    """
    path = f'{name}/1.m3u8'
    if os.path.exists(path):
        with open(path, 'r') as f:
            m3u8_lst = f.readlines()
            # print(len(m3u8_lst))
            if (len(m3u8_lst) < 10):
                for line in m3u8_lst:
                    if line.startswith('#'):
                        continue
                    else:
                        m3u8_url_2 = urljoin(m3u8_url, line.strip().replace('\n', ''))
                        # print(m3u8_url_2)
                if not os.path.exists(f'{name}/2.m3u8'):
                    with open(f'{name}/2.m3u8', 'w') as f:
                        f.write(requests.get(m3u8_url_2, headers=hearders).text)
                    return m3u8_url_2
            else:
                with open(f'{name}/2.m3u8', 'w') as f:
                    f.writelines(m3u8_lst)
                return m3u8_url


def get_movie_lsts(name, m3u8_url_2):
    """
    从m3u8链接中提取电影片段列表和密钥。

    :param name: 视频名称
    :param m3u8_url_2: m3u8链接(可能是深层次的)
    """
    with open(f'{name}/movie_lsts.txt', 'w') as f1:
        with open(f'{name}/2.m3u8', 'r') as f:
            for line in f.readlines():
                if line.startswith('#'):
                    if 'key' in line:
                        key = re.search(r'URI="(.*?)"', line).group(1)
                        if 'IV' in line:
                            # 这玩意好像没什么用 都是0用none一样
                            iv = line.split('=')[-1].replace('"', '').strip()
                        # print(key, iv)
                        # print(urljoin(m3u8_url_2, key))
                    continue
                else:
                    f1.write(urljoin(m3u8_url_2, line.strip()) + '\n')
    with open(f'{name}/key.key', 'wb') as f:
        f.write(requests.get(urljoin(m3u8_url_2, key), headers=hearders).content)


async def down_load(movie_url, sema, name):
    """
    异步下载电影片段。

    :param movie_url: 电影片段链接
    :param sema: 线程锁
    :param name: 视频名称
    """
    file_path = f'{name}/encryption/{movie_url.split("/")[-1]}'
    async with sema:
        for i in range(10):
            try:
                print(f'{file_path}第{i + 1}次下载开始下载')
                async with aiohttp.ClientSession() as session:
                    async with session.get(movie_url, headers=hearders) as response:
                        content = await response.content.read()
                        async with aiofiles.open(file_path, 'wb') as f:
                            await f.write(content)
                print(f'{file_path}下载成功')
                break
            except Exception as e:
                print(f'{file_path}第{i + 1}次下载失败正在重试下载,原因:{e}')
                continue


async def main(name):
    """
    主异步下载函数,管理下载任务。

    :param name: 视频名称
    """
    movie_lsts = get_movie_names(name)
    tasks = []
    sema = asyncio.Semaphore(100)
    for movie_lst in movie_lsts:
        tasks.append(down_load(movie_lst.strip().replace('\n', ''), sema, name))
    await asyncio.gather(*tasks)


def get_movie_names(name):
    """
    获取电影片段名称列表。

    :param name: 视频名称
    :return: 电影片段链接列表
    """
    with open(f'{name}/movie_lsts.txt', 'r') as f:
        movie_lsts = f.readlines()
    return movie_lsts


async def decrypt_file(input_filename, output_filename, key, iv=None):
    """
    异步解密文件。

    :param input_filename: 输入文件名
    :param output_filename: 输出文件名
    :param key: 解密密钥
    :param iv: 初始化向量
    """
    cipher = AES.new(key, AES.MODE_CBC, iv)
    try:
        async with aiofiles.open(input_filename, 'rb') as infile:
            encrypted_data = await infile.read()
        async with aiofiles.open(output_filename, 'wb') as outfile:
            await outfile.write(cipher.decrypt(encrypted_data))
    except Exception as e:
        print(f'{input_filename}解密失败,原因:应该是未加密的广告{e}')


async def file_lsts(name):
    """
    管理文件解密任务。

    :param name: 视频名称
    """
    movie_lsts = get_movie_names(name)
    with open(f'{name}/key.key', 'rb') as f:
        key = f.read()
    tasks = []
    for movie_name in movie_lsts:
        movie_name = movie_name.split('/')[-1].replace('\n', '')
        input_filename = f'{name}/encryption/{movie_name}'
        output_filename = f'{name}/decryption/{movie_name}'
        tasks.append(asyncio.create_task(decrypt_file(input_filename, output_filename, key)))
    await asyncio.gather(*tasks)


def merge_movie(name):
    """
    合并解密后的电影文件为MP4格式。

    :param name: 视频名称
    """
    temp = []  # 临时存储合成批次的文件名
    n = 1  # 初始化批次号
    now_path = os.getcwd()  # 获取当前工作目录
    lst_movies = get_movie_names(name)
    path = f'{name}/decryption'  # 设置合成前文件存放路径
    os.chdir(path)  # 切换到合成前文件存放路径
    # 循环处理每个电影文件,直到处理完所有文件
    for i in range(len(lst_movies)):
        file_name = lst_movies[i].replace('\n', '').split('/')[-1]
        temp.append(file_name)  # 添加文件名到临时列表
        # 当临时列表达到20个文件名时,进行一次合成
        if len(temp) == 20:
            cmd = f'copy /b {"+".join(temp)} {n}.ts'  # 构造合成命令
            r = os.popen(cmd)  # 执行合成命令
            print(r.read())  # 打印命令执行结果
            n += 1  # 更新批次号
            temp = []  # 清空临时列表

    # 处理剩余的文件名,进行最后一次合成
    cmd = f'copy /b {"+".join(temp)} {n}.ts'
    r = os.popen(cmd)
    print(r.read())
    last_temp = []  # 存储所有合成批次的文件名
    for i in range(1, n + 1):
        last_temp.append(f'{i}.ts')
    cmd = f'copy /b {"+".join(last_temp)} {name}.mp4'  # 构造最终合成命令
    r = os.popen(cmd)  # 执行最终合成命令
    print(r.read())  # 打印最终合成命令的执行结果
    os.chdir(now_path)  # 返回初始工作目录
    print('合并完成')


def last_work(name):
    """
    清理工作,移动文件和删除临时目录。

    :param name: 视频名称
    """
    det_file_path = f'{name}/'
    src_file_path = f'{name}/decryption/{name}.mp4'
    if os.path.exists(det_file_path) and os.path.exists(src_file_path):
        shutil.move(src_file_path, det_file_path)
    del_dir(f'{name}/decryption')
    del_dir(f'{name}/encryption')
    try:
        os.remove(f'{name}/key.key')
        os.remove(f'{name}/1.m3u8')
        os.remove(f'{name}/2.m3u8')
        os.remove(f'{name}/movie_lsts.txt')
    except Exception as e:
        print(e)


def del_dir(directory_path):
    """
    删除指定目录及其内容。

    :param directory_path: 目录路径
    """
    try:
        shutil.rmtree(directory_path)
        print(f"目录 {directory_path} 及其内容已删除")
    except FileNotFoundError:
        print(f"目录 {directory_path} 不存在")
    except Exception as e:
        print(f"删除目录时出错: {e}")


def all_func(url):
    """
    执行全部功能的入口函数。
    """
    # url = 'http://www.ahljtj.com/play/5618346-2-1.html'
    name, m3u8_url = get_m3u8_1(url)
    m3u8_url_2 = get_m3u8_2(name, m3u8_url)
    get_movie_lsts(name, m3u8_url_2)

    asyncio.run(main(name))
    # name = '庆余年第二季'
    asyncio.run(file_lsts(name))
    merge_movie(name)
    last_work(name)
    print('全部完成')


if __name__ == '__main__':
    while True:
        print('http://www.ahljtj.com/list/1.html')
        print('只作为学习研究,请勿用于非法用途')
        url = input('请输入电影链接,理论上也支持电视剧(下完改名字不然会覆盖):')
        all_func(url)

1.jpg (267.55 KB, 下载次数: 2)

1.jpg

免费评分

参与人数 19吾爱币 +21 热心值 +16 收起 理由
gndj + 1 + 1 我很赞同!
hongwei1112 + 1 用心讨论,共获提升!
qiulujun + 1 谢谢@Thanks!
bjjette + 1 + 1 我很赞同!
抱歉、 + 1 用心讨论,共获提升!
lxf0356 + 1 谢谢@Thanks!
liujuran00 + 1 感谢发布原创作品,吾爱破解论坛因你更精彩!
xiaovvv + 1 + 1 谢谢@Thanks!
yuzaizi521 + 1 + 1 谢谢@Thanks!
苏紫方璇 + 7 + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!
timeni + 1 + 1 用心讨论,共获提升!
醉月清风 + 1 + 1 只发一个加密电影嘛,别的一起分享一下啊
Fiftyisnt100 + 1 + 1 鼓励转贴优秀软件安全工具和文档!
elan + 1 + 1 谢谢@Thanks!
百里梦想 + 1 用心讨论,共获提升!
weihai0631 + 1 + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!
bscn + 1 + 1 用心讨论,共获提升!
marinek + 1 用心讨论,共获提升!
zhaopengdoctor + 1 + 1 感谢发布原创作品,吾爱破解论坛因你更精彩!

查看全部评分

本帖被以下淘专辑推荐:

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

推荐
最新的 发表于 2024-5-21 19:36
本帖最后由 最新的 于 2024-5-21 19:37 编辑

[Python] 纯文本查看 复制代码
import asyncio
import time
from Crypto.Cipher import AES
import requests
import re
import os
from urllib.parse import urljoin
import aiohttp
import aiofiles
import shutil

# 定义请求头
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.5735.289 Safari/537.36'
}

def get_m3u8_1(url):
    """
    从给定的URL获取初始的m3u8链接和视频名称,并创建相关目录。
    :param url: 视频链接
    :return: 视频名称和m3u8链接的元组
    """
    resp = requests.get(url, headers=headers)
    if resp.status_code == 200:
        pattern = re.compile(r'link_pre":"(.*?)","url":"(.*?)"')
        pattern_name = re.compile(r'<title>《(.*?)》')
        result = re.search(pattern, resp.text)
        if result:
            m3u8_url = result.group(2)
            m3u8_url = m3u8_url.replace('\\', '')
        else:
            raise ValueError("No m3u8 URL found in the provided URL")

        result_name = re.search(pattern_name, resp.text)
        if result_name:
            name = result_name.group(1)
            name = name.replace(' ', '')
        else:
            raise ValueError("No video name found in the provided URL")

        os.makedirs(f'{name}/encryption', exist_ok=True)
        os.makedirs(f'{name}/decryption', exist_ok=True)
        with open(name + '/' + '1.m3u8', 'w') as f:
            f.write(requests.get(m3u8_url, headers=headers).text)
        return name, m3u8_url

def get_m3u8_2(name, m3u8_url):
    """
    获取深层次的m3u8链接(如果存在)。
    :param name: 视频名称
    :param m3u8_url: 初始或上层的m3u8链接
    :return: 深层次m3u8链接或原链接
    """
    path = f'{name}/1.m3u8'
    if os.path.exists(path):
        with open(path, 'r') as f:
            m3u8_lst = f.readlines()
            if (len(m3u8_lst) < 10):
                for line in m3u8_lst:
                    if line.startswith('#'):
                        continue
                    else:
                        m3u8_url_2 = urljoin(m3u8_url, line.strip().replace('\n', ''))
                if not os.path.exists(f'{name}/2.m3u8'):
                    with open(f'{name}/2.m3u8', 'w') as f:
                        f.write(requests.get(m3u8_url_2, headers=headers).text)
                    return m3u8_url_2
            else:
                with open(f'{name}/2.m3u8', 'w') as f:
                    f.writelines(m3u8_lst)
                return m3u8_url

def get_movie_lsts(name, m3u8_url_2):
    """
    从m3u8链接中提取电影片段列表和密钥。
    :param name: 视频名称
    :param m3u8_url_2: m3u8链接(可能是深层次的)
    """
    with open(f'{name}/movie_lsts.txt', 'w') as f1:
        with open(f'{name}/2.m3u8', 'r') as f:
            for line in f.readlines():
                if line.startswith('#'):
                    if 'key' in line:
                        key = re.search(r'URI="(.*?)"', line).group(1)
                        if 'IV' in line:
                            iv = line.split('=')[-1].replace('"', '').strip()
                    continue
                else:
                    f1.write(urljoin(m3u8_url_2, line.strip()) + '\n')
    with open(f'{name}/key.key', 'wb') as f:
        f.write(requests.get(urljoin(m3u8_url_2, key), headers=headers).content)

async def down_load(movie_url, sema, name):
    """
    异步下载电影片段。
    :param movie_url: 电影片段链接
    :param sema: 线程锁
    :param name: 视频名称
    """
    file_path = f'{name}/encryption/{movie_url.split("/")[-1]}'
    async with sema:
        for i in range(10):
            try:
                print(f'{file_path}第{i + 1}次下载开始下载')
                async with aiohttp.ClientSession() as session:
                    async with session.get(movie_url, headers=headers) as response:
                        content = await response.content.read()
                        async with aiofiles.open(file_path, 'wb') as f:
                            await f.write(content)
                print(f'{file_path}下载成功')
                break
            except Exception as e:
                print(f'{file_path}第{i + 1}次下载失败正在重试下载,原因:{e}')
                continue

async def main(name):
    """
    主异步下载函数,管理下载任务。
    :param name: 视频名称
    """
    movie_lsts = get_movie_names(name)
    tasks = []
    sema = asyncio.Semaphore(100)
    for movie_lst in movie_lsts:
        tasks.append(down_load(movie_lst.strip().replace('\n', ''), sema, name))
    await asyncio.gather(*tasks)

def get_movie_names(name):
    """
    获取电影片段名称列表。
    :param name: 视频名称
    :return: 电影片段链接列表
    """
    with open(f'{name}/movie_lsts.txt', 'r') as f:
        movie_lsts = f.readlines()
    return movie_lsts

async def decrypt_file(input_filename, output_filename, key, iv=None):
    """
    异步解密文件。
    :param input_filename: 输入文件名
    :param output_filename: 输出文件名
    :param key: 解密密钥
    :param iv: 初始化向量
    """
    cipher = AES.new(key, AES.MODE_CBC, iv)
    try:
        async with aiofiles.open(input_filename, 'rb') as infile:
            encrypted_data = await infile.read()
        async with aiofiles.open(output_filename, 'wb') as outfile:
            await outfile.write(cipher.decrypt(encrypted_data))
    except Exception as e:
        print(f'{input_filename}解密失败,原因:应该是未加密的广告{e}')

async def file_lsts(name):
    """
    管理文件解密任务。
    :param name: 视频名称
    """
    movie_lsts = get_movie_names(name)
    with open(f'{name}/key.key', 'rb') as f:
        key = f.read()
    tasks = []
    for movie_name in movie_lsts:
        movie_name = movie_name.split('/')[-1].replace('\n', '')
        input_filename = f'{name}/encryption/{movie_name}'
        output_filename = f'{name}/decryption/{movie_name}'
        tasks.append(asyncio.create_task(decrypt_file(input_filename, output_filename, key)))
    await asyncio.gather(*tasks)

def merge_movie(name):
    """
    合并解密后的电影文件为MP4格式。
    :param name: 视频名称
    """
    temp = []  # 临时存储合成批次的文件名
    n = 1  # 初始化批次号
    now_path = os.getcwd()  # 获取当前工作目录
    lst_movies = get_movie_names(name)
    path = f'{name}/decryption'  # 设置合成前文件存放路径
    os.chdir(path)  # 切换到合成前文件存放路径
    # 循环处理每个电影文件,直到处理完所有文件
    for i in range(len(lst_movies)):
        file_name = lst_movies[i].replace('\n', '').split('/')[-1]
        temp.append(file_name)  # 添加文件名到临时列表
        # 当临时列表达到20个文件名时,进行一次合成
        if len(temp) == 20:
            cmd = f'copy /b {"+".join(temp)} {n}.ts'  # 构造合成命令
            r = os.popen(cmd)  # 执行合成命令
            print(r.read())  # 打印命令执行结果
            n += 1  # 更新批次号
            temp = []  # 清空临时列表

    # 处理剩余的文件名,进行最后一次合成
    cmd = f'copy /b {"+".join(temp)} {n}.ts'
    r = os.popen(cmd)
    print(r.read())
    last_temp = []  # 存储所有合成批次的文件名
    for i in range(1, n + 1):
        last_temp.append(f'{i}.ts')
    cmd = f'copy /b {"+".join(last_temp)} {name}.mp4'  # 构造最终合成命令
    r = os.popen(cmd)  # 执行最终合成命令
    print(r.read())  # 打印最终合成命令的执行结果
    os.chdir(now_path)  # 返回初始工作目录
    print('合并完成')

def last_work(name):
    """
    清理工作,移动文件和删除临时目录。
    :param name: 视频名称
    """
    det_file_path = f'{name}/'
    src_file_path = f'{name}/decryption/{name}.mp4'
    if os.path.exists(det_file_path) and os.path.exists(src_file_path):
        shutil.move(src_file_path, det_file_path)
    del_dir(f'{name}/decryption')
    del_dir(f'{name}/encryption')
    try:
        os.remove(f'{name}/key.key')
        os.remove(f'{name}/1.m3u8')
        os.remove(f'{name}/2.m3u8')
        os.remove(f'{name}/movie_lsts.txt')
    except Exception as e:
        print(e)

def del_dir(directory_path):
    """
    删除指定目录及其内容。
    :param directory_path: 目录路径
    """
    try:
        shutil.rmtree(directory_path)
        print(f"目录 {directory_path} 及其内容已删除")
    except FileNotFoundError:
        print(f"目录 {directory_path} 不存在")
    except Exception as e:
        print(f"删除目录时出错: {e}")

def all_func(url):
    """
    执行全部功能的入口函数。
    """
    try:
        name, m3u8_url = get_m3u8_1(url)
        m3u8_url_2 = get_m3u8_2(name, m3u8_url)
        get_movie_lsts(name, m3u8_url_2)

        asyncio.run(main(name))
        asyncio.run(file_lsts(name))
        merge_movie(name)
        last_work(name)
        print('全部完成')
    except Exception as e:
        print(f"运行过程中发生错误: {e}")

if __name__ == '__main__':
    while True:
        print('http://www.ahljtj.com/list/1.html')
        print('只作为学习研究,请勿用于非法用途')
        url = input('请输入电影链接,理论上也支持电视剧(下完改名字不然会覆盖):')
        all_func(url)


增加了异常处理,仅供学习使用
主要修改点
  • [color=var(--tw-prose-bold)]get_m3u8_1 函数中检查正则表达式匹配结果:
    • 在调用 result.group 和 result_name.group 之前,检查 result 和 result_name 是否为 None。
    • 如果匹配失败,抛出一个 ValueError 异常,并在 all_func 中捕获和处理异常。
  • [color=var(--tw-prose-bold)]all_func 函数中的异常处理:
    • 捕获 all_func 中任何函数可能抛出的异常,并打印错误消息。

通过这些修改,你可以确保在正则表达式没有匹配到任何内容时,程序不会崩溃,并且会输出适当的错误消息。

免费评分

参与人数 2吾爱币 +3 热心值 +2 收起 理由
aabbcc123123 + 1 + 1 谢谢@Thanks!
star0angel + 2 + 1 用心讨论,共获提升!

查看全部评分

推荐
最新的 发表于 2024-5-22 19:28
本帖最后由 最新的 于 2024-5-22 19:33 编辑
为爱而伤心 发表于 2024-5-22 08:40
大佬作为小白 这个代码如何打包成EXE文件啊

安装 PyInstaller:

[Python] 纯文本查看 复制代码
pip install pyinstaller

创建打包文件:
在你的命令行中导航到包含 Python 脚本的目录,并执行以下命令:

[Python] 纯文本查看 复制代码
pyinstaller your_script.py


这样打包出来会有依赖文件

[Python] 纯文本查看 复制代码
pyinstaller --onefile your_script.py

这样打包出来就是一个exe文件 示例:https://www.123pan.com/s/nTvKVv-Gez0h.html
推荐
dx163 发表于 2024-5-21 07:09
我win11上测试失败了,把网址粘贴,回车 ,程序就退出了。
沙发
cbkxh 发表于 2024-5-21 02:39
谢谢分享,研究一下
3#
jswxll 发表于 2024-5-21 06:55
只为学习研究,不可非法用途!
4#
于生 发表于 2024-5-21 07:05
这个牛逼,是所有电影(非其他电影)都可以下载吗?直接输入名称即可?
6#
zsf123 发表于 2024-5-21 07:37
这个非常实用,感谢大神
7#
想不出名字 发表于 2024-5-21 07:48
好像比较简单?我研究一下
8#
林古月凡 发表于 2024-5-21 07:56
下来用看看效果如何?
9#
sondycnc 发表于 2024-5-21 07:56
粘贴链接后回车就闪退,啥情况?
10#
milu1123 发表于 2024-5-21 08:08
学习学习,,,,
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2024-11-24 06:32

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表