BienBoy 发表于 2022-1-24 18:15

python爬取B站视频,实现实时进度查看,番剧下载全集

本帖最后由 BienBoy 于 2022-1-25 23:19 编辑

在之前的文章中,主要分析了B站如何获取不同清晰度视频下载链接的方法,现在具体写一下B站视频怎么下载。

一、完整代码
先把完整代码放出来,后面再讲解。
需要的模块:requests, re, os, shutil, BeautifulSoup, time,此外视频和音频的合成用到了ffmpeg。
import requests
import re
import os
import shutil
from bs4 import BeautifulSoup
import time

def type_of_video(url):
    response = requests.get(url, headers=headers1)
    type = BeautifulSoup(response.text, 'lxml').find(attrs={'property': 'og:type'})['content']
    if 'movie' in type:
      return 'movie'
    elif 'anime' in type:
      return 'anime'
    elif 'documentary' in type:
      return 'documentary'
    elif 'tv' in type:
      return 'tv'
    else:
      return 'video'


def get_all_url(url, type):
    if type == 'anime' or type == 'tv' or type == 'documentary':
      response = requests.get(url, headers=headers1)
      text = response.text
      pattern1 = 'upInfo.*</html>'
      pattern2 = '"share_url":"(http.*?)"'
      a = re.sub(pattern1, '', text)
      result = re.findall(pattern2, a)
      return result
    else:
      return url


def get_downloadurl(url, type):         #获取视频和音频的下载地址
    try:
      response = requests.get(url, headers=headers1)
      if response.status_code == 200:
            text = response.text
            title_of_series = BeautifulSoup(text, 'lxml').find(attrs={'property': 'og:title'})['content']
            if type == 'movie' or type == 'video':
                title_of_series = 'Bilibili下载视频'
                pattern_title = '.__INITIAL_STATE__.*?itle.*?:"(.*?)"'
            else:
                pattern_title = '.__INITIAL_STATE__.*?itle.*?:"' + title_of_series + '.*?:(.*?)"'
            pattern_video = '"video":.+?"baseUrl".*?"(https://.*?.m4s.*?)"'
            pattern_audio = '"audio":.+?"baseUrl".*?"(https://.*?.m4s.*?)"'
            url_video = re.search(pattern_video, text)
            url_audio = re.search(pattern_audio, text)
            title = re.search(pattern_title, text)
            urls = {
                'video': url_video,
                'audio': url_audio,
                'title': title,
                'title_of_series': title_of_series
            }
            return urls
    except ConnectionError as e:
      print(e.args)
      print('获取视频和音频的下载地址失败')
      return None


def merge(title, output):
    os.system('D:/ffmpeg/bin/ffmpeg -i ./download/"' + title + '.mp3" -i ./download/"' + title + '.mp4" \
-acodec copy -vcodec copy ./' + output + '/"' +title + '.mp4"')


def down_video(urls):
    if not os.path.exists('./download'):
      os.mkdir('./download')# 创建临时文件夹以便存放音频,视频
    if not os.path.exists(urls['title_of_series']):
      os.mkdir(urls['title_of_series'])
    try:
      video = requests.get(urls['video'], headers=headers2, stream=True)
      if video.status_code == 206:
            chunk_size = 1024
            content_size = int(video.headers['content-length'])
            data_count = 0
            with open('./download/' + urls['title'] + '.mp4', 'wb') as f:
                for data in video.iter_content(chunk_size=chunk_size):
                  f.write(data)
                  data_count += len(data)
                  progress = data_count * 100 / content_size
                  print('\r 正在下载视频:[%s%s] %d%%' % (int(progress) * '█', ' ' * (100 - int(progress)), progress), end=' ')
    except:
      print("Error!")
      shutil.rmtree('./download')
      return False
    try:
      audio = requests.get(urls['audio'], headers=headers2, stream=True)
      if audio.status_code == 206:
            chunk_size = 1024
            content_size = int(audio.headers['content-length'])
            data_count = 0
            with open('./download/' + urls['title'] + '.mp3', 'wb') as f:
                for data in audio.iter_content(chunk_size=chunk_size):
                  f.write(data)
                  data_count += len(data)
                  progress = data_count * 100 / content_size
                  print('\r 正在下载音频[%s%s] %d%%' % (int(progress) * '█', ' ' * (100 - int(progress)), progress), end=' ')
    except:
      print('Error!')
      shutil.rmtree('./download')
      return False
    merge(urls['title'], urls['title_of_series'])
    shutil.rmtree('./download')
    return True


if __name__ == '__main__':
    cookie = ''            #可以添加自己的cookies
    headers1 = {
      'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)\
   Chrome/97.0.4692.99 Safari/537.36 Edg/97.0.1072.69',
      'cookie': cookie
    }
    headers2 = {
      'referer': 'https://www.bilibili.com/bangumi/play/ep402225/',
      'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)\
   Chrome/97.0.4692.99 Safari/537.36 Edg/97.0.1072.69',
      'range': 'bytes=0-1000000000000'
    }
    end_download = False
    while not end_download:
      url = input('请输入要下载的视频网址:')
      type = type_of_video(url)
      allurls = get_all_url(url, type)
      if isinstance(allurls, str):
            urls = get_downloadurl(allurls, type)
            is_succeessful = down_video(urls)
            if is_succeessful:
                print("下载完成")
            else:
                print('下载失败')
      else:
            choice = input("是否下载全部剧集(y/n):")
            while choice != 'y' and choice != 'n':
                print(choice)
                choice = input('请输入y或n:')
            if choice == 'y':
                download_all = True
            else:
                download_all = False
            if download_all:
                i = 0
                while i < len(allurls):
                  urls = get_downloadurl(allurls.encode('utf8').decode("unicode_escape"), type)
                  if urls:
                        is_succeessful = down_video(urls)
                        if is_succeessful:
                            print("第%d集下载完成已完成:%.2f%%" % (i+1, i+1/len(allurls)-1))
                            i += 1
                        else:
                            print('第%d集下载失败,将于30分钟后再次尝试下载' % (i+1))
                            time.sleep(1800)
                  else:
                        print('第%d集下载失败,将于30分钟后再次尝试下载' % (i+1))
                        time.sleep(1800)
            else:
                urls = get_downloadurl(url, type)
                is_succeessful = down_video(urls)
                if is_succeessful:
                  print("下载完成")
                else:
                  print('下载失败')
      choice = input('是否继续下载?(y/n):')
      while choice != 'y' and choice != 'n':
            print(choice)
            choice = input('请输入y或n:')
      if choice == 'y':
            end_download = False
      else:
            end_download = True

如果有大会员,可以考虑在代码中添加cookies,以便下载会员专属视频及享受最高清晰度。没有可以留空。
简单介绍一下这段代码的功能:
1. 主要功能就是下载B站视频,对于番剧、电视剧以及纪录片这三类,可以选择下载全集,或只下载当前集数;其他类型视频一次只能下载一个
2. 由于音频、视频需要分开的,所以下载过程中会新建一个download文件夹,合成视频、音频后会自动删除。请确保当前目录内没有同名文件夹,否则将会被删除
3. 利用ffmpeg,自动将下载的音频和视频合成
4. 对于番剧、电视剧以及纪录片,会自动创建一个文件夹,命名为作品名称,单个视频命名为单集名称。其余类型视频,均放入命名为“Bilibili下载视频”的文件夹,视频名称同B站原视频名称
5. 视频下载采用分块的模式,减小内存消耗,并实现下载过程中,实时显示视频下载进度
6. 在下载全集时,如果下载出错,均视为请求出错,会暂停30分钟,30分钟后尝试再次下载,可自行编辑暂停时间
没有使用代{过}{滤}理,在下载大量视频时建议自己添加,防止ip被封。
二、代码分析
1. 获取视频类型
请求视频url后的响应体中,我们可以在meta标签中找到视频类型。如图




可以利用Beautifulsoup提取,代码如下:
type = BeautifulSoup(response.text, 'lxml').find(attrs={'property': 'og:type'})['content']
可以定义一个获取视频类型的函数type_of_video():
def type_of_video(url):
    response = requests.get(url, headers=headers1)
    type = BeautifulSoup(response.text, 'lxml').find(attrs={'property': 'og:type'})['content']
    if 'movie' in type:
      return 'movie'
    elif 'anime' in type:
      return 'anime'
    elif 'documentary' in type:
      return 'documentary'
    elif 'tv' in type:
      return 'tv'
    else:
      return 'video'
2. 获取番剧的所有网址
同样请求视频url后的响应体中,也可以找到所有视频的url。
先尝试搜索第一集的名称或网址,可以找到以下结果:




可以看到,其中的share_url对应的https:\u002F\u002Fwww.bilibili.com\u002Fbangumi\u002Fplay\u002Fep323085看起来像是第一集的网址,不过和真实的网址比起来,"\"是由'\u002F'来表示的,只要进行一下转码就可以了。
这里我采用正则表达式来进行匹配,"share_url":"(http.*?)"。但是得到了4373条结果,比总集数1094要多。
为了解决这个问题,我找到第二个第一集的url出现的位置,如图




在这里我看到了一个词“upInfo”,我尝试在里面搜索了一下,发现这个词只出现了一次,所以只要把它后面的信息全部删掉,再进行正则匹配就可以了。
可以定义get_all_url(url, type)函数,type为视频类型,这样就可以获取番剧、电视剧、纪录片的所有剧集的url,其余类型的视频仍返回其url。
def get_all_url(url, type):
    if type == 'anime' or type == 'tv' or type == 'documentary':
      response = requests.get(url, headers=headers1)
      text = response.text
      pattern1 = 'upInfo.*</html>'
      pattern2 = '"share_url":"(http.*?)"'
      a = re.sub(pattern1, '', text)
      result = re.findall(pattern2, a)
      return result
    else:
      return url
3. 根据视频网址获取视频、音频下载网址
网址的查找在之前的文章Python爬取b站视频并分析视频清晰度问题中介绍过了,可以前往查看,不再赘述。
直接给出提取视频和音频下载网址的正则表达式:"video":.+?"baseUrl".*?"(https://.*?.m4s.*?)"及"audio":.+?"baseUrl".*?"(https://.*?.m4s.*?)"
同时,我们也可以提取作品名称以及单集视频名称,以便之后进行文件夹及视频命名。
我们尝试搜索作品名称,发现可以可以在meta标签里找到。而且其获取方式与1. 获取视频类型里的方法大同小异。





再尝试搜索单集名称,并找到一个容易提取的位置。我找到的位置如下:





利用正则表达式.__INITIAL_STATE__.*?h1Title":.*?"(.*?)"可以获取单集名称。为了符合B站非番剧类的视频,做出一些修改:.__INITIAL_STATE__.*?itle":.*?"(.*?)"
结合上面获得的作品名称,可以构造正则表达式去除作品名称。
定义获取视频、音频下载地址的函数get_downloadurl(url)
def get_downloadurl(url, type):         #获取视频和音频的下载地址
    try:
      response = requests.get(url, headers=headers1)
      if response.status_code == 200:
            text = response.text
            title_of_series = BeautifulSoup(text, 'lxml').find(attrs={'property': 'og:title'})['content']
            if type == 'movie' or type == 'video':
                title_of_series = 'Bilibili下载视频'
                pattern_title = '.__INITIAL_STATE__.*?itle.*?:"(.*?)"'
            else:
                pattern_title = '.__INITIAL_STATE__.*?itle.*?:"' + title_of_series + '.*?:(.*?)"'
            pattern_video = '"video":.+?"baseUrl".*?"(https://.*?.m4s.*?)"'
            pattern_audio = '"audio":.+?"baseUrl".*?"(https://.*?.m4s.*?)"'
            url_video = re.search(pattern_video, text)
            url_audio = re.search(pattern_audio, text)
            title = re.search(pattern_title, text)
            urls = {
                'video': url_video,
                'audio': url_audio,
                'title': title,
                'title_of_series': title_of_series
            }
            return urls
    except ConnectionError as e:
      print(e.args)
      print('获取视频和音频的下载地址失败')
      return None
4. 下载视频
已经得到了视频下载url,下面就要利用requests发送请求进行下载。
为了可以得到正确完整的视频,需要构造请求头。
headers2 = {
    'referer': 'https://www.bilibili.com/bangumi/play/ep402225/',
    'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)\
Chrome/97.0.4692.99 Safari/537.36 Edg/97.0.1072.69',
    'range': 'bytes=0-1000000000000'
}
range的范围要大一些以便可以获得完整视频。
referer需要有,不然会返回403
因为需要下载视频,属于比较大的文件,为了减轻内存负担,可以一边请求,一边写入。这样还有一个好处,就是可以实时查看下载进度。
为了达到这样的目的,我们需要使用requests.get()的stream参数。当把get函数的stream参数设置为True时,它不会立即开始下载,只有当调用iter_content或iter_lines遍历内容或访问内容属性时才开始下载。文件没有下载时仍需保持连接。
iter_content是一块一块遍历,而iter_lines是一行一行遍历,下载视频需要用iter_content遍历。
可以简单看一下例子:
video = requests.get(url, headers=headers, stream=True)
chunk_size = 1024      #按块下载的数据块大小
content_size = int(video.headers['content-length'])   #获取文件总大小
with open('test.mp4', 'wb') as f:
    for data in video.iter_content(chunk_size=chunk_size):   #遍历下载
      f.write(data)
用法比较简单。
为了实现下载进度的可视化,可以结合以上代码做出一个进度条。
video = requests.get(url, headers=headers, stream=True)
chunk_size = 1024      #按块下载的数据块大小
content_size = int(video.headers['content-length'])   #获取文件总大小
data_count = 0   #用于记录已下载的数据大小
with open('test.mp4', 'wb') as f:
    for data in video.iter_content(chunk_size=chunk_size):   #遍历下载
      f.write(data)
      data_count += len(data)
      progress = 100 * data_count / content_size         #已下载数据占比
      print('/r 正在下载:[%s%s] %d%%' % (int(progress) * '█', '' * (100 - int(progress)), progress), end= ' ')
      #/r为水平制表符,表示回到最左边;设置end=‘ ’,可以使得进度条在固定一行显示
至此,可以定义函数download_video(urls)
def down_video(urls):
    if not os.path.exists('./download'):
      os.mkdir('./download')# 创建临时文件夹以便存放音频,视频
    if not os.path.exists(urls['title_of_series']):
      os.mkdir(urls['title_of_series'])
    try:
      video = requests.get(urls['video'], headers=headers2, stream=True)
      if video.status_code == 206:
            chunk_size = 1024
            content_size = int(video.headers['content-length'])
            data_count = 0
            with open('./download/' + urls['title'] + '.mp4', 'wb') as f:
                for data in video.iter_content(chunk_size=chunk_size):
                  f.write(data)
                  data_count += len(data)
                  progress = data_count * 100 / content_size
                  print('\r 正在下载视频:[%s%s] %d%%' % (int(progress) * '█', ' ' * (100 - int(progress)), progress), end=' ')
    except:
      print("Error!")
      shutil.rmtree('./download')
      return False
    try:
      audio = requests.get(urls['audio'], headers=headers2, stream=True)
      if audio.status_code == 206:
            chunk_size = 1024
            content_size = int(audio.headers['content-length'])
            data_count = 0
            with open('./download/' + urls['title'] + '.mp3', 'wb') as f:
                for data in audio.iter_content(chunk_size=chunk_size):
                  f.write(data)
                  data_count += len(data)
                  progress = data_count * 100 / content_size
                  print('\r 正在下载音频[%s%s] %d%%' % (int(progress) * '█', ' ' * (100 - int(progress)), progress), end=' ')
    except:
      print('Error!')
      shutil.rmtree('./download')
      return False
    merge(urls['title'], urls['title_of_series'])
    shutil.rmtree('./download')
    return True
5. 合并视频和音频
其实上面download_video(urls)已经用到了自定义函数merge(title, output),传入视频名称以及存储文件夹名称,函数会调用ffmpeg进行合成。
关于ffmpeg,可以参考python 处理视频之 FFmpeg - 简书 (jianshu.com)下载使用。
我这里偷懒没有使用ffmpy库,而是通过调用cmd直接使用ffmpeg,因此导致运行时会输出一些信息。具体代码如下(其实只有一行):
def merge(title, output):
    os.system('D:/ffmpeg/bin/ffmpeg -i ./download/"' + title + '.mp3" -i ./download/"' + title + '.mp4" \
-acodec copy -vcodec copy ./' + output + '/"' +title + '.mp4"')
三、总结
到这里,下载B站视频需要的函数都已经写出了,在主函数中合理调用就可以下载所有(应该)的B站视频了。(详见一、完整代码)
尝试运行,结果为:


Helli 发表于 2022-11-24 12:42

我有个问题,之前跑的好好的,突然出现了这个错误,是什么意思?
Traceback (most recent call last):
File "C:\Users\wjdxs\Desktop\爬取哔哩哔哩视频.py", line 128, in <module>
    type = type_of_video(url)
File "C:\Users\wjdxs\Desktop\爬取哔哩哔哩视频.py", line 10, in type_of_video
    type = BeautifulSoup(response.text, 'lxml').find(attrs={'property': 'og:type'})['content']
File "D:\编程\Python\Python\Lib\site-packages\bs4\__init__.py", line 248, in __init__
    raise FeatureNotFound(
bs4.FeatureNotFound: Couldn't find a tree builder with the features you requested: lxml. Do you need to install a parser library?

mebyan 发表于 2023-6-22 14:33

本帖最后由 mebyan 于 2023-6-22 14:37 编辑

请教楼主和大佬们一个关于下载视频分辨率的问题。B站视频分辨率有多种,听说未登录用户有480P、720P、1080P,可选,最高可选1080P, 参见”Python爬取b站视频并分析视频清晰度问题 https://www.52pojie.cn/thread-1579770-1-1.html“。这个程序可以下载成功B站视频,但是默认的是480P的,请问怎样选择下载1080P更清晰的版本?我在url_video = re.search(pattern_video, text)之后加了一条语句:url_video= url_video.replace("30032","30080"),虽然将视频链接中的30032.m4s改成了30080.m4s,但是最后在程序跑到video = requests.get(urls['video'], headers=headers2, stream=True)时,video.status_code变成了403而不是206。无法访问B站,不能下载视频了。请高手们多指教。

踮起脚尖过日子 发表于 2022-1-24 19:00

技术厉害呀

eeeeda 发表于 2022-1-24 19:25

BienBoy 发表于 2022-1-24 19:28

eeeeda 发表于 2022-1-24 19:25
可以下载无水印的吗?

不可以,下载的都有水印

studentguo 发表于 2022-1-24 19:38

学习。。。

咔c君 发表于 2022-1-24 20:26

不错学习了

萌新与小白 发表于 2022-1-24 20:27

感谢楼主分享

xuexiba 发表于 2022-1-24 20:37

学习了,感谢楼主分享。

qzh阑珊梦 发表于 2022-1-24 21:01

我对b站视频网络抓包 发现m4s类的串流文件不是get型 是option加密型状态200的forbidden了 状态206的可以接收 这就让我无法解决了 但是后来发现浏览器不登录 请求那个串流包url可以下载 登录就403 玩到这我就放弃了……起码不登陆就720p 画质是真的不行 楼主这个不错 值得研究

seawaycao 发表于 2022-1-24 21:48

谢谢分享!{:1_921:}
页: [1] 2 3 4 5 6
查看完整版本: python爬取B站视频,实现实时进度查看,番剧下载全集