天空宫阙 发表于 2020-4-28 22:49

python asyncio + aiohttp 异步微博相册图片爬取

本帖最后由 天空宫阙 于 2020-12-5 22:15 编辑

微博图片链接的爬取还是用的requests在weibo_photo.py文件(需要加入自己登录后的cookie),最终生成result.json,不是重点不多讲。


用asyncio + aiohttp异步下载result.json中jpg格式的图片在weibo_photo_dl.py
安装完依赖pip3 install -r requirements.txt
后可以直接运行weibo_photo_dl.py
weibo_photo_dl.py代码
import aiohttp
import asyncio
import aiofiles
import json
import os
import time
from aiohttp import client_exceptions
from MYretry import *

loop = asyncio.get_event_loop()


class Async_download:
    def __init__(self):
      super().__init__()
      self.CONCURRENCY = 5
      self.session = None
      self.semaphore = asyncio.Semaphore(self.CONCURRENCY)
      self.save_path = 'result'
   
    """ 利用python装饰器实现请求的重试 """
    @MYretry(client_exceptions.ServerDisconnectedError)
    async def request(self, url):
      async with self.semaphore:
            # try:
            print('getting', url)
            # 添加ssl=False 防止SSLCertVerificationError
            async with self.session.get(url,ssl=False) as response:
                await asyncio.sleep(1)
                return await response.read()
            # except client_exceptions.ServerDisconnectedError:
            #   print('ServerDisconnectedError occurred while scraping ',url)

    def save_pic(self, name, content):
      # 同步的写入文件
      name = os.path.join(self.save_path, name)
      with open(name, 'wb') as f:
            f.write(content)
            f.close()

    async def save_pic2(self, name, content):
      """ 未知是否异步 """
      name = os.path.join(self.save_path, name)
      async with aiofiles.open(name, mode='wb') as f:
            await f.write(content)

    def get_url_list(self, path):
      url_list = []
      with open(path, 'r', encoding='utf-8') as f:
            res_json = json.loads(f.read())
            for card in res_json:
                if card['card_type'] == 47:
                  for pic in card['pics']:
                        if pic['pic_big'][-4:] == '.gif':
                            continue
                        url_list.append(pic['pic_big'])
            return url_list

    async def download_one(self, url, name):
      name = str(name).zfill(4)+'.jpg'
      content = await self.request(url)
      print('saved', name)
      if content:
            await self.save_pic2(name, content)

    async def main(self, url_list):
      self.session = aiohttp.ClientSession()
      # 添加任务一行写法
      tasks = [asyncio.ensure_future(self.download_one(
            url_list, i)) for i in range(len(url_list))]
      # tasks = []
      # for i in range(len(url_list)):
      #   tasks.append(asyncio.ensure_future(self.download_one(url_list,i)))
      await asyncio.gather(*tasks)
      await self.session.close()

    def run(self):
      if not os.path.exists(self.save_path):
            os.makedirs(self.save_path)
      url_list = self.get_url_list('result.json')
      loop.run_until_complete(self.main(url_list))


if __name__ == "__main__":
    start_time = time.time()
    spider = Async_download()
    spider.run()
    print('cost', time.time()-start_time)



MYretry.py对异常进行了简单的处理利用了装饰器
MYretry.py代码
import logging
from functools import wraps
import asyncio

log = logging.getLogger(__name__)

def MYretry(*exceptions, retries=3, cooldown=1, verbose=True):
    # https://www.cnblogs.com/killianxu/p/9821414.html
    """Decorate an async function to execute it a few times before giving up.
    Hopes that problem is resolved by another side shortly.

    Args:
      exceptions (Tuple) : The exceptions expected during function execution
      retries (int): Number of retries of function execution.
      cooldown (int): Seconds to wait before retry.
      verbose (bool): Specifies if we should log about not successful attempts.
    """

    def wrap(func):
      @wraps(func)
      async def inner(*args, **kwargs):
            retries_count = 0

            while True:
                try:
                  result = await func(*args, **kwargs)
                except exceptions as err:
                  retries_count += 1
                  message = "Exception during {} execution. " \
                              "{} of {} retries attempted".format(func, retries_count, retries)
                  if retries_count > retries:
                        verbose and log.exception(message)
                        # raise RetryExhaustedError(func.__qualname__, args, kwargs) from err
                        raise Exception('reached max trying times')
                  else:
                        verbose and log.warning(message)

                  if cooldown:
                        await asyncio.sleep(cooldown)
                else:
                  return result
      return inner
    return wrap

问题
1.只对client_exceptions.ServerDisconnectedError进行了处理
若在请求中出现其他异常可以添加到
@MYretry(client_exceptions.ServerDisconnectedError,otherexception)
2.不谈weibo_photo.py获取链接是同步方式,
单是weibo_photo_dl.py也有一种异步和同步代码杂糅的感觉,不知aiofiles这样用是否可以实现异步的文件写入
恳请大佬指点


完整代码下载链接
https://www.lanzouj.com/ic1tjyf




代码不具备通用性,当前的功能是批量下载鞠婧祎的微博相册图片(jpg),
如需采集其他用户需要改weibo_photo.py得到新的result.json文件


效果
经过简单的异常处理没有遗漏的
并发数CONCURRENCY为5我家的垃圾网络用了400s下载了1500多张图
(非并发情况下等一张图片的响应就要超过1s)可以说协程的效果还不错








dlytang 发表于 2020-4-28 23:18

感谢 好好学习下asyncio

处女-大龙猫 发表于 2020-4-28 23:37

大佬, 爬虫我也接触一些, 最近在趴淘宝的信息

pkni1230 发表于 2020-4-29 00:12

dlytang 发表于 2020-4-28 23:18
感谢 好好学习下asyncio

asyncio是什么

zucker 发表于 2020-4-29 10:00

你都没用最新的语法

天空宫阙 发表于 2020-4-29 12:07

zucker 发表于 2020-4-29 10:00
你都没用最新的语法

什么最新语法,海象运算符还是什么语法糖,请详述

天空宫阙 发表于 2020-4-29 12:09

处女-大龙猫 发表于 2020-4-28 23:37
大佬, 爬虫我也接触一些, 最近在趴淘宝的信息

淘宝也要登录,账号是个问题,用自己的账号。。。我可不想买东西的时候跳出验证码

处女-大龙猫 发表于 2020-4-29 12:13

天空宫阙 发表于 2020-4-29 12:09
淘宝也要登录,账号是个问题,用自己的账号。。。我可不想买东西的时候跳出验证码

对, 账号是关键, 一个人顶多也就四五个账号, 如果用户要是使用的话, 可以让py调用浏览器打开网页, 让他们扫码自己登陆. em

zucker 发表于 2020-4-29 12:58

天空宫阙 发表于 2020-4-29 12:07
什么最新语法,海象运算符还是什么语法糖,请详述

run(main)这种写法

天空宫阙 发表于 2020-12-5 22:15

添加ssl=False 防止SSLCertVerificationError
页: [1]
查看完整版本: python asyncio + aiohttp 异步微博相册图片爬取