[Python] 纯文本查看 复制代码
import hashlib
import os
import re
import subprocess as sub
import time
from urllib import parse
from pyfiglet import Figlet
import requests
import urllib3
from loguru import logger as log
from rich.console import Console
import threading
import signal
import urllib
from HEADERS import get_random_UA
urllib3.disable_warnings()
log.add('qi_e_log.log',encoding='utf8',level='INFO')
BASE_PATH = os.path.dirname(__file__)
def filter_title(title):
"""
过滤文件名
"""
new_title = re.sub(r"|[\\/:*?\"<>|]+", "", title.strip())
return new_title
@log.catch
def record_flv(url,rtmp_url):
'''
录制flv
'''
nick_name,room_name = get_room_info(url)
path = _mkdir(nick_name)
time_str = time.strftime('%Y_%m_%d_%H_%M_%S', time.localtime(time.time()))
file_path_name = os.path.join(path,room_name + '_' + time_str +'.flv')
log.info('开始录制,播主:{},房间名称:{}\n保存位置:{}'.format(nick_name,room_name,file_path_name))
opener = urllib.request.build_opener()
opener.addheaders = [('Referer','https://live.qq.com/')]
urllib.request.install_opener(opener)
urllib.request.urlretrieve(rtmp_url, file_path_name)
@log.catch
def figlet_font():
f = Figlet(font='slant')
print(f.renderText('Qi-E Record'))
@log.catch
def get_room_info(url):
'''
获取房间信息
'''
res = requests.get(url)
room_name = re.findall(r'"room_name":"(.+?)","user_chat_level":',res.text)
nick_name = re.findall(r'"nickname":"(.+?)","child_name"',res.text)
room_name = filter_title(room_name[0])
nick_name = filter_title(nick_name[0])
return nick_name,room_name
@log.catch
def _mkdir(nickname):
'''
创建文件夹
'''
path = os.path.join(os.getcwd(),'qi_e',nickname)
if not os.path.exists(path):
os.makedirs(path)
return path
@log.catch
def get_params(url):
'''
获取加密参数
'''
key = 'RgP7DW01naDYQSV0'
room_id = url.split('/')[-1]
time_params = int((int(time.time()) / 60000)*1000)
keys = url.split('/')[-1] + key + str(time_params)
bkeys = keys.encode('utf8')
sign = hashlib.md5(bkeys).hexdigest()
return room_id,time_params,sign
@log.catch
def get_video_url(url):
'''
获取直播流地址
'''
# 'https://live.qq.com/swf_api/room/10165096?cdn=ws&nofan=yes&_t=27741715&sign=b9c7d334d44a7115c20850a5e8667e16'
room_id,time_params,sign = get_params(url)
request_url = 'https://live.qq.com/swf_api/room/{room_id}?cdn=ws&nofan=yes&_t={time}&sign={sign}'.format(room_id=room_id, time=time_params,sign=sign)
global headers
headers = {'User-Agent': get_random_UA(),'Connection': 'close'}
res = requests.get(request_url,headers=headers,verify=False).json()
rtmp_url = parse.urljoin(res.get('data').get('rtmp_url'),res.get('data').get('rtmp_live'))
return rtmp_url
def start_recording(url,record_url):
'''
调用录屏函数
'''
nick_name,room_name = get_room_info(url)
__ffmepg = os.path.join(BASE_PATH,'ffmpeg.exe')
path = _mkdir(nick_name)
time_str = time.strftime('%Y_%m_%d_%H_%M_%S', time.localtime(time.time()))
file_path_name = os.path.join(path,room_name + '_' + time_str +'.mp4')
log.info('开始录制,播主:{},房间名称:{}\n保存位置:{}'.format(nick_name,room_name,file_path_name))
cmd = [__ffmepg, "-y",
"-v","verbose",
"-timeout","2000000",
"-loglevel","error",
"-hide_banner",
"-user_agent",headers.get("User-Agent"),
"-referer",'https://live.qq.com/'
"-analyzeduration","2147483647",
"-probesize","2147483647",
"-i",record_url,
'-bufsize','5000k',
"-map","0",
"-sn","-dn",
'-max_muxing_queue_size','64',
file_path_name]
sub.check_call(cmd)
def replay_recording(url,num):
'''
循环监控录屏状态
'''
while True:
try:
if int(num) == 1:
rtmp_url = get_video_url(url)
start_recording(url,rtmp_url)
time.sleep(5)
elif int(num) == 2:
rtmp_url = get_video_url(url)
record_flv(url,rtmp_url)
time.sleep(5)
except Exception as e:
if 'returned non-zero exit status 255.' in str(e):
break
elif 'returned non-zero exit status 1' in str(e):
log.error('录屏失败,主播未开播或者网络异常,8秒重新尝试,如需退出请关闭窗口!')
time.sleep(8)
elif 'unknown url type' in str(e):
log.error('录屏失败,主播未开播或者网络异常,8秒重新尝试,如需退出请关闭窗口!')
time.sleep(8)
else:
log.exception(e)
@log.catch
def exit_recording(handler,frame):
"""
退出录屏
"""
log.warning('退出录屏')
os._exit(0)
@log.catch
def main(url,num):
"""
主逻辑函数
"""
# 按Ctrl+C 退出
record = threading.Thread(target=replay_recording,daemon=True,args=(url,num))
record.start()
signal.signal(signal.SIGINT, exit_recording)
record.join()
if __name__ == '__main__':
Console().print(f'[yellow]欢迎使用话痨的企鹅录屏[/yellow]\n[blue]{"=="*50}[/blue]\n[red]请输入这样的的直播间地址\'https://live.qq.com/10163895\'[/red]')
Console().print("[red]{:*^100}[/red]".format('按下Ctrl+C退出直播录屏'))
# url = 'https://live.qq.com/10163895' # 直播间地址
url = input('请输入直播间地址:').strip()
num = input('请输入数字->1.录制mp4;->2.录制flv:').strip()
if int(num) == 2:
Console().print(f'[yellow]欢迎使用话痨的企鹅录屏[/yellow]\n[blue]{"=="*50}[/blue]\n[red]请输入这样的的直播间地址\'https://live.qq.com/10163895\'[/red]')
Console().print("[red]{:*^100}[/red]".format('停止录播直接关闭窗口'))
main(url,num)