本帖最后由 话痨司机啊 于 2022-7-22 03:57 编辑
@szsnk144864 有点问题,改了~
【成品下载地址】链接: https://pan.baidu.com/s/1Pz4C0m-SaoqdGz2XWTit6Q?pwd=d4q6 提取码: d4q6
今天不忙,无聊随便写的,供大家学习吧~,希望有大佬帮我改进一下,我觉得还是代码有点。。那啥
[Python] 纯文本查看 复制代码
# //div[@class="item_list infinite_scroll masonry"]/div[num]//a/img/@alt 壁纸标题
# //div[@class="item_list infinite_scroll masonry"]/div[num]//a/img/@href 壁纸图片地址
# 总页数440页 [url=https://www.mmonly.cc/gqbz/list_41_]https://www.mmonly.cc/gqbz/list_41_[/url][num].html 页数num从1开始到440
# //div[@class="topmbx"]/a[last()]/text() 壁纸类型
# //div[@id="big-pic"]//a/@href 壁纸高清地址
import aiofiles
import aiohttp
import asyncio
import async_timeout
from collections import namedtuple
import time
import os
from rich.console import Console
from fake_useragent import UserAgent
from lxml import etree
from typing import List,Text
from datetime import datetime
import keyboard
from threading import Thread
console = Console()
headers = {'User-Agent':UserAgent().random}
Img_url_name = namedtuple('Img_url_name', ['img_url', 'img_name'])
Img_big_url_type = namedtuple('Img_big_url_name', ['img_big_url', 'img_type'])
sign = True # 信号
async def get_html(url) -> Text:
"""
获取网页源码
"""
async with aiohttp.ClientSession() as session:
async with async_timeout.timeout(10):
async with session.get(url) as resp:
return await resp.text()
async def save_img(img_url,path,img_name) -> None:
"""
保存图片
"""
async with aiohttp.ClientSession() as session:
async with async_timeout.timeout(10):
async with session.get(img_url,headers=headers) as resp:
img = await resp.read()
async with aiofiles.open(f'{path}\{img_name}', 'wb') as f:
await f.write(img)
nowtime = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
console.print(f'[yellow]创建时间:{nowtime}\n保存路径:{os.path.join(os.getcwd(),path)}\n文件名称:{img_name}\n状态:[green]下载完成!\n[gray]提示:按esc退出')
console.print('[blue]-'*50)
def get_img_url_name(resp_text) -> List:
"""
获取缩略图页面的图片地址和图片名称
"""
tree = etree.HTML(resp_text)
# 每页有24张缩略图和网址
img_url_name = [Img_url_name(img_url = tree.xpath(f'//div[@class="item_list infinite_scroll masonry"]/div[{num}]//a/@href')[0],
img_name = tree.xpath(f'//div[@class="item_list infinite_scroll masonry"]/div[{num}]//a/img/@alt')[0]) for num in range(1,25)]
return img_url_name
def get_big_img(resp_text) -> Img_big_url_type:
"""
获取详情页的高清图片地址和图片类型
"""
tree = etree.HTML(resp_text)
img_big_url = tree.xpath('//div[@id="big-pic"]//a/img/@src')[0] if tree.xpath('//div[@id="big-pic"]//a/img/@src') else tree.xpath('//div[@class="photo"]//p/img/@src')[0]
img_big_url_type = Img_big_url_type(img_big_url=img_big_url,img_type=tree.xpath('//div[@class="topmbx"]/a[last()]/text()')[0])
return img_big_url_type
def mkdir(path) -> None:
"""
创建文件夹
"""
isExists = os.path.exists(path)
if not isExists:
os.makedirs(path)
def quit() -> None:
"""
退出
"""
global sign
while True:
if keyboard.read_key()=='esc':
sign = False
console.print('[red]正在退出...')
break
def retry(exceptions=Exception, tries=3):
"""
重试装饰器
"""
def decorator(func):
def wrapper(*args, **kwargs):
_tries = tries
while _tries > 0:
try:
return func(*args, **kwargs)
except exceptions as e:
_tries -= 1
if _tries == 0:
raise e
time.sleep(1)
return wrapper
return decorator
@retry()
def start() ->Text:
"""
开始
"""
print('开始页码请不要输入0')
start_page = input('请输入开始页码:').strip()
if start_page == '0':
console.print('[red]请输入正确的页码!')
raise Exception('出错次数过多!')
print('结束页码请不要超过440')
end_page = input('请输入结束页码:').strip()
if int(end_page) > 440:
console.print('[red]请输入正确的页码!')
raise Exception('出错次数过多!')
return start_page,end_page
async def main() -> None:
"""
主函数
"""
Thread(target=quit).start()
start_page,end_page = start()
start_time = time.time()
for num in range(int(start_page),int(end_page)+1):
# 网站共计440页
url = f'https://www.mmonly.cc/gqbz/list_41_{num}.html'
resp_text = await get_html(url)
get_img_url_name_list = get_img_url_name(resp_text)
for img_url_name in get_img_url_name_list:
resp_text_big_img = await get_html(img_url_name.img_url)
# 图片标号开始为1
img_num = 1
# 图片页码开始为2
page_num = 1
while sign:
try:
if page_num >= 2:
resp_text_big_img = await get_html(next_img_big_url)
img_big_url_type = get_big_img(resp_text_big_img)
img_path = os.path.join('壁纸',img_big_url_type.img_type,img_url_name.img_name)
mkdir(img_path)
await save_img(img_big_url_type.img_big_url, img_path,f'{img_url_name.img_name}_{img_num}.jpg')
img_num += 1
page_num += 1
next_img_big_url = '.'.join(img_url_name.img_url.split(".")[0:-1]) + "_" + str(page_num) + ".html"
except:
img_num = 1
break
if sign == False:
break
console.print(f'[red]全部下载完成! 耗时{time.time() - start_time:.2f}秒')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
以下为效果图
|