wutljs 发表于 2023-11-13 11:10

这个包可很方便的集成在您的项目中

本帖最后由 wutljs 于 2023-11-14 09:47 编辑

# 一个高效的python包 : from_m3u8_get_mp4

## 编写背景

作为一个爬虫爱好者,笔者经常从各种网页中下载MP4视频。有些视频只需要一键下载即可,而有些视频则需要使用m3u8文件来下载。对于前者没什么好说的,直接点击一键下载或者使用requests库拿到content写入文件即可,而对于后者,在自己不写脚本加工m3u8文件的话,则需要使用第三方工具来完成任务。

不过显然,当爬取任务量非常大的时候,这种方法就显得低效。

出于以上原因,笔者写了一个包用来完成常规m3u8文件的转化。

## 相关说明

### github地址

(https://github.com/wutljs/from_m3u8_get_mp4)

### pypi下载方式

**pip install from_m3u8_get_mp4**

### 使用方式

- 您的爬虫获取了目标网址

**g = GetVideoFromUrl(save_file_path=save_file_path, video_name=video_name, m3u8_url=m3u8_url)**

**g.start()**

- 您的爬虫获得了目标文件

**g = GetVideoFromFile(save_file_path=save_file_path, video_name=video_name, m3u8_file_path=m3u8_file_path)**

**g.start()**

## 注意事项

### 缺少相关依赖

1. 在您的相关pythonProject位置打开终端,如下图:



2. 使用以下命令:

   ```cmd
   python detect_moudle.py的文件地址
   ```

   一般来说,可以使用以下命令来代替上述命令:

   ```
   python venv/lib/site-packages/from_m3u8_get_mp4/detect_moudle.py
   ```

### 添加代{过}{滤}理、UA

如果您有这方面需要的话,可以修改from_m3u8_get_mp4包的settings.py文件。

## 其他

### 寄语

希望这个包可以提高您的开发效率,如果您遇见了什么问题,欢迎与笔者交流。

### 关键代码展示

get_video.py:

```python
# -*- coding: utf-8 -*-
# @AuThor: LouJingshuo
# @E-mail: 3480339804@qq.com
# @Time    : 2023/4/16 15:13
# @Function: Infringement must be investigated, please indicate the source of reproduction!
"""This module is used to download the video that the user wants"""

from from_m3u8_get_mp4.decrypt_video import DecryptAES
from from_m3u8_get_mp4 import settings
import os
import shutil
import re
import asyncio
import aiohttp
import aiofiles


class GetVideoFromUrl:
    """
    This class is used to download M3U8 files from the given M3U8 file download address,
    and the downloaded M3U8 file should contain all TS video file download addresses.
    """

    proxy = ''

    def __init__(self, save_file_path, video_name, m3u8_url):
      self.save_file_path = save_file_path
      self.video_name = video_name
      self.m3u8_url = m3u8_url

    def creat_file(self):
      os.system(rf'md {self.save_file_path}\{self.video_name}\ts')
      os.system(rf'md {self.save_file_path}\{self.video_name}\decode_ts')

    async def decrypt_video(self, key):
      da = DecryptAES(save_file_path=self.save_file_path, video_name=self.video_name, key=key)
      await da.decrypt_all_ts()

    @staticmethod
    async def get_key(session, key_url):
      async with session.get(url=key_url) as resp:
            key = await resp.read()
      return key

    async def get_data(self, session, ts_url):
      """This method is used to troubleshoot various problems that can occur with connecting to the destination server"""

      headers = {
            'user-agent': settings.headers
      }

      try:
            async with session.get(url=ts_url, proxy=self.proxy, headers=headers) as resp:
                data = await resp.content.read()
                if len(data) == 0:
                  return False, None
                return True, data
      except asyncio.exceptions.TimeoutError:
            return False, None
      except aiohttp.client_exceptions.ServerDisconnectedError:
            return False, None
      except aiohttp.client_exceptions.ClientOSError:
            return False, None

    async def one_ts_download(self, session, ts_url, name):
      while True:
            judge, data = await self.get_data(session, ts_url)
            if judge == True:
                break
      async with aiofiles.open(rf'{self.save_file_path}\{self.video_name}\ts\{name}', 'wb') as fp:
            await fp.write(data)
      print(name + ' finished!')

    async def download_all_ts(self, session, ts_urls):
      async with aiofiles.open(rf'{self.save_file_path}\{self.video_name}\ts_name.text', 'w') as fp:
            tasks = []
            num = 0
            for ts_url in ts_urls:
                name = '{:0>5}.ts'.format(num)
                await fp.write(name + '\n')
                tasks.append(asyncio.create_task(self.one_ts_download(session, ts_url, name)))
                num += 1
            await asyncio.wait(tasks)

    @staticmethod
    async def url_compose(url_header, uncomplete_url):
      result_url_list = []
      uncomplete_url_list = uncomplete_url.split('/')
      for item in uncomplete_url_list:
            if item not in url_header:
                result_url_list.append(item)
      complete_url = url_header + '/'.join(result_url_list)
      return complete_url

    async def all_urls_get(self, session):
      async with session.get(self.m3u8_url) as resp:
            m3u8_content = await resp.read()
      async with aiofiles.open(rf'{self.save_file_path}\{self.video_name}\{self.video_name}.m3u8', 'wb') as fp:
            await fp.write(m3u8_content)

      url_header = self.m3u8_url.strip('index.m3u8')
      key_url = ''
      ts_urls = []
      async with aiofiles.open(rf'{self.save_file_path}\{self.video_name}\{self.video_name}.m3u8', 'r') as fp:
            async for item in fp:
                # extract the URL of the key
                if 'key' in item:
                  key_url = re.compile(r'URI="(?P<key_url>.*?)"', re.S).search(item).group('key_url')
                  if 'http' not in key_url:
                        key_url = await self.url_compose(url_header, key_url)

                # extract the addresses of all TS files in the file and set up a proxy
                if '#' not in item:
                  ts_url = item.strip()

                  try:
                        assert 'https' in ts_url
                        self.proxy = settings.https_proxy
                  except AssertionError:
                        self.proxy = settings.http_proxy

                  if 'http' not in ts_url:
                        ts_url = await self.url_compose(url_header, ts_url)
                  ts_urls.append(ts_url)

      return key_url, ts_urls

    async def main(self):
      # Create a folder based on the storage path provided by the user to store the corresponding video files.
      self.creat_file()
      print('The corresponding folder has been created!')

      # download the video
      timeout = aiohttp.ClientTimeout(total=30)# Set the timeout period, which is 30 seconds by default.
      async with aiohttp.ClientSession(timeout=timeout) as session:
            key_url, ts_urls = await self.all_urls_get(session)
            await self.download_all_ts(session, ts_urls)

      # determine whether the video is encrypted
      if key_url == '':
            os.system(
                rf'copy /b {self.save_file_path}\{self.video_name}\ts\*.ts {self.save_file_path}\{self.video_name}.mp4')
      else:
            try:
                key = await self.get_key(session, key_url)
            except RuntimeError:
                async with aiohttp.ClientSession() as session1:
                  key = await self.get_key(session1, key_url)
            await self.decrypt_video(key)
            os.system(
                rf'copy /b {self.save_file_path}\{self.video_name}\decode_ts\*.ts {self.save_file_path}\{self.video_name}.mp4')
      print(self.video_name + ' finished!!')

      # clean up currently downloaded files in preparation for subsequent downloads
      shutil.rmtree(rf'{self.save_file_path}\{self.video_name}')

    def start(self):
      # encapsulates the main function for easy user calling
      asyncio.get_event_loop().run_until_complete(self.main())


class GetVideoFromFile(GetVideoFromUrl):
    """
    This class converts the user-supplied M3U8 file into an MP4 file.
    Note that this file contains all TS file (the key) download addresses in full.
    """

    def __init__(self, save_file_path, video_name, m3u8_file_path):
      super().__init__(save_file_path=save_file_path, video_name=video_name, m3u8_url=None)
      self.m3u8_file_path = m3u8_file_path

    async def all_urls_get(self, session):
      key_url = ''
      ts_urls = []
      async with aiofiles.open(rf'{self.m3u8_file_path}', 'r') as fp:
            async for item in fp:
                # extract the URL of the key
                if 'key' in item:
                  key_url = re.compile(r'URI="(?P<key_url>.*?)"', re.S).search(item).group('key_url')
                  if 'http' not in key_url:
                        key_url = await self.url_compose(url_header, key_url)

                # extract the addresses of all TS files in the file and set up a proxy
                if '#' not in item:
                  ts_url = item.strip()

                  try:
                        assert 'https' in ts_url
                        self.proxy = settings.https_proxy
                  except AssertionError:
                        self.proxy = settings.http_proxy

                  ts_urls.append(ts_url)

      return key_url, ts_urls

```

HR741158 发表于 2023-11-15 12:42

感谢分享

LeonSmith123 发表于 2023-11-15 01:23

看起来能方便将视频爬取功能整合到python爬虫流程中,赞一个!

icode_isky 发表于 2023-11-13 14:37

感谢楼主分享,非常实用的一个包

fatyung 发表于 2023-11-13 14:55

感谢楼主分享,非常实用的一个包

FruitBaby 发表于 2023-11-13 22:35

加到项目中,正好试试

wutljs 发表于 2023-11-14 09:21

本帖最后由 wutljs 于 2023-11-14 21:31 编辑

FruitBaby 发表于 2023-11-13 22:35
加到项目中,正好试试
感谢支持!欢迎交流。

RKCN 发表于 2023-11-15 00:48

感谢楼主分享,非常实用的一个包

xiaoermua 发表于 2023-11-15 09:06

支持支持

游不动 发表于 2023-11-17 16:38

感谢分享
页: [1] 2
查看完整版本: 这个包可很方便的集成在您的项目中