import
asyncio
import
time
from
Crypto.Cipher
import
AES
import
requests
import
re
import
os
from
urllib.parse
import
urljoin
import
aiohttp
import
aiofiles
import
shutil
headers
=
{
'User-Agent'
:
'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.5735.289 Safari/537.36'
}
def
get_m3u8_1(url):
resp
=
requests.get(url, headers
=
headers)
if
resp.status_code
=
=
200
:
pattern
=
re.
compile
(r
'link_pre":"(.*?)","url":"(.*?)"'
)
pattern_name
=
re.
compile
(r
'<title>《(.*?)》'
)
result
=
re.search(pattern, resp.text)
if
result:
m3u8_url
=
result.group(
2
)
m3u8_url
=
m3u8_url.replace(
'\\', '
')
else
:
raise
ValueError(
"No m3u8 URL found in the provided URL"
)
result_name
=
re.search(pattern_name, resp.text)
if
result_name:
name
=
result_name.group(
1
)
name
=
name.replace(
' '
, '')
else
:
raise
ValueError(
"No video name found in the provided URL"
)
os.makedirs(f
'{name}/encryption'
, exist_ok
=
True
)
os.makedirs(f
'{name}/decryption'
, exist_ok
=
True
)
with
open
(name
+
'/'
+
'1.m3u8'
,
'w'
) as f:
f.write(requests.get(m3u8_url, headers
=
headers).text)
return
name, m3u8_url
def
get_m3u8_2(name, m3u8_url):
path
=
f
'{name}/1.m3u8'
if
os.path.exists(path):
with
open
(path,
'r'
) as f:
m3u8_lst
=
f.readlines()
if
(
len
(m3u8_lst) <
10
):
for
line
in
m3u8_lst:
if
line.startswith(
'#'
):
continue
else
:
m3u8_url_2
=
urljoin(m3u8_url, line.strip().replace(
'\n'
, ''))
if
not
os.path.exists(f
'{name}/2.m3u8'
):
with
open
(f
'{name}/2.m3u8'
,
'w'
) as f:
f.write(requests.get(m3u8_url_2, headers
=
headers).text)
return
m3u8_url_2
else
:
with
open
(f
'{name}/2.m3u8'
,
'w'
) as f:
f.writelines(m3u8_lst)
return
m3u8_url
def
get_movie_lsts(name, m3u8_url_2):
with
open
(f
'{name}/movie_lsts.txt'
,
'w'
) as f1:
with
open
(f
'{name}/2.m3u8'
,
'r'
) as f:
for
line
in
f.readlines():
if
line.startswith(
'#'
):
if
'key'
in
line:
key
=
re.search(r
'URI="(.*?)"'
, line).group(
1
)
if
'IV'
in
line:
iv
=
line.split(
'='
)[
-
1
].replace(
'"'
, '').strip()
continue
else
:
f1.write(urljoin(m3u8_url_2, line.strip())
+
'\n'
)
with
open
(f
'{name}/key.key'
,
'wb'
) as f:
f.write(requests.get(urljoin(m3u8_url_2, key), headers
=
headers).content)
async
def
down_load(movie_url, sema, name):
file_path
=
f
'{name}/encryption/{movie_url.split("/")[-1]}'
async with sema:
for
i
in
range
(
10
):
try
:
print
(f
'{file_path}第{i + 1}次下载开始下载'
)
async with aiohttp.ClientSession() as session:
async with session.get(movie_url, headers
=
headers) as response:
content
=
await response.content.read()
async with aiofiles.
open
(file_path,
'wb'
) as f:
await f.write(content)
print
(f
'{file_path}下载成功'
)
break
except
Exception as e:
print
(f
'{file_path}第{i + 1}次下载失败正在重试下载,原因:{e}'
)
continue
async
def
main(name):
movie_lsts
=
get_movie_names(name)
tasks
=
[]
sema
=
asyncio.Semaphore(
100
)
for
movie_lst
in
movie_lsts:
tasks.append(down_load(movie_lst.strip().replace(
'\n'
, ''), sema, name))
await asyncio.gather(
*
tasks)
def
get_movie_names(name):
with
open
(f
'{name}/movie_lsts.txt'
,
'r'
) as f:
movie_lsts
=
f.readlines()
return
movie_lsts
async
def
decrypt_file(input_filename, output_filename, key, iv
=
None
):
cipher
=
AES.new(key, AES.MODE_CBC, iv)
try
:
async with aiofiles.
open
(input_filename,
'rb'
) as infile:
encrypted_data
=
await infile.read()
async with aiofiles.
open
(output_filename,
'wb'
) as outfile:
await outfile.write(cipher.decrypt(encrypted_data))
except
Exception as e:
print
(f
'{input_filename}解密失败,原因:应该是未加密的广告{e}'
)
async
def
file_lsts(name):
movie_lsts
=
get_movie_names(name)
with
open
(f
'{name}/key.key'
,
'rb'
) as f:
key
=
f.read()
tasks
=
[]
for
movie_name
in
movie_lsts:
movie_name
=
movie_name.split(
'/'
)[
-
1
].replace(
'\n'
, '')
input_filename
=
f
'{name}/encryption/{movie_name}'
output_filename
=
f
'{name}/decryption/{movie_name}'
tasks.append(asyncio.create_task(decrypt_file(input_filename, output_filename, key)))
await asyncio.gather(
*
tasks)
def
merge_movie(name):
temp
=
[]
n
=
1
now_path
=
os.getcwd()
lst_movies
=
get_movie_names(name)
path
=
f
'{name}/decryption'
os.chdir(path)
for
i
in
range
(
len
(lst_movies)):
file_name
=
lst_movies[i].replace(
'\n'
, '
').split('
/
')[
-
1
]
temp.append(file_name)
if
len
(temp)
=
=
20
:
cmd
=
f
'copy /b {"+".join(temp)} {n}.ts'
r
=
os.popen(cmd)
print
(r.read())
n
+
=
1
temp
=
[]
cmd
=
f
'copy /b {"+".join(temp)} {n}.ts'
r
=
os.popen(cmd)
print
(r.read())
last_temp
=
[]
for
i
in
range
(
1
, n
+
1
):
last_temp.append(f
'{i}.ts'
)
cmd
=
f
'copy /b {"+".join(last_temp)} {name}.mp4'
r
=
os.popen(cmd)
print
(r.read())
os.chdir(now_path)
print
(
'合并完成'
)
def
last_work(name):
det_file_path
=
f
'{name}/'
src_file_path
=
f
'{name}/decryption/{name}.mp4'
if
os.path.exists(det_file_path)
and
os.path.exists(src_file_path):
shutil.move(src_file_path, det_file_path)
del_dir(f
'{name}/decryption'
)
del_dir(f
'{name}/encryption'
)
try
:
os.remove(f
'{name}/key.key'
)
os.remove(f
'{name}/1.m3u8'
)
os.remove(f
'{name}/2.m3u8'
)
os.remove(f
'{name}/movie_lsts.txt'
)
except
Exception as e:
print
(e)
def
del_dir(directory_path):
try
:
shutil.rmtree(directory_path)
print
(f
"目录 {directory_path} 及其内容已删除"
)
except
FileNotFoundError:
print
(f
"目录 {directory_path} 不存在"
)
except
Exception as e:
print
(f
"删除目录时出错: {e}"
)
def
all_func(url):
try
:
name, m3u8_url
=
get_m3u8_1(url)
m3u8_url_2
=
get_m3u8_2(name, m3u8_url)
get_movie_lsts(name, m3u8_url_2)
asyncio.run(main(name))
asyncio.run(file_lsts(name))
merge_movie(name)
last_work(name)
print
(
'全部完成'
)
except
Exception as e:
print
(f
"运行过程中发生错误: {e}"
)
if
__name__
=
=
'__main__'
:
while
True
:
print
(
'http://www.ahljtj.com/list/1.html'
)
print
(
'只作为学习研究,请勿用于非法用途'
)
url
=
input
(
'请输入电影链接,理论上也支持电视剧(下完改名字不然会覆盖):'
)
all_func(url)