hestyle 发表于 2020-10-6 20:56

基于Python3实现的m3u8批量下载器 解密&合并&多线程 (开车新姿势~)

本帖最后由 hestyle 于 2020-11-20 12:48 编辑

# 一、前言
在上一篇帖子 https://www.52pojie.cn/thread-1184085-1-1.html 展示了怎么抓到晃video的m3u8,现在发个最近闲来无聊写的m3u8批量下载器,实现了多线程下载、AES常规解密、合并、批量下载四大功能。

# 二、m3u8概述
M3U8 是 Unicode 版本的 M3U,用 UTF-8 编码。"M3U" 和 "M3U8" 文件都是苹果公司使用的 HTTP Live Streaming(HLS) 协议格式的基础,HLS 的工作原理是把整个流分成一个个小的基于 HTTP 的文件来下载,每次只下载一些。当媒体流正在播放时,客户端可以选择从许多不同的备用源中以不同的速率下载同样的资源,允许流媒体会话适应不同的网络速率,所以广泛用于在线视频的播放、传输。

一个m3u8文件主要由信息头(记录版本、是否加密、key的位置)、ts流列表两个部分构成。下面是常见的两类m3u8文件。

## m3u8表现形式1(假设m3u8文件的url=https://www.xxx.com/yyy/zzz/index.m3u8)
这种m3u8链接对应的视频可能有多种分辨率,比如下面这个例子只有720x480这个分辨率,对应的相对url为1000kb/hls/index.m3u8,是一个相对路径,720x480分辨率的视频绝对路径就是https://www.xxx.com/yyy/zzz/1000kb/hls/index.m3u8,需要再次访问这个链接下载这个分辨率对应的m3u8文件。
```
#EXTM3U
#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=1000000,RESOLUTION=720x480
1000kb/hls/index.m3u8
```
## m3u8表现形式2(假设m3u8文件的url=https://www.xxx.com/yyy/zzz/index.m3u8)
这种m3u8已经把ts列表直接放进来了,所以下载所有的ts流组合即可得到视频的全部内容。
注意第5行,注明了该视频进行了加密,加密方式为AES-128(默认是CBC模式),加密key的路径为key.key,是一个相对路径,套上基路径就是https://www.xxx.com/yyy/zzz/key.key,(注:key也可能这个key的url是一个http开头的绝对路径)。有的m3u8可能还在这一行加一个IV,也就是AES-CBC加密、解密中的IV。
下面的例子中ts也是相对路径,同样需要加上基路径,比如第1个ts的绝对路径就是https://www.xxx.com/yyy/zzz/QxiMvI3688000.ts
```
#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:1
#EXT-X-MEDIA-SEQUENCE:0
#EXT-X-KEY:METHOD=AES-128,URI="key.key"
#EXTINF:0.834167,
QxiMvI3688000.ts
#EXTINF:0.834167,
QxiMvI3688001.ts
#EXTINF:0.834167,
QxiMvI3688002.ts
#EXTINF:0.834167,
QxiMvI3688003.ts
#EXTINF:0.834167,
QxiMvI3688004.ts
#EXTINF:0.834167,
QxiMvI3688005.ts
...
```
# 三、基于Python3实现的m3u8批量下载器(解密&多线程&合并)
## 1、下载思路
经过简单的分析m3u8协议及其文件格式,现在只要把他们串起来就好了。
```
①、下载m3u8文件,如果其内容的表示形式是第1种,则还需要再次访问对应的分辨率的url,重新下载m3u8
②、解析m3u8,判断是否加密了(需要提取加密方式、加密key、IV),提取ts列表
③、多线程下载所有ts(注意别打乱顺序,在m3u8文件中的顺序就是在完整视频中的顺序,所以需要记录原来的顺序,或者按照顺序进行ts重命名)
④、合并(如果加密了,则对每个ts解密)
⑤、调用FFmpeg,将合并好的视频信息放入一个mp4容器中(直接放在mp4文件也行)
⑥、回到①,开始下载下一个m3u8
```
## 2、Python源码实现
受博主编码能力影响,加上博主又很懒,怎么简单怎么来,代码冗余度比较高。。。下面的代码已经过了4-5千个m3u8的下载测试,但是不能保证没有bug,如有问题欢迎斧正哈~
```
# UTF-8
# author hestyle
# desc: 必须在终端直接执行,不能在pycharm等IDE中直接执行,否则看不到动态进度条效果

import os
import sys
import m3u8
import requests
import traceback
import threadpool
from Crypto.Cipher import AES


headers = {
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
    "Connection": "Keep-Alive",
    "Accept-Encoding": "gzip, deflate, br",
    "Accept-Language": "zh-CN,zh;q=0.9",
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.102 Safari/537.36"
}
######################配置信息##########################
# m3u8链接批量输入文件
m3u8InputFilePath = "D:/input/m3u8s_input.txt"
# 视频保存路径
saveRootDirPath = "D:/output"
# 下载出错的m3u8保存文件
errorM3u8InfoDirPath = "D:/output/error.txt"
# m3u8文件、key文件下载尝试次数,ts流默认无限次尝试下载,直到成功
m3u8TryCountConf = 10
# 线程数(同时下载的分片数)
processCountConf = 50
#######################################################

# 全局变量
# 全局线程池
taskThreadPool = None
# 当前下载的m3u8 url
m3u8Url = None
# url前缀
rootUrlPath = None
# title
title = None
# ts count
sumCount = None
# 已处理的ts
doneCount = None
# cache path
cachePath = saveRootDirPath + "/cache"
# log path
logPath = cachePath + "/log.log"
# log file
logFile = None

# 1、下载m3u8文件
def getM3u8Info():
    global m3u8Url
    global logFile
    global rootUrlPath
    tryCount = m3u8TryCountConf
    while True:
      if tryCount < 0:
            print("\t{0}下载失败!".format(m3u8Url))
            logFile.write("\t{0}下载失败!".format(m3u8Url))
            return None
      tryCount = tryCount - 1
      try:
            response = requests.get(m3u8Url, headers=headers, timeout=20, allow_redirects=True)
            if response.status_code == 301:
                nowM3u8Url = response.headers["location"]
                print("\t{0}重定向至{1}!".format(m3u8Url, nowM3u8Url))
                logFile.write("\t{0}重定向至{1}!\n".format(m3u8Url, nowM3u8Url))
                m3u8Url = nowM3u8Url
                continue
            expected_length = int(response.headers.get('Content-Length'))
            actual_length = len(response.content)
            if expected_length > actual_length:
                raise Exception("m3u8下载不完整")
            print("\t{0}下载成功!".format(m3u8Url))
            logFile.write("\t{0}下载成功!".format(m3u8Url))
            rootUrlPath = m3u8Url
            break
      except TimeoutError:
            print("\t{0}下载失败!正在重试".format(m3u8Url))
            logFile.write("\t{0}下载失败!正在重试".format(m3u8Url))
            traceback.print_exc()
    # 解析m3u8中的内容
    m3u8Info = m3u8.loads(response.text)
    # 有可能m3u8Url是一个多级码流
    if m3u8Info.is_variant:
      print("\t{0}为多级码流!".format(m3u8Url))
      logFile.write("\t{0}为多级码流!".format(m3u8Url))
      for rowData in response.text.split('\n'):
            # 寻找响应内容的中的m3u8
            if rowData.endswith(".m3u8"):
                m3u8Url = m3u8Url.replace("index.m3u8", rowData)
                rootUrlPath = m3u8Url
                return getM3u8Info()
      # 遍历未找到就返回None
      print("\t{0}响应未寻找到m3u8!".format(response.text))
      logFile.write("\t{0}响应未寻找到m3u8!".format(response.text))
      return None
    else:
      return m3u8Info

# 2、下载key文件
def getKey(keyUrl):
    global logFile
    tryCount = m3u8TryCountConf
    while True:
      if tryCount < 0:
            print("\t{0}下载失败!".format(keyUrl))
            logFile.write("\t{0}下载失败!".format(keyUrl))
            return None
      tryCount = tryCount - 1
      try:
            response = requests.get(keyUrl, headers=headers, timeout=20, allow_redirects=True)
            if response.status_code == 301:
                nowKeyUrl = response.headers["location"]
                print("\t{0}重定向至{1}!".format(keyUrl, nowKeyUrl))
                logFile.write("\t{0}重定向至{1}!\n".format(keyUrl, nowKeyUrl))
                keyUrl = nowKeyUrl
                continue
            expected_length = int(response.headers.get('Content-Length'))
            actual_length = len(response.content)
            if expected_length > actual_length:
                raise Exception("key下载不完整")
            print("\t{0}下载成功!key = {1}".format(keyUrl, response.content.decode("utf-8")))
            logFile.write("\t{0}下载成功! key = {1}".format(keyUrl, response.content.decode("utf-8")))
            break
      except :
            print("\t{0}下载失败!".format(keyUrl))
            logFile.write("\t{0}下载失败!".format(keyUrl))
    return response.text

# 3、多线程下载ts流
def mutliDownloadTs(playlist):
    global logFile
    global sumCount
    global doneCount
    global taskThreadPool
    taskList = []
    # 每个ts单独作为一个task
    for index in range(len(playlist)):
      dict = {"playlist": playlist, "index": index}
      taskList.append((None, dict))
    # 重新设置ts数量,已下载的ts数量
    doneCount = 0
    sumCount = len(taskList)
    printProcessBar(sumCount, doneCount, 50)
    # 构造thread pool
    requests = threadpool.makeRequests(downloadTs, taskList)
   
    # 等待所有任务处理完成
    taskThreadPool.wait()
    print("")
    return True

# 4、下载单个ts playlists
def downloadTs(playlist, index):
    global logFile
    global sumCount
    global doneCount
    global cachePath
    global rootUrlPath
    succeed = False
    while not succeed:
      # 文件名格式为 "00000001.ts",index不足8位补充0
      outputPath = cachePath + "/" + "{0:0>8}.ts".format(index)
      outputFp = open(outputPath, "wb+")
      if playlist.startswith("http"):
            tsUrl = playlist
      else:
            tsUrl = rootUrlPath + "/" + playlist
      try:
            response = requests.get(tsUrl, timeout=10, headers=headers, stream=True)
            if response.status_code == 200:
                expected_length = int(response.headers.get('Content-Length'))
                actual_length = len(response.content)
                if expected_length > actual_length:
                  raise Exception("分片下载不完整")
                outputFp.write(response.content)
                doneCount += 1
                printProcessBar(sumCount, doneCount, 50)
                logFile.write("\t分片{0:0>8} url = {1} 下载成功!".format(index, tsUrl))
                succeed = True
      except Exception as exception:
            logFile.write("\t分片{0:0>8} url = {1} 下载失败!正在重试...msg = {2}".format(index, tsUrl, exception))
      outputFp.close()

# 5、合并ts
def mergeTs(tsFileDir, outputFilePath, cryptor, count):
    global logFile
    outputFp = open(outputFilePath, "wb+")
    for index in range(count):
      printProcessBar(count, index + 1, 50)
      logFile.write("\t{0}\n".format(index))
      inputFilePath = tsFileDir + "/" + "{0:0>8}.ts".format(index)
      if not os.path.exists(outputFilePath):
            print("\n分片{0:0>8}.ts, 不存在,已跳过!".format(index))
            logFile.write("分片{0:0>8}.ts, 不存在,已跳过!\n".format(index))
            continue
      inputFp = open(inputFilePath, "rb")
      fileData = inputFp.read()
      try:
            if cryptor is None:
                outputFp.write(fileData)
            else:
                outputFp.write(cryptor.decrypt(fileData))
      except Exception as exception:
            inputFp.close()
            outputFp.close()
            print(exception)
            return False
      inputFp.close()
    print("")
    outputFp.close()
    return True

# 6、删除ts文件
def removeTsDir(tsFileDir):
    # 先清空文件夹
    for root, dirs, files in os.walk(tsFileDir, topdown=False):
      for name in files:
            os.remove(os.path.join(root, name))
      for name in dirs:
            os.rmdir(os.path.join(root, name))
    os.rmdir(tsFileDir)
    return True

# 7、convert to mp4(调用了FFmpeg,将合并好的视频内容放置到一个mp4容器中)
def ffmpegConvertToMp4(inputFilePath, ouputFilePath):
    global logFile
    if not os.path.exists(inputFilePath):
      print(inputFilePath + " 路径不存在!")
      logFile.write(inputFilePath + " 路径不存在!\n")
      return False
    cmd = r'.\ffmpeg -i "{0}" -vcodec copy -acodec copy "{1}"'.format(inputFilePath, ouputFilePath)
    if os.system(cmd) == 0:
      print(inputFilePath + "转换成功!")
      logFile.write(inputFilePath + "转换成功!\n")
      return True
    else:
      print(inputFilePath + "转换失败!")
      logFile.write(inputFilePath + "转换失败!\n")
      return False

# 8、模拟输出进度条
def printProcessBar(sumCount, doneCount, width):
    precent = doneCount / sumCount
    useCount = int(precent * width)
    spaceCount = int(width - useCount)
    precent = precent*100
    print('\t{0}/{1} {2}{3} {4:.2f}%'.format(sumCount, doneCount, useCount*'■', spaceCount*'□', precent), file=sys.stdout, flush=True, end='\r')

# m3u8下载器
def m3uVideo8Downloader():
    global title
    global logFile
    global m3u8Url
    global cachePath
    # 1、下载m3u8
    print("\t1、开始下载m3u8...")
    logFile.write("\t1、开始下载m3u8...\n")
    m3u8Info = getM3u8Info()
    if m3u8Info is None:
      return False
    tsList = []
    for playlist in m3u8Info.segments:
      tsList.append(playlist.uri)
    # 2、获取key
    keyText = ""
    cryptor = None
    # 判断是否加密
    if (len(m3u8Info.keys) != 0) and (m3u8Info.keys is not None):
      # 默认选择第一个key,且AES-128算法
      key = m3u8Info.keys
      if key.method != "AES-128":
            print("\t{0}不支持的解密方式!".format(key.method))
            logFile.write("\t{0}不支持的解密方式!\n".format(key.method))
            return False
      # 如果key的url是相对路径,加上m3u8Url的路径
      keyUrl = key.uri
      if not keyUrl.startswith("http"):
            keyUrl = m3u8Url.replace("index.m3u8", keyUrl)
      print("\t2、开始下载key...")
      logFile.write("\t2、开始下载key...\n")
      keyText = getKey(keyUrl)
      if keyText is None:
            return False
      # 判断是否有偏移量
      if key.iv is not None:
            cryptor = AES.new(bytes(keyText, encoding='utf8'), AES.MODE_CBC, bytes(key.iv, encoding='utf8'))
      else:
            cryptor = AES.new(bytes(keyText, encoding='utf8'), AES.MODE_CBC, bytes(keyText, encoding='utf8'))
    # 3、下载ts
    print("\t3、开始下载ts...")
    logFile.write("\t3、开始下载ts...\n")
    if mutliDownloadTs(tsList):
      print("\tts下载完成---------------------")
      logFile.write("\tts下载完成---------------------\n")
    # 4、合并ts
    print("\t4、开始合并ts...")
    logFile.write("\t4、开始合并ts...\n")
    if mergeTs(cachePath, cachePath + "/cache.flv", cryptor, len(tsList)):
      print("\tts合并完成---------------------")
      logFile.write("\tts合并完成---------------------\n")
    else:
      print(keyText)
      print("\tts合并失败!")
      logFile.write("\tts合并失败!\n")
      return False
    # 5、开始转换成mp4
    print("\t5、开始mp4转换...")
    logFile.write("\t5、开始mp4转换...\n")
    if not ffmpegConvertToMp4(cachePath + "/cache.flv", saveRootDirPath + "/" + title + ".mp4"):
      return False
    return True


if __name__ == '__main__':
    # 判断m3u8文件是否存在
    if not (os.path.exists(m3u8InputFilePath)):
      print("{0}文件不存在!".format(m3u8InputFilePath))
      exit(0)
    m3u8InputFp = open(m3u8InputFilePath, "r", encoding="utf-8")
    # 设置error的m3u8 url输出
    errorM3u8InfoFp = open(errorM3u8InfoDirPath, "a+", encoding="utf-8")
    # 设置log file
    if not os.path.exists(cachePath):
      os.makedirs(cachePath)
    logFile = open(logPath, "w+", encoding="utf-8")
    # 初始化线程池
    taskThreadPool = threadpool.ThreadPool(processCountConf)
    while True:
      rowData = m3u8InputFp.readline()
      rowData = rowData.strip('\n')
      if rowData == "":
            break
      m3u8Info = rowData.split(',')
      title = m3u8Info
      m3u8Url = m3u8Info
      try:
            print("{0} 开始下载:".format(m3u8Info))
            logFile.write("{0} 开始下载:\n".format(m3u8Info))
            if m3uVideo8Downloader():
                # 成功下载完一个m3u8则清空logFile
                logFile.truncate()
                print("{0} 下载成功!".format(m3u8Info))
            else:
                errorM3u8InfoFp.write(title + "," + m3u8Url + '\n')
                errorM3u8InfoFp.flush()
                print("{0} 下载失败!".format(m3u8Info))
                logFile.write("{0} 下载失败!\n".format(m3u8Info))
      except Exception as exception:
            print(exception)
            traceback.print_exc()
    # 关闭文件
    logFile.close()
    m3u8InputFp.close()
    errorM3u8InfoFp.close()
    print("----------------下载结束------------------")
```
# 四、开车姿势与车速展示
## 1、开车姿势
### ①、导入源码用到库m3u8、traceback、threadpool、pycryptodome、beautifulsoup4(用pip3安装就行)
### ②、将ffmpeg.exe放到源码文件所在目录(源码调用了cmd命令进而调用了ffmpeg.exe)
ffmpeg.exe文件下载链接: htt防ps://pan.ba封idu.com/s/1Q处ag-VlGlRajx理Ovp2d_2Uaw 提取码: 886z
### ③、将准备好的m3u8s_input.txt文件(必须是utf-8编码,格式如下)
```
title_1,m3u8_url_1
title_2,m3u8_url_2
title_3,m3u8_url_3
...这是省略号,小白别瞎搞...
title_n,m3u8_url_n
```
### ④、配置好源码中的m3u8 url文件路径、视频保存的地址、线程数
### ⑤、控制台/终端使用Python执行脚本
#### 注:为了尽量减少频繁的删除、创建ts流文件,源码实验时在output目录中创建一个cache缓存目录,里面暂存下载好的ts流。下载过程中不可删除!!!

## 2、车速展示(系好安全带)

# 五、git链接
未防止后期更新帖子无法编辑,已放置在github上
https://github.com/hestyle/m3u8_downloader
# 六、后记
以上源码仅作为Python技术学习、交流之用,切勿用于任何可能造成违法犯罪的场景,否则后果自负!

haoxuer 发表于 2020-10-7 08:19

我是来学习代码写法的,感谢提供

yilvz 发表于 2020-11-4 13:01

请教大佬,key是这样加密的怎么办,是什么加密#EXTM3U
#EXT-X-TARGETDURATION:10
#EXT-X-ALLOW-CACHE:YES
#EXT-SECRET-KEY-INDEX:2
#EXT-SECRET-KEY:XpDzBPuLf4hh3U6zY+NqvgtFDFW8fWwR1OnyISqBK1qHQqmuVsKfE0JpjLX/Dz5YaY/xFb9MlWmtoD2G7ELAFBro2pJ6OyaMD/hnQ+LcDWeRWVSHmK805qh0QOL6sdHlnPTSm9lO46zdEaxaL8Wmb/5IAF5a2K7D+AXf/jV5zs0=
#EXT-X-PLAYLIST-TYPE:VOD
#EXT-X-VERSION:3

hestyle 发表于 2020-10-6 21:23

letheva 发表于 2020-10-6 21:13
大佬厉害啊,支持japonx这个网站吗?

这种问题就别问了吧,小心号没了,这是一个通用下载器,有些网站的视频有二次加密、防爬操作,需要针对其进行适配

ysnsn 发表于 2020-10-6 21:11

不会python,支持一下{:1_887:}:victory:

letheva 发表于 2020-10-6 21:13

本帖最后由 letheva 于 2020-10-6 21:31 编辑

大佬厉害啊,支持支持,{:301_997:}

爱飞的猫 发表于 2020-10-6 21:22

https://ytdl-org.github.io/youtube-dl/index.html

Youtube-dl 也是 python 做的视频下载工具,能直接下m3u8 + 合并

letheva 发表于 2020-10-6 21:31

hestyle 发表于 2020-10-6 21:23
这种问题就别问了吧,小心号没了,这是一个通用下载器,有些网站的视频有二次加密、防爬操作,需要针对其 ...

那我赶紧删了{:301_971:}

t1099778271 发表于 2020-10-6 21:32

好东西?!!!!!!!

立刻 发表于 2020-10-6 22:23

你这个帖子为什么这么卡

littlebit 发表于 2020-10-7 00:15

{:301_993:}火钳刘明!学习下!
页: [1] 2 3 4 5 6 7
查看完整版本: 基于Python3实现的m3u8批量下载器 解密&合并&多线程 (开车新姿势~)