python 爬取平板电子书小说网
"""@Time : 2023/1/5 15:53
@AuThor: FanSL
@file : 2023-1-5平板电子书下载小说.py
"""
import os
import shutil
import re
from lxml import etree
from tqdm import trange
import aiohttp
import asyncio
import time
from bs4 import BeautifulSoup
import requests
# 打印name_url
def print_name_url(name_url, count):
for num, key in zip(range(1, count + 1), name_url):
print(str(num) + ":{0: <20s}{1: <20s}{2: <40s}".format(key, key, key))
# 输入书名 返回name_url
def get_name_url(url, book_name):
search_url = url + "modules/article/search.php?searchkey=" + book_name
with requests.get(search_url) as response:
response.encoding = "utf-8"
source_code = response.text
obj = re.compile(
r'《<a href="(?P<url>.*?)">(?P<name>.*?)</a>》.*?((({3}'
r'|{2}{1}|{1}{2}|{3})-'
r'(((0|1)-(0||3))|((0|11)-(0'
r'||30))|(02-(0||2))))|((({2})(0'
r'||)|((0||)00))-02-29))'
r'\s(?P<type>.*?)小说</div>.*?最新章节:.*?>(?P<lastchapter>.*?)</a>',
re.S)
res = obj.finditer(source_code)
name_url = []
count = 0
for i in res:
name_url.append([])
name_url.append(i.group("name"))
name_url.append(i.group("url"))
name_url.append(i.group("type"))
name_url.append(i.group("lastchapter"))
count = count + 1
return name_url, count# name_url为小说列表 count为列表长度
# 输入id 返回main_url
def get_main_url(url, book_id, name_url):
novel_intro_url = name_url
with requests.get(novel_intro_url) as response:
response.encoding = "utf-8"
intro_code = response.text
intro_code = etree.HTML(intro_code)
href = intro_code.xpath("/html/body/div/div/div/p/a/@href")
main_url = url + href
return main_url
# 输入下载网页地址 返回下载地址
def get_download_url(download_page_url, book_name):
download_url = download_page_url.rstrip('.html').replace('xiazai', '').replace('www',
'txt') + '/' + book_name + '.txt'
return download_url
# 输入main_url 返回详情页
def get_detail(url, main_url):
detail = {}
with requests.get(main_url) as response:
response.encoding = "utf-8"
main_code = response.text
soup = BeautifulSoup(main_code, "html.parser")
download_page_url = soup.find("div", class_="info").find("a", class_="txt")["href"]
author = soup.find("span", class_="author").text
intro = soup.find("div", class_="intro").contents.text.strip("\n")
book_name = soup.find("h1").text
download_url = get_download_url(download_page_url, book_name)
detail["book_name"] = book_name
detail["author"] = author
detail["intro"] = intro
detail["main_url"] = main_url
detail["download_url"] = download_url
return detail
# 下载小说
def download_novel_by_ori_url(download_path, book_name, download_url):
with requests.get(download_url) as download:
with open(download_path + book_name + ".txt", "wb") as f:
if download.status_code == 200:
f.write(download.content)
return True
else:
return False
# 如果文件夹不存在就创建,如果文件存在就清空!
def RemoveDir(filepath):
if not os.path.exists(filepath):
os.mkdir(filepath)
else:
shutil.rmtree(filepath)
os.mkdir(filepath)
# 获取章节及连接列表 字典
def get_chapters(main_url):
chapters = {}
with requests.get(main_url) as request:
request.encoding = 'utf-8'
soup = BeautifulSoup(request.text, 'lxml')
chapters_temp = soup.find('div', class_='list').find('dl').find_all('a')
for i, chapter in zip(range(1, len(chapters_temp) + 1), chapters_temp):
chapters = main_url + chapter.get('href')
return chapters
# 保存detail chapter文件
def write_detail_chapters(temp_path, novel_name, detail, chapters):
if not os.path.exists(temp_path):
os.mkdir(temp_path)
RemoveDir(temp_path + novel_name)
RemoveDir(temp_path + novel_name + '/logs')
with open(temp_path + novel_name + '/' + 'information.txt', 'w', encoding='utf-8') as f:
for key, value in detail.items():
f.write(key + ':' + value + '\n')
f.write('\n')
for key, value in chapters.items():
f.write(key + ':' + value + '\n')
return temp_path + novel_name + '/logs/'
# 输入logs_path、chapter_name写入txt文件
def write_logs_(logs_path, chapter_name, chapter):
with open(logs_path + chapter_name + '.txt', 'w', encoding='utf-8') as f:
for key, value in chapter.items():
f.write(key + '\n')
f.write(value.replace("\n "," ").replace(" ","\n ") + '\n')
# novel_pack
def novel_pack(download_path, logs_path, detail, chapters):
with open(download_path + detail['book_name'] + '.txt', mode='w', encoding='utf-8') as book_file:
for key, tq in zip(chapters.keys(), trange(len(chapters), desc='正在打包')):
with open(logs_path + key + '.txt', mode='r', encoding='utf-8') as chapter_file:
book_file.write(chapter_file.read())
book_file.write('\n')
# 去杂
def remove_impurities(download_path,book_name):
novel_path = download_path + book_name + '.txt'
with open(novel_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
with open(novel_path, 'w', encoding='utf-8') as f:
for line in lines:
result = re.search('txt下载地址.*?谢谢您的支持!!', line)
if result is not None:
impurities = result.group()
line = line.replace(impurities, ' ')
f.write(line)
# 下载章节
async def download_chapter(logs_path, chapter_name, chapter_href, session):
chapter = {}
async with session.get(chapter_href) as request:
request.encoding = 'utf-8'
request_text = await request.text()
soup = BeautifulSoup(request_text, 'html.parser')
chapter_content = soup.find('div', class_='content').text
chapter] = chapter_content
write_logs_(logs_path, chapter_name, chapter)
# 下载
async def aio_download(logs_path, chapters):
tasks = []
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=64, ssl=False)) as session:
for chapter_name, chapter_href in chapters.items():
tasks.append(asyncio.create_task(download_chapter(logs_path, chapter_name, chapter_href, session)))
await asyncio.wait(tasks)
# 异步协成下载小说
def aio_main_download(download_path, temp_path, detail):
chapters = get_chapters(detail['main_url'])
logs_path = write_detail_chapters(temp_path, detail['book_name'], detail, chapters)
asyncio.run(aio_download(logs_path, chapters))
novel_pack(download_path, logs_path, detail, chapters)
# main function
def main(url, download_path, temp_path):
book_name = input("请输入小说书名:")
name_url, count = get_name_url(url, book_name)
print_name_url(name_url, count)
booK_id = input("请输入下载的小说序号:")
main_url = get_main_url(url, booK_id, name_url)
detail = get_detail(url, main_url.replace("com//", "com/"), )
print(detail)
print("开始下载--------------------------------")
start_time = time.time()
is_over = download_novel_by_ori_url(download_path, detail['book_name'], detail['download_url'])
# is_over = False
if not is_over:
print("原网站提供的txt下载失败,开始异步协程下载。")
aio_main_download(download_path, temp_path, detail)
end_time = time.time()
print("下载完成--------------------------------")
print(f"耗时:{round(end_time - start_time, 2)} s")
delete_logs = input("是否保留(1)日志文件:")
if delete_logs != '1':
logs_path = temp_path + detail['book_name'] + '/'
RemoveDir(logs_path)
os.removedirs(logs_path)
print("开始去杂--------------------------------")
remove_impurities(download_path,detail['book_name'])
print("去杂完成--------------------------------")
# main
if __name__ == '__main__':
url = "http://www.qiuyelou.com/"
download_path = "小说下载/"
temp_path = "小说下载/temp/"
# --------------------------------------------
main(url, download_path, temp_path)
不错,入门级 谢谢作者让我学到了新的py知识 谢谢,实用小栗子{:1_899:} 可以可以,过段时间学习一下python 学习一下python 先保存,以后用到爬虫的时候再学习。 哪来学习一下 学习交流好厉害 这个代码非常实用,感谢楼主!
页:
[1]
2