[Python] 纯文本查看 复制代码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import re
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import PriorityQueue
from urllib.parse import quote
import requests
from alive_progress import alive_bar
from prettytable import PrettyTable
headers = {
'Connection': 'keep-alive',
'Cache-Control': 'max-age=0',
'Upgrade-Insecure-Requests': '1',
'Origin': 'https://www.shuquge.com',
'Content-Type': 'application/x-www-form-urlencoded',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'Referer': 'https://www.shuquge.com/',
}
session = requests.Session()
session.headers.update(headers)
# proxy 填写示例 没有则忽略
# 具体的百度。。。
# socks 代{过}{滤}理 示例:
# proxy = {
# 'http': 'socks5://127.0.0.1:10808',
# 'https': 'socks5://127.0.0.1:10808'
# }
# http、https 代{过}{滤}理 示例:
# proxy = {
# 'http': 'http://127.0.0.1:10809',
# 'https': 'http://127.0.0.1:10809'
# }
proxy = {
'http': None,
'https': None
}
# 根据用户输入关键词查找小说
def find_by_name(name):
j = 0
while j < 3:
try:
response = session.post(f'{host}/search.php', data=f"s=6445266503022880974&searchkey={quote(name)}",proxies=proxy, timeout=5)
response.encoding = response.apparent_encoding
if response.status_code == 200:
return response
except:
j += 1
# 利用正则解析html 获取小说名、小说分类、小说作者 并以表格形式打印 再根据用户输入返回要下载的小说url、小说名
def parse_html(html):
if html:
reg = r'<a href="(.*?)">(.*?)</a>.*?<div class="cat">分类:(.*?)</div>.*?<div class="author">作者:(.*?)</div>'
novel_list = re.findall(reg, html.text)
if novel_list:
index = 1
tb = PrettyTable()
tb.field_names = ["序号", "小说名", "分类", "作者"]
for novel in novel_list[:]:
novelname = novel[1]
if name not in novelname:
novel_list.remove(novel)
continue
tb.add_row([index, novelname, novel[2], novel[3]])
index += 1
print(tb)
else:
print('对不起,没有找到您要的书!请输入其它书名')
return None, None
print('请输入你想爬取的小说序号(1、2、3...等)或输入"quit"退出程序:')
while True:
index = input()
if index == 'quit':
exit()
elif int(index) in range(1, len(novel_list) + 1):
break
else:
print('没有您需要的小说序号,请检查后重新输入序号!')
try:
novel_name = novel_list[int(index) - 1][1]
print('您即将下载小说 <<' + novel_name + '>>')
novel_url = host + novel_list[int(index) - 1][0]
return novel_url, novel_name
except IndexError as e:
print(e)
else:
print('没有返回结果!')
# 获取所有小说章节
def get_all_novel_chapter(novel_url):
j = 0
while j < 3:
try:
response = session.get(novel_url, proxies=proxy, timeout=5)
if response.status_code == 200:
response.encoding = response.apparent_encoding
reg = '<dd><a href="(.*?)">(.*?)</a></dd>'
chapter_list = re.findall(reg, response.text)[12:]
new_novel_url = novel_url.replace('index.html', '')
chapter_list = [(new_novel_url + chapter[0], chapter[1]) for chapter in chapter_list]
return chapter_list
except:
j += 1
# 下载小说章节
def download_novel_chapter(novel_chapter_url, novel_chapter_name, index):
j = 0
while j < 10:
try:
response = session.get(novel_chapter_url, proxies=proxy, timeout=5)
if response.status_code == 200:
response.encoding = response.apparent_encoding
reg = r'<div id="content" class="showtxt">(.*?)</div>'
chapter_content = re.findall(reg, response.text, re.S | re.M | re.I)[0].replace(' ', ' ')
filter_content = novel_chapter_content_filter(chapter_content)
Queue.put([index, novel_chapter_name + "\n" + filter_content])
break
except:
j += 1
# 小说章节内容过滤,去除插入广告
def novel_chapter_content_filter(content):
reg = r'http.*?html|请记住本书首发域名.*?。书趣阁_笔趣阁手机版阅读网址:.*'
filter_content = re.sub(reg, '', re.sub('<br.*?/>', '', content))
return filter_content
if __name__ == '__main__':
host = "https://www.shuquge.com"
while True:
print('请输入你需要查询的小说书名:')
name = input()
results = find_by_name(name)
novel_url, novel_name = parse_html(results)
if novel_url is not None:
break
novel_chapter_list = get_all_novel_chapter(novel_url)
print('请输入爬取线程数 范围(1-256) 不建议设置太大!!!:')
while True:
thread_num = int(input())
if 1 <= thread_num <= 256:
break
else:
print("输入错误,请重新输入爬取线程数")
# 创建优先级队列,保证小说章节顺序
Queue = PriorityQueue()
with alive_bar(len(novel_chapter_list), title="download", force_tty=True) as bar, ThreadPoolExecutor(max_workers=thread_num) as executor:
all_task = [
executor.submit(
download_novel_chapter,
novel_chapter_url=novel_chapter[0],
novel_chapter_name=novel_chapter[1],
index=index
)
for index, novel_chapter in enumerate(novel_chapter_list)
]
for future in as_completed(all_task):
if future.done():
bar()
print('小说爬取完毕......')
path = ""
while True:
num = input("请按照提示键入小说保存的目录\n[1] 自定义保存目录 \n[2] 保存至内置默认目录\n")
if num == "1":
path = input("请输入您想要保存的目录:\n")
if os.path.isdir(path):
path = os.sep.join([path, 'book'])
if not os.path.exists(path=path):
os.makedirs(path)
break
else:
print('该目录不存在,已返回上一级!!!')
continue
elif num == "2":
# 默认保存目录
path = r"C:\Users\Administrator\Downloads\Documents"
if not os.path.exists(path=path):
os.makedirs(path)
break
else:
print("输入错误 请输入数字 1 或 2")
with open(f'{path}/{novel_name}.txt', 'w', encoding='utf-8') as f:
while not Queue.empty():
f.write(Queue.get()[1])
print("小说写入完成,以保存至-->{} 目录!!!".format(path))
print("程序将在5秒后结束")
time.sleep(5)