好友
阅读权限 10
听众
最后登录 1970-1-1
wapys
发表于 2022-11-22 08:39
本帖最后由 wapys 于 2022-12-5 13:33 编辑
貌似我这样的小白只会这个,
速度快,比之前那个要快很多
废话不多说, 上代码
[Python] 纯文本查看 复制代码
'''
异步协程下载小说
# ---------------------------------------------------------------------------------
# 支持站点:
# 八一中文网(81zw.com)
# 顶点小说(23usp.com)
#笔趣阁(bqg.org,qbiqu.com,52bqg.net等全部站点)
#天籁小说(xs.23sk.com)
# --------------------------------------------------------------------------------
'''
# 2.导入需要用的库文件
import requests,os,time
import asyncio
import aiohttp,aiofiles
from lxml import etree
# 1.准备网站,headers
URL = "https://www.81zw.com/book/73391/"
gbk = 'utf-8'
headers={
'user-agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:82.0) Gecko/20100101 Firefox/82.0'
}
if not os.path.exists('./缓存'): #创建缓存文件夹来存放抓取的每一章节文件
os.mkdir('./缓存')
# 一个返回网页文章列表的函数
def resp_html(url):
html = requests.get(url,headers=headers)
html.encoding = gbk
return html.text
def list_html(url):
html = resp_html(url)
tree=etree.HTML(html)
zjlist=tree.xpath('//dd//a/@href') #获取每一章的页面相对地址
title=tree.xpath('//h1/text()')[0] #获取小说名字
zj_List = []
for u in zjlist: # 组合链接地址
zjname=u.split('/')[-1].split('.')[0]
zj_List.append(int(zjname))
zj_List.sort() #章节排序
return title,zj_List
# 多次调用函数
def res_content(html):
tree=etree.HTML(html)
title=tree.xpath('//h1/text()')[0] #获取每一章名字
txtt=tree.xpath('//div[@id="content"]/text()') #获取每一章文本内容
txt = ""
for line in txtt: #保存章节内容到文本文件,循环保存每一行
txt = txt+"\n"+line
# xs = URL.split('/')[-2] 这个在 “23usp.com” 这个网站有用 .replace("kvpsd https://www.23usp.com/"+xs+"/ 天才一秒记住","")
#替换掉不用的页面内容
txt = txt.replace("https://www.81zw.com","").replace("网页版章节内容慢,请下载爱阅小说app阅读最新内容","").replace("网站即将关闭,下载爱阅app免费看最新内容","").replace("免费阅读。","")
return title,txt
async def asytxt(url):
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=20,ssl=False)) as session :
async with session.get(url,headers=headers) as resp:
resp.encoding = gbk
html =await resp.text()
code = resp.status
return html,code
# 写一个异步返回text
async def asy_html(nums):
if os.path.exists('./缓存/{}.txt'.format(nums)): #这里是查重,如果有重复文件就不继续采集了
print(nums,"------ 已采集,执行下一个")
else:
url = URL+str(nums)+".html" # 组合网址
html,code = await asytxt(url)
if code == 200:
title,txt = res_content(html)
await downtxt(nums,title,txt)
else:
await asyncio.sleep(1) # 停顿3秒再执行一次
html,code = await asytxt(url)
if code == 200:
title,txt = res_content(html)
await downtxt(nums,title,txt)
else: # 采集不成功,就记录下来
with open('mistake.txt','a',encoding='utf-8') as f:
f.write(str(nums)+'\n')
print(url+' '*10+'-----已记录------第二次下载失败!')
#采集错误的记录在这个文件里
# 这个函数把错误网址在合并文件前再重新采集一遍
def mistake_txt():
if not os.path.exists('./mistake.txt'): #检测错误存储文件,要不没有错误时会报错
print("------漂亮------完美------\n -----你的程序没有出错!-----")
else:
with open('mistake.txt','r+',encoding='utf-8') as f:
ff = f.readlines()
ii = len(ff)
print("共{}条数据。".format(ii))
print("-"*30)
if ii>0:
i = 1
for line in ff:
print("下面采集第 {} 条数据。".format(i))
lines = URL+str(line)+".html"
text = resp_html(lines)
title,txt = res_content(text)
with open('./缓存/{}.txt'.format(line),'w',encoding='UTF-8') as f:
f.write('\n'+title+'\n\n'+txt) #保存章节名字到文本文件
print(title+' '*10+'下载成功')
time.sleep(1)
print("错误网址已经采集完毕!")
else:
print("你没有、没有出错网址!")
print("*"*30)
# 写一个下载文章的函数
async def downtxt(nums,file_name,txt):
async with aiofiles.open('./缓存/{}.txt'.format(nums),'w',encoding='UTF-8') as f:
await f.write(file_name+"\n\n"+txt+"\n\n")
print(file_name +"-"*20+" 已下载完成")
# 写一个合并小说的函数
def combine_txt(title,nums): #合并所有章节文件函数
with open('./小说/{}.txt'.format(title),'a',encoding='utf-8') as f:
for txt in nums: #循环打开缓存中每一章的内容保存到新的文件中
path='./缓存/{}.txt'.format(txt) #设置存放路径
content=open(path,'r',encoding='utf-8').read() #打开每章节文件
f.write(content)
os.remove(path) # 删除缓存的txt文件,调试时可以注释掉
print("已保存 <<"+title+">> 的所有章节!请开心阅读!")
# 写一个主函数
def main():
title,zjlist= list_html(URL) # 返回小说名称和文章列表
try:
F = False
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
tasks = [asy_html(nums) for nums in zjlist]
loop.run_until_complete(asyncio.wait(tasks)) # 激活协程
loop.close()
print("*"*30+"")
#这里检查掉下的错误网址
mistake_txt()
except Exception as e :
F = True
print(e)
print("出现错误时就不执行合并")
if F == False :
# 这里合并文件
combine_txt(title,zjlist)
else:
print("未采集完毕,没有执行查重和合并,请重新运行一次!")
if __name__ == "__main__" :
start = time.time()
main()
end = time.time()
print(end - start, 's')
用下面这个吧!这个各种提示都有了! 我的有多无聊。。。。
[Python] 纯文本查看 复制代码
'''
异步协程下载小说
# ---------------------------------------------------------------------------------
# 支持站点:
# 八一中文网(81zw.com)
# 顶点小说(23usp.com)
#笔趣阁(bqg.org,qbiqu.com,52bqg.net等全部站点)
#天籁小说(xs.23sk.com)
# --------------------------------------------------------------------------------
'''
# 2.导入需要用的库文件
import requests,os,time,re
import asyncio
import aiohttp,aiofiles
from lxml import etree
from concurrent.futures import ThreadPoolExecutor
import PySimpleGUI as sg
from fake_useragent import UserAgent
ua = UserAgent()
# 1.准备网站,headers
URL = "https://www.81zw.com/book/132118/"
if '23usp' in URL or '23sk' in URL: #转码,否则有乱码
gbk = 'gbk'
elif '81zw' in URL:
gbk = 'utf-8'
else:
gbk = 'GB2312'
if not os.path.exists('./缓存'): #创建缓存文件夹来存放抓取的每一章节文件
os.mkdir('./缓存')
if not os.path.exists('./mistake.txt'): #创建缓存文件夹来存放抓取的每一章节文件
with open("./mistake.txt","w", encoding='UTF-8'):
pass
# 一个返回网页文章列表的函数
def resp_html(url):
headers={
'user-agent':ua.random
}
html = requests.get(url,headers=headers)
html.encoding = gbk
return html.text
def list_html(url):
html = resp_html(url)
tree=etree.HTML(html)
zjlist=tree.xpath('//dd//a/@href') #获取每一章的页面相对地址
title=tree.xpath('//h1/text()')[0] #获取小说名字
zj_List = []
zjlist = set(zjlist)
for u in zjlist: # 组合链接地址
zjname=u.split('/')[-1].split('.')[0]
zj_List.append(int(zjname))
zj_List.sort() #章节排序
return title,zj_List
def th_cw(txt,xs):
# 这个列表是要替换成空的,可以按需添加删除
cw_list = ["kvpsd https://www.23usp.com/"+xs+"/ 天才一秒记住","八壹中文網","www.81zw.ćőm","八一中文网","网页版章节内容慢,请下载爱阅小说app阅读最新内容","网站即将关闭,下载爱阅app免费看最新内容","免费阅读。https://www.81zw.com"]
for cw in cw_list:
txt = txt.replace(cw,"")
# 这个列表是要替换成相关字符的,可以按需添加删除
txt = txt.replace("<br><br>","\n").replace("…","…").replace("”","”").replace("“","“").replace("‘","‘").replace("—","’").replace("’","’")
return txt
# 多次调用函数
def res_content(html):
tree=etree.HTML(html)
title=tree.xpath('//h1/text()')[0] #获取每一章名字
if "23usp" in html:
try:
txtt=re.findall('<div id="content">(.*?)kvpsd.*?天才一秒记住',html,re.S)[0]
except:
txtt=tree.xpath('//div[@id="content"]/text()') #获取每一章文本内容
elif "81zw" in html:
txtt = re.findall('<div id="content">(.*?)网页版章节内容慢,请下载爱阅小说app阅读最新内容',html,re.S)[0]
else:
txtt=tree.xpath('//div[@id="content"]/text()') #获取每一章文本内容
if isinstance(txtt, list):
for line in txtt: #保存章节内容到文本文件,循环保存每一行
txt = txt+"\n"+line
else:
txt = txtt
xs = URL.split('/')[-2] # 这个在 “23usp.com” 这个网站有用 .replace("kvpsd https://www.23usp.com/"+xs+"/ 天才一秒记住","")
#替换掉不用的页面内容
txt = th_cw(txt,xs)
return title,txt
async def asytxt(url):
headers={
'user-agent':ua.random
}
await asyncio.sleep(0.5)
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=63,ssl=False)) as session :
async with session.get(url,headers=headers) as resp:
resp.encoding = gbk
html =await resp.text()
code = resp.status
return html,code
# 写一个异步返回text
async def asy_html(index,nums,n):
if not os.path.exists('./缓存/{}.txt'.format(nums)): #这里是查重,如果有重复文件就不继续采集了
url = URL+str(nums)+".html" # 组合网址
html,code = await asytxt(url)
if code == 200:
title,txt = res_content(html)
await downtxt(nums,title,txt)
else:
await asyncio.sleep(2) # 停顿2秒再执行一次
html,code = await asytxt(url)
if code == 200:
title,txt = res_content(html)
await downtxt(nums,title,txt)
else: # 采集不成功,就记录下来
with open('mistake.txt','a',encoding='utf-8') as f:
f.write(str(nums)+"\n")
print(nums)
print(url+' '*10+'-----已记录------第二次下载失败!')
if code == 200:
sg.one_line_progress_meter('小说采集中。。。', index+1, n, title)
await asyncio.sleep(0.1)# '''
# 用多线程采集内容页
def Thpool(ff):
line = ff.strip()
if not os.path.exists('./缓存/{}.txt'.format(line)): #这里是查重,如果有重复文件就不继续采集了
print("下面采集第‘ {} ’这条数据。".format(line),)
lines = URL+str(line)+".html"
text = resp_html(lines)
title,txt = res_content(text)
time.sleep(0.5)
with open('./缓存/{}.txt'.format(line),'w',encoding='UTF-8') as f:
f.write('\n'+title+'\n\n'+txt) #保存章节名字到文本文件
print(title+'>'*10+'下载成功')
time.sleep(0.5)
# 这个函数把错误网址在合并文件前再重新采集一遍
def mistake_txt():
if os.path.exists('./mistake.txt'): #检测错误存储文件,要不没有错误时会报错
with open('mistake.txt','r',encoding='utf-8') as f:
ff = f.readlines()
ii = len(ff)
print("*"*40+"\n本小说 共 {} 条错误章节。".format(ii))
print("*"*40)
if ii>0:
with ThreadPoolExecutor(20) as Pool: #使用线程池,设置20个线程,可修改
Pool.map(Thpool,ff)
print("错误网址已经采集完毕!")
else:
print("------漂亮------完美-------\n------你的程序没有出错-----\n")
print("*"*40)
# 写一个下载文章的函数
async def downtxt(nums,file_name,txt):
async with aiofiles.open('./缓存/{}.txt'.format(nums),'w',encoding='UTF-8') as f:
await f.write(file_name+"\n\n"+txt+"\n\n")
print(file_name +"-"*20+" 已下载完成")
# 写一个合并小说的函数
def combine_txt(title,nums): #合并所有章节文件函数
with open('./小说/{}.txt'.format(title),'a',encoding='utf-8') as f:
for txt in nums: #循环打开缓存中每一章的内容保存到新的文件中
path='./缓存/{}.txt'.format(txt) #设置存放路径
content=open(path,'r',encoding='utf-8').read() #打开每章节文件
f.write(content)
os.remove(path) # 删除缓存的txt文件,调试时可以注释掉
print("*"*40+"\n已保存 <<"+title+">> 的所有章节!请开心阅读!\n****************************************")
# 写一个主函数
def main():
title,zjlist= list_html(URL) # 返回小说名称和文章列表
nums = len(zjlist)
print("*"*40+"\n本小说《%s》总共%d章!"%(title,nums))
time.sleep(2)
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
tasks = [asy_html(index,num,nums) for index,num in enumerate(zjlist)]
loop.run_until_complete(asyncio.wait(tasks)) # 激活协程
loop.close()
print("*"*40+"")
F = False
except Exception as e:
F = True
print("未采集完毕,没有执行查重和合并,请重新运行一次!")
print("出现错误时就不执行合并")
if F == False :
#这里检查掉下的错误网址,为避免出错,下面这个用单页采集
mistake_txt()
num_png = len(os.listdir("./缓存/")) # 读入文件夹
# 统计文件夹中的文件个数
print("---<<%s>>--已下载 %s 章!-----"%(title,num_png))
print("-----本小说《%s》总共%d章!-----"%(title,nums))
# 这里合并文件
if int(nums) == int(num_png) :
time.sleep(2)
combine_txt(title,zjlist)
with open('./mistake.txt','w',encoding='utf-8'): # 删除缓存的txt文件,调试时可以注释掉
pass
else:
print("采集出现错误,请查看采集的章节是否完整,并检查代码有没有错误:")
else:
print("采集出现错误,请查看:")
if __name__ == "__main__" :
start = time.time()
main()
end = time.time()
print(end - start, 's')
免费评分
查看全部评分