本帖最后由 magic0 于 2023-10-11 20:38 编辑
- 安装好python环境(自行百度)
- 安装以下python库
BeautifulSoup:pip3 install BeautifulSoup
requests:pip3 install requests
主要是利用BeautifulSoup对爬取的页面进行解析,这个每个网页不同要具体分析,由于tg的界面很简洁只需要获取所有img标签的图片链接就好,接下来就是把链接放在请求的方法里,
这里简单用了一下线程池,循环一次请求一次太慢了,循环的时候就把请求的任务放进线程池里排队,另外下载前会对文件进行一次校验避免重复下载,这边写了一个循环用来检测线程池的任务执行状况,
当线程池里的任务执行完成就开始对文件进行校验,校验主要的逻辑是通过检测文件占用大小和文件名是否存在
初次分享代码,有什么需要改进的可以提
[Python] 纯文本查看 复制代码 import re
import os
import requests
import urllib
from bs4 import BeautifulSoup
import time
import threading
from concurrent.futures import ThreadPoolExecutor
lock = threading.Lock()##线程锁
threadPool = ThreadPoolExecutor(max_workers=5)
constStr = 'https://telegra.ph'
## 网页解析
def SolutionHtml(urlpara):
try:
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36'
}
## url = urllib.parse.unquote(url)
requests.packages.urllib3.disable_warnings()#防止报错
context = requests.get(url=urlpara,headers = headers,verify=False)
bs = BeautifulSoup(context.text,'html.parser') # 网页解析
return bs
except Exception as ex:
return ex
## 创建文件夹
def CreFolder(url,filepath):
try:
bs = SolutionHtml(url)
folder = bs.h1.string
file = filepath + '/' +folder
if not os.path.exists(file):
os.mkdir(file)
print("成功创建文件夹:" + file)
else:
print("文件夹已存在")
return file
except(NameError) as e:
return e
# 下载函数
def download_file(url):
response = requests.get(url, stream=True)
return response.content
# 保存函数
def save_file(data, save_path):
with open(save_path, 'wb') as f:
f.write(data)
# 创建下载线程
def download_thread(file_url,save_path,file_name):
data = download_file(file_url)
with lock:
save_file(data, save_path+"/"+file_name)
time.sleep(0.5)##防止请求过于频繁
print(f"{file_name}下载完成")
## 下载图片
def DownPic(url,filepath):
try:
bs = SolutionHtml(url)
titleName = bs.h1.string
imgList = bs.find_all("img")
linkList = [constStr + item['src'] for item in imgList]
count = 1
tasks = []
for item in linkList:
path = filepath
fileName = titleName+"-"+str(count)+".jpg"
_file = path + "/" + fileName
if os.path.exists(_file) and os.path.getsize(_file) > 0:
count += 1
continue
else:
tasks.append(threadPool.submit(download_thread,item,path,fileName))
count += 1
while True:#
time.sleep(1)
if len(linkList) == count-1:
if(threadPool._work_queue.qsize() == 0):
while True:
time.sleep(0.5)#防止cpu占用过高
for future in tasks:
if future.done():
tasks.remove(future)
if len(tasks) == 0:
ValiFile(linkList,path,titleName)
threadPool.shutdown()
break
break
return "下载完成"
except NotImplementedError as e:
return e
# 校验文件
def ValiFile(urllist,filepath,filename):
count = 1
for item in urllist:
_file = filepath + "/" + filename+"-"+str(count)+".jpg"
if os.path.exists(_file) and os.path.getsize(_file) > 0:
count += 1
continue
if len(urllist) == count-1:
print("下载完成")
# 主函数
def Main():
url = input("请输入下载链接创建文件:")
filepath = "E:\\Game\\Images"
fileobj = CreFolder(url,filepath)
if fileobj is not None:
DownPic(url,fileobj)
# 脚本入口
if __name__ == "__main__":
Main() |