cokery 发表于 2024-8-18 16:14

python写的批量下载程序(源码)

自己用AI写的批量下载小程序,没有编译GUI,需要自己搭建Python环境,有心的小伙伴也可以帮忙编译一下,目的是实现批量url的下载,可以指定header、cookie、下载目录以及间隔时间,功能需求比较小众,感谢大家!





import tkinter as tk
from tkinter import filedialog, messagebox
import requests
from urllib.parse import urlparse
import time
import os
from tkinter import ttk
import logging
import threading
import queue

class BatchDownloaderApp(tk.Tk):
    def __init__(self):
      super().__init__()
      self.title("批量文件下载器")
      self.headers_options = {
            "Chrome (Default)": {
                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
            },
            "Firefox": {
                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/115.0"
            },
            "Edge": {
                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36 Edg/115.0.1901.200"
            }
      }

      self.header_var = tk.StringVar(self)
      self.header_var.set("Chrome (Default)")# 默认值

      # GUI布局
      self.create_widgets()

      # 用于线程间通信的队列
      self.progress_queue = queue.Queue()

    def create_widgets(self):
      # 下拉选择框
      header_label = tk.Label(self, text="选择请求头 (Headers):")
      header_label.grid(row=0, column=0, sticky="w", padx=10, pady=5)

      header_menu = tk.OptionMenu(self, self.header_var, *self.headers_options.keys())
      header_menu.grid(row=0, column=1, sticky="we", padx=10, pady=5)

      # Cookie 输入
      cookie_label = tk.Label(self, text="输入Cookie (例如: cookie_name=cookie_value; other_cookie=other_value):")
      cookie_label.grid(row=1, column=0, sticky="w", padx=10, pady=5)

      self.cookie_text = tk.Text(self, height=8, width=30)
      self.cookie_text.grid(row=2, column=0, columnspan=2, sticky="we", padx=10, pady=5)

      # 用户输入框
      urls_label = tk.Label(self, text="请输入文件下载链接,每条链接一行:")
      urls_label.grid(row=3, column=0, sticky="w", padx=10, pady=5)

      self.urls_text = tk.Text(self, height=15, width=30)
      self.urls_text.grid(row=4, column=0, columnspan=2, sticky="we", padx=10, pady=5)

      # 保存位置选择
      save_location_label = tk.Label(self, text="选择保存位置:")
      save_location_label.grid(row=5, column=0, columnspan=2, sticky="w", padx=10, pady=5)

      self.save_location = tk.StringVar()
      self.save_location.set("./")# 默认保存位置

      save_location_entry = tk.Entry(self, textvariable=self.save_location, width=20)
      save_location_entry.grid(row=6, column=0, sticky="we", padx=10, pady=5)

      # 选择保存位置按钮
      def select_save_location():
            directory = filedialog.askdirectory()
            if directory:
                self.save_location.set(directory)

      select_save_location_button = tk.Button(self, text="选择目录", command=select_save_location)
      select_save_location_button.grid(row=6, column=1, sticky="we", padx=10, pady=5)

      # 间隔时间输入
      interval_label = tk.Label(self, text="输入下载间隔时间(秒):")
      interval_label.grid(row=7, column=0, sticky="w", padx=10, pady=5)

      self.interval_entry = tk.Entry(self, width=20)
      self.interval_entry.insert(0, "5")# 默认间隔时间为5秒
      self.interval_entry.grid(row=7, column=1, sticky="we", padx=10, pady=5)

      # 开始按钮
      start_button = tk.Button(self, text="开始下载", command=self.start_download)
      start_button.grid(row=8, column=0, columnspan=2, sticky="we", padx=10, pady=5)

      # 当前文件进度条
      self.current_file_progress = ttk.Progressbar(self, orient='horizontal', length=200, mode='determinate')
      self.current_file_progress.grid(row=9, column=0, columnspan=2, sticky="we", padx=10, pady=5)

      # 总体进度条
      self.overall_progress = ttk.Progressbar(self, orient='horizontal', length=200, mode='determinate')
      self.overall_progress.grid(row=10, column=0, columnspan=2, sticky="we", padx=10, pady=5)

      # 添加总体进度条的数值显示标签
      self.overall_progress_label = tk.Label(self, text="0%", font=("Arial", 10))
      self.overall_progress_label.grid(row=10, column=2, sticky="w", padx=10, pady=5)

      # 状态标签
      self.status_label = tk.Label(self, text="", fg="black")
      self.status_label.grid(row=11, column=0, columnspan=2, sticky="we", padx=10, pady=5)

    def get_headers(self):
      """获取选中的headers"""
      return self.headers_options

    def download_file_from_url(self, url, save_path, progress_var, headers=None, cookies=None):
      """
      下载文件并保存到指定路径
      """
      try:
            response = requests.get(url, stream=True, headers=headers, cookies=cookies)
            response.raise_for_status()

            total_length = response.headers.get('content-length')
            if total_length is None:# no content length header
                with open(save_path, "wb") as file:
                  for chunk in response.iter_content(chunk_size=8192):
                        file.write(chunk)
                self.progress_queue.put(f"文件 {save_path} 下载成功!")
            else:
                dl = 0
                total_length = int(total_length)
                with open(save_path, "wb") as file:
                  for chunk in response.iter_content(chunk_size=8192):
                        dl += len(chunk)
                        file.write(chunk)
                        done = int(50 * dl / total_length)
                        self.progress_queue.put((dl / total_length) * 100)
                self.progress_queue.put(f"文件 {save_path} 下载成功!")
      except requests.HTTPError as http_err:
            self.progress_queue.put(f"HTTP 错误: {http_err}")
      except Exception as err:
            self.progress_queue.put(f"发生错误: {err}")

    def batch_download_with_interval(self, urls, save_dir, headers=None, cookies=None, interval=5):
      """
      批量下载文件,并在每次下载后等待指定的时间间隔
      """
      if not os.path.exists(save_dir):
            os.makedirs(save_dir)

      total_files = len(urls)
      downloaded_files = 0

      for idx, url in enumerate(urls):
            self.progress_queue.put(f"正在下载文件 {idx + 1} / {total_files}")

            parsed_url = urlparse(url)
            filename = os.path.basename(parsed_url.path)
            save_path = os.path.join(save_dir, filename)

            # 更新总体进度条
            overall_progress = (idx / total_files) * 100
            self.progress_queue.put(('overall', overall_progress))

            self.download_file_from_url(url, save_path, self.current_file_progress, headers, cookies)
            downloaded_files += 1
            time.sleep(interval)

      # 最后确保总体进度条达到100%
      self.progress_queue.put(('overall', 100))
      self.progress_queue.put("所有文件下载完成.")

    def start_download(self):
      """
      开始下载过程
      """
      urls = self.urls_text.get("1.0", tk.END).strip().split("\n")
      save_directory = self.save_location.get()
      interval_seconds = int(self.interval_entry.get())
      cookies = self.cookie_text.get("1.0", tk.END).strip()
      headers = self.get_headers()

      # 将Cookie字符串转换为字典
      cookies_dict = {}
      if cookies:
            cookie_pairs = cookies.split("; ")
            for pair in cookie_pairs:
                key, value = pair.split("=", 1)
                cookies_dict = value

      if save_directory:
            self.progress_queue.put("开始下载...")
            threading.Thread(target=self.batch_download_with_interval, args=(urls, save_directory, headers, cookies_dict, interval_seconds)).start()

    def process_queue(self):
      """
      从队列中获取消息并更新GUI
      """
      try:
            while True:
                item = self.progress_queue.get_nowait()
                if isinstance(item, str):
                  self.status_label.config(text=item, fg="green" if "成功" in item else "red")
                elif isinstance(item, float):
                  self.current_file_progress['value'] = item
                elif isinstance(item, tuple) and item == 'overall':
                  self.overall_progress['value'] = item
                  self.overall_progress_label.config(text=f"{item:.2f}%")
      except queue.Empty:
            pass

      # 递归调用自身以持续检查队列
      self.after(100, self.process_queue)

if __name__ == "__main__":
    app = BatchDownloaderApp()
    app.after(100, app.process_queue)# 启动队列处理器
    app.mainloop()

zlf2020999 发表于 2024-8-21 14:21

谢谢分享,请尽量搞简单一点,便于小白直接操作{:1_893:}

beiank 发表于 2024-9-13 11:00

使用的python的具体版本号是多少啊 :lol 谢谢楼主 我不想出现不兼容的结果
页: [1]
查看完整版本: python写的批量下载程序(源码)