话痨司机啊 发表于 2022-5-9 13:09

【异步爬虫】无聊的小爬虫,大家一起学习

本帖最后由 话痨司机啊 于 2022-7-22 03:57 编辑

@szsnk144864有点问题,改了~

【成品下载地址】链接: https://pan.baidu.com/s/1Pz4C0m-SaoqdGz2XWTit6Q?pwd=d4q6 提取码: d4q6

今天不忙,无聊随便写的,供大家学习吧~,希望有大佬帮我改进一下,我觉得还是代码有点。。那啥

# //div[@class="item_list infinite_scroll masonry"]/div//a/img/@alt 壁纸标题
# //div[@class="item_list infinite_scroll masonry"]/div//a/img/@href 壁纸图片地址
# 总页数440页 https://www.mmonly.cc/gqbz/list_41_.html 页数num从1开始到440
# //div[@class="topmbx"]/a/text() 壁纸类型
# //div[@id="big-pic"]//a/@href 壁纸高清地址


import aiofiles
import aiohttp
import asyncio
import async_timeout
from collections import namedtuple
import time
import os
from rich.console import Console
from fake_useragent import UserAgent
from lxml import etree
from typing import List,Text
from datetime import datetime
import keyboard
from threading import Thread

console = Console()
headers = {'User-Agent':UserAgent().random}
Img_url_name = namedtuple('Img_url_name', ['img_url', 'img_name'])
Img_big_url_type = namedtuple('Img_big_url_name', ['img_big_url', 'img_type'])
sign = True # 信号


async def get_html(url) -> Text:
    """
    获取网页源码
    """
    async with aiohttp.ClientSession() as session:
      async with async_timeout.timeout(10):
            async with session.get(url) as resp:         
                return await resp.text()



async def save_img(img_url,path,img_name) -> None:
    """
    保存图片
    """
    async with aiohttp.ClientSession() as session:
      async with async_timeout.timeout(10):
            async with session.get(img_url,headers=headers) as resp:
                img = await resp.read()
                async with aiofiles.open(f'{path}\{img_name}', 'wb') as f:
                  await f.write(img)
                  nowtime = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                  console.print(f'创建时间:{nowtime}\n保存路径:{os.path.join(os.getcwd(),path)}\n文件名称:{img_name}\n状态:下载完成!\n提示:按esc退出')
                  console.print('-'*50)

def get_img_url_name(resp_text) -> List:
    """
    获取缩略图页面的图片地址和图片名称
    """
    tree = etree.HTML(resp_text)
    # 每页有24张缩略图和网址
    img_url_name = /div[{num}]//a/@href'),
    img_name = tree.xpath(f'//div[@class="item_list infinite_scroll masonry"]/div[{num}]//a/img/@alt')) for num in range(1,25)]
    return img_url_name

def get_big_img(resp_text) -> Img_big_url_type:
    """
    获取详情页的高清图片地址和图片类型
    """
    tree = etree.HTML(resp_text)
    img_big_url = tree.xpath('//div[@id="big-pic"]//a/img/@src') if tree.xpath('//div[@id="big-pic"]//a/img/@src') else tree.xpath('//div[@class="photo"]//p/img/@src')
    img_big_url_type = Img_big_url_type(img_big_url=img_big_url,img_type=tree.xpath('//div[@class="topmbx"]/a/text()'))
    return img_big_url_type

def mkdir(path) -> None:
    """
    创建文件夹
    """
    isExists = os.path.exists(path)
    if not isExists:
      os.makedirs(path)

def quit() -> None:
    """
    退出
    """
    global sign
    while True:
      if keyboard.read_key()=='esc':
            sign = False
            console.print('正在退出...')
            break

def retry(exceptions=Exception, tries=3):
    """
    重试装饰器
    """
    def decorator(func):
      def wrapper(*args, **kwargs):
            _tries = tries
            while _tries > 0:
                try:
                  return func(*args, **kwargs)
                except exceptions as e:
                  _tries -= 1
                  if _tries == 0:
                        raise e
                  time.sleep(1)
      return wrapper
    return decorator

@retry()
def start() ->Text:
    """
    开始
    """
    print('开始页码请不要输入0')
    start_page = input('请输入开始页码:').strip()
    if start_page == '0':
      console.print('请输入正确的页码!')
      raise Exception('出错次数过多!')

    print('结束页码请不要超过440')
    end_page = input('请输入结束页码:').strip()
    if int(end_page) > 440:
      console.print('请输入正确的页码!')
      raise Exception('出错次数过多!')
    return start_page,end_page
   
async def main() -> None:
    """
    主函数
    """
    Thread(target=quit).start()
    start_page,end_page = start()
    start_time = time.time()
    for num in range(int(start_page),int(end_page)+1):
      # 网站共计440页
      url = f'https://www.mmonly.cc/gqbz/list_41_{num}.html'
      resp_text = await get_html(url)
      get_img_url_name_list = get_img_url_name(resp_text)
      for img_url_name in get_img_url_name_list:
            resp_text_big_img = await get_html(img_url_name.img_url)
            # 图片标号开始为1
            img_num = 1
            # 图片页码开始为2
            page_num = 1
            while sign:
                try:
                  if page_num >= 2:
                        resp_text_big_img = await get_html(next_img_big_url)
                  img_big_url_type = get_big_img(resp_text_big_img)
                  img_path = os.path.join('壁纸',img_big_url_type.img_type,img_url_name.img_name)
                  mkdir(img_path)
                  await save_img(img_big_url_type.img_big_url, img_path,f'{img_url_name.img_name}_{img_num}.jpg')
                  img_num += 1
                  page_num += 1
                  next_img_big_url = '.'.join(img_url_name.img_url.split(".")) + "_" + str(page_num) + ".html"
                except:
                  img_num = 1
                  break
      if sign == False:
            break
    console.print(f'全部下载完成! 耗时{time.time() - start_time:.2f}秒')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
以下为效果图







话痨司机啊 发表于 2022-7-22 03:52

本帖最后由 话痨司机啊 于 2022-7-22 03:55 编辑

yuehong 发表于 2022-7-21 23:37
貌似爬不了了,希望跟进
2022年7月22日 测试正常


Soc 发表于 2022-5-9 13:18

加油,共勉

a8876595123 发表于 2022-5-9 13:36

我在手机看不清楚图,这是爬什么图片的?

dph5199278 发表于 2022-5-9 13:38

谢谢分享,学习了

风/生/水/起 发表于 2022-5-9 14:16

拿来试试

huzp 发表于 2022-5-9 14:37

不错,这是phyon吗

photocs 发表于 2022-5-9 14:39

爬壁纸的啊

yanglixing 发表于 2022-5-9 14:43

很强 爬图片 学习了!!

wangqiang2015 发表于 2022-5-9 15:01

不错!可以学习一下。

空心 发表于 2022-5-9 15:15

成品exe有没有
页: [1] 2 3
查看完整版本: 【异步爬虫】无聊的小爬虫,大家一起学习