某新网盘mp4文件上传协议
本帖最后由 漁滒 于 2020-11-27 12:52 编辑其中的drive_id和authorization可以通过app或者网页抓包得到,file是当前目录下的mp4文件名
import requests
import json
import os
import hashlib
def main():
file = ''
drive_id = ''
authorization = ''
filesize = os.path.getsize(file)
# 创建文件上传列表
url = 'https://api.aliyundrive.com/v2/file/create'
headers = {
'authorization': authorization,
'accept': 'application/json',
'content-type': 'application/json;charset=utf-8',
'accept-encoding': 'gzip',
'user-agent': 'okhttp/4.2.2'
}
user_meta = {
'hash': getsha1(file),
'size': filesize
}
part_info_list = []
datasize = filesize
if filesize > 5 * 1024 * 1024:
part_number = 1
while datasize > 5 * 1024 * 1024:
part_info_list.append({
"part_number": part_number,
"part_size": 5 * 1024 * 1024
})
part_number += 1
datasize -= 5 * 1024 * 1024
part_info_list.append({
"part_number": part_number,
"part_size": datasize
})
data = {
"check_name_mode": "auto_rename",
"content_type": "video/mp4",
"drive_id": drive_id,
"name": file,
"parent_file_id": "root",
'part_info_list': part_info_list,
"pre_hash": getchunksha1(file),
"size": filesize,
"type": "file",
"user_meta": json.dumps(user_meta, separators=(',', ':'))
}
data = json.dumps(data, separators=(',', ':'))
else:
part_info_list.append({
"part_number": 1,
"part_size": datasize
})
data = {
"check_name_mode": "auto_rename",
"content_type": "video/mp4",
"drive_id": drive_id,
"name": file,
"parent_file_id": "root",
'part_info_list': part_info_list,
"pre_hash": getchunksha1(file),
"size": filesize,
"type": "file",
"user_meta": json.dumps(user_meta, separators=(',', ':'))
}
data = json.dumps(data, separators=(',', ':'))
# 获取文件上传地址
response = requests.post(url, headers=headers, data=data)
if response.status_code == 409:
# 文件重复,进行秒传操作
print('文件已存在')
data = {
"check_name_mode": "auto_rename",
'content_hash': getcontenthash(file),
'content_hash_name': 'sha1',
"content_type": "video/mp4",
"drive_id": drive_id,
"name": file,
"parent_file_id": "root",
'part_info_list': part_info_list,
"size": filesize,
"type": "file",
"user_meta": json.dumps(user_meta, separators=(',', ':'))
}
data = json.dumps(data, separators=(',', ':'))
response = requests.post(url, headers=headers, data=data).json()
file_id = response['file_id']
else:
print('创建预上传文件成功')
# 创建预上传文件成功
response = response.json()
# 分段上传文件
upload_id = response['upload_id']
file_id = response['file_id']
with open(file, 'rb') as f:
for each in response['part_info_list']:
upload_url = each['upload_url']
response = requests.put(upload_url, data=f.read(each['part_size']))
print(str(each['part_number'])+'/'+str(len(part_info_list)))
if response.status_code == 200:
print('分段上传成功')
# 发送上传完成
url = 'https://api.aliyundrive.com/v2/file/complete'
data = {
"content_type": "video/mp4",
"drive_id": drive_id,
"file_id": file_id,
"name": file,
"parent_file_id": "root",
"type": "file",
"upload_id": upload_id
}
data = json.dumps(data, separators=(',', ':'))
response = requests.post(url, headers=headers, data=data)
if response.status_code == 200:
print('文件上传成功')
# 获取视频链接
url = 'https://api.aliyundrive.com/v2/file/get'
data = {
"drive_id": drive_id,
"file_id": file_id,
"url_expire_sec": 1800# 链接有效时间
}
data = json.dumps(data, separators=(',', ':'))
response = requests.post(url, headers=headers, data=data).json()
# 在线直链
videourl = response['url']
print(videourl)
def getsha1(file):
lenght = os.path.getsize(file)
sha1 = hashlib.sha1()
if lenght <= 20480:
with open(file, 'rb') as f:
sha1.update(f.read())
else:
with open(file, 'rb') as f:
sha1.update(f.read(2048))
f.read((lenght // 2) - 2048)
sha1.update(f.read(1024))
f.read((lenght - 1024) - ((lenght // 2) + 1024))
sha1.update(f.read(1024))
sha1.update(str(lenght).encode())
return sha1.hexdigest()
def getchunksha1(file):
sha1 = hashlib.sha1()
with open(file, 'rb') as f:
sha1.update(f.read(1024))
return sha1.hexdigest()
def getcontenthash(file):
sha1 = hashlib.sha1()
with open(file, 'rb') as f:
while True:
chunk = f.read(1024 * 1024)
if chunk:
sha1.update(chunk)
else:
break
return sha1.hexdigest()
if __name__ == '__main__':
main()
团队版上传协议
其中的authorization可以通过app抓包得到,file是当前目录下的mp4文件名
import requests
import os
import math
from urllib import parse
def main():
filename = ''
filesize = os.path.getsize(filename)
authorization = ''
# 获取个人ID
url = 'https://www.teambition.com/api/organizations/personal'
headers = {
'user-agent': 'Dart/2.8 (dart:io)',
'authorization': authorization
}
response = requests.get(url, headers=headers).json()
creatorId = response['_creatorId']
orgId = response['_id']
print('creatorId:', creatorId)
print('orgId:', orgId)
# 获取文件夹ID
data = {
'orgId': orgId,
'memberId': creatorId
}
url = 'https://pan.teambition.com/pan/api/spaces?'+parse.urlencode(data)
response = requests.get(url, headers=headers).json()# 选择0是因为默认主目录,需要上传到其他文件夹自行设置
spaceId = response['spaceId']
rootId = response['rootId']
print('spaceId:', spaceId)
print('rootId:', rootId)
# 获取设备ID
data = {
'orgId': orgId
}
url = 'https://pan.teambition.com/pan/api/orgs/'+orgId+'?'+parse.urlencode(data)
response = requests.get(url, headers=headers).json()
driveId = response['data']['driveId']
print('driveId:', driveId)
# 获取上传密钥
data = {
'orgId': orgId,
'driveId': driveId,
'spaceId': spaceId
}
url = 'https://pan.teambition.com/pan/api/nodes/'+rootId+'?'+parse.urlencode(data)
response = requests.get(url, headers=headers).json()
ccpParentId = response['ccpFileId']
print('ccpParentId:', ccpParentId)
# 创建预上传文件
data = {
'checkNameMode': 'autoRename',
'infos': [{
'ccpParentId': ccpParentId,
'chunkCount': math.ceil(filesize / 10485760),# 每10MB为一个分块
'contentType': 'video/mp4',
'driveId': driveId,
'name': filename,
'size': filesize,
'type': 'file'
}],
'orgId': orgId,
'parentId': rootId,
'spaceId': spaceId
}
url = 'https://pan.teambition.com/pan/api/nodes/file'
response = requests.post(url, headers=headers, json=data)
if response.status_code == 201:
print('创建预上传文件成功')
response = response.json()
uploadId = response['uploadId']
nodeId = response['nodeId']
ccpFileId = response['ccpFileId']
chunknumber = str(len(response['uploadUrl']))
print('uploadId:', uploadId)
print('nodeId:', nodeId)
print('ccpFileId:', ccpFileId)
i = 1
with open(filename, 'rb') as f:
for uploadUrl in response['uploadUrl']:
response = requests.put(uploadUrl, headers=headers, data=f.read(10485760))
print('上传进度 【 '+str(i) + '/' + chunknumber + ' 】')
i += 1
# 发送上传完成
data = {
'ccpFileId': ccpFileId,
'driveId': driveId,
'nodeId': nodeId,
'orgId': orgId,
'uploadId': uploadId
}
url = 'https://pan.teambition.com/pan/api/nodes/complete'
response = requests.post(url, headers=headers, json=data)
if response.status_code == 201:
print('上传成功')
data = {
'orgId': orgId,
'driveId': driveId,
'spaceId': spaceId
}
url = 'https://pan.teambition.com/pan/api/nodes/'+nodeId+'?'+parse.urlencode(data)
response = requests.get(url, headers=headers).json()
url = response['url']
print(url)
else:
print('上传失败')
else:
print('创建预上传文件失败')
if __name__ == '__main__':
main()
小樱 发表于 2020-11-23 00:26
能透露下有什么作用吗?做成客户端来使用还是?
目前主要作用是批量上传和批量下载 有办法php自动获取authorization吗 两小时失效一次很蛋疼
https://www.52zhibo.me/aliyun/ 感谢分享{:1_893:} 感谢分享,就差邀请码了 感谢分享 HLT 发表于 2020-11-22 23:43
感谢分享,就差邀请码了
论坛里面有不少了为啥还缺呀 感谢分享 现在都没收到内测码,最后一批的 这个有什么应用 那个网盘好吗,? 能透露下有什么作用吗?做成客户端来使用还是?