吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 1303|回复: 11
收起左侧

[Python 原创] 这个包可很方便的集成在您的项目中

[复制链接]
wutljs 发表于 2023-11-13 11:10
本帖最后由 wutljs 于 2023-11-14 09:47 编辑

一个高效的python包 : from_m3u8_get_mp4

编写背景

作为一个爬虫爱好者,笔者经常从各种网页中下载MP4视频。有些视频只需要一键下载即可,而有些视频则需要使用m3u8文件来下载。对于前者没什么好说的,直接点击一键下载或者使用requests库拿到content写入文件即可,而对于后者,在自己不写脚本加工m3u8文件的话,则需要使用第三方工具来完成任务。

不过显然,当爬取任务量非常大的时候,这种方法就显得低效。

出于以上原因,笔者写了一个包用来完成常规m3u8文件的转化。

相关说明

github地址

wutljs/from_m3u8_get_mp4 (github.com)

pypi下载方式

pip install from_m3u8_get_mp4

使用方式

  • 您的爬虫获取了目标网址

    g = GetVideoFromUrl(save_file_path=save_file_path, video_name=video_name, m3u8_url=m3u8_url)

    g.start()

  • 您的爬虫获得了目标文件

    g = GetVideoFromFile(save_file_path=save_file_path, video_name=video_name, m3u8_file_path=m3u8_file_path)

    g.start()

注意事项

缺少相关依赖

  1. 在您的相关pythonProject位置打开终端,如下图:

pythonProject下的终端

pythonProject下的终端

  1. 使用以下命令:

    python detect_moudle.py的文件地址

    一般来说,可以使用以下命令来代替上述命令:

    python venv/lib/site-packages/from_m3u8_get_mp4/detect_moudle.py

添加代{过}{滤}理、UA

如果您有这方面需要的话,可以修改from_m3u8_get_mp4包的settings.py文件。

其他

寄语

希望这个包可以提高您的开发效率,如果您遇见了什么问题,欢迎与笔者交流。

关键代码展示

get_video.py:

# -*- coding: utf-8 -*-
# @AuThor  : LouJingshuo
# @E-mail  : 3480339804@qq.com
# @Time    : 2023/4/16 15:13
# @Function: Infringement must be investigated, please indicate the source of reproduction!
"""This module is used to download the video that the user wants"""

from from_m3u8_get_mp4.decrypt_video import DecryptAES
from from_m3u8_get_mp4 import settings
import os
import shutil
import re
import asyncio
import aiohttp
import aiofiles

class GetVideoFromUrl:
    """
    This class is used to download M3U8 files from the given M3U8 file download address,
    and the downloaded M3U8 file should contain all TS video file download addresses.
    """

    proxy = ''

    def __init__(self, save_file_path, video_name, m3u8_url):
        self.save_file_path = save_file_path
        self.video_name = video_name
        self.m3u8_url = m3u8_url

    def creat_file(self):
        os.system(rf'md {self.save_file_path}\{self.video_name}\ts')
        os.system(rf'md {self.save_file_path}\{self.video_name}\decode_ts')

    async def decrypt_video(self, key):
        da = DecryptAES(save_file_path=self.save_file_path, video_name=self.video_name, key=key)
        await da.decrypt_all_ts()

    @staticmethod
    async def get_key(session, key_url):
        async with session.get(url=key_url) as resp:
            key = await resp.read()
        return key

    async def get_data(self, session, ts_url):
        """This method is used to troubleshoot various problems that can occur with connecting to the destination server"""

        headers = {
            'user-agent': settings.headers
        }

        try:
            async with session.get(url=ts_url, proxy=self.proxy, headers=headers) as resp:
                data = await resp.content.read()
                if len(data) == 0:
                    return False, None
                return True, data
        except asyncio.exceptions.TimeoutError:
            return False, None
        except aiohttp.client_exceptions.ServerDisconnectedError:
            return False, None
        except aiohttp.client_exceptions.ClientOSError:
            return False, None

    async def one_ts_download(self, session, ts_url, name):
        while True:
            judge, data = await self.get_data(session, ts_url)
            if judge == True:
                break
        async with aiofiles.open(rf'{self.save_file_path}\{self.video_name}\ts\{name}', 'wb') as fp:
            await fp.write(data)
        print(name + ' finished!')

    async def download_all_ts(self, session, ts_urls):
        async with aiofiles.open(rf'{self.save_file_path}\{self.video_name}\ts_name.text', 'w') as fp:
            tasks = []
            num = 0
            for ts_url in ts_urls:
                name = '{:0>5}.ts'.format(num)
                await fp.write(name + '\n')
                tasks.append(asyncio.create_task(self.one_ts_download(session, ts_url, name)))
                num += 1
            await asyncio.wait(tasks)

    @staticmethod
    async def url_compose(url_header, uncomplete_url):
        result_url_list = []
        uncomplete_url_list = uncomplete_url.split('/')
        for item in uncomplete_url_list:
            if item not in url_header:
                result_url_list.append(item)
        complete_url = url_header + '/'.join(result_url_list)
        return complete_url

    async def all_urls_get(self, session):
        async with session.get(self.m3u8_url) as resp:
            m3u8_content = await resp.read()
        async with aiofiles.open(rf'{self.save_file_path}\{self.video_name}\{self.video_name}.m3u8', 'wb') as fp:
            await fp.write(m3u8_content)

        url_header = self.m3u8_url.strip('index.m3u8')
        key_url = ''
        ts_urls = []
        async with aiofiles.open(rf'{self.save_file_path}\{self.video_name}\{self.video_name}.m3u8', 'r') as fp:
            async for item in fp:
                # extract the URL of the key
                if 'key' in item:
                    key_url = re.compile(r'URI="(?P<key_url>.*?)"', re.S).search(item).group('key_url')
                    if 'http' not in key_url:
                        key_url = await self.url_compose(url_header, key_url)

                # extract the addresses of all TS files in the file and set up a proxy
                if '#' not in item:
                    ts_url = item.strip()

                    try:
                        assert 'https' in ts_url
                        self.proxy = settings.https_proxy
                    except AssertionError:
                        self.proxy = settings.http_proxy

                    if 'http' not in ts_url:
                        ts_url = await self.url_compose(url_header, ts_url)
                    ts_urls.append(ts_url)

        return key_url, ts_urls

    async def main(self):
        # Create a folder based on the storage path provided by the user to store the corresponding video files.
        self.creat_file()
        print('The corresponding folder has been created!')

        # download the video
        timeout = aiohttp.ClientTimeout(total=30)  # Set the timeout period, which is 30 seconds by default.
        async with aiohttp.ClientSession(timeout=timeout) as session:
            key_url, ts_urls = await self.all_urls_get(session)
            await self.download_all_ts(session, ts_urls)

        # determine whether the video is encrypted
        if key_url == '':
            os.system(
                rf'copy /b {self.save_file_path}\{self.video_name}\ts\*.ts {self.save_file_path}\{self.video_name}.mp4')
        else:
            try:
                key = await self.get_key(session, key_url)
            except RuntimeError:
                async with aiohttp.ClientSession() as session1:
                    key = await self.get_key(session1, key_url)
            await self.decrypt_video(key)
            os.system(
                rf'copy /b {self.save_file_path}\{self.video_name}\decode_ts\*.ts {self.save_file_path}\{self.video_name}.mp4')
        print(self.video_name + ' finished!!')

        # clean up currently downloaded files in preparation for subsequent downloads
        shutil.rmtree(rf'{self.save_file_path}\{self.video_name}')

    def start(self):
        # encapsulates the main function for easy user calling
        asyncio.get_event_loop().run_until_complete(self.main())

class GetVideoFromFile(GetVideoFromUrl):
    """
    This class converts the user-supplied M3U8 file into an MP4 file.
    Note that this file contains all TS file (the key) download addresses in full.
    """

    def __init__(self, save_file_path, video_name, m3u8_file_path):
        super().__init__(save_file_path=save_file_path, video_name=video_name, m3u8_url=None)
        self.m3u8_file_path = m3u8_file_path

    async def all_urls_get(self, session):
        key_url = ''
        ts_urls = []
        async with aiofiles.open(rf'{self.m3u8_file_path}', 'r') as fp:
            async for item in fp:
                # extract the URL of the key
                if 'key' in item:
                    key_url = re.compile(r'URI="(?P<key_url>.*?)"', re.S).search(item).group('key_url')
                    if 'http' not in key_url:
                        key_url = await self.url_compose(url_header, key_url)

                # extract the addresses of all TS files in the file and set up a proxy
                if '#' not in item:
                    ts_url = item.strip()

                    try:
                        assert 'https' in ts_url
                        self.proxy = settings.https_proxy
                    except AssertionError:
                        self.proxy = settings.http_proxy

                    ts_urls.append(ts_url)

        return key_url, ts_urls

免费评分

参与人数 4威望 +1 吾爱币 +23 热心值 +4 收起 理由
苏紫方璇 + 1 + 20 + 1 感谢发布原创作品,吾爱破解论坛因你更精彩!
LeonSmith123 + 1 我很赞同!
helian147 + 1 + 1 热心回复!
Arcticlyc + 2 + 1 感谢发布原创作品,吾爱破解论坛因你更精彩!

查看全部评分

本帖被以下淘专辑推荐:

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

HR741158 发表于 2023-11-15 12:42
感谢分享
LeonSmith123 发表于 2023-11-15 01:23
看起来能方便将视频爬取功能整合到python爬虫流程中,赞一个!
icode_isky 发表于 2023-11-13 14:37
fatyung 发表于 2023-11-13 14:55
感谢楼主分享,非常实用的一个包
FruitBaby 发表于 2023-11-13 22:35
加到项目中,正好试试
 楼主| wutljs 发表于 2023-11-14 09:21
本帖最后由 wutljs 于 2023-11-14 21:31 编辑
FruitBaby 发表于 2023-11-13 22:35
加到项目中,正好试试

感谢支持!欢迎交流。
RKCN 发表于 2023-11-15 00:48
感谢楼主分享,非常实用的一个包
xiaoermua 发表于 2023-11-15 09:06
支持支持
游不动 发表于 2023-11-17 16:38
感谢分享
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2025-1-11 05:54

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表