[Python] 纯文本查看 复制代码
import re
import asyncio
import aiohttp
from aiohttp.client_exceptions import ClientError
import tkinter as tk
from tkinter import scrolledtext, simpledialog, messagebox, Toplevel, ttk
import webbrowser
from bs4 import BeautifulSoup
from selenium import webdriver
from msedge.selenium_tools import Edge, EdgeOptions
from webdriver_manager.microsoft import EdgeChromiumDriverManager
import requests_cache
import os
# Default invalid keywords
default_invalid_keywords = ["文件不存在", "已被删除", "Not Found", "404", "拒绝连接", "无法访问", "取消分享", "文件打不开"]
keyword_vars = {}
# Threshold for considering a page as "blank" or "unresponsive"
BLANK_PAGE_THRESHOLD = 200
# Global variable to control cancellation
cancel_check = False
# Cache for link results
cache_db_path = os.path.join(os.path.expanduser("~"), "link_cache")
try:
requests_cache.install_cache(cache_db_path, backend='sqlite', expire_after=3600)
except (requests_cache.backends.sqlite.SQLiteError, requests_cache.backends.base.BaseCacheError) as e:
print(f"Failed to initialize cache: {e}")
cache_db_path = None
# Configure Edge
options = EdgeOptions()
options.use_chromium = True
options.add_argument('headless')
options.add_argument('disable-gpu')
# Create a global WebDriver instance
driver = None
def initialize_driver():
global driver
if driver is None:
driver = Edge(EdgeChromiumDriverManager().install(), options=options)
def close_driver():
global driver
if driver is not None:
driver.quit()
driver = None
def extract_links(text):
"""Extracts all links from the given text."""
link_pattern = re.compile(r'(https?://\S+)')
links = link_pattern.findall(text)
return links
async def check_link_validity(link, mode):
"""Checks if a link is valid according to the selected mode."""
session = requests_cache.CachedSession(cache_db_path)
try:
if mode == "快速筛选":
result = await quick_check(session, link)
elif mode == "正常筛选":
result = await normal_check(session, link)
else: # 仔细筛选
result = await detailed_check(session, link)
except (ClientError, asyncio.TimeoutError):
result = False
return result
async def quick_check(session, link):
"""Quickly check if a link is reachable using a HEAD request."""
async with session.head(link, allow_redirects=True, timeout=5) as response:
return response.status < 400
async def normal_check(session, link):
"""Check link validity using a GET request and keyword/content analysis."""
async with session.get(link, allow_redirects=True, timeout=10) as response:
if response.status >= 400:
return False
page_content = await response.text()
active_keywords = [kw for kw, var in keyword_vars.items() if var.get()]
if any(keyword in page_content for keyword in active_keywords):
return False
if len(page_content) < BLANK_PAGE_THRESHOLD:
return False
return True
async def detailed_check(session, link):
"""Check link validity with advanced techniques including JavaScript rendering."""
async with session.get(link, allow_redirects=True, timeout=30) as response:
if response.status >= 400:
return False
page_content = await response.text()
active_keywords = [kw for kw, var in keyword_vars.items() if var.get()]
if any(keyword in page_content for keyword in active_keywords):
return False
if len(page_content) < BLANK_PAGE_THRESHOLD:
return False
# Use Selenium for JavaScript rendering
initialize_driver()
try:
driver.get(link)
await asyncio.sleep(5) # Give some time for the page to fully load
page_content = driver.page_source
driver.delete_all_cookies()
except Exception as e:
print(f"Selenium error: {e}")
return False
if not check_html_structure(page_content):
return False
return True
def check_html_structure(content):
"""Checks if the HTML structure is valid by looking for specific tags."""
soup = BeautifulSoup(content, 'html.parser')
return soup.title is not None or soup.h1 is not None
def open_link(event):
"""Opens the link in a web browser."""
widget = event.widget
index = widget.index(tk.CURRENT)
line_index = int(index.split('.')[0])
link = widget.get(f"{line_index}.0", f"{line_index}.end")
if link.startswith('http'):
webbrowser.open(link)
async def check_links():
global cancel_check
cancel_check = False
check_button.config(state=tk.DISABLED)
cancel_button.config(state=tk.NORMAL)
result_area.config(state=tk.NORMAL)
result_area.delete("1.0", tk.END)
result_area.insert(tk.INSERT, "正在检查链接,请稍候...\n")
result_area.config(state=tk.DISABLED)
text = text_area.get("1.0", tk.END)
links = extract_links(text)
valid_links = []
invalid_links = []
mode = mode_var.get()
total_links = len(links)
async def check_link(link, idx):
is_valid = await check_link_validity(link, mode)
if is_valid:
valid_links.append(link)
else:
invalid_links.append(link)
progress_var.set(int((idx + 1) / total_links * 100))
small_threads = small_threads_var.get()
# Limit the number of concurrent tasks
semaphore = asyncio.Semaphore(10 if small_threads else 100)
async def sem_check_link(link, idx):
async with semaphore:
await check_link(link, idx)
tasks = [sem_check_link(link, idx) for idx, link in enumerate(links)]
await asyncio.gather(*tasks)
if not cancel_check:
show_results(valid_links, invalid_links)
else:
show_cancel_message()
check_button.config(state=tk.NORMAL)
cancel_button.config(state=tk.DISABLED)
def show_results(valid_links, invalid_links):
result_area.config(state=tk.NORMAL)
result_area.delete("1.0", tk.END)
result_area.insert(tk.INSERT, "有效链接:\n")
for link in valid_links:
result_area.insert(tk.INSERT, f"{link}\n", "link")
result_area.insert(tk.INSERT, "\n无效链接:\n")
for link in invalid_links:
result_area.insert(tk.INSERT, f"{link}\n")
result_area.tag_configure("link", foreground="blue", underline=True)
result_area.tag_bind("link", "<Button-1>", open_link)
result_area.config(state=tk.DISABLED)
def clear_text():
text_area.delete("1.0", tk.END)
result_area.config(state=tk.NORMAL)
result_area.delete("1.0", tk.END)
result_area.config(state=tk.DISABLED)
progress_var.set(0)
def cancel_checking():
global cancel_check
cancel_check = True
def show_cancel_message():
result_area.config(state=tk.NORMAL)
result_area.delete("1.0", tk.END)
result_area.insert(tk.INSERT, "操作已取消。\n")
result_area.config(state=tk.DISABLED)
def open_settings():
settings_window = Toplevel(root)
settings_window.title("设置筛选词")
tk.Label(settings_window, text="筛选词列表:").pack()
keywords_frame = tk.Frame(settings_window)
keywords_frame.pack(padx=10, pady=10)
def update_keywords_display():
for widget in keywords_frame.winfo_children():
widget.destroy()
row, col = 0, 0
for keyword, var in keyword_vars.items():
cb = tk.Checkbutton(keywords_frame, text=keyword, variable=var)
cb.grid(row=row, column=col, padx=5, pady=5, sticky='w')
cb.bind("<Button-3>", lambda e, kw=keyword: confirm_remove_keyword(kw))
col += 1
if col >= 8:
col = 0
row += 1
def add_keyword():
new_keyword = simpledialog.askstring("添加筛选词", "请输入新的筛选词:", parent=settings_window)
if new_keyword and new_keyword not in keyword_vars:
var = tk.BooleanVar(value=True)
keyword_vars[new_keyword] = var
update_keywords_display()
def confirm_remove_keyword(keyword):
if messagebox.askokcancel("确认删除", f"你确定要删除筛选词 '{keyword}' 吗?"):
remove_keyword(keyword)
def remove_keyword(keyword):
if keyword in keyword_vars:
del keyword_vars[keyword]
update_keywords_display()
add_keyword_button = tk.Button(settings_window, text="添加筛选词", command=add_keyword)
add_keyword_button.pack(pady=5)
update_keywords_display()
save_button = tk.Button(settings_window, text="关闭", command=settings_window.destroy)
save_button.pack(pady=5)
def initialize_keywords():
global keyword_vars
for keyword in default_invalid_keywords:
var = tk.BooleanVar(value=True)
keyword_vars[keyword] = var
root = tk.Tk()
root.title("链接有效性检查工具")
main_frame = tk.Frame(root)
main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
left_frame = tk.Frame(main_frame)
left_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
right_frame = tk.Frame(main_frame)
right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True)
tk.Label(left_frame, text="请输入包含链接的文本:").pack()
text_area = scrolledtext.ScrolledText(left_frame, wrap=tk.WORD, width=50, height=20)
text_area.pack(padx=10, pady=10, fill=tk.BOTH, expand=True)
# Add mode selection
mode_var = tk.StringVar(value="正常筛选")
tk.Label(left_frame, text="选择筛选模式:").pack()
modes = ["快速筛选", "正常筛选", "仔细筛选"]
for mode in modes:
rb = tk.Radiobutton(left_frame, text=mode, variable=mode_var, value=mode)
rb.pack(anchor='w')
# Add small threads option
small_threads_var = tk.BooleanVar(value=False)
small_threads_checkbutton = tk.Checkbutton(left_frame, text="启用小线程模式", variable=small_threads_var)
small_threads_checkbutton.pack(anchor='w')
check_button = tk.Button(left_frame, text="检查链接", command=lambda: asyncio.run(check_links()))
check_button.pack(pady=5)
cancel_button = tk.Button(left_frame, text="取消", command=cancel_checking, state=tk.DISABLED)
cancel_button.pack(pady=5)
clear_button = tk.Button(left_frame, text="清除文本", command=clear_text)
clear_button.pack(pady=5)
settings_button = tk.Button(left_frame, text="设置", command=open_settings)
settings_button.pack(pady=5)
tk.Label(right_frame, text="结果:").pack()
result_area = scrolledtext.ScrolledText(right_frame, wrap=tk.WORD, width=50, height=20)
result_area.pack(padx=10, pady=10, fill=tk.BOTH, expand=True)
result_area.config(state=tk.DISABLED)
result_area.bind("<Button-1>", open_link)
# Add progress bar
progress_var = tk.IntVar()
progress_bar = ttk.Progressbar(left_frame, orient="horizontal", length=400, mode="determinate", variable=progress_var)
progress_bar.pack(pady=5)
# Initialize keyword variables
initialize_keywords()
# Ensure WebDriver is closed on exit
def on_closing():
close_driver()
root.destroy()
root.protocol("WM_DELETE_WINDOW", on_closing)
# Run the main loop
root.mainloop()