[Asm] 纯文本查看 复制代码
import requests, re, time, os
import urllib.parse
import urllib.request
from urllib.parse import quote
from lxml import etree
from Crypto.Cipher import AES
import aiohttp
import aiofiles
import asyncio
'''
目标网站 :电影淘淘 url = https://www.dytt.com/
搜索url = https://www.dytt.com/vodsearch/-------------.html?wd={name}
'''
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36',
'Referer': 'https://www.dytt.com/',
}
# 1. 建立搜索
def sou_suo(url):
i = 0
while True:
try:
rep = requests.get(url=url, headers=headers, timeout=30)
if rep.status_code == 200:
tree = etree.HTML(rep.text)
next_url = 'https://www.dytt.com' + tree.xpath('//div[@class="tableListMain"]//tr/td[1]/a/@href')[
0] # 注意遇到tbody标签会让xpath失效
print(next_url)
return next_url
except:
i += 1
time.sleep(2)
if i > 50:
print("无法获得响应,请检查输入的电影或电视剧名称是否正确")
break
print(f"正在尝试{i}连接")
continue
def moive_urls(url): # 连续剧就显示选择列表 电影直接获取播放页
i = 0
while True:
try:
rep = requests.get(url=url, headers=headers, timeout=30)
if rep.status_code == 200:
tree = etree.HTML(rep.text)
li_lis = tree.xpath('//div[@class="bofangList"]//li') # 注意遇到tbody标签会让xpath失效
dex = len(li_lis)
print(dex)
if dex == 0:
li_lis = tree.xpath('//div[@class="dybofangList"]//li')
dics = []
for li in li_lis:
moive_url = 'https://www.dytt.com' + li.xpath('./a/@href')[0]
moive_title = li.xpath('./a/text()')[0]
dic = {
'moive_title': moive_title,
'moive_url': moive_url,
}
dics.append(dic)
print(dic)
return dics
except:
i += 1
time.sleep(2)
if i > 20:
print("无法获得响应,请检查输入的电影或电视剧名称是否正确")
break
print(f"正在尝试{i}连接")
continue
def xuan_zhe(dics, name):
print(dics)
print("请输入数字0,1,2,3..0为第一集以此类推")
index = int(input("选择你要下载的集数:"))
video_url = dics[index]['moive_url']
video_name = dics[index]['moive_title']
print(name + video_name, "正在进行请求响应")
dic2 = {
'video_url': video_url,
'video_name': video_name
}
return dic2
# -----------------------------------------------------------------------------------------------------------------------
# url = 'https://www.dytt.com/vod-play-id-155223-src-3-num-1.html'
"""
获取m3u8(有些视频有一个m3u8,有些有2个,有些加密,有些没有加密)
url的坑主要斜杠最好左斜杠/ 右斜杠\会被当成转义 不能获取响应
"""
def get_response(url):
i = 0
while True: # 防止请求 不到 反复请求
try:
rep = requests.get(url=url, headers=headers, timeout=30)
rep.encoding = rep.apparent_encoding
if rep.status_code == 200:
return rep
# req = urllib.request.Request(url,headers=headers)
# rep = urllib.request.urlopen(req,timeout=2)
# print(rep.read()) # 获取响应体 二进制字符串
# print(rep.read().decode("utf-8")) # 对响应体进行解码
# return rep
except:
i += 1
time.sleep(2)
if i > 50:
print("无法获得响应,请重新试试")
break
print(f"正在尝试{i}连接")
continue
def get_m3u8(response, path): # 获取下载页m3u8
start_m3u8_url = re.findall(r'"url":"(?P<start_m3u8_url>.*?)","url_next"', response.text)[0].replace("\\", "")
print(start_m3u8_url)
rep_start_m3u8 = get_response(start_m3u8_url)
print(rep_start_m3u8.status_code)
path1 = path + "/" + "start_m3u8.txt"
with open(path1,'w',encoding="utf-8") as f:
f.write(rep_start_m3u8.text)
with open(path1, "r") as f1:
urls = []
for line in f1:
line = line.strip()
if line.startswith("#"):
continue
urls.append(line)
if len(urls) < 5:
second_m3u8_url = urllib.parse.urljoin(start_m3u8_url, urls[0])
print(second_m3u8_url)
rep_second_m3u8 = get_response(second_m3u8_url)
second_urls = []
path2 = path + "/second_m3u8.txt"
with open(path2, 'w') as f:
f.write(rep_second_m3u8.text)
with open(path2, "r") as f1:
for line2 in f1:
line2 = line2.strip()
if line2.startswith("#"):
continue
second_urls.append(line2)
print(line2)
return second_urls
elif len(urls) > 5:
return urls
async def movie_down(url, path):
ts_name = url.split("/")[-1]
async with aiohttp.ClientSession() as session:
async with await session.get(url,headers=headers) as rep3:
print(ts_name, "正在下载")
path3 = path + "/" + ts_name
rep3 = await rep3.read()
async with aiofiles.open(path3, "wb") as f3:
await f3.write(rep3)
print(ts_name, "下载完成")
await asyncio.sleep(1)
#-----------------------------------------------------------------------------------------------------------------------
#1.获取加密的秘钥key 2. 解密 3.合并
def get_key(path):
path_m3u8 = path + "/second_m3u8.txt"
path_start_m3u8 = path + "/start_m3u8.txt"
try:
if os.path.exists(path_m3u8):
with open(path_m3u8, "r") as f_key:
key_txt = f_key.read()
if 'EXT-X-KEY' in key_txt:
key_url = re.findall(r'#EXT-X-KEY:METHOD=AES-128,URI="(?P<key_url>.*?)"',key_txt)[0]
print(key_url)
key = get_response(key_url).text
AES_ts(key,path_m3u8)
he_bing(path_m3u8)
else:
he_bing(path_m3u8)
except:
if os.path.exists(path_start_m3u8):
with open(path_start_m3u8, "r") as f_2key:
key1_txt = f_2key.read()
if 'EXT-X-KEY' in key1_txt:
key_url = re.findall(r'#EXT-X-KEY:METHOD=AES-128,URI="(?P<key_url>.*?)"', key1_txt)[0]
print(key_url)
key = get_response(key_url).text
AES_ts(key, path_start_m3u8)
he_bing(path_start_m3u8)
else:
he_bing(path_start_m3u8)
def AES_ts(key,path):
aes = AES.new(key=key.encode("utf_8"),iv="0000000000000000",mode=AES.MODE_CBC)
with open(path,"r") as f_1:
for line in f_1:
line = line.strip()
if line.startswith("#"):
continue
ts_name = line.split("/")[-1]
path_ts = path +"/" +ts_name
path_ts = path.replace(path.split("/")[-1], ts_name)
print(ts_name, "开始解密啦------")
print(path_ts)
with open(path_ts,"rb") as f_2:
ts = f_2.read()
with open(path_ts,"wb") as f_3:
f_3.write(aes.decrypt(ts)) ##解密好的内容用存入文件
print(ts_name,"解密完成")
def he_bing(path): # 合并
# mac: cat 1.ts 2.ts 3.ts > xxx.mp4
# windows: copy/b 1.ts +2.ts +3.ts ... xxx.mp4
# copy /b 命令格式:copy /b 文件1+文件2+......文件N 合并后的文件名<BR>命令讲解:使用"+"将多个相同或不同格式的文件合并为一个文件。
lst = []
with open(path, mode="r", encoding='utf-8') as p_f:
for line in p_f:
if line.startswith("#"):
continue
line = line.split('/')[-1].strip()
lst.append(line)
s = "+".join(lst)
name3 = path.split('/')[-1]
os.chdir(path) #- ---- 改变当前工作目录
os.system(f'copy /b {s} {name3}.mp4') #----- 运行一个程序或命令,会立即返回,上述很多操作都可以用该函数完成
if os.path.exists('tyxml.mp4'): #判断合并完成删除ts文件
for file in os.listdir():
if file.endswith(".ts"):
os.remove(file)
print("合并完毕")
#-----------------------------------------------------------------------------------------------------------------------
def main():
name = str(input("请输入需要下载的电影或电视剧名称:"))
name1 = quote(name)
start_url = f'https://www.dytt.com/vodsearch/-------------.html?wd={name1}'
print(start_url)
next_url = sou_suo(start_url)
dics = moive_urls(next_url)
dic2 = xuan_zhe(dics, name)
response = get_response(dic2['video_url'])
path = name + "/" + dic2['video_name']
if not os.path.exists(path):
os.makedirs(path)
urls = get_m3u8(response, path)
tasks = []
for ts_url in urls:
task =asyncio.ensure_future(movie_down(ts_url, path))
tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
print(path,'下载完成')
print("开始进行解密合并程序")
get_key(path)
print("你可以开心的观看啦--")
if __name__ == '__main__':
loop=asyncio.get_event_loop()
main()