好友
阅读权限10
听众
最后登录1970-1-1
|
本帖最后由 caliph21 于 2024-3-12 18:30 编辑
更新
[Python] 纯文本查看 复制代码 #TVBox本地直播源高效检测
#更新更新更新>>>
#૧(●´৺`●)૭
#✔原文件还是可以用的
#新文件为改写方式45行
#get_parse_href_result中2种情况X
#1.不通报错:exept
#2.直播源响应时间<=500毫秒(建议)
#当然也可设置成1000ms
#增加了移动/电信/联通信号测试,结果是不一样的,文件后缀也不一样
#128.131行添加status_codw是非200不显示问题
#更新时间:20240312
import os,requests,time
from urllib.parse import urlparse
def local_live_check(filename):
print('''#1.中国移动通信-----chinamobile CMCC
#2.中国联通通讯-----chinaunicom CUCC
#3.中国电信 ------chinatelecom CTCC''')
select=int(input('\nY select:'))
if select==1:suffix='yd'
elif select==2:suffix='lt'
elif select==3:suffix='dx'
path = os.path.abspath(filename)
print(path)
dir, file = os.path.split(path)
# dir,file = os.path.split(file_path)
# print(dir,file)“
# basename=os.path.basename(filename)
files = os.path.splitext(file)
newfile = os.path.join(dir, files[0] + f'_{suffix}' + files[1])
print(newfile)
#if not os.path.isfile(newfile):
#f = open(newfile, 'w')
#f.close()
# print(os.path.isfile(newfile))
lives_data = get_lives_data(filename)
# print(lives_data)
test_url(newfile, lives_data)
def get_lives_data(filename):
f=open(filename,'r+')
r = f.readlines()
lives_data = [x.strip() for x in r if x.strip() != '']
# lives_data= list(map(lambda x: x.strip(), r))
# lives_data=lives_data.remove('')
f.close()
return lives_data
def test_url(newfile,lives_data):
# ll是电视直播源的链接列表
# ll=['http://........','https://.......']
invalids, valids = [], []
# 用于检测失败或成功的net,不再检测,提高效率
#l=lives_data.index('🌳电影直播,#genre#')
with open(newfile, 'w+') as f:
#for line in lives_data[:]:
for line in lives_data:
if line.find(',http') != -1:
name = line.split(',http')[0]
urls = 'http' + line.split(',http')[-1]
if urls.find('#') != -1:
hrefs = urls.split('#')
else:
hrefs = [urls]
if len(hrefs) == 1:
url_parse = urlparse(hrefs[0]).netloc
# print(url_parse,invalids,valids)
if url_parse not in invalids:
# print('url_parse not in invalids')
result = get_parse_href_result(name, hrefs[0], valids, f)
invalids = list(set(invalids + result[0]))
valids = list(set(valids + result[1]))
else:
print(f'[无效] {name} -')
# print(f'{hrefs[0]}')
else: # 包含#
content = name + ','
for i in range(len(hrefs)):
url_parse = urlparse(hrefs[i]).netloc
if url_parse not in invalids:
result2 = \
get_parse_href_result2(name, hrefs[i], valids, f)
nvalids = list(set(invalids + result2[0]))
valids = list(set(valids + result2[1]))
content += result2[2]
else:
print(f'[无效] {name} -')
# print(f'{hrefs[i]}')
if content[:-1] != name:
f.write(content[:-1] + '\n')
else:
if line[-7:] == '#genre#':f.write('\n' + line + '\n')
else:f.write(line + '\n')
f.close()
print(f'\n🈚效集合√:\n{invalids}')
print(f'\n🈶效集合X:\n{valids}')
def get_parse_href_result(name, href, valids, f):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36'}
invalid, valid = [], []
now = time.time()
try:
# print(hrefs[0])
netloc = urlparse(href).netloc
# print(netloc not in valids)
if netloc not in valids:
# print('get:url_parse not in valids')
res = requests.get(href, headers=headers, timeout=5, stream=True)
ms_=res.elapsed.total_seconds()*1000
res_time=round(ms_)#ms
#print(res_time,type(res_time))
#print(res.status_code)
if res.status_code == 200:
for chunk in res.iter_content(chunk_size=1024):
if chunk:
#print(f'{time.time() - now:.2f}\t{name}') 1m=1000ms
if int(res_time)>500:#
print(f'[X][{res_time}ms] {name}')
else:
print(f'[✔][{res_time}ms] {name}')
valid += [netloc]
content = name + ',' + href + '\n'
# print(content)
f.write(content)
break
else:
print(f'[X] {name}, status_code: [{res.status_code}]')
invalid += [urlparse(href).netloc]
else:
print(f'{time.time() - now:.2f}\t{name} +')
content = name + ',' + href + '\n'
f.write(content)
except Exception:
invalid += [urlparse(href).netloc]
# 无法连接并超时的情况下输出“X”
print(f'无法连接或超时 [X] {name}')
# print(f'{href}')
return invalid, valid
def get_parse_href_result2(name,href,valids,f):
#带#
headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36'}
invalid,valid,content=[],[],''
now=time.time()
try:
netloc=urlparse(href).netloc
if netloc not in valids:
res=requests.get(href,headers=headers,timeout=5,stream=True)
print(res.elapsed.total_seconds)
if res.status_code==200:
for chunk in res.iter_content(chunk_size=1024):
#for k in res.iter_content(chunk_size=1048576):
if chunk:
valid+=[urlparse(href).netloc]
print(f'{time.time()-now:.2f}\t{name}')
content+=href+'#'
break
except Exception:
invalid+=[urlparse(href).netloc]
# 无法连接并超时的情况下输出“X”
print(f'[无效] {name}')
#print(f'{href}')
return invalid,valid,content
if __name__ == '__main__':
filename = '/storage/emulated/0/TVBoxx/lives/migu.txt'
local_live_check(filename)
##市面上,度娘,公众号对TvBox直播源都是胡乱的嗮在一起,检测工具也就检测连通性,而且检测较慢,自己搞了下脚本,用的还行,分享
检测过程及结果说明:
[文件越大,相同host越多,越到后面速度越快]
#待检测的tvbox直播源本地文件
/storage/emulated/0/TVBoxx/lives/agit_live.txt
#待保存的新文件
/storage/emulated/0/TVBoxx/lives/agit_live_ttd.txt
#无效地址 直播名称
[无效] CCTV1
#连接速度数据越小,速度越快
0.12 CCTV1
# -号无效集合里的host,已检测无需再次检测,以提高速度
[无效] CCTV3 -
# 0.00 有效集合里数据,host已检测无需再次检测,可用
0.00 伤感DJ串烧为爱流泪 +
[无效] 津南一套
[无效] 湖南张家界宝峰湖
[无效] 四川峨眉山云海日出 -
#已检测的无效有效host集合:
无效集合√:
['117.169.121.162:6610', '115.231.128.81', '115.231.128.80', 'yixing-tv-ori-hls.jstv.com', 'stream.ysbtv.net', '117.169.124.149:8080']
有效集合X:
['112.45.133.129:90', 'pluslive.wrbtv.cn', '118.122.78.172:89', 'tv.drs.hhtv.cc:8100', 'm3u8.channel.wsrtv.com.cn', 'stream10.jlntv.cn', 'live.cms.anhuinews.com', 'stream.hrbtv.net','pili-live-hls.hfmt.net']
请同我追寻高天的境地,送给天下愉悦欢欣。
——水对天的述说
[Python] 纯文本查看 复制代码 #import 自己看不需要的可以#批注掉
#invalids, valids list用于收集检测失败或成功的直播源,已检测的同样的host,不再检测,提高效率!
#filename,newfile路径设置,win和linux肯定不一样
#newfile将是检测后可用的直播源,后缀_ttd,如需自行修改
import time,re,json,requests,random
import os.path
from urllib.parse import urlparse
from pprint import pprint
from lxml import etree
import pandas as pd
def get_lives_data(filename):
f=open(filename,'r+')
r = f.readlines()
lives_data = [x.strip() for x in r if x.strip() != '']
# lives_data= list(map(lambda x: x.strip(), r))
# lives_data=lives_data.remove('')
f.close()
return lives_data
def test_url(newfile,lives_data):
# ll是电视直播源的链接列表
# ll=['http://........','https://.......']
invalids, valids = [], []
# 用于检测失败或成功的net,不再检测,提高效率
#l=lives_data.index('🌳电影直播,#genre#')
with open(newfile, 'a+') as f:
#for line in lives_data[:]:
for line in lives_data:
if line.find(',http') != -1:
name = line.split(',http')[0]
urls = 'http' + line.split(',http')[-1]
if urls.find('#') != -1:
hrefs = urls.split('#')
else:
hrefs = [urls]
if len(hrefs) == 1:
url_parse = urlparse(hrefs[0]).netloc
# print(url_parse,invalids,valids)
if url_parse not in invalids:
# print('url_parse not in invalids')
result = get_parse_href_result(name, hrefs[0], valids, f)
invalids = list(set(invalids + result[0]))
valids = list(set(valids + result[1]))
else:
print(f'[无效] {name} -')
# print(f'{hrefs[0]}')
else: # 包含#
content = name + ','
for i in range(len(hrefs)):
url_parse = urlparse(hrefs[i]).netloc
if url_parse not in invalids:
result2 = \
get_parse_href_result2(name, hrefs[i], valids, f)
nvalids = list(set(invalids + result2[0]))
valids = list(set(valids + result2[1]))
content += result2[2]
else:
print(f'[无效] {name} -')
# print(f'{hrefs[i]}')
if content[:-1] != name:
f.write(content[:-1] + '\n')
else:
if line[-7:] == '#genre#':f.write('\n' + line + '\n')
else:f.write(line + '\n')
f.close()
print(f'\n🈚效集合√:\n{invalids}')
print(f'\n🈶效集合X:\n{valids}')
def local_live_check():
filename = '/storage/emulated/0/TVBoxx//公测版/live_local.txt'
path = os.path.abspath(filename)
print(path)
dir, file = os.path.split(path)
# dir,file = os.path.split(file_path)
# print(dir,file)“
# basename=os.path.basename(filename)
files = os.path.splitext(file)
newfile = os.path.join(dir, files[0] + '_ttd' + files[1])
print(newfile)
if not os.path.isfile(newfile):
f = open(newfile, 'w')
f.close()
# print(os.path.isfile(newfile))
lives_data = get_lives_data(filename)
# print(lives_data)
test_url(newfile, lives_data)
if __name__ == '__main__':
local_live_check() |
免费评分
-
查看全部评分
|