[Python] 纯文本查看 复制代码
import json
import re
import os
import requests
import urllib.request
from multiprocessing import Pool
import time
requestUrl = 'https://video.kuaishou.com/graphql'
folder_path = 'F:\kuaishou'
cookie = 'clientid=3; did=web_9161d9069f991dc552b7a3555f748271; client_key=65890b29; kpf=PC_WEB; kpn=KUAISHOU_VISION; didv=1617473948950; userId=711665271; kuaishou.server.web_st=ChZrdWFpc2hvdS5zZXJ2ZXIud2ViLnN0EqABaSsDgEVavwtvzo1Is5J8IPL6jUZEHg644HE4WE7Lt2-G8-FeLlyKOyejA9uNkO_88CiqtTZa98wUuyBkTEI_Oktvewn0B6uXBcnPzgIs5fT7Wr7lHckJsbkAoR5XDkkqkhWhOJgAnEXeu21VRCLl9YvkfZyt2qdMPDek9d6yck06B2eiLEfApi9mSKuxN-M5m2QG2NZ80VAAPAEhZaySiBoS5dGNQ2tN9j6L3QVO7fJXKiWdIiDoDc20YG0w7FWACxq236Fn9FjhUzCLSUrVsXpPsiBiuygFMAE; kuaishou.server.web_ph=cb462c47e56fd6baa52e4a24f0692e7a3ca8'
pcursor = '1'
searchSessionId = 'MTRfNzExNjY1MjcxXzE2MTgwNjc2NDg3NTJf5b6u6IOWXzc0MTg'
def post(keyword,Cookie,pcursor):
data = {"operationName":"visionSearchPhoto","variables":{"keyword":keyword,"pcursor":pcursor,"page":"search","searchSessionId":searchSessionId},"query":"query visionSearchPhoto($keyword: String, $pcursor: String, $searchSessionId: String, $page: String, $webPageArea: String) {\n visionSearchPhoto(keyword: $keyword, pcursor: $pcursor, searchSessionId: $searchSessionId, page: $page, webPageArea: $webPageArea) {\n result\n llsid\n webPageArea\n feeds {\n type\n author {\n id\n name\n following\n headerUrl\n headerUrls {\n cdn\n url\n __typename\n }\n __typename\n }\n tags {\n type\n name\n __typename\n }\n photo {\n id\n duration\n caption\n likeCount\n realLikeCount\n coverUrl\n photoUrl\n liked\n timestamp\n expTag\n coverUrls {\n cdn\n url\n __typename\n }\n photoUrls {\n cdn\n url\n __typename\n }\n animatedCoverUrl\n stereoType\n videoRatio\n __typename\n }\n canAddComment\n currentPcursor\n llsid\n status\n __typename\n }\n searchSessionId\n pcursor\n aladdinBanner {\n imgUrl\n link\n __typename\n }\n __typename\n }\n}\n"}
failed = {'msg': 'failed...'}
headers = {
'Host':'video.kuaishou.com',
'Connection':'keep-alive',
'Content-Length':'1261',
'accept':'*/*',
'User-Agent':'Mozilla/5.0(WindowsNT10.0;Win64;x64)AppleWebKit/537.36(KHTML,likeGecko)Chrome/89.0.4389.114Safari/537.36Edg/89.0.774.68',
'content-type':'application/json',
'Origin':'https://video.kuaishou.com',
'Sec-Fetch-Site':'same-origin',
'Sec-Fetch-Mode':'cors',
'Sec-Fetch-Dest':'empty',
'Referer':'https://video.kuaishou.com/search/video?searchKey=' + urllib.parse.quote(keyword),
'Accept-Language':'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Cookie':Cookie,
}
r = requests.post(requestUrl, data=json.dumps(data), headers=headers)
r.encoding = 'UTF-8'
html = r.text
return html
def down(feeds,keywork):
for feed in feeds:
filename = feed['photo']['caption'] + '.mp4'
filepath = folder_path + '/' + keywork + '/'
if not os.path.exists(filepath + filename):
progressbar(feed['photo']['photoUrl'],filepath,filename)
print(filename + ",下载完成")
else:
pass
#print(filename + ",已存在,跳过")
def url_response(url,filepath,filename):
r = requests.get(url, stream=True)
with open(filepath, 'wb') as f:
widgets = ['Progress: ', progressbar.Percentage(), ' ',
progressbar.Bar(marker='#', left='[', right=']'),
' ', progressbar.ETA(), ' ', progressbar.FileTransferSpeed()]
pbar = progressbar.ProgressBar(widgets=widgets, maxval=total_length).start()
for chunk in response.iter_content(chunk_size=1):
if chunk:
f.write(chunk)
f.flush()
pbar.update(len(chunk) + 1)
pbar.finish()
def progressbar(url,filepath,filename):
if not os.path.exists(filepath):
os.mkdir(filepath)
start = time.time()
response = requests.get(url, stream=True)
size = 0
chunk_size = 1024
content_size = int(response.headers['content-length'])
if response.status_code == 200:
print('Start download,[File size]:{size:.2f} MB'.format(size = content_size / chunk_size / 1024))
filename = filename.replace("\n", "")
filepath = filepath + filename
try:
with open(filepath,'wb') as file:
for data in response.iter_content(chunk_size = chunk_size):
file.write(data)
size +=len(data)
print('\r' + '[下载进度]:%s%.2f%%' % ('>' * int(size * 50 / content_size), float(size / content_size * 100)) ,end=' ')
end = time.time()
print('Download completed!,times: %.2f秒' % (end - start))
except :
pass
if __name__ == "__main__":
keyWork = '微胖'
links = []
index = 1
pcursor=index
for x in 1,10:
result = post(keyWork,cookie,pcursor)
data = json.loads(result)
feeds = data['data']['visionSearchPhoto']['feeds']
flen = len(feeds)
if flen == 0:
break
links.append(feeds)
for link in links:
down(link,keyWork)