好友
阅读权限40
听众
最后登录1970-1-1
|
漁滒
发表于 2020-11-22 23:30
本帖最后由 漁滒 于 2020-11-27 12:52 编辑
其中的drive_id和authorization可以通过app或者网页抓包得到,file是当前目录下的mp4文件名
[Python] 纯文本查看 复制代码
import requests
import json
import os
import hashlib
def main():
file = ''
drive_id = ''
authorization = ''
filesize = os.path.getsize(file)
# 创建文件上传列表
url = 'https://api.aliyundrive.com/v2/file/create'
headers = {
'authorization': authorization,
'accept': 'application/json',
'content-type': 'application/json;charset=utf-8',
'accept-encoding': 'gzip',
'user-agent': 'okhttp/4.2.2'
}
user_meta = {
'hash': getsha1(file),
'size': filesize
}
part_info_list = []
datasize = filesize
if filesize > 5 * 1024 * 1024:
part_number = 1
while datasize > 5 * 1024 * 1024:
part_info_list.append({
"part_number": part_number,
"part_size": 5 * 1024 * 1024
})
part_number += 1
datasize -= 5 * 1024 * 1024
part_info_list.append({
"part_number": part_number,
"part_size": datasize
})
data = {
"check_name_mode": "auto_rename",
"content_type": "video/mp4",
"drive_id": drive_id,
"name": file,
"parent_file_id": "root",
'part_info_list': part_info_list,
"pre_hash": getchunksha1(file),
"size": filesize,
"type": "file",
"user_meta": json.dumps(user_meta, separators=(',', ':'))
}
data = json.dumps(data, separators=(',', ':'))
else:
part_info_list.append({
"part_number": 1,
"part_size": datasize
})
data = {
"check_name_mode": "auto_rename",
"content_type": "video/mp4",
"drive_id": drive_id,
"name": file,
"parent_file_id": "root",
'part_info_list': part_info_list,
"pre_hash": getchunksha1(file),
"size": filesize,
"type": "file",
"user_meta": json.dumps(user_meta, separators=(',', ':'))
}
data = json.dumps(data, separators=(',', ':'))
# 获取文件上传地址
response = requests.post(url, headers=headers, data=data)
if response.status_code == 409:
# 文件重复,进行秒传操作
print('文件已存在')
data = {
"check_name_mode": "auto_rename",
'content_hash': getcontenthash(file),
'content_hash_name': 'sha1',
"content_type": "video/mp4",
"drive_id": drive_id,
"name": file,
"parent_file_id": "root",
'part_info_list': part_info_list,
"size": filesize,
"type": "file",
"user_meta": json.dumps(user_meta, separators=(',', ':'))
}
data = json.dumps(data, separators=(',', ':'))
response = requests.post(url, headers=headers, data=data).json()
file_id = response['file_id']
else:
print('创建预上传文件成功')
# 创建预上传文件成功
response = response.json()
# 分段上传文件
upload_id = response['upload_id']
file_id = response['file_id']
with open(file, 'rb') as f:
for each in response['part_info_list']:
upload_url = each['upload_url']
response = requests.put(upload_url, data=f.read(each['part_size']))
print(str(each['part_number'])+'/'+str(len(part_info_list)))
if response.status_code == 200:
print('分段上传成功')
# 发送上传完成
url = 'https://api.aliyundrive.com/v2/file/complete'
data = {
"content_type": "video/mp4",
"drive_id": drive_id,
"file_id": file_id,
"name": file,
"parent_file_id": "root",
"type": "file",
"upload_id": upload_id
}
data = json.dumps(data, separators=(',', ':'))
response = requests.post(url, headers=headers, data=data)
if response.status_code == 200:
print('文件上传成功')
# 获取视频链接
url = 'https://api.aliyundrive.com/v2/file/get'
data = {
"drive_id": drive_id,
"file_id": file_id,
"url_expire_sec": 1800 # 链接有效时间
}
data = json.dumps(data, separators=(',', ':'))
response = requests.post(url, headers=headers, data=data).json()
# 在线直链
videourl = response['url']
print(videourl)
def getsha1(file):
lenght = os.path.getsize(file)
sha1 = hashlib.sha1()
if lenght <= 20480:
with open(file, 'rb') as f:
sha1.update(f.read())
else:
with open(file, 'rb') as f:
sha1.update(f.read(2048))
f.read((lenght // 2) - 2048)
sha1.update(f.read(1024))
f.read((lenght - 1024) - ((lenght // 2) + 1024))
sha1.update(f.read(1024))
sha1.update(str(lenght).encode())
return sha1.hexdigest()
def getchunksha1(file):
sha1 = hashlib.sha1()
with open(file, 'rb') as f:
sha1.update(f.read(1024))
return sha1.hexdigest()
def getcontenthash(file):
sha1 = hashlib.sha1()
with open(file, 'rb') as f:
while True:
chunk = f.read(1024 * 1024)
if chunk:
sha1.update(chunk)
else:
break
return sha1.hexdigest()
if __name__ == '__main__':
main()
团队版上传协议
其中的authorization可以通过app抓包得到,file是当前目录下的mp4文件名
[Python] 纯文本查看 复制代码
import requests
import os
import math
from urllib import parse
def main():
filename = ''
filesize = os.path.getsize(filename)
authorization = ''
# 获取个人ID
url = 'https://www.teambition.com/api/organizations/personal'
headers = {
'user-agent': 'Dart/2.8 (dart:io)',
'authorization': authorization
}
response = requests.get(url, headers=headers).json()
creatorId = response['_creatorId']
orgId = response['_id']
print('creatorId:', creatorId)
print('orgId:', orgId)
# 获取文件夹ID
data = {
'orgId': orgId,
'memberId': creatorId
}
url = 'https://pan.teambition.com/pan/api/spaces?'+parse.urlencode(data)
response = requests.get(url, headers=headers).json()[0] # 选择0是因为默认主目录,需要上传到其他文件夹自行设置
spaceId = response['spaceId']
rootId = response['rootId']
print('spaceId:', spaceId)
print('rootId:', rootId)
# 获取设备ID
data = {
'orgId': orgId
}
url = 'https://pan.teambition.com/pan/api/orgs/'+orgId+'?'+parse.urlencode(data)
response = requests.get(url, headers=headers).json()
driveId = response['data']['driveId']
print('driveId:', driveId)
# 获取上传密钥
data = {
'orgId': orgId,
'driveId': driveId,
'spaceId': spaceId
}
url = 'https://pan.teambition.com/pan/api/nodes/'+rootId+'?'+parse.urlencode(data)
response = requests.get(url, headers=headers).json()
ccpParentId = response['ccpFileId']
print('ccpParentId:', ccpParentId)
# 创建预上传文件
data = {
'checkNameMode': 'autoRename',
'infos': [{
'ccpParentId': ccpParentId,
'chunkCount': math.ceil(filesize / 10485760), # 每10MB为一个分块
'contentType': 'video/mp4',
'driveId': driveId,
'name': filename,
'size': filesize,
'type': 'file'
}],
'orgId': orgId,
'parentId': rootId,
'spaceId': spaceId
}
url = 'https://pan.teambition.com/pan/api/nodes/file'
response = requests.post(url, headers=headers, json=data)
if response.status_code == 201:
print('创建预上传文件成功')
response = response.json()[0]
uploadId = response['uploadId']
nodeId = response['nodeId']
ccpFileId = response['ccpFileId']
chunknumber = str(len(response['uploadUrl']))
print('uploadId:', uploadId)
print('nodeId:', nodeId)
print('ccpFileId:', ccpFileId)
i = 1
with open(filename, 'rb') as f:
for uploadUrl in response['uploadUrl']:
response = requests.put(uploadUrl, headers=headers, data=f.read(10485760))
print('上传进度 【 '+str(i) + '/' + chunknumber + ' 】')
i += 1
# 发送上传完成
data = {
'ccpFileId': ccpFileId,
'driveId': driveId,
'nodeId': nodeId,
'orgId': orgId,
'uploadId': uploadId
}
url = 'https://pan.teambition.com/pan/api/nodes/complete'
response = requests.post(url, headers=headers, json=data)
if response.status_code == 201:
print('上传成功')
data = {
'orgId': orgId,
'driveId': driveId,
'spaceId': spaceId
}
url = 'https://pan.teambition.com/pan/api/nodes/'+nodeId+'?'+parse.urlencode(data)
response = requests.get(url, headers=headers).json()
url = response['url']
print(url)
else:
print('上传失败')
else:
print('创建预上传文件失败')
if __name__ == '__main__':
main()
|
免费评分
-
查看全部评分
|