吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 4499|回复: 4
收起左侧

[Python 原创] 【原创源码】【Python】教你从0搭建微信推送斗鱼直播提醒(单房间简化版)

  [复制链接]
Loker 发表于 2021-5-28 11:10
【原创源码】【PHP Python HTML】教你从0搭建微信推送斗鱼直播提醒之后
重新推出重制版,单房间推送提醒版本(后文有扩展多房间思路),适合没有服务器的小伙伴学习使用。

Gitee文档同步更新:https://gitee.com/LGW_space/dy-live-lintener
语雀文档更为详细(包含成品案例):https://www.yuque.com/books/share/fdc7c120-e4eb-47d5-93de-dcf5d347e5a8?# 《斗鱼直播提醒服务》


斗鱼直播提醒 ——简化版

功能介绍:

  • 主播开播后或更换房间标题后推送提醒到微信(单房间)

    > 理论上可以配置多个房间,可以达到和斗鱼全房间推送的效果一样。

项目截图:

微信截图_20210527164432.png

[TOC]

一、项目准备

1. 用到的技术内容

  • 云函数
  • 对象存储(为什么使用对象存储,下文有介绍)
  • python(3.6/3.7)
  • WxPusher

2. 需要的工具

  • 腾讯云(云函数和云对象存储)

二、开通对象存储

这里我们使用对象存储相当于数据库的一个作用,为什么使用对象存储呢?
> 对象存储采用扁平的文件组织方式,所以在文件量上升至千万、亿级别,容量在PB级别的时候,这种文件组织方式下的性能优势就显现出来了,文件不在有目录树深度的问题,历史和近线数据有同样的访问效率。另外,对象存储多采用分布式架构,

简而言之就是便宜效率高

官方入门步骤

1. 注册腾讯云

官方的注册方法

2. 开通 COS 服务

在 腾讯云控制台 中,选择【云产品】>【对象存储】,进入 COS 控制台,按照界面提示开通 COS 服务。(如果您已开通,请跳过该步骤。)

3. 创建存储桶

我们需要创建一个用于存放对象的存储桶:

  1. 在 对象存储控制台 左侧导航栏中单击【存储桶列表】,进入存储桶管理页。

  2. 单击【创建存储桶】,输入以下配置信息,其他配置保持默认即可。

    • 名称:输入存储桶名称。名称设置后不可修改。此处举例输入 examplebucket。
    • 所属地域:存储桶所属地域,选择与您业务最近的一个地区,例如广州地域。
    • 访问权限:存储桶访问权限,此处我们保持默认为“私有读写”。
  3. 单击【确定】,即可创建完成。

三、云函数的开发

云函数我们使用腾讯云函数作为示例,阿里云也同理。(阿里云似乎是不允许触发器的触发周期低于1分钟)

1. 注册腾讯云

官方的注册方法

2. 创建云函数

  • 登录腾讯云,在云产品里找到云函数,在【函数服务】里新建云函数
  • 选择【自定义创建】
  • 修改【函数名称】、【地域】、【运行环境(Python3.6)】、【提交方法(在线编辑)】、【执行方法(index.main_handler)】暂时不用修改默认代码。
  • 点击完成

3. 完善功能

在写代码之前,我们需要用到这几个关键信息:

  • appid :你的APPID(账号信息里有)
  • secret_id : 你的 SecretId (API密钥管理里获取)
  • secret_key :你的 SecretKey(API密钥管理里获取)
  • region :存储桶所在的地域
  • BUCKET:存储桶名称
  • token :没有可为空
  • FILE_NAME :你要上传的文件名 如room_info
  • WxPusherToken :WxPusher的用户appToken
3.1 引入所需包,完成基本配置
# -*- coding: utf8 -*-
from qcloud_cos_v5 import CosConfig
from qcloud_cos_v5 import CosS3Client
from qcloud_cos_v5 import CosServiceError
from qcloud_cos_v5 import CosClientError
import datetime
import json
import logging
import requests
from urllib.parse import parse_qs

logger = logging.getLogger()
logger.setLevel(logging.ERROR) # 默认打印 INFO 级别日志,可根据需要调整为 DEBUG、WARNING、ERROR、CRITICAL 级日志

appid = 123123  # Please replace with your APPID. 请替换为您的 APPID
secret_id = u'xxx'  # Please replace with your SecretId. 请替换为您的 SecretId
secret_key = u'xxx'  # Please replace with your SecretKey. 请替换为您的 SecretKey
region = u'ap-shanghai'  # Please replace with the region where COS bucket located. 请替换为您bucket 所在的地域
BUCKET = 'xxx'
FILE_NAME = 'xxx'
token = ''
WxPusherToken = 'xxxxxx'

# 配置存储桶
config = CosConfig(Secret_id=secret_id, Secret_key=secret_key, Region=region, Token=token)
client = CosS3Client(config)

注:qcloud_cos_v5 为 COS的SDK

3.2 创建方法 根据房间号获取直播状态

斗鱼第三方API,通过房间号获取房间信息,自行百度获取。
最后返回 房间信息

def getRoomInfo():
url = '斗鱼官方API/房间号'
try:
r = requests.get(url, timeout=5)
except requests.exceptions.RequestException as e:
print(str(e))
return 'timeout'
else:
if r.status_code == 404:
return '404'
json_str = json.loads(r.text)
if isinstance(json_str, dict):
if 'error' in json_str.keys() and json_str['error'] == 0:
room_info = json_str['data']
return room_info
3.3 创建方法 发送WxPusher消息
def send_WxPusher_msg(c):
    headers = {
        'Content-Type': 'application/json'
    }
    url = 'http://wxpusher.zjiecode.com/api/send/message'
    parameter = {
        "appToken": WxPusherToken,
        "content": c,
        "contentType": 1,    # 内容类型 1表示文字  2表示html(只发送body标签内部的数据即可,不包括body标签) 3表示markdown 
        "topicIds":[ 
            你的主题号
        ]
    }
    r = requests.post(url=url, headers=headers,
                      data=json.dumps(parameter, ensure_ascii=False).encode('utf-8'))
    json_str = json.loads(r.text)
    datas = json_str['data']
    for data in datas:
        if data['code'] == 1001:
            logger.error(data['status'])
3.4 创建方法 更新(上传)文件

设置更新时间字段,用于锁定更新,在指定时间内不允许更新/重复推送(我猜是腾讯云多线程的问题,会导致重复更新、重复推送)

def updateLocalInfo(roominfo):
DIC = {
"room_id": roominfo["room_id"],
"room_status": roominfo["room_status"],
"room_name": roominfo["room_name"],
"update_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
response = client.put_object(
Bucket=BUCKET,
Body=DIC,
Key=FILE_NAME
)
print("更新成功:",response['ETag'])
3.5 创建方法 获取文件

url获取:上传文件后,在存储桶内点击文件的详情基本信息对象地址信息
https://存储桶名.cos.地域.myqcloud.com/文件名

def getLocalInfo():
url = '文件的对象地址'
r = requests.get(url)
info_str = r.text
params = parse_qs(info_str)
result = {key: params[key][0] for key in params}
return result
3.6 创建方法 信息整合
def time_last(time_str): # 计算时间差
    t_r = datetime.datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
    now_time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    t_n = datetime.datetime.strptime(now_time_str, "%Y-%m-%d %H:%M:%S")
    return (t_n - t_r).seconds

def checkRoomInfo():
    room = getLocalInfo() # 获取历史房间信息
    curr_info = getRoomInfo() # 获取当前真实房间信息
    time_l = time_last(room['update_time']) # 计算当前时间和上次更新时间的时间差
    # 直播开播提醒
    if curr_info['room_status'] == "1": # 正在直播
        if room['room_status'] == "2":  # 之前状态为 未开播
            if time_l > 300:  # 300秒内无法重复推送
                content = '【开播】您关注的主播:' + curr_info['owner_name'] + '开始直播啦' + \
                            '\n房间标题:' + curr_info['room_name'] + \
                            '\n正在直播:' + curr_info['cate_name']
                send_WxPusher_msg(content)
                updateLocalInfo(roominfo=curr_info)
            else:
                logger.error(
                        '【开播】房间号:' + room['room_id'] + '\n上次更新:' + room['update_time'] + '\n时间差:' + str(
                            time_l))
    else:
        if room['room_status'] == "1":  # 关闭了直播
            updateLocalInfo(roominfo=curr_info)

    # 更换房间标题提醒
    if curr_info['room_name'] != room['room_name']:
        if time_l > 300:
            content1 = '【更换标题】您关注的主播:' + curr_info['owner_name'] + '改变了房间标题' + \
                        '\n当前房间标题:' + curr_info['room_name']
            send_WxPusher_msg(content1)
            updateLocalInfo(roominfo=curr_info)
        else:
                logger.error(
                        '【更换标题】房间号:' + room['room_id'] + '\n上次更新:' + room['update_time'] + '\n时间差:' + str(
                            time_l))

到这所有工能就基本全部完成啦~

开发扩展思路

多房间支持:支持一个房间就一定支持多个房间。

  • 下载文件:文件内容为字典/json数组,每一个字典记录房间的信息。
  • 遍历:获取到存储桶内的房间信息后,用斗鱼API遍历得到每一个房间的真实信息。
  • 更新/上传:通过比对真实信息,更新字典数组,最后再上传到COS。

免费评分

参与人数 5吾爱币 +5 热心值 +5 收起 理由
pqkane + 1 + 1 我很赞同!
hudongliang + 1 + 1 谢谢@Thanks!
ccwuax + 1 + 1 感谢发布原创作品,吾爱破解论坛因你更精彩!
loker嘎嘎 + 1 + 1 感谢楼主大大分享( •̀ ω •́ )✧.
zoenbo + 1 + 1 大佬出品 双手点赞~

查看全部评分

本帖被以下淘专辑推荐:

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

hudongliang 发表于 2021-5-28 18:38
感谢楼主奉献
第一品霄 发表于 2021-5-29 11:20
地狱猫 发表于 2021-9-29 20:02
你开发业务系统吗?现在网上有很多零代码搭建系统的平台,我们想借助这样的平台搭建业务系统,然后布置到我们自己的服务器。
 楼主| Loker 发表于 2021-9-29 20:53
地狱猫 发表于 2021-9-29 20:02
你开发业务系统吗?现在网上有很多零代码搭建系统的平台,我们想借助这样的平台搭建业务系统,然后布置到我 ...

开源的零代码平台有很多,在Gitee上也有很多开源项目。
简单的我可以帮你搭建和基础开发,复杂的我也没空弄。
具体业务可以私聊我详谈,但是我没大有时间。。。
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2025-1-13 17:37

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表