# ddrk 低端影视下载
import base64
import re
import time
from datetime import datetime
from pathlib import Path
from urllib import parse
import requests
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from loguru import logger
from python_idm import download
logger.add('ddrk_log.log')
headers = {
'Host': 'ddys.tv',
'Connection': 'keep-alive',
'sec-ch-ua': '^\\^Chromium^\\^;v=^\\^104^\\^, ^\\^',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '^\\^Windows^\\^',
'DNT': '1',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'Sec-Fetch-Site': 'same-origin',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-User': '?1',
'Sec-Fetch-Dest': 'document',
'Referer': 'https://ddys.tv/',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
}
def encrpyt_string(paths,key):
"""ddrk加密函数"""
if len(key)>16:
return key
key = key.encode('utf8')
iv = '1529076118276120'.encode('utf8')
time_tuple = datetime.now().timetuple()
expires = int(time.mktime(time_tuple))*1000 + 600000
text = '{"path":"' + f'{paths}'+ f'","expire":{expires}' + '}'
text = pad(text.encode('utf8'),64,'pkcs7')
cryptor = AES.new(key=key,mode=AES.MODE_CBC,iv=iv)
ciphertext = cryptor.encrypt(text)
encryption_result = parse.quote(base64.b64encode(ciphertext).decode('utf8'))
return encryption_result
@logger.catch
def get_path(res):
'''获取加密参数'''
result = re.findall(r'"tracks":(.*?)}</script>',res.text,re.S)[0]
result = result.replace('\n','')
title = re.findall(r'<title>(.*?)</title>',res.text)[0]
datas = [(index,i.get('src0').replace(r'\/','/'),i.get('src1')) for index,i in enumerate(eval(result),start=1)]
return title,datas
@logger.catch
def get_mp4_url(session,ids):
'''获取MP4_URL'''
base_url = f'https://ddys.tv/getvddr/video?id={ids}&dim=1080P+&type=mix'
# cookies={'X_CACHE_KEY':'df43ecf509ad4237f04d9059dc1cc4d9'}
jsons = session.get(base_url)
logger.info(jsons.json())
return jsons.json().get('url')
def main(url):
'''主函数'''
session = requests.Session()
session.headers.clear()
session.headers.update(headers) # 注意header顺序,顺序不对,请求不到数据
response = session.get(url)
title,datas = get_path(response)
if len(datas)>1:
for index,path,key in datas:
mp4_url = get_mp4_url(session,encrpyt_string(path,key))
titles = title
titles = titles + f'_第{index}集' + '.mp4'
logger.info(f'开始下载{titles},链接为:{mp4_url}')
if mp4_url is not None:
download(mp4_url,thread_count=24,save_name=title)
else:
logger.warning('未获取到MP4URL')
else:
mp4_url = get_mp4_url(session,encrpyt_string(datas[0][1],datas[0][2]))
title = title + '.mp4'
logger.info(f'开始下载 -> {title},链接为:{mp4_url}')
if mp4_url is not None:
download(mp4_url,thread_count=24,save_name=title)
else:
logger.warning('未获取到MP4URL')
if __name__ == '__main__':
with open(Path('.').absolute().joinpath('ddrk_address.txt'),'r',encoding='utf8') as f:
urls = f.readlines()
for url in urls:
# url = "https://ddrk.me/school-tales-the-series/"
main(url)
[Python] 纯文本查看复制代码
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
from itertools import tee
from alive_progress import alive_bar
def pairwise(iterable):
'''转换'''
# pairwise('ABCDEFG') --> AB BC CD DE EF FG
a, b = tee(iterable)
next(b, None)
return zip(a, b)
def calc_divisional_range(res,threads_count):
'''分区下载'''
capacity = int(res.headers['Content-Length'])
remainder = int(capacity%threads_count)
singular = int((capacity - remainder)/threads_count)
url_range = list(pairwise((range(0,capacity - remainder + singular,singular))))
url_range[-1] = (url_range[-1][0],capacity -1)
return url_range
def range_download(url,save_name, s_pos, e_pos,proxies=None):
'''下载函数'''
headers = {"Range": f"bytes={s_pos}-{e_pos}"}
res = requests.get(url, headers=headers, stream=True,proxies=proxies)
with open(save_name, "rb+") as f:
f.seek(s_pos)
for chunk in res.iter_content(chunk_size=64*1024):
if chunk:
f.write(chunk)
def download(url,thread_count,save_name=None,proxies=None):
'''多线程调用下载'''
# url,thread_count = 'http://yue.cmvideo.cn:8080/depository_yqv/asset/zhengshi/5102/598/709/5102598709/media/5102598709_5010999563_56.mp4',16
if save_name is None:
save_name = url.split('/')[-1]
print(save_name,'下载中……')
res = requests.head(url,proxies=proxies)
divisional_ranges = calc_divisional_range(res,thread_count)
with open(save_name, "wb") as f:
pass
with ThreadPoolExecutor(max_workers=thread_count) as p,alive_bar(len(divisional_ranges)+1) as bar:
futures = []
for s_pos, e_pos in divisional_ranges:
# print(s_pos, e_pos)
futures.append(p.submit(range_download,url,save_name, s_pos, e_pos,proxies=proxies))
# 等待所有任务执行完毕
for f in as_completed(futures):
if f.done():
bar()
print(save_name,'下载完成!')
if __name__ == '__main__':
url = input('输入下载链接:')
thread_count = 16
download(url,thread_count)