[Python] 纯文本查看 复制代码
import time,re,json,requests,random
import os.path
from urllib.parse import urlparse
from pprint import pprint
from lxml import etree
import pandas as pd
# *该方法可以实现URL的识别和分段
#result = urlparse('http://www.baidu.com:7788/index.html;user?id=5#comment').netloc
#class
#readme=Path('01-05.txt').home()#.cwd()#.exists()#绝对路径.resolve().stem#suffix#.parent.parent
#print(file.stem,file,str(Path.cwd()),Path(__file__),file.resolve(),file.parent)#suffix
#cwd = Path.cwd()
#print(list(cwd.rglob('*.txt')))
def get_lives_data(filename):
f=open(filename,'r+')
r = f.readlines()
lives_data = [x.strip() for x in r if x.strip() != '']
# lives_data= list(map(lambda x: x.strip(), r))
# lives_data=lives_data.remove('')
f.close()
return lives_data
def get_parse_href_result(name, href, valids, f):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36'}
invalid, valid = [], []
now = time.time()
try:
# print(hrefs[0])
netloc = urlparse(href).netloc
# print(netloc not in valids)
if netloc not in valids:
# print('get:url_parse not in valids')
res = requests.get(href, headers=headers, timeout=5, stream=True)
if res.status_code == 200:
for k in res.iter_content(chunk_size=1048576):
if k:
valid += [netloc]
print(f'{time.time() - now:.2f}\t{name}')
content = name + ',' + href + '\n'
# print(content)
f.write(content)
break
else:
print(f'{time.time() - now:.2f}\t{name} +')
content = name + ',' + href + '\n'
f.write(content)
except Exception:
invalid += [urlparse(href).netloc]
# 无法连接并超时的情况下输出“X”
print(f'[无效] {name}')
# print(f'{href}')
return invalid, valid
def get_parse_href_result2(name,href,valids,f):
headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36'}
invalid,valid,content=[],[],''
now=time.time()
try:
netloc=urlparse(href).netloc
if netloc not in valids:
res=requests.get(href,headers=headers,timeout=5,stream=True)
if res.status_code==200:
for k in res.iter_content(chunk_size=1024):
#for k in res.iter_content(chunk_size=1048576):
if k:
valid+=[urlparse(href).netloc]
print(f'{time.time()-now:.2f}\t{name}')
content+=href+'#'
break
except Exception:
invalid+=[urlparse(href).netloc]
# 无法连接并超时的情况下输出“X”
print(f'[无效] {name}')
#print(f'{href}')
return invalid,valid,content
def test_url(newfile,lives_data):
# ll是电视直播源的链接列表
# ll=['http://........','https://.......']
invalids, valids = [], []
# 用于检测失败或成功的net,不再检测,提高效率
#l=lives_data.index('🌳电影直播,#genre#')
with open(newfile, 'a+') as f:
#for line in lives_data[:]:
for line in lives_data:
if line.find(',http') != -1:
name = line.split(',http')[0]
urls = 'http' + line.split(',http')[-1]
if urls.find('#') != -1:
hrefs = urls.split('#')
else:
hrefs = [urls]
if len(hrefs) == 1:
url_parse = urlparse(hrefs[0]).netloc
# print(url_parse,invalids,valids)
if url_parse not in invalids:
# print('url_parse not in invalids')
result = get_parse_href_result(name, hrefs[0], valids, f)
invalids = list(set(invalids + result[0]))
valids = list(set(valids + result[1]))
else:
print(f'[无效] {name} -')
# print(f'{hrefs[0]}')
else: # 包含#
content = name + ','
for i in range(len(hrefs)):
url_parse = urlparse(hrefs[i]).netloc
if url_parse not in invalids:
result2 = \
get_parse_href_result2(name, hrefs[i], valids, f)
nvalids = list(set(invalids + result2[0]))
valids = list(set(valids + result2[1]))
content += result2[2]
else:
print(f'[无效] {name} -')
# print(f'{hrefs[i]}')
if content[:-1] != name:
f.write(content[:-1] + '\n')
else:
if line[-7:] == '#genre#':f.write('\n' + line + '\n')
else:f.write(line + '\n')
f.close()
print(f'\n🈚效集合√:\n{invalids}')
print(f'\n🈶效集合X:\n{valids}')
def jiekou_jm():
url = 'https://api.lige.fit/ua' # 不要动
headers = {'user-agent': 'okhttp/3.15 Html5Plus/1.0 (Immersed/23.92157)'}
# 解密:https://tvbox.cainisi.cf/mao.php?url=https://tvbox.cainisi.cf/TV
# jar下载:https://tvbox.cainisi.cf/jarxz.php?url=http://饭太硬.top/tv
#jk_url = input('Enter jk url: ')
jk_url = 'http://124.220.63.232/关注码上放生/时光机' # 手动
# jk_url = 'http://gg.gg/liugongzi'
#jk_url = 'https://ghproxy.com/https://raw.githubusercontent.com/kebedd69/TVbox-interface/main/%E7%94%9C%E8%9C%9C.json'
#jk_url = 'https://szyyds.cn/tv/x.json'
addr_json = {"url": jk_url}
res = requests.post(url, headers=headers, json=addr_json)
content = res.content.decode()
try:
jk_json = json.loads(content)
pprint(jk_json)
live_addrs = re.findall("proxy://do=live&type=txt&ext=(.*?)", content, re.S)
live_url = jk_json['lives'][0]['url']
print('\nlive_addrs: ', live_addrs)
print('\nlive_url: ', live_url)
except Exception:
live_addrs = re.findall('"proxy://do=live&type=txt&ext=(.*?)"\n', res.text, re.S)
pprint(content)
print('\nlive_url: ', live_addrs)
# hz=os.path.(jk_url).stem
name = os.path.basename(jk_url)
fs = os.path.splitext(jk_url)
if fs[1] == '':
hz = '.json'
else:
hz = ''
filename = 'lives/' + name + hz
basename = os.path.basename(filename)
path = os.path.abspath(filename)
print(f'\n接口文件路径: {path}')
with open(filename, 'w+') as f:
f.write(content)
f.close()
print(f'\nFile [{basename}] Download OK! ')
def more_tv_adrs_get():
url='https://api.lige.fit/getJson'
headers = {'user-agent': 'okhttp/3.15 Html5Plus/1.0 (Immersed/23.92157)'}
res=requests.get(url,headers=headers).content.decode()
pprint(json.loads(res))
name=os.path.basename(url)
filename='lives/'+name+'.json'
basename=os.path.basename(filename)
path=os.path.abspath(filename)
print(f'/n单|多仓文件路径: {path}')
with open(filename ,'w+') as f:
f.write(res)
f.close()
print(f'\nFile [{basename}] Download OK! [note:最后四个是多仓地址]')
def daily_words():
#
url='https://www.lige.fit/'
#经纬度:https://restapi.amap.com/v3/ip?key=57eaea5833ff1616cfd1ff2c4cf9b58a
#https://api-meting.imsyy.top/api?server=netease&type=lrc&id=28815250
#lrc:
#url='https://api-meting.imsyy.top/api?server=netease&type=lrc&id=1389794615'
#X音乐playlisy🎶top:
#url='https://api-meting.imsyy.top/api?server=netease&type=playlist&id=7452421335'
#天气:
#url='https://restapi.amap.com/v3/weather/weatherInfo?key=57eaea5833ff1616cfd1ff2c4cf9b58a&city=320000'
#每日一言
url='https://v1.hitokoto.cn/'
headers = {'user-agent': 'okhttp/3.15 Html5Plus/1.0 (Immersed/23.92157)'}
res=requests.get(url,headers=headers)
content=res.content.decode()
c_json=json.loads(content)
#pprint(c_json)
hitokoto=c_json['hitokoto']
length=c_json['length']
#from_who=c_json['from_who']
who='——'+c_json['from']
#if who==None:who='佚名'
print('\n'+hitokoto)
#中文对齐#出处
print(who.rjust(length,chr(12288)))
def iptvsearch():
#1.first html requests,get search resulets,pages
url='https://www.foodieguide.com/iptvsearch/'
# url1='https://www.foodieguide.com/iptvsearch/?page=2&s=cctv6'
# host='https://www.foodieguide.com'
headers={'user-agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.50'}
#params={'search':'CCTV6','Submit':'+'}
#search='CCTV6'
words=input('Entet iptvSearch words: ')
#params={'s':'CCTV6'}
params={'s':words}
res=requests.post(url,headers=headers,params=params)
content=res.content.decode()
# print(content)
_etree=etree.HTML(content)
top_search = _etree.xpath('//div/span[@class="sh"]/a/@href')
top_search = list(x[3:] for x in top_search)
print(f'\nTop_Search :{top_search}\n')
# pages = sorted(list(set(_etree.xpath('//div[@style="display:flex;justify-content:center;"]/a/@href'))))[1:]
result = _etree.xpath('//body/div[1]/div[2]/div/text()')[0].strip()
ps = _etree.xpath('//div[@style="display:flex;justify-content:center;"]/a/@href')
# print(ps)
if ps==[]:print('Search Result :None, pls try run again.')
else:
page_hrefs = _etree.r('/K;/div[@style="display:flex;justify-content:center;"]/a/@href')[-1]
pages=int(re.findall('page=(.*?)&s=.*?',page_hrefs,re.S)[0])
print(f'\n{result}, total pages: {pages}页\n')
t_channels,t_m3u8s=[],[]#addy channels,m3u8 of 5 pages.
channels = _etree.xpath('//div[@class="tables"]//div[@class="channel"]/a/div[1]/text()')
m3u8s = _etree.xpath('//div[@class="tables"]//div[@class="m3u8"]//td[2]/text()')
print(len(channels),len(m3u8s))
print(f'>>> requesting... page 1')
t_channels += channels
t_m3u8s += m3u8s
# #checked:09-07-2023 checked
# checked_ts=_etree.xpath('//div[@class="tables"]//div[3]/i/text()')
# cs = checked_ts[0].strip().split(' ')
# checked = cs[0].strip()
# print(checked)
#pa 5 page:
if pages>=10:page=10
else:page=pages
for p in range(2,page+1):
print(f'>>> requesting... pfffg''age {p}')
url1 = f'https://www.foodieguide.com/iptvsearch/?page={p}&s={words}'
res = requests.post(url1, headers=headers)
content = res.content.decode()
_etree = etree.HTML(content)
channels = _etree.xpath('//div[@class="tables"]//div[@class="channel"]/a/div[1]/text()')
m3u8s = _etree.xpath('//div[@class="tables"]//div[@class="m3u8"]//td[2]/text()')
#print(len(channels),len(m3u8s))
t_channels += channels
t_m3u8s += m3u8s
#2.cannels set
m3u8s_set=[]
#same channel ,use # join.
# t_channels=list(x.upper() for x in t_channels)
# print('t_channels: ', t_channels)
# channels_set = list(set(x.upper() for x in t_channels))
channels_set = list(set(x for x in t_channels))
print('\nchannels_set: ', channels_set)
# print(len(t_channels),len(channels_set))
m3u8_set = []
for i in range(len(channels_set)):
m3u8_set.append('')
for t in range(len(channels_set)):
for j in range(len(t_channels)):
if channels_set[t] == t_channels[j]:
if m3u8_set[t] == '':
m3u8_set[t] = t_m3u8s[j].strip()
else:
m3u8_set[t] = f'{m3u8_set[t]}#{t_m3u8s[j].strip()}'
#print(len(m3u8_set),m3u8_set)
#print(len(channels_set),channels_set)
save_iptv_search_result(words,channels_set,m3u8_set)
def save_iptv_search_result(words,channels,m3u8s):
#print(channels,m3u8s)
s='🎈🦋🦜🍀💋🎤🎗🎖🏅✨🎋🌳🍃🌱🌿☘🥇🥈🥉🌹🏵️🍂🌺🎍🌴'
pic=random.choice(s)
filename ='lives/iptv10.txt'
if not os.path.isfile(filename):f = open(filename, 'w')
else:f=open(filename,'a+')
f.write(f'\n{pic}{words}直播'+',#genre#\n')
for i in range(len(channels)):
# f.write(channels[i].upper() + ',' + m3u8s[i].strip() + '\n')
f.write(channels[i]+','+m3u8s[i].strip()+'\n')
f.close()
print(f'\n{pic}{words}直播'+',#genre# have saved.\n')
def m3u_convert_to_txt(m3u_filename,txt_filename):
try:
with open(m3u_filename, "r") as m3u_file:
lines = m3u_file.readlines()
with open(txt_filename, "w") as txt_file:
s = \
'🎈🦋🦜🍀💋✨🎤🎗🎖🏅🥇🥈🥉🌹🏵️🍂🌺🎍🌴🎋🌵🌳🍃🌱🌿☘️'
pic = random.choice(s)
name = txt_filename.split('/')[-1].split('.')[0].upper()
txt_file.write(f'\n{pic}{name} TV' + ',#genre#\n')
for i in range(len(lines)):
line = lines[i].strip()
# print(line)
if line.startswith("#EXTINF:-1"):
next_line = lines[i + 1].strip() if i + 1 < len(lines) else None
if next_line and next_line.startswith("http"):
line = lines[i].strip().split(',')[-1]
if line[0] == '.': line = line[1:]
print(line)
# print(next_line)
txt_file.write(line + ',' + next_line + '\n')
print("Conversion successful!")
except Exception as e:
print("An error occurred:", e)
def local_live_check():
filename = '/storage/emulated/0/TVBoxx//公测版/live_local.txt'
path = os.path.abspath(filename)
print(path)
dir, file = os.path.split(path)
# dir,file = os.path.split(file_path)
# print(dir,file)“
# basename=os.path.basename(filename)
files = os.path.splitext(file)
newfile = os.path.join(dir, files[0] + '_ttd' + files[1])
print(newfile)
if not os.path.isfile(newfile):
f = open(newfile, 'w')
f.close()
# print(os.path.isfile(newfile))
lives_data = get_lives_data(filename)
# print(lives_data)
test_url(newfile, lives_data)
def douyu_search():
url='https://m.douyu.com'
url='https://api.tiankongapi.com/api.php/provide/vod/at/xml/from/tkm3u8/'
headers={'user-agent':'Mozilla/5.0 (Linux; Android 10; SP300 Build/CMDCSP300;) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/88.0.4324.93 Mobile Safari/537.36'}
se=requests.session()
res=se.get(url,headers=headers)
print(res.text)
url2='https://m.douyu.com/api/search/anchor'
params={'limit':'50','offset':'0','sk':'电影'}#limit50
res=se.post(url2,headers=headers,params=params)
lst_json=json.loads(res.text)['data']['list']
df = pd.DataFrame(lst_json)
df1=df[["nickname","hn","isLive","roomId"]]#,"roomName"]]
df1.sort_values("hn",ascending=False, inplace=True)
print(df1)
print('#昵称,热度,在线,房间号,房间名长')
#print(res.content.decode())
#print(res.cookies)
def net_live_check():
pass
def get_api():
filename = '/storage/emulated/0/biubiutv/220512.txt'
path = os.path.abspath(filename)
print(path)
dir, file = os.path.split(path)
# dir,file = os.path.w(file_path)
# print(dir,file)
# basename=os.path.basename(filename)
files = os.path.splitext(file)
newfile = os.path.join(dir, files[0] + '_get' + files[1])
print(newfile)
if not os.path.isfile(newfile):
f = open(newfile, 'w');
f.close()
# print(os.path.isfile(newfile))
f=open(filename,'r+')
data=json.load(f)
pprint(data)
f.close()
print('1.biubiut 源采集:')
zhuyejiekou=data['zhuyejiekou']
caijizhan=data['caijizhan']
print(zhuyejiekou)
print(caijizhan)
f = open(newfile, 'a')
f.write('\n#zhuyejiekou\n')
for zjk in zhuyejiekou:
name,url=zjk['name'],zjk['url']
print(name,url)
f.write(name+','+url+'\n')
f.write('\n#caijizhan\n')
for cjz in caijizhan:
name,url=cjz['name'],cjz['url']
print(name,url)
f.write(name+','+url+'\n')
f.close()
#return lives_data
#lives_data = get_lives_data(filename)
# print(lives_data)
#test_url(newfile, lives_data)
def ZYPalerApp_add_json():
url='https://gitcode.net/-/snippets/1706/raw/master/ZY-Player.json'
headers={'user-agent':'Mozilla/5.0 (Linux; Android 10; SP300 Build/CMDCSP300;) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/88.0.4324.93 Mobile Safari/537.36'}
res=requests.get(url,headers=headers)
js=json.loads(res.content.decode())
print(js)
model=[]
md_json={"key":"B8628A5D7CC38F01600379E4E4E01600","name":"非凡资源+","api":"http:\/\/www.ffzy.tv\/api.php\/provide\/vod\/at\/xml\/","download":"","parseApi":"https:\/\/ffzyplayer.com\/?url="}
import hashlib
str = '天堂资源+'
md5 = hashlib.md5() #创建md5加密对象
md5.update(str.encode('utf-8'))
#指定需要加密的字符串
str_md5 = md5.hexdigest().upper()
# 加密后的字符串
print(str_md5==md_json['key'])
#https://ghproxy.com/https://usercontent.githubfast.com/raw/hjdhnx/dr_py/main/js/人人.js #结果:e10adc3949ba59abbe56e057f20f883e
def download_file():
#https://agit.ai/guot54/ygbh/raw/branch/master/JS/直播转分线点播2.js
url='https://agit.ai/guot54/ygbh/raw/branch/master/zB/直播.js'
filename='lives/直播json.js'
res=requests.get(url)
#print(res.content.decode())
with open(filename,'w') as f:
f.write(res.content.decode())
print(f'{filename} download ok!')
f.close()
def yhdu_json():
#1.资源猫json 源:'https://videocat.oss-cn-hangzhou.aliyuncs.com/const/searchers.json'
import requests,json
from pprint import pprint
#2.永恒国度APP源
url='http://api.sqpon.gq/zyplyer.site'
headers={
'pkName':'com.ttt.eternity',
'user-agent':'Mozilla/5.0 (Linux; Android 10; SP300 Build/CMDCSP300; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/92.0.4515.105 Mobile Safari/537.36 uni-app Html5Plus/1.0 (Immersed/29.0)',
'Host':'api.sqpon.gq',
'Connection':'Keep-Alive'}
res=requests.get(url,headers=headers)
js=json.loads(res.content.decode())
filename = '/storage/emulated/0/TVBoxx/永恒国度/永恒国度APP源.json'
with open(filename,'w+') as f:
f.write(res.content.decode())
f.close()
basename= os.path.basename(filename)
#f=open(filename,'w')
pprint(js)
print(f'\n{basename} have saved.\n')
#3.永恒国度直播源
url='http://api.woaikoko.tk/iptv.site'
res=requests.get(url,headers=headers)
js=json.loads(res.content.decode())
filename = '/storage/emulated/0/TVBoxx/永恒国度/永恒国度直播源.json'
basename= os.path.basename(filename)
f=open(filename,'w')
f.write(res.content.decode())
f.close()
pprint(js)
print(f'\n{basename} have saved.\n')
def tm_test():
filename = '/storage/emulated/0/TVBoxx/时光机/精简版.json'
path = os.path.abspath(filename)
print(path)
dir, file = os.path.split(path)
# dir,file = os.path.w(file_path)
# print(dir,file)
# basename=os.path.basename(filename)
files = os.path.splitext(file)
newfile = os.path.join(dir, files[0] + '_get' + files[1])
print(newfile)
if not os.path.isfile(newfile):
f = open(newfile, 'w');
f.close()
# print(os.path.isfile(newfile))
f=open(filename,'r+')
data=json.load(f)
pprint(data['sites'])
#pprint(data)
f.close()
print('1.biubiut 源采集:')
zhuyejiekou=data['zhuyejiekou']
def run():
while True:
daily_words()
print('\n+-------- TVBox功能 --------+\n')
print('🇨🇳✨ 0.接口解密: ')
print('🇨🇳✨ 1.本地直播源 > 无效检测:')
print('🇨🇳✨ 2.NET直播源 > 无效检测: ')
print('🇨🇳✨ 3.More 单|多仓 地址获取:')
print('🇨🇳✨ 4.base64 转码:')
print('✨5.每日一言:')
print('✨6.直播源搜索:')
print('✨7.直播源搜索:')
print('✨8.斗鱼源搜索:')
print('✨9.get_api源:')
print('✨10.ZYPaler.app_add_json:')
print('11.下载文件')
print('12.永恒国度源get')
print('13.时光机CS')
enter = input('\npls enter your select:')
if enter == '0':jiekou_jm()
elif enter == '1':local_live_check()
elif enter == '2':net_live_check()
elif enter == '3':more_tv_adrs_get()
elif enter == '5':daily_words()
elif enter == '6':iptvsearch()
elif enter == '7':m3u_convert_to_txt('lives/music.m3u', 'lives/music.txt')
elif enter == '8':douyu_search()
elif enter == '9':get_api()
elif enter == '10':ZYPalerApp_add_json()
elif enter == '11':download_file()
elif enter == '12':yhdu_json()
elif enter == '13':tm_test()
else:print('enter error, pls retry again.')
if __name__ == '__main__':
run()