import requests
import re
from loguru import logger
from lxml import etree
from python_idm import download
from pathlib import Path
from H_m3u8DL import m3u8download
logger.add('omofun_log.log')
@logger.catch
def requests_functions(url):
res = requests.get(url)
return res.text
@logger.catch
def regular_functions(string,text):
return re.findall(string,text)[0]
def title_filter(title: str):
"""
转为Windows合法文件名
"""
lst = ['\r','\n', '\\', '/', ':', '*', '?', '"', '<', '>', '|']
for key in lst:
title = title.replace(key,'-')
if len(title) > 60:
title = title[:60]
return title.strip()
def get_mp4_url(url):
text = requests_functions(url)
et = etree.HTML(text)
title = et.xpath('//title/text()')[0]
string = r'(?i)player_aaaa=(.*?)</script><script'
player_aaaa_text = regular_functions(string,text)
player_aaaa_json = eval(player_aaaa_text)
ids = player_aaaa_json.get('id')
_url = player_aaaa_json.get('url')
if 'omofun' in _url:
php_url = f'https://omofun.tv/addons/dp/player/index.php?key=0&id={ids}&from=omo&url={_url}'
else:
php_url = f'https://omofun.tv/addons/dp/player/index.php?key=0&id={ids}&from=mp4&url={_url}'
text = requests_functions(php_url)
string = r'(?i)window.location.href=(.*?);</script>'
target_result = regular_functions(string,text)
target_url = "https://omofun.tv" + eval(target_result)
text = requests_functions(target_url)
string = r'(?ms)var config.{3}\{(.*?)\}'
target_result = regular_functions(string,text)
_mp4_json = '{' + target_result.replace('\n','').replace(' ','') + '}}}'
mp4_url = eval(_mp4_json).get('url')
return mp4_url,title
def downloads(url):
mp4_url,title = get_mp4_url(url)
logger.info(f'{title},下载地址:{mp4_url}')
title = title_filter(title) + '.mp4'
if 'mp4' in mp4_url.split('.')[-1]:
title = Path('.').joinpath(title)
download(url=mp4_url,thread_count=24,save_name=title)
elif 'm3u8' in mp4_url.split('.')[-1]:
m3u8download(mp4_url,title=title,work_dir='./video_download')
if __name__ == "__main__":
with open(Path('.').joinpath('omofun_address.txt'),'r',encoding='utf8') as f:
urls = f.readlines()
for url in urls:
downloads(url)
[Python] 纯文本查看复制代码
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
from itertools import tee
from alive_progress import alive_bar
def pairwise(iterable):
'''转换'''
# pairwise('ABCDEFG') --> AB BC CD DE EF FG
a, b = tee(iterable)
next(b, None)
return zip(a, b)
def calc_divisional_range(res,threads_count):
'''分区下载'''
capacity = int(res.headers['Content-Length'])
remainder = int(capacity%threads_count)
singular = int((capacity - remainder)/threads_count)
url_range = list(pairwise((range(0,capacity - remainder,singular))))
url_range[-1] = (url_range[-1][0],capacity-1)
return url_range
def range_download(url,save_name, s_pos, e_pos,proxies=None):
'''下载函数'''
headers = {"Range": f"bytes={s_pos}-{e_pos}"}
res = requests.get(url, headers=headers, stream=True,proxies=proxies)
with open(save_name, "rb+") as f:
f.seek(s_pos)
for chunk in res.iter_content(chunk_size=64*1024):
if chunk:
f.write(chunk)
def download(url,thread_count,save_name=None,proxies=None):
'''多线程调用下载'''
# url,thread_count = 'http://yue.cmvideo.cn:8080/depository_yqv/asset/zhengshi/5102/598/709/5102598709/media/5102598709_5010999563_56.mp4',16
if save_name is None:
save_name = url.split('/')[-1]
print(save_name,'下载中……')
res = requests.head(url,proxies=proxies)
divisional_ranges = calc_divisional_range(res,thread_count)
with open(save_name, "wb") as f:
pass
with ThreadPoolExecutor(max_workers=thread_count) as p,alive_bar(len(divisional_ranges)+1) as bar:
futures = []
for s_pos, e_pos in divisional_ranges:
# print(s_pos, e_pos)
futures.append(p.submit(range_download,url,save_name, s_pos, e_pos,proxies=proxies))
# 等待所有任务执行完毕
for f in as_completed(futures):
if f.done():
bar()
print(save_name,'下载完成!')
if __name__ == '__main__':
url = input('输入下载链接:')
thread_count = 16
download(url,thread_count)