import
os
import
aiofiles
import
aiohttp
import
requests
import
re
from
urllib.parse
import
urljoin
import
hashlib
import
binascii
from
Crypto.Cipher
import
AES
from
Crypto.Util.Padding
import
unpad
import
base64
import
time
headers
=
{
}
def
aes_ecb_decrypt(key, ciphertext_base64):
if
len
(key)
not
in
[
16
,
24
,
32
]:
raise
ValueError(
"Key length must be 16, 24, or 32 bytes"
)
ciphertext
=
base64.b64decode(ciphertext_base64)
cipher
=
AES.new(key, AES.MODE_ECB)
decrypted_padded
=
cipher.decrypt(ciphertext)
decrypted_text
=
unpad(decrypted_padded, AES.block_size)
return
decrypted_text
def
aes_cbc_decrypt(key, ciphertext, iv):
if
len
(key)
not
in
[
16
,
24
,
32
]:
raise
ValueError(
"Key length must be 16, 24, or 32 bytes"
)
cipher
=
AES.new(key, AES.MODE_CBC, iv)
decrypted_padded
=
cipher.decrypt(ciphertext)
decrypted_text
=
unpad(decrypted_padded, AES.block_size)
return
decrypted_text
def
md5_encrypt(input_string):
md5_hash
=
hashlib.md5()
md5_hash.update(input_string.encode(
'utf-8'
))
encrypted_string
=
md5_hash.hexdigest()
return
encrypted_string[:
16
]
def
get_signs(m3u8_url):
resp
=
requests.get(m3u8_url, headers
=
headers)
with
open
(
'test.m3u8'
,
'w'
, encoding
=
'utf-8'
) as f:
f.write(resp.text)
ts_lst
=
[]
with
open
(
'test.m3u8'
,
'r'
, encoding
=
'utf-8'
) as f:
for
line
in
f.readlines():
if
line.startswith(
'#'
):
if
'key'
in
line:
key_url
=
re.findall(r
'URI="(.*?)"'
, line)[
0
]
key
=
re.findall(r
'_keys/(.*?)"'
, line)[
0
]
iv
=
re.findall(r
'IV=(.*?)\n'
, line)[
0
]
iv
=
binascii.unhexlify(iv.replace(
'0x'
, "")).
hex
()[:
16
].encode(
'utf-8'
)
continue
else
:
ts_lst.append(urljoin(m3u8_url, line))
sign_url
=
key_url
+
'/signs'
resp_signs
=
requests.get(sign_url, headers
=
headers)
nonce
=
resp_signs.json()[
'nonce'
]
md5_result
=
md5_encrypt(nonce
+
key)
new_signs_url
=
key_url
+
'?nonce='
+
nonce
+
'&sign='
+
md5_result
resp_new_signs
=
requests.get(new_signs_url, headers
=
headers)
id_key
=
resp_new_signs.json()
data_base64
=
id_key[
'key'
]
ts_key
=
aes_ecb_decrypt(md5_result.encode(
'utf-8'
), data_base64)
return
ts_lst, ts_key, iv
def
merge_ts(path):
current_dir
=
os.getcwd()
new_dir
=
os.chdir(path)
all_entries
=
os.listdir(new_dir)
cmd
=
'copy /b '
+
'+'
.join(all_entries)
+
' '
+
'result.mp4'
r
=
os.system(cmd)
if
r
=
=
0
:
print
(
'合并成功'
)
else
:
print
(
'合并失败'
)
r
=
os.system(
'del *.ts'
)
os.chdir(current_dir)
r
=
os.system(
'del *.m3u8'
)
async
def
download_ts(ts, ts_key, iv, path):
file_name
=
path
+
'/'
+
ts.split(
'-'
)[
-
1
]
for
i
in
range
(
10
):
try
:
print
(
'开始下载'
, file_name)
async with aiohttp.ClientSession() as session:
async with session.get(ts) as resp:
ts_data
=
await resp.content.read()
ts_data
=
aes_cbc_decrypt(ts_key, ts_data, iv)
async with aiofiles.
open
(file_name,
'wb'
) as f:
await f.write(ts_data)
print
(
'下载完成'
, file_name)
break
except
Exception as e:
print
(file_name,
'开始重新下载。。'
, e)
continue
async
def
run_main():
while
True
:
m3u8_lst
=
get_m3u8_url()
for
m3u8_url
in
m3u8_lst:
path
=
str
(
int
(time.time()))
+
'ts'
if
not
os.path.exists(path):
os.makedirs(path)
ts_lst, ts_key, iv
=
get_signs(m3u8_url)
tasks
=
[asyncio.create_task(download_ts(ts, ts_key, iv, path))
for
ts
in
ts_lst]
await asyncio.gather(
*
tasks)
merge_ts(path)
def
get_m3u8_url():
lst_m3u8
=
[]
video_url
=
input
(
'请输入视频地址或者m3u8地址(输入多个m3u8地址以,分开):'
)
if
'm3u8'
in
video_url:
lst_m3u8
=
video_url.split(
','
)
return
lst_m3u8
id
=
video_url.split(
'activityId='
)[
1
].split(
'&'
)[
0
]
json_url
=
f
'https://s-file-2.ykt.cbern.com.cn/zxx/ndrv2/national_lesson/resources/details/{id}.json'
resp
=
requests.get(json_url)
json_data
=
resp.json()
resource
=
json_data[
'relations'
][
"national_course_resource"
]
for
i
in
range
(
len
(resource)):
if
i
%
5
=
=
0
:
m3u8_url
=
resource[i][
'ti_items'
][
0
][
'ti_storages'
][
0
]
lst_m3u8.append(m3u8_url)
return
lst_m3u8
if
__name__
=
=
'__main__'
:
print
(
'------------------仅做练习,请勿用于商业用途------------------'
)
print
(
'------------------仅做练习,请勿用于非法用途------------------'
)
print
(
'------------------仅做练习,请勿滥用造成不良后果自负------------'
)
asyncio.run(run_main())