xuyanhenry 发表于 2024-8-1 20:58

python批量下载网页现打开的漫画

本帖最后由 xuyanhenry 于 2024-8-1 21:02 编辑

声明本文内容为作者学习娱乐乘兴所写,大神请忽视;本文仅供学习交流,请勿用于商业和非法用途,否则由此产生的一切后果均与作者无关;若有侵权,请联系立即删除!最近有点想把一些先看的漫画存起来的想法,但批量下载比较繁琐,给地址有些不方便,于是想着能否通过代码直接访问浏览器(edge)现在已经打开的网页的网址,以简化输网址的过程(还是无记录的,不用删记录了)。此次制作所用网站有给下载链接,因此对于网页爬虫部分较为简单。对于下载下来的漫画,本人以前尝试用图片查看器,pdf等观看发现不太适应本人网页看多了的习惯,因此打算把下载的漫画保存为html格式(要pdf也可以,只需修改部分保存代码即可,代码可参考:https://www.52pojie.cn/forum.php?mod=viewthread&tid=1934574&highlight=%C2%FE%BB%AD%CD%BC%C6%AC)由于图片体积大,进行了压缩处理(可不压缩)
此代码使用方法:先下载EdgeDriver(确保与此edge同版本):https://developer.microsoft.com/en-us/microsoft-edge/tools/webdriver/?form=MA13LH#downloads然后运行程序,打开网页浏览把此网址对应需要下载的页面留下后在控制台输入yes后自动下载所有剩下的符合要求的网址的数据。此代码所使用网址在 ###   !   ###请阅读注释修改代码
<blockquote>import base64
import re
import queue
import requests
from parsel import Selector
from io import BytesIO
from tqdm import tqdm
from rich.progress import Progress
import zipfile
from PIL import Image
import os
import urllib.parse
from selenium import webdriver
from selenium.webdriver.edge.service import Service as EdgeService
from selenium.webdriver.edge.options import Options as EdgeOptions
from concurrent.futures import ThreadPoolExecutor, as_completed


#**************************************打开浏览器读取网址****************************


def decode_change(encoded_text):
    decrypted_text = ""
    for char in encoded_text:
      offset = ord(char) - 32
      original_code = (offset + 23) % (126 - 32 + 1) + 32
      decrypted_text += chr(original_code)
    decoded_bytes = base64.b64decode(decrypted_text)
    decoded_text = decoded_bytes.decode('utf-8')
    return decoded_text


def get_opened_edge_tabs(driver):
    # 获取所有窗口句柄
    window_handles = driver.window_handles
    urls = []


    # 遍历所有窗口句柄
    for handle in window_handles:
      driver.switch_to.window(handle)
      urls.append(driver.current_url)


    return urls


def filter_urls(urls):
    filtered_urls = []
    for url in urls:
      fi = decode_change("Mz|`6<}SB`&&")
      lo = decode_change("5@/YC,x&")
      # print("fi, lo", fi, lo)
      if fi in url and lo in url:
            filtered_urls.append(url)
    return filtered_urls


def extract_aid_number(url):
    match = re.search(r'-aid-(\d+)', url)
    if match:
      return match.group(1)
    return None


#***************************************第一板块结束*********************


#***************************************第二部分 下载文件并保存********************
# 下载文件并显示进度条
def download_to_memory(url, retries=3):
    with Progress() as progress:
      task_id = progress.add_task("Downloading...", total=100, start=False)# 初始化进度条但不自动启动
      for attempt in range(retries):
            try:
                response = requests.get(url, stream=True)
                if response.status_code == 200:
                  total_size = int(response.headers.get('content-length', 0))
                  progress.update(task_id, total=total_size)# 更新任务的总大小
                  block_size = 1024
                  buffer = BytesIO()
                  for data in response.iter_content(block_size):
                        buffer.write(data)
                        progress.update(task_id, advance=len(data))# 更新进度
                  buffer.seek(0)
                  parsed_url = urllib.parse.urlparse(url)
                  file_name = urllib.parse.parse_qs(parsed_url.query).get('n', )
                  return buffer, file_name# 返回文件内容和文件名
                else:
                  print(f"Failed to download from {url}. Status code: {response.status_code}")
            except requests.exceptions.RequestException as e:
                print(f"Attempt {attempt + 1} failed: {e}")
    return None, None


# 从网页中提取链接
def extract_and_combine_urls(url, xpath):
    try:
      response = requests.get(url)
      if response.status_code == 200:
            selector = Selector(response.text)
            elements = selector.xpath(xpath)
            base_url = "https:"
            combined_urls = []
            for element in elements.xpath('.//a'):
                href = element.xpath('@href').get()
                if href:
                  combined_urls.append(base_url + href)
            return combined_urls
      else:
            print(f"Failed to retrieve the webpage. Status code: {response.status_code}")
            return []
    except requests.exceptions.RequestException as e:
      print(f"Error retrieving webpage: {e}")
      return []


# 压缩图像
def compress_image(img, quality=85, max_resolution=(1024, 1024)):
    img.thumbnail(max_resolution, Image.LANCZOS)
    with BytesIO() as buffer:
      try:
            if img.mode == 'RGBA':
                img = img.convert('RGB')# 转换为RGB模式
            img.save(buffer, format="JPEG", quality=quality)
            buffer.seek(0)
            return Image.open(buffer).copy()
      except Exception as e:
            print(f"图像压缩失败: {e}")
            return img# 返回原图


# 从内存中的ZIP文件生成HTML文件
def images_to_html_from_zip(zip_content, output_folder, output_name, compress=False, quality=85, max_resolution=(1024, 1024)):
    with zipfile.ZipFile(zip_content, 'r') as zip_ref:
      image_files =
      image_files.sort()


      output_html = os.path.join(output_folder, f'{output_name}.html')


      with open(output_html, 'w', encoding='utf-8') as html_file:
            html_file.write('''
            <html>
            <head>
                <meta charset="UTF-8">
                <title>Images</title>
                <style>
                  body { font-family: Arial, sans-serif; text-align: center; }
                  .image-container { margin-bottom: 20px; }
                  .image-container img { max-width: 100%; height: auto; }
                  .caption { margin-top: 10px; font-size: 18px; color: #555; }
                </style>
            </head>
            <body>
            ''')


            total_images = len(image_files)
            for index, file in enumerate(image_files):
                with zip_ref.open(file) as img_file:
                  img_bytes = img_file.read()
                  img = Image.open(BytesIO(img_bytes))
                  
                  if compress:
                        img = compress_image(img, quality, max_resolution)


                  buffered = BytesIO()
                  img.save(buffered, format="JPEG", quality=quality)
                  img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8')
                  html_file.write(f'''
                  <div class="image-container">
                        <img src="data:image/jpeg;base64,{img_base64}" alt="{file}">
                        <div class="caption">Image {index + 1} of {total_images}</div>
                  </div>
                  ''')


            html_file.write('</body></html>')


      print(f"HTML文件已保存到 {output_html}")




      # 多线程下载并处理文件
def process_download(url, xpath, output_folder):                                 #多线程使用的但输出下载有问题
    combined_urls = extract_and_combine_urls(url, xpath)


    if combined_urls:
      zip_content, file_name = download_to_memory(combined_urls)
      if not zip_content and len(combined_urls) > 1:
            zip_content, file_name = download_to_memory(combined_urls)


      if zip_content:
            print("文件转换中")
            output_name = os.path.splitext(file_name) if file_name else 'output'
            images_to_html_from_zip(zip_content, output_folder, output_name, compress=True)




#************************************第二部分结束**********************************


def main():
    xpath = '/html/body/div/div'
    output_folder = 'output'                              #要改
    os.makedirs(output_folder, exist_ok=True)


    # 设置Edge浏览器选项
    edge_options = EdgeOptions()
    edge_options.use_chromium = True


    # 指定EdgeDriver路径
    edge_service = EdgeService(executable_path='C:/Users/13816/Downloads/edgedriver_win32/msedgedriver.exe')# 请确保这里的路径是正确的要改


    # 启动Edge浏览器
    driver = webdriver.Edge(service=edge_service, options=edge_options)
    #driver.get(decode_change("J1;xL-X_5{M{Mb}{KS*a5V7S"))                            ###   !###
   
    try:
      # 等待用户在控制台输入`yes`
      user_input = input("浏览器已启动,请使用浏览器。输入 'yes' 以继续: ").strip().lower()
      while user_input != 'yes':
            user_input = input("输入 'yes' 以继续: ").strip().lower()


      print("读取已打开的标签页...")


      urls = get_opened_edge_tabs(driver)
      for url in urls:
            print(f'打开的的URL: {url}')
            
      # 筛选符合条件的URL
      filtered_urls = filter_urls(urls)
      download_queue = queue.Queue()
      # 打印筛选后的URL和提取的-aid-后面的id
      if filtered_urls:
            for url in filtered_urls:
                print(f'符合条件的URL: {url}')
                aid_number = extract_aid_number(url)
                if aid_number:
                  # print(f'-aid-后面的数字: {aid_number}')
                  fro = decode_change('J1;xL16~5b"{M{L^Mz|`6<}SBb"TK{M^K0"QC,yYKV;UN,yQJ@:]')
                  download_url = f'{fro}{aid_number}.html'
                  # print(f'生成的下载链接: {download_url}')
                  download_queue.put(download_url)


      else:
            print("未能找到符合条件的URL")


      while not download_queue.empty():
            url = download_queue.get()
            print("url", url)
            combined_urls = extract_and_combine_urls(url, xpath)


            if combined_urls:
                zip_content, file_name = download_to_memory(combined_urls)
                if not zip_content and len(combined_urls) > 1:
                  print("First link failed. Trying the second link...")
                  zip_content, file_name = download_to_memory(combined_urls)


                if zip_content:
                  print("文件转换中")
                  output_name = os.path.splitext(file_name) if file_name else 'output'
                  images_to_html_from_zip(zip_content, output_folder, output_name, compress=True)# 将compress参数设为True或False以控制是否压缩图片




      # with ThreadPoolExecutor(max_workers=4) as executor:
      #   futures = []
      #   while not download_queue.empty():
      #         url = download_queue.get()
      #         futures.append(executor.submit(process_download, url, xpath, output_folder))


      #   for future in as_completed(futures):
      #         try:
      #             future.result()
      #         except Exception as e:
      #             print(f"下载或处理文件时出错: {e}")




    finally:
      driver.quit()


if __name__ == '__main__':
    main()</blockquote>
此代码仅仅为学习娱乐使用,且勿非法传播使用。
有两个问题想请教各位大佬,若使用在多线程同时下载几个数据那进度条该如何写,原本用tqdm库,但发现在多线程下载时会出问题控制台输出混乱,查阅后换了rich库,但还是没有整出来,请问该如何修改能够在同时下载多个文件时能有多个进度条分开显示进度呢。还有有没有办法不打开新的浏览器界面直接访问现已打开的网页网址的方法,selenium库得是调试状态的浏览器和win32process库查看进程好像都不行。

如有问题欢迎指正,谢谢。

longsui48 发表于 2024-8-9 14:03

用协程会好一点 你可以试试
页: [1]
查看完整版本: python批量下载网页现打开的漫画