[Python] 纯文本查看 复制代码
import json
import re
import os
import requests
import urllib.request
from multiprocessing import Pool
import time
requestUrl = 'https://video.kuaishou.com/graphql'
folder_path = 'F:\kuaishou'
cookie = ''
pcursor = ''
def post(userId,Cookie,pcursor):
data = {"operationName":"visionProfilePhotoList","variables":{"userId":userId,"pcursor":pcursor,"page":"profile"},"query":"query visionProfilePhotoList($pcursor: String, $userId: String, $page: String, $webPageArea: String) {\n visionProfilePhotoList(pcursor: $pcursor, userId: $userId, page: $page, webPageArea: $webPageArea) {\n result\n llsid\n webPageArea\n feeds {\n type\n author {\n id\n name\n following\n headerUrl\n headerUrls {\n cdn\n url\n __typename\n }\n __typename\n }\n tags {\n type\n name\n __typename\n }\n photo {\n id\n duration\n caption\n likeCount\n realLikeCount\n coverUrl\n coverUrls {\n cdn\n url\n __typename\n }\n photoUrls {\n cdn\n url\n __typename\n }\n photoUrl\n liked\n timestamp\n expTag\n animatedCoverUrl\n stereoType\n videoRatio\n __typename\n }\n canAddComment\n currentPcursor\n llsid\n status\n __typename\n }\n hostName\n pcursor\n __typename\n }\n}\n"}
failed = {'msg': 'failed...'}
headers = {
'Host':'video.kuaishou.com',
'Connection':'keep-alive',
'Content-Length':'1261',
'accept':'*/*',
'User-Agent':'Mozilla/5.0(WindowsNT10.0;Win64;x64)AppleWebKit/537.36(KHTML,likeGecko)Chrome/89.0.4389.114Safari/537.36Edg/89.0.774.68',
'content-type':'application/json',
'Origin':'https://video.kuaishou.com',
'Sec-Fetch-Site':'same-origin',
'Sec-Fetch-Mode':'cors',
'Sec-Fetch-Dest':'empty',
'Referer':'https://video.kuaishou.com/profile/' + userId,
'Accept-Language':'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Cookie':Cookie,
}
r = requests.post(requestUrl, data=json.dumps(data), headers=headers)
r.encoding = 'UTF-8'
html = r.text
return html
def down(feeds):
for feed in feeds:
author = feed['author']['name']
filename = feed['photo']['caption'] + '.mp4'
filepath = folder_path + '/' + author + '/'
if not os.path.exists(filepath + filename):
#urllib.request.urlretrieve(feed['photo']['photoUrl'],
#filename=filepath)
progressbar(feed['photo']['photoUrl'],filepath,filename)
print(filename + ",下载完成")
else:
print(filename + ",已存在,跳过")
def url_response(url,filepath,filename):
r = requests.get(url, stream=True)
with open(filepath, 'wb') as f:
widgets = ['Progress: ', progressbar.Percentage(), ' ',
progressbar.Bar(marker='#', left='[', right=']'),
' ', progressbar.ETA(), ' ', progressbar.FileTransferSpeed()]
pbar = progressbar.ProgressBar(widgets=widgets, maxval=total_length).start()
for chunk in response.iter_content(chunk_size=1):
if chunk:
f.write(chunk)
f.flush()
pbar.update(len(chunk) + 1)
pbar.finish()
def progressbar(url,filepath,filename):
if not os.path.exists(filepath):
os.mkdir(filepath)
start = time.time()
response = requests.get(url, stream=True)
size = 0
chunk_size = 1024
content_size = int(response.headers['content-length'])
if response.status_code == 200:
print('Start download,[File size]:{size:.2f} MB'.format(size = content_size / chunk_size / 1024))
filename = filename.replace("\n", "")
filepath = filepath + filename
try:
with open(filepath,'wb') as file:
for data in response.iter_content(chunk_size = chunk_size):
file.write(data)
size +=len(data)
print('\r' + '[下载进度]:%s%.2f%%' % ('>' * int(size * 50 / content_size), float(size / content_size * 100)) ,end=' ')
end = time.time()
print('Download completed!,times: %.2f秒' % (end - start))
except :
pass
if __name__ == "__main__":
userIdList = ['']
for userId in userIdList:
links = []
while True:
result = post(userId,cookie,pcursor)
data = json.loads(result)
pcursor = data['data']['visionProfilePhotoList']['pcursor']
feeds = data['data']['visionProfilePhotoList']['feeds']
flen = len(feeds)
if flen == 0:
break
links.append(feeds)
for link in links:
down(link)