[Asm] 纯文本查看 复制代码 from selenium import webdriver
import time,re
start_url = "https://www.jijikb.com/play/52825-0-1.html" #一共5集
def get_start_m3u8(url):
#-----------------------------------------------------------------------------------------------------------------------
# chrome_options = webdriver.ChromeOptions()
# # 添加浏览器参数
# # 添加UA
# chrome_options.add_argument(
# 'User-Agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36"'
# )
# # 浏览器不提供可视化页面. linux下如果系统不支持可视化不加这条会启动失败
# chrome_options.add_argument('--headless')
# # 以最高权限运行
# chrome_options.add_argument('--no-sandbox')
# chrome_options.add_argument("--disable-gpu")
# chrome_options.add_argument("--disable-dev-shm-usage")
# # 设置开发者模式启动,该模式下webdriver属性为正常值
# chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
# driver = webdriver.Chrome(chrome_options= chrome_options)
#-----------------------------------------------------------------------------------------------------------------------
#创建浏览器对象
driver =webdriver.Chrome()
driver.get(start_url)
driver.find_element_by_id("details-button").click()
time.sleep(0.5)
driver.find_element_by_id('proceed-link').click()
time.sleep(1)
response = driver .page_source #获取首页的响应数据 首页有debuger 调试验证 用selenium 跳过
# print(response)
start_m3u8 = re.findall(r'id="forbaiducache">(.*?)</div>',response)[0]
print(start_m3u8)
time.sleep(0.5)
print(driver.title)
driver.quit() #退出浏览器
return start_m3u8
if __name__ == '__main__':
get_start_m3u8(start_url)
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------[Asm] 纯文本查看 复制代码 import timeimport requests,os
import urllib3
import urllib.request
from startm3u8 import get_start_m3u8
import asyncio
import aiofile
import aiohttp
# start_url = "https://www.jijikb.com/play/52825-0-1.html" #一共5集
secend_m3u8 = "https://vod4.buycar5.cn/20210617/DmV0P4zD/1000kb/hls/index.m3u8"
headers = {
'Referer': 'https://vod4.buycar5.cn/',
'host':'vod4.buycar5.cn',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36',
}
def response(url):
rep = requests.get(url=url,headers=headers,timeout = 20,verify =False) #使用Python3 requests发送HTTPS请求,已经关闭认证(verify=False)情况下,控制台会输出以下InsecureRequestWarning
rep.encoding = rep.apparent_encoding
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) ## 禁用安全请求警告
if rep.status_code == 200:
return rep
else:
print("----没有响应----")
def get_second_m3u8_url(url):
rep = requests.get(url)
print(rep)
with open('first.m3u8','w') as f:
f.write(rep.text)
with open('first.m3u8',"r") as r_f:
for line in r_f:
if line.startswith("#"):
continue
start_m3u8_url = "https://vod4.buycar5.cn" + line
return start_m3u8_url
def get_tc_url(resp):
with open('secend.m3u8', 'wb') as f:
f.write(resp)
tc_urls = []
with open('secend.m3u8',"r") as r_f:
for n in r_f:
if n.startswith("#"):
continue
else:
print(n)
tc_urls.append(n)
return tc_urls
async def mov_down(url,semaphore):
async with semaphore:
async with aiohttp.ClientSession() as session:
tc_name = url.split('/')[-1].strip()
print(tc_name,"---正在下载-----")
async with await session.get(url,headers=headers) as rep:
print(rep.status)
async with aiofile.async_open("mov2/"+tc_name,'wb') as p_f:
print("-----正在存储------")
rep1 = await rep.read()
await p_f.write(rep1)
print(tc_name,'----下载完成---')
"""
urllib.request.urlopen(url, data=None, [timeout, ])
传入的url就是你想抓取的地址;
data是指向服务器提交信息时传递的字典形式的信息,通常来说就是爬去需要登录的网址时传入的用户名和密码,可省略。
timeout参数指的是超时时间,也可省略。
"""
def main():
semaphore = asyncio.Semaphore(100) # 限制并发量为20
start_time = time.time()
if not os.path.exists('mov2'):
os.mkdir("mov2")
start_m3u8_url = get_start_m3u8(start_url)
secend_m3u8_url = get_second_m3u8_url(start_m3u8_url)
print(secend_m3u8_url)
resp = urllib.request.urlopen(secend_m3u8_url).read()
# resp =requests.get(url=secend_m3u8_url,headers=headers) #不知道为什么requests 请求不到
tc_urls = get_tc_url(resp)
tasks = []
for url in tc_urls:
task =asyncio.ensure_future(mov_down(url,semaphore))
tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print(time.time()-start_time)
if __name__ == '__main__':
loop = asyncio.get_event_loop() #建立事件循环
main()
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------[Asm] 纯文本查看 复制代码 import asyncioimport os,re
import requests
import aiofiles
from Crypto.Cipher import AES
#pycryptodome模块
# key_url = "https://ts4.chinalincoln.com:9999/20210617/DmV0P4zD/1000kb/hls/key.key"
headers = {
'Referer': 'https://vod4.buycar5.cn/',
'host': 'vod4.buycar5.cn',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36',
}
def get_key_url():
with open('secend.m3u8','r') as f:
ke = f.read()
key_url = re.findall(r'#EXT-X-KEY:METHOD=AES-128,URI="(?P<key>.*?)"',str(ke))[0]
print(key_url)
return key_url
async def aio_dec(key): # METHOD=AES-128
#解密
tasks = []
print("-------1------")
with open("secend.m3u8",'r') as f:
for line in f:
if line.startswith("#"):
continue
line = line.split('/')[-1].strip()
#开始创建异步任务
print(line)
task = asyncio.ensure_future(dec_ts(line,key))
tasks.append(task)
await asyncio.wait(tasks)
# loop.run_until_complete(asyncio.wait(tasks))
# loop.close()
async def dec_ts(name,key): #解密
aes = AES.new(key=key,IV=b"0000000000000000",mode=AES.MODE_CBC) #IV偏移量 key多少位就是多少位前面写b
# print(aes)
async with aiofiles.open(f'mov2/{name}','rb') as f1:
bs= await f1.read() # 从原文件读取内容
print("-----2----")
async with aiofiles.open(f'mov2/temp_{name}','wb') as f2:
await f2.write(aes.decrypt(bs)) #解密好的内容用存入文件
os.remove(f'mov2/{name}')
print("-----3----")
print(f'{name}处理完毕')
def merge_ts(): #合并
#mac: cat 1.ts 2.ts 3.ts > xxx.mp4
#windows: copy/b 1.ts +2.ts +3.ts ... xxx.mp4
#copy /b 命令格式:copy /b 文件1+文件2+......文件N 合并后的文件名<BR>命令讲解:使用"+"将多个相同或不同格式的文件合并为一个文件。
lst = []
with open('secend.m3u8',mode="r",encoding='utf-8') as p_f:
for line in p_f:
if line.startswith("#"):
continue
line = line.split('/')[-1].strip()
lst.append(line)
s = "".join(lst)
os.system(f"copy /b {s} movie.mp4")
print("")
if __name__ == '__main__':
# loop = asyncio.get_event_loop()
key_url = get_key_url()
key = requests.get(url=key_url, headers=headers).text
# key ='39f98d719dbdfbde'
key = key.encode("utf-8")
print(key)
asyncio.run(aio_dec(key)) |