<blockquote>import base64
import re
import queue
import requests
from parsel import Selector
from io import BytesIO
from tqdm import tqdm
from rich.progress import Progress
import zipfile
from PIL import Image
import os
import urllib.parse
from selenium import webdriver
from selenium.webdriver.edge.service import Service as EdgeService
from selenium.webdriver.edge.options import Options as EdgeOptions
from concurrent.futures import ThreadPoolExecutor, as_completed
#**************************************打开浏览器读取网址****************************
def decode_change(encoded_text):
decrypted_text = ""
for char in encoded_text:
offset = ord(char) - 32
original_code = (offset + 23) % (126 - 32 + 1) + 32
decrypted_text += chr(original_code)
decoded_bytes = base64.b64decode(decrypted_text)
decoded_text = decoded_bytes.decode('utf-8')
return decoded_text
def get_opened_edge_tabs(driver):
# 获取所有窗口句柄
window_handles = driver.window_handles
urls = []
# 遍历所有窗口句柄
for handle in window_handles:
driver.switch_to.window(handle)
urls.append(driver.current_url)
return urls
def filter_urls(urls):
filtered_urls = []
for url in urls:
fi = decode_change("Mz|`6<}SB`&&")
lo = decode_change("5@/YC,x&")
# print("fi, lo", fi, lo)
if fi in url and lo in url:
filtered_urls.append(url)
return filtered_urls
def extract_aid_number(url):
match = re.search(r'-aid-(\d+)', url)
if match:
return match.group(1)
return None
#***************************************第一板块结束*********************
#***************************************第二部分 下载文件并保存********************
# 下载文件并显示进度条
def download_to_memory(url, retries=3):
with Progress() as progress:
task_id = progress.add_task("Downloading...", total=100, start=False) # 初始化进度条但不自动启动
for attempt in range(retries):
try:
response = requests.get(url, stream=True)
if response.status_code == 200:
total_size = int(response.headers.get('content-length', 0))
progress.update(task_id, total=total_size) # 更新任务的总大小
block_size = 1024
buffer = BytesIO()
for data in response.iter_content(block_size):
buffer.write(data)
progress.update(task_id, advance=len(data)) # 更新进度
buffer.seek(0)
parsed_url = urllib.parse.urlparse(url)
file_name = urllib.parse.parse_qs(parsed_url.query).get('n', [None])[0]
return buffer, file_name # 返回文件内容和文件名
else:
print(f"Failed to download from {url}. Status code: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"Attempt {attempt + 1} failed: {e}")
return None, None
# 从网页中提取链接
def extract_and_combine_urls(url, xpath):
try:
response = requests.get(url)
if response.status_code == 200:
selector = Selector(response.text)
elements = selector.xpath(xpath)
base_url = "https:"
combined_urls = []
for element in elements.xpath('.//a'):
href = element.xpath('@href').get()
if href:
combined_urls.append(base_url + href)
return combined_urls
else:
print(f"Failed to retrieve the webpage. Status code: {response.status_code}")
return []
except requests.exceptions.RequestException as e:
print(f"Error retrieving webpage: {e}")
return []
# 压缩图像
def compress_image(img, quality=85, max_resolution=(1024, 1024)):
img.thumbnail(max_resolution, Image.LANCZOS)
with BytesIO() as buffer:
try:
if img.mode == 'RGBA':
img = img.convert('RGB') # 转换为RGB模式
img.save(buffer, format="JPEG", quality=quality)
buffer.seek(0)
return Image.open(buffer).copy()
except Exception as e:
print(f"图像压缩失败: {e}")
return img # 返回原图
# 从内存中的ZIP文件生成HTML文件
def images_to_html_from_zip(zip_content, output_folder, output_name, compress=False, quality=85, max_resolution=(1024, 1024)):
with zipfile.ZipFile(zip_content, 'r') as zip_ref:
image_files = [f for f in zip_ref.namelist() if f.lower().endswith(('png', 'jpg', 'jpeg', 'bmp', 'gif'))]
image_files.sort()
output_html = os.path.join(output_folder, f'{output_name}.html')
with open(output_html, 'w', encoding='utf-8') as html_file:
html_file.write('''
<html>
<head>
<meta charset="UTF-8">
<title>Images</title>
<style>
body { font-family: Arial, sans-serif; text-align: center; }
.image-container { margin-bottom: 20px; }
.image-container img { max-width: 100%; height: auto; }
.caption { margin-top: 10px; font-size: 18px; color: #555; }
</style>
</head>
<body>
''')
total_images = len(image_files)
for index, file in enumerate(image_files):
with zip_ref.open(file) as img_file:
img_bytes = img_file.read()
img = Image.open(BytesIO(img_bytes))
if compress:
img = compress_image(img, quality, max_resolution)
buffered = BytesIO()
img.save(buffered, format="JPEG", quality=quality)
img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8')
html_file.write(f'''
<div class="image-container">
<img src="data:image/jpeg;base64,{img_base64}" alt="{file}">
<div class="caption">Image {index + 1} of {total_images}</div>
</div>
''')
html_file.write('</body></html>')
print(f"HTML文件已保存到 {output_html}")
# 多线程下载并处理文件
def process_download(url, xpath, output_folder): #多线程使用的 但输出下载有问题
combined_urls = extract_and_combine_urls(url, xpath)
if combined_urls:
zip_content, file_name = download_to_memory(combined_urls[0])
if not zip_content and len(combined_urls) > 1:
zip_content, file_name = download_to_memory(combined_urls[1])
if zip_content:
print("文件转换中")
output_name = os.path.splitext(file_name)[0] if file_name else 'output'
images_to_html_from_zip(zip_content, output_folder, output_name, compress=True)
#************************************第二部分结束**********************************
def main():
xpath = '/html/body/div[2]/div[2]'
output_folder = 'output' #要改
os.makedirs(output_folder, exist_ok=True)
# 设置Edge浏览器选项
edge_options = EdgeOptions()
edge_options.use_chromium = True
# 指定EdgeDriver路径
edge_service = EdgeService(executable_path='C:/Users/13816/Downloads/edgedriver_win32/msedgedriver.exe') # 请确保这里的路径是正确的 要改
# 启动Edge浏览器
driver = webdriver.Edge(service=edge_service, options=edge_options)
#driver.get(decode_change("J1;xL-X_5{M{Mb}{KS*a5V7S")) ### !###
try:
# 等待用户在控制台输入`yes`
user_input = input("浏览器已启动,请使用浏览器。输入 'yes' 以继续: ").strip().lower()
while user_input != 'yes':
user_input = input("输入 'yes' 以继续: ").strip().lower()
print("读取已打开的标签页...")
urls = get_opened_edge_tabs(driver)
for url in urls:
print(f'打开的的URL: {url}')
# 筛选符合条件的URL
filtered_urls = filter_urls(urls)
download_queue = queue.Queue()
# 打印筛选后的URL和提取的-aid-后面的id
if filtered_urls:
for url in filtered_urls:
print(f'符合条件的URL: {url}')
aid_number = extract_aid_number(url)
if aid_number:
# print(f'-aid-后面的数字: {aid_number}')
fro = decode_change('J1;xL16~5b"{M{L^Mz|`6<}SBb"TK{M^K0"QC,yYKV;UN,yQJ@:]')
download_url = f'{fro}{aid_number}.html'
# print(f'生成的下载链接: {download_url}')
download_queue.put(download_url)
else:
print("未能找到符合条件的URL")
while not download_queue.empty():
url = download_queue.get()
print("url", url)
combined_urls = extract_and_combine_urls(url, xpath)
if combined_urls:
zip_content, file_name = download_to_memory(combined_urls[0])
if not zip_content and len(combined_urls) > 1:
print("First link failed. Trying the second link...")
zip_content, file_name = download_to_memory(combined_urls[1])
if zip_content:
print("文件转换中")
output_name = os.path.splitext(file_name)[0] if file_name else 'output'
images_to_html_from_zip(zip_content, output_folder, output_name, compress=True) # 将compress参数设为True或False以控制是否压缩图片
# with ThreadPoolExecutor(max_workers=4) as executor:
# futures = []
# while not download_queue.empty():
# url = download_queue.get()
# futures.append(executor.submit(process_download, url, xpath, output_folder))
# for future in as_completed(futures):
# try:
# future.result()
# except Exception as e:
# print(f"下载或处理文件时出错: {e}")
finally:
driver.quit()
if __name__ == '__main__':
main()</blockquote>
此代码仅仅为学习娱乐使用,且勿非法传播使用。