吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 4360|回复: 56
上一主题 下一主题
收起左侧

[Python 原创] 获取腾、B、爱、芒、优平台弹幕

  [复制链接]
跳转到指定楼层
楼主
2513002960 发表于 2024-6-5 11:21 回帖奖励
本帖最后由 2513002960 于 2024-12-18 09:16 编辑

零、效果展示

1、更新指定链接弹幕

2、更新指定剧集弹幕


3、更新onelist所有弹幕

onelist见:基于alist的影视墙-可加载弹幕
https://www.52pojie.cn/thread-1931639-1-1.html
(出处: 吾爱破解论坛)

获取后会自动上传到alist对应的目录

一、使用教程

为了方便打包exe,我把所有的代码都放在了一个py文件内,所以代码会比较乱

1、使用更新onelist所有弹幕

①、配置config.ini文件


onelist_url:如你在本地搭建的,那么访问网址就是http://127.0.0.1:5246/
onelist_authorization:抓包获取,F12随便找一个接口就可以看到了

alist_host:alist的主页网址
alist_token:在alist的管理-设置-其他可以看到令牌选项

②、双击exe或运行py文件即可

2、更新指定弹幕

双击exe或运行py文件即可
输入影视名的话需要输入第几季和是否为电视剧
第几季为[forecolor=red]数字[/forecolor]
是否为电视剧:电视剧为1.其他直接回车

输入链接直接回车即可,这个链接是影视平台的链接

三、主要代码


# region 弹幕获取相关函数
import base64
import hashlib
import json
import re
import time
import traceback
import zlib
from dataclasses import dataclass, field, asdict
from typing import List
from urllib.parse import urljoin
from venv import logger

import parsel
# import requests
import xmltodict
from retrying import retry
from tqdm import tqdm

from Fuctions.Fuction import request_data

from curl_cffi import requests

@dataclass
class DanMuType:
    text: str = ''  # 弹幕文本
    time: int = 0  # 弹幕时间
    mode: int = 1  # 弹幕模式
    color: str = '#FFFFFF'  # 弹幕颜色
    border: bool = False  # 弹幕是否有描边
    style: dict = field(default_factory=dict)  # 弹幕自定义样式
    other: dict = field(default_factory=dict)  # 其他数据

    def __dict__(self):
        return dict(
            text=self.text.replace('&#', ''),
            time=int(self.time),
            mode=self.mode,
            color=str(self.color) if isinstance(self.color, str) and self.color.startswith(
                "#") else f'#{int(self.color):06X}',
            border=self.border,
            style=self.style
        )

    def escape_xml(self):
        # 定义需要转义的字符及其对应的转义序列
        escape_chars = {
            '&': '&',
            '<': '<',
            '>': '>',
            '"': '"',
            "'": '''
        }

        # 按照需要转义的字符顺序进行替换
        for char, escaped_char in escape_chars.items():
            self.text = self.text.replace(char, escaped_char)

        return self.text

@dataclass
class RetDanMuType:
    list: List[DanMuType]
    xml: str

class DataclassJSONEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, DanMuType):
            return obj.__dict__()
        return super().default(obj)

class GetDanmuBase(object):
    base_xml = '''<?xml version="1.0" encoding="utf-8"?>
<i>
{}
</i>'''
    data_list = []
    name = ""
    domain = ""

    def error(self, msg):
        return {
            "msg": msg,
            "start": 500,
            "data": None,
            "name": self.name
        }

    def success(self, data):
        return {
            "msg": "success",
            "start": 0,
            "data": data,
            "name": self.name
        }

    def get_data_dict(self) -> DanMuType:
        return DanMuType()

    def main(self, url) -> list[DanMuType]:
        """
        获取弹幕的主逻辑
        """
        pass

    def parse(self):
        """
        解析返回的原始数据
        :param _type: 数据类型,xml 或 list
        """
        pass

    def get(self, url, _type='json'):
        self.data_list = []
        try:
            data = self.main(url)
            return RetDanMuType(json.loads(json.dumps(data, cls=DataclassJSONEncoder)),
                                self.base_xml.format('\n'.join([self.list2xml(d) for d in data])))
        except Exception as e:
            return self.error(f"程序出现错误:{traceback.print_exc()}")

    def getImg(self, url):
        """
        获取弹幕的表情链接
        """
        return []

    def list2xml(self, data: DanMuType):
        xml_str = f'    <d p="{data.time},{data.mode},{data.style.get("size", 25)},{int(data.color[1:], 16) if isinstance(data.color, str) and data.color.startswith("#") else data.color},0,0,0,0">{data.escape_xml()}</d>'
        return xml_str

    def time_to_second(self, _time: list):
        s = 0
        m = 1
        for d in _time[::-1]:
            s += m * int(d)
            m *= 60
        return s

class GetDanmuTencent(GetDanmuBase):
    name = "腾讯视频"
    domain = "v.qq.com"

    def __init__(self):
        self.api_danmaku_base = "https://dm.video.qq.com/barrage/base/"
        self.api_danmaku_segment = "https://dm.video.qq.com/barrage/segment/"

    def parse(self):
        data_list = []
        for data in tqdm(self.data_list):
            for item in data.get("barrage_list", []):
                _d = self.get_data_dict()
                _d.time = int(item.get("time_offset", 0)) / 1000
                _d.text = item.get("content", "")
                _d.other['create_time'] = item.get('create_time', "")
                if item.get("content_style") != "":
                    content_style = json.loads(item.get("content_style"))
                    if content_style.get("color") != "ffffff":
                        _d.color = int(content_style.get("color", "ffffff"), 16)
                data_list.append(_d)
        return data_list

    def main(self, url):
        self.data_list = []
        # res = request_data("GET", url)
        # sel = parsel.Selector(res.text)
        # title = sel.xpath('//title/text()').get()
        # vid = re.findall(f'"title":"{title}","vid":"(.*?)"', res.text)[-1]
        # if not vid:
        vid = re.search("/([a-zA-Z0-9]+)\.html", url)
        if vid:
            vid = vid.group(1)
        if not vid:
            return self.error("解析vid失败,请检查链接是否正确")
        res = request_data("GET", urljoin(self.api_danmaku_base, vid))
        if res.status_code != 200:
            return self.error("获取弹幕详情失败")

        for k, segment_index in res.json().get("segment_index", {}).items():
            self.data_list.append(
                request_data("GET",
                             urljoin(self.api_danmaku_segment,
                                     vid + "/" + segment_index.get("segment_name", "/"))).json())
        parse_data = self.parse()
        return parse_data

    def getImg(self, url):
        vid = re.search("/([a-zA-Z0-9]+)\.html", url)
        if vid:
            vid = vid.group(1)
        if not vid:
            return self.error("解析vid失败,请检查链接是否正确")
        cid = url.split('/')[-2]
        data = {
            "vid": vid,
            "cid": cid,
            "lid": "",
            "bIsGetUserCfg": True
        }
        res = request_data("POST",
                           "https://pbaccess.video.qq.com/trpc.danmu.danmu_switch_comm.DanmuSwitch/getVideoDanmuSwitch",
                           json=data)
        danmukey = res.json().get('data', {}).get('registResultInfo', {}).get('dataKey')
        res = request_data("POST", "https://pbaccess.video.qq.com/trpc.message.danmu_richdata.Richdata/GetRichData",
                           json={
                               "danmu_key": danmukey,
                               "vip_degree": 0
                           },
                           headers={
                               'origin': "https://v.qq.com",
                               'referer': "https://v.qq.com",
                           })
        emoji_infos = res.json().get('data', {}).get('emoji_configs', {}).get('emoji_infos', [])
        emoji_data_list = []
        for emoji_info in emoji_infos:
            emoji_data_list.append(
                {
                    'emoji_code': emoji_info.get('emoji_code'),
                    'emoji_url': emoji_info.get('emoji_url'),
                }
            )
        return emoji_data_list

class GetDanmuBilibili(GetDanmuBase):
    name = "B站"
    domain = "bilibili.com"

    def __init__(self):
        self.api_video_info = "https://api.bilibili.com/x/web-interface/view"
        self.api_epid_cid = "https://api.bilibili.com/pgc/view/web/season"

    def parsel(self, xml_data):
        data_list = re.findall('<d p="(.*?)">(.*?)<\/d>', xml_data)
        for data in tqdm(data_list):
            _d = self.get_data_dict()
            _d.text = data[1]
            data_time = data[0].split(",")
            _d.time = int(data_time[0])
            _d.mode = data_time[1]
            _d.style['size'] = data_time[2]
            _d.color = data_time[3]
            self.data_list.append(_d)
        return self.data_list

    def main(self, url: str):
        # 番剧
        if url.find("bangumi/") != -1 and url.find("ep") != -1:
            epid = url.split('?')[0].split('/')[-1]
            params = {
                "ep_id": epid[2:]
            }
            res = request_data("GET", url=self.api_epid_cid, params=params, impersonate="chrome110")
            res_json = res.json()
            if res_json.get("code") != 0:
                return self.error("获取番剧信息失败")
            for episode in res_json.get("result", {}).get("episodes", []):
                if episode.get("id", 0) == int(epid[2:]):
                    xml_data = request_data("GET", f'https://comment.bilibili.com/{episode.get("cid")}.xml',
                                            impersonate="chrome110").text
                    return self.parsel(xml_data)

class GetDanmuIqiyi(GetDanmuBase):
    name = "爱奇艺"
    domain = "iqiyi.com"

    def parse(self):
        data_list = []
        for data in tqdm(self.data_list):
            # 解压缩数据
            decompressed_data = zlib.decompress(data)
            data = decompressed_data.decode('utf-8')
            for d in re.findall('<bulletInfo>.*?</bulletInfo>', data, re.S):
                d = d.replace('&#', '')
                try:
                    d_dict = xmltodict.parse(d).get("bulletInfo")
                    _d = self.get_data_dict()
                    _d.time = int(d_dict.get("showTime"))
                    _d.text = d_dict.get("content")
                    _d.color = int(d_dict.get("color"), 16)
                    _d.style["size"] = int(d_dict.get("font"))
                    data_list.append(_d)
                except Exception as e:
                    logger.error(e)
                    pass
        return data_list

    def main(self, url):
        _req = requests.Session(impersonate="chrome124")
        res = _req.request("GET", url,
                           headers={"Accept-Encoding": "gzip,deflate,compress"}, impersonate="chrome124")
        res.encoding = "UTF-8"
        js_url = re.findall(r'<script src="(.*?)" referrerpolicy="no-referrer-when-downgrade">', res.text)[0]
        res = _req.request('GET', f'https:{js_url}', headers={'referer': url})
        tv_id = re.findall('"tvId":([0-9]+)', res.text)[0]
        i = 1
        while True:
            url = f"https://cmts.iqiyi.com/bullet/{tv_id[-4:-2]}/{tv_id[-2:]}/{tv_id}_300_{i}.z"
            params = {
            }
            r = request_data("GET", url=url, params=params,
                             headers={'Content-Type': 'application/octet-stream'})
            if r.status_code == 404: break
            self.data_list.append(r.content)
            i += 1

        parse_data = self.parse()
        return parse_data

class GetDanmuMgtv(GetDanmuBase):
    name = "芒果TV"
    domain = "mgtv.com"

    def __init__(self):
        self.api_video_info = "https://pcweb.api.mgtv.com/video/info"
        self.api_danmaku = "https://galaxy.bz.mgtv.com/rdbarrage"

    def parse(self):
        data_list = []
        for data in tqdm(self.data_list):
            if data.get("data", {}).get("items", []) is None:
                continue
            for d in data.get("data", {}).get("items", []):
                _d = self.get_data_dict()
                _d.time = d.get('time', 0) / 1000
                _d.text = d.get('content', '')
                data_list.append(_d)
        return data_list

    def main(self, url):
        _u = url.split(".")[-2].split("/")
        cid = _u[-2]
        vid = _u[-1]
        params = {
            'cid': cid,
            'vid': vid,
        }
        res = request_data("GET", url=self.api_video_info, params=params)
        _time = res.json().get("data", {}).get("info", {}).get("time")
        end_time = self.time_to_second(_time.split(":")) * 1000
        for _t in range(0, end_time, 60 * 1000):
            self.data_list.append(
                request_data("GET", self.api_danmaku, params={
                    'vid': vid,
                    "cid": cid,
                    "time": _t
                }).json()
            )
        parse_data = self.parse()
        return parse_data

class GetDanmuYouku(GetDanmuBase):
    name = "优酷"
    domain = "v.youku.com"

    def __init__(self):
        super().__init__()
        self.req = requests.Session()
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
        }
        self.get_cna()
        self.get_tk_enc()

    @retry(stop_max_attempt_number=5, wait_random_min=1000, wait_random_max=2000)
    def request_data(self, method, url, status_code=None, **kwargs):
        """
        发送请求
        :param method: 请求方式
        :param url: 请求URL
        :param status_code: 成功的状态码
        :param kwargs:
        :return:
        """
        res = self.req.request(method, url, **kwargs)
        # res.encoding = res.apparent_encoding
        if status_code:
            if res.status_code == status_code:
                return res
            else:
                return
        return res

    def get_cna(self):
        url = "https://log.mmstat.com/eg.js"
        res = self.request_data("GET", url, headers=self.headers)

    def get_tk_enc(self):
        res = self.request_data("GET",
                                "https://acs.youku.com/h5/mtop.com.youku.aplatform.weakget/1.0/?jsv=2.5.1&appKey=24679788",
                                headers=self.headers)
        if '_m_h5_tk' in res.cookies.keys() and '_m_h5_tk_enc' in res.cookies.keys():
            return True
        return False

    def get_vinfos_by_video_id(self, video_id):
        url = "https://openapi.youku.com/v2/videos/show.json"
        params = {
            'client_id': '53e6cc67237fc59a',
            'video_id': video_id,
            'package': 'com.huawei.hwvplayer.youku',
            'ext': 'show',
        }
        res = self.request_data("GET", url, params=params, headers=self.headers)
        return res.json().get('duration')

    def get_msg_sign(self, msg_base64):
        secret_key = 'MkmC9SoIw6xCkSKHhJ7b5D2r51kBiREr'
        combined_msg = msg_base64 + secret_key
        hash_object = hashlib.md5(combined_msg.encode())
        return hash_object.hexdigest()

    def yk_t_sign(self, token, t, appkey, data):
        text = '&'.join([token, t, appkey, data])
        md5_hash = hashlib.md5(text.encode())
        return md5_hash.hexdigest()

    def parse(self):
        data_list = []
        for data in tqdm(self.data_list):
            result = json.loads(data.get('data', {}).get('result', {}))
            if result.get('code', '-1') == '-1':
                continue
            danmus = result.get('data', {}).get('result', [])
            for danmu in danmus:
                _d = self.get_data_dict()
                _d.time = danmu.get('playat') / 1000
                _d.color = json.loads(danmu.get('propertis', '{}')).get('color', _d.color)
                _d.text = danmu.get('content')
                data_list.append(_d)
        return data_list

    def main(self, url):

        video_id = url.split('?')[0].split('/')[-1].replace("id_", '').split('.html')[0]
        max_mat = self.get_vinfos_by_video_id(video_id)
        for mat in range(0, int(float(max_mat) / 60) + 1):
            url = "https://acs.youku.com/h5/mopen.youku.danmu.list/1.0/"
            msg = {
                'ctime': int(time.time() * 1000),
                'ctype': 10004,
                'cver': 'v1.0',
                'guid': self.req.cookies.get('cna'),
                'mat': mat,
                'mcount': 1,
                'pid': 0,
                'sver': '3.1.0',
                'type': 1,
                'vid': video_id
            }
            msg['msg'] = base64.b64encode(json.dumps(msg).replace(' ', '').encode('utf-8')).decode('utf-8')
            msg['sign'] = self.get_msg_sign(msg['msg'])
            t = int(time.time() * 1000)
            params = {
                'jsv': '2.5.6',
                'appKey': '24679788',
                't': t,
                'sign': self.yk_t_sign(self.req.cookies.get('_m_h5_tk')[:32], str(t), '24679788',
                                       json.dumps(msg).replace(' ', '')),
                'api': 'mopen.youku.danmu.list',
                'v': '1.0',
                'type': 'originaljson',
                'dataType': 'jsonp',
                'timeout': '20000',
                'jsonpIncPrefix': 'utility'
            }
            headers = self.headers.copy()
            headers['Content-Type'] = 'application/x-www-form-urlencoded'
            headers['Referer'] = 'https://v.youku.com'
            res = self.request_data("POST", url, data={"data": json.dumps(msg).replace(' ', '')}, headers=headers,
                                    params=params)
            self.data_list.append(res.json())
        parse_data = self.parse()
        return parse_data

    def getImg(self, url):
        api_url = 'https://acs.youku.com/h5/mtop.youku.danmu.common.profile/1.0'
        video_id = url.split('?')[0].split('/')[-1].replace("id_", '').split('.html')[0]
        msg = {
            'pid': 0,
            'ctype': 10004,
            'sver': '3.1.0',
            'cver': 'v1.0',
            'ctime': int(time.time() * 1000),
            'guid': self.req.cookies.get('cna'),
            'vid': video_id
        }
        msg['msg'] = base64.b64encode(json.dumps(msg).replace(' ', '').encode('utf-8')).decode('utf-8')
        msg['sign'] = self.get_msg_sign(msg['msg'])
        t = int(time.time() * 1000)
        params = {
            'jsv': '2.5.6',
            'appKey': '24679788',
            't': t,
            'sign': self.yk_t_sign(self.req.cookies.get('_m_h5_tk')[:32], str(t), '24679788',
                                   json.dumps(msg).replace(' ', '')),
            'api': 'mopen.youku.danmu.list',
            'v': '1.0',
            'type': 'originaljson',
            'dataType': 'jsonp',
            'timeout': '20000',
            'jsonpIncPrefix': 'utility'
        }
        headers = self.headers.copy()
        headers['Content-Type'] = 'application/x-www-form-urlencoded'
        headers['Referer'] = 'https://v.youku.com'
        res = self.request_data("POST", api_url, data={"data": json.dumps(msg).replace(' ', '')}, headers=headers,
                                params=params)
        emoji_infos = res.json().get('data', {}).get('data', {}).get('danmuEmojiEnter', {}).get('danmuDynamicEmojiVO',
                                                                                                [])
        emoji_data_list = []
        for emoji_info in emoji_infos:
            emoji_data_list.append(
                {
                    'emoji_code': emoji_info.get('subtext'),
                    'emoji_url': emoji_info.get('previewPic'),
                }
            )
        return emoji_data_list

if __name__ == '__main__':
    print(GetDanmuBase.__subclasses__())
    danmu = GetDanmuTencent()
    a = danmu.get(
        "https://v.qq.com/x/cover/mzc002007sqbpce/r4100zvgz9i.html?ptag=newdouban.tv",
    )
    print()
    # danmu = GetDanmuMgtv()
    # a = danmu.get(
    #     "https://www.mgtv.com/b/594763/20422016.html?fpa=1217&fpos=&lastp=ch_home",
    #     _type="xml")
    # print()

# endregion


下载链接:https://shuxiaobai.lanzouw.com/i60FY20vtvfe

免费评分

参与人数 8吾爱币 +15 热心值 +7 收起 理由
weilai911 + 1 + 1 我很赞同!
lidong234ok + 1 + 1 牛啊,终于找到下载弹幕的软件呢
stone102 + 1 + 1 热心回复!
renrenrenjl + 1 + 1 我很赞同!
lztym + 1 + 1 很好用,希望后续更加完善!
wushaominkk + 7 + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!
天空宫阙 + 2 + 1 感谢发布原创作品,吾爱破解论坛因你更精彩!
WTYCUG + 1 我很赞同!

查看全部评分

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

推荐
asong95 发表于 2024-12-15 21:41
本帖最后由 asong95 于 2024-12-15 22:15 编辑

今天下载下来使用了一会,怎么下载了两三个电视剧弹幕后,现在输入完电视剧名,第几季,是否为电视剧输入1 ,之后就直接闪退了,之前最起码找不到 会让任意键退出,现在直接闪退了,什么都找不到了,是什么情况?麻烦给解答下,谢谢~下面是报错信息

image.png (397.92 KB, 下载次数: 0)

报错信息

报错信息
推荐
asong95 发表于 2024-12-16 01:00
本帖最后由 asong95 于 2024-12-16 18:54 编辑
asong95 发表于 2024-12-15 21:41
今天下载下来使用了一会,怎么下载了两三个电视剧弹幕后,现在输入完电视剧名,第几季,是否为电视剧输入1  ...

看了代码,打了日志,定位到是去豆瓣获取影视列表时,访问异常了,需要手动去访问下豆瓣的那个接口,后面代码就可以正常走了,,,不过豆瓣这个接口感觉调用几次就提示太频繁,这个地方就报错,走不下去了。。。

image.png (141.55 KB, 下载次数: 0)

image.png
沙发
ZhjhJZ 发表于 2024-6-5 11:44
3#
yunchu 发表于 2024-6-5 11:59
感觉有点儿厉害
4#
pbmmm 发表于 2024-6-5 12:07
想知道获取这个弹幕有什么用?
5#
twl102841 发表于 2024-6-5 12:31
弹幕能下到本地吗
6#
Yifan2007 发表于 2024-6-5 13:11
pbmmm 发表于 2024-6-5 12:07
想知道获取这个弹幕有什么用?

一般是做词云用的,分析情感指数。。等等
7#
moye15 发表于 2024-6-5 13:22
大牛!!!!!!
8#
cyh92 发表于 2024-6-5 13:53
厉害了!
9#
 楼主| 2513002960 发表于 2024-6-5 14:16 |楼主
twl102841 发表于 2024-6-5 12:31
弹幕能下到本地吗

可以的,用获取指定弹幕的程序就可以
10#
 楼主| 2513002960 发表于 2024-6-5 14:17 |楼主
pbmmm 发表于 2024-6-5 12:07
想知道获取这个弹幕有什么用?

加载到视频网站看,有弹幕看视频更对味~
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2025-2-19 03:24

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表