[Python] 纯文本查看 复制代码
import os
import re
import threading
import time
from tkinter import filedialog
from urllib import parse, request
import requests
import tkinter as tk
from tkinter import messagebox
from bs4 import BeautifulSoup
from lxml import etree
from concurrent.futures import ThreadPoolExecutor, as_completed
url = "https://www.88xiaoshuo.net/search.html"
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Pragma': 'no-cache',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.6261.95 Safari/537.36',
}
class NovelDownloader:
def __init__(self):
self.markbook = ''
self.max_workers = 5 #
def fetch_books(self):
search_key = entry.get()
payload = {'searchkey': search_key}
data = parse.urlencode(payload).encode('utf-8')
req = request.Request(url, data=data, method='POST')
try:
response = request.urlopen(req)
html = response.read().decode('utf-8')
matches = re.findall(r'<a target="_blank" href="/Partlist/(\d+)/">(.*?)</a>', html)
listbox.delete(0, tk.END)
for match in matches:
partlist_id = match[0]
book_name = match[1]
listbox.insert(tk.END, f"{book_name} (ID: {partlist_id})")
except Exception as e:
messagebox.showerror("错误", f"获取书籍信息时发生错误:{e}")
def fetch_chapters(self):
selected = listbox.curselection()
if selected:
selected_text = listbox.get(selected)
partlist_id = re.search(r'ID: (\d+)', selected_text).group(1)
threading.Thread(target=self.fetch_chapter_content, args=(partlist_id,), daemon=True).start()
else:
messagebox.showwarning("未选择", "请选择一本小说!")
def fetch_chapter_content(self, partlist_id):
partlist_url = f'https://www.88xiaoshuo.net/Partlist/{partlist_id}/'
try:
response = requests.get(partlist_url, headers=headers)
soup = BeautifulSoup(response.text, 'html.parser')
chapter_links = soup.select('dd a')
self.download_chapters(chapter_links)
except Exception as e:
messagebox.showerror("错误", f"获取章节信息时发生错误:{e}")
def download_chapters(self, chapter_links):
save_directory = filedialog.askdirectory()
if not save_directory:
messagebox.showwarning("未选择目录", "请选择保存目录!")
return
messagebox.showinfo("下载提示", "开始下载章节,请稍候...")
# 保存章节信息,包括链接和索引
chapter_tasks = [(index, link.get('href')) for index, link in enumerate(chapter_links)]
# 使用线程池并发下载章节
chapter_contents = [None] * len(chapter_tasks) # 预留空间以保证按顺序保存
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_index = {executor.submit(self.get_chapter_content, f'https://www.88xiaoshuo.net{href}', index): index for index, href in chapter_tasks}
for future in as_completed(future_to_index):
index = future_to_index[future]
try:
result = future.result()
if result:
chapter_contents[index] = result
else:
print(f"第 {index + 1} 章下载失败或无内容。")
except Exception as e:
print(f'下载章节时发生错误:{e}')
self.write_chapters_to_file(chapter_contents, save_directory)
messagebox.showinfo("下载完成", "所有章节下载完成!")
def get_chapter_content(self, chapter_url, index):
try:
print(f"正在下载章节 {index + 1}:{chapter_url}") # 调试信息
response = requests.get(chapter_url, headers=headers, timeout=10)
response.raise_for_status()
html = response.content.decode(response.apparent_encoding)
selector = etree.HTML(html)
bookname = selector.xpath('//a[@id="bookname"]/strong/text()')
chaptername = selector.xpath('//div[@class="zhangjieming"]/h1/text()')
if not bookname or not chaptername:
print(f'无法找到书名或章节名称,跳过章节 {index + 1}')
return None
bookname = bookname[0].strip()
chaptername = chaptername[0].strip()
chaptername = re.sub(r'\d{1,5}\.', '', chaptername)
contents = selector.xpath('//div[@id="content"]/p/text()')
chaptercontent = "\n".join([content.strip() for content in contents if content.strip()])
return (bookname, chaptername, chaptercontent)
except Exception as e:
print(f'下载章节时发生错误:{e}')
return None
def write_chapters_to_file(self, chapter_contents, save_directory):
bookname = None
for chapter_content in chapter_contents:
if chapter_content:
bookname, chaptername, chaptertext = chapter_content
book_path = os.path.join(save_directory, bookname + '.txt')
if self.markbook != bookname:
self.markbook = bookname
with open(book_path, 'w', encoding='utf-8') as f:
f.write(f'\n\n书名:{bookname}\n\n')
with open(book_path, 'a', encoding='utf-8') as f:
f.write(f'\n{chaptername}\n{chaptertext}\n\n')
print(f'已保存章节:{chaptername}')
novel_downloader = NovelDownloader()
def on_search():
threading.Thread(target=novel_downloader.fetch_books, daemon=True).start()
def on_download():
threading.Thread(target=novel_downloader.fetch_chapters, daemon=True).start()
# 设置界面
root = tk.Tk()
root.title("小说下载器")
frame = tk.Frame(root, padx=10, pady=10)
frame.pack(fill=tk.BOTH, expand=True)
entry = tk.Entry(frame, font=("Arial", 14))
entry.pack(pady=5, fill=tk.X)
search_button = tk.Button(frame, text="搜索", command=on_search, font=("Arial", 12))
search_button.pack(pady=5)
listbox = tk.Listbox(frame, font=("Arial", 12))
listbox.pack(pady=5, fill=tk.BOTH, expand=True)
download_button = tk.Button(frame, text="下载", command=on_download, font=("Arial", 12))
download_button.pack(pady=10)
root.mainloop()