import
os
import
re
import
time
import
requests
from
concurrent.futures
import
ThreadPoolExecutor, as_completed
from
urllib.parse
import
urljoin
from
lxml
import
etree
from
requests.adapters
import
HTTPAdapter
import
chardet
import
threading
USER_AGENTS
=
[
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
,
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
,
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
,
'Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko'
,
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15'
]
MAX_RETRIES
=
10
TIMEOUT
=
5
def
get_random_user_agent():
return
USER_AGENTS[
int
(time.time())
%
len
(USER_AGENTS)]
def
get_session():
thread_local
=
threading.local()
if
not
hasattr
(thread_local,
"session"
):
thread_local.session
=
requests.Session()
adapter
=
HTTPAdapter(pool_connections
=
100
, pool_maxsize
=
100
, max_retries
=
MAX_RETRIES)
thread_local.session.mount(
'http://'
, adapter)
thread_local.session.mount(
'https://'
, adapter)
return
thread_local.session
def
decode_content(response):
detected
=
chardet.detect(response.content)
encodings
=
[
'utf-8'
,
'gbk'
,
'gb2312'
,
'big5'
,
'gb18030'
]
if
detected[
'confidence'
] >
=
0.7
:
try
:
return
response.content.decode(detected[
'encoding'
], errors
=
'strict'
)
except
UnicodeDecodeError:
pass
for
enc
in
encodings:
try
:
return
response.content.decode(enc, errors
=
'strict'
)
except
UnicodeDecodeError:
continue
return
response.content.decode(detected[
'encoding'
], errors
=
'replace'
)
def
fetch_url(url, headers):
session
=
get_session()
for
attempt
in
range
(MAX_RETRIES):
try
:
response
=
session.get(url, headers
=
headers, timeout
=
TIMEOUT)
response.raise_for_status()
return
response
except
requests.exceptions.RequestException as e:
if
attempt
=
=
MAX_RETRIES
-
1
:
raise
e
time.sleep(
1
)
def
get_chaptercontent(chapter_url, index):
headers
=
{
'User-Agent'
: get_random_user_agent(),
'Accept-Language'
:
'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7'
,
'Referer'
: chapter_url
}
all_content
=
[]
title
=
""
while
chapter_url:
try
:
response
=
fetch_url(chapter_url, headers)
html
=
decode_content(response)
selector
=
etree.HTML(html)
title_elements
=
selector.xpath(title_elements_xpath)
contents
=
selector.xpath(contents_xpath)
all_content.extend([content.strip()
for
content
in
contents
if
content.strip()])
title
=
title_elements[
0
].strip()
if
title_elements
else
""
title
=
re.sub(r
'(\s*\d+\s*/\s*\d+\s*)'
, '', title).strip()
next_page
=
selector.xpath(
'//a[contains(text(), "下一页")]/@href'
)
if
next_page
and
next_page[
0
] !
=
"javascript:"
:
chapter_url
=
urljoin(chapter_url, next_page[
0
])
else
:
chapter_url
=
None
except
Exception as e:
print
(f
"获取章节 {title} 时发生错误: {e}"
)
break
if
not
title
or
not
all_content:
print
(f
"章节 {index} 获取失败"
)
return
(index,
None
, "")
chaptercontent
=
"\n "
.join(all_content)
return
(index, title, chaptercontent.strip())
def
download_chapters(base_url, max_threads):
headers
=
{
'User-Agent'
: get_random_user_agent()}
all_chapter_links
=
[]
book_name
=
None
first_directory_page
=
True
while
base_url:
try
:
response
=
fetch_url(base_url, headers)
html
=
decode_content(response)
selector
=
etree.HTML(html)
if
first_directory_page:
book_name
=
selector.xpath(book_name_xpath)[
0
].strip()
print
(f
'\n开始下载小说: 《{book_name}》\n'
)
first_directory_page
=
False
chapter_links
=
selector.xpath(chapter_links_xpath)[chapter_links_start_number:]
all_chapter_links.extend(urljoin(base_url, href)
for
href
in
chapter_links)
if
directory_pages_xpath
and
current_page_option_xpath:
directory_pages
=
[(urljoin(base_url, option.attrib[
'value'
]), option.text)
for
option
in
selector.xpath(directory_pages_xpath)]
current_page_option
=
selector.xpath(current_page_option_xpath)
if
current_page_option:
current_page_value
=
urljoin(base_url, current_page_option[
0
].attrib[
'value'
])
current_page_text
=
current_page_option[
0
].text
print
(f
'当前目录页:{current_page_text}'
)
current_page_index
=
[page[
0
]
for
page
in
directory_pages].index(current_page_value)
if
current_page_index
+
1
<
len
(directory_pages):
base_url
=
directory_pages[current_page_index
+
1
][
0
]
else
:
base_url
=
None
else
:
print
(
"未找到当前选中的目录页,停止抓取。"
)
break
else
:
break
except
Exception as e:
print
(f
"获取目录页时发生错误: {e}"
)
break
if
not
book_name:
print
(
"无法获取书名,请检查URL和网页结构。"
)
return
False
save_dir
=
os.path.join(os.getcwd(),
'我的小说'
)
os.makedirs(save_dir, exist_ok
=
True
)
output_path
=
os.path.join(save_dir, f
'{book_name}.txt'
)
chapters
=
[]
failed_chapters
=
[]
def
write_to_file():
chapters.sort(key
=
lambda
x: x[
0
])
try
:
with
open
(output_path,
'w'
, encoding
=
'utf-8'
) as f:
f.write(f
'\n\n书名:{book_name}\n\n网址:{input_url}\n\n\n'
)
for
idx, title, content
in
chapters:
f.write(f
"{title}\n\n{content}\n\n"
)
if
failed_chapters:
print
(f
"\n以下章节下载失败: {failed_chapters}"
)
print
(f
'\n《{book_name}》下载完成'
)
return
True
except
Exception as e:
print
(f
"写入文件时发生错误: {e}"
)
return
False
success
=
True
with ThreadPoolExecutor(max_workers
=
max_threads) as executor:
futures
=
[executor.submit(get_chaptercontent, link, idx)
for
idx, link
in
enumerate
(all_chapter_links,
1
)]
for
future
in
as_completed(futures):
try
:
index, title, content
=
future.result()
if
title
and
content:
chapters.append((index, title, content))
print
(f
"完成章节: {title}"
)
else
:
failed_chapters.append(index)
except
Exception as e:
print
(f
"处理章节时出错: {e}"
)
failed_chapters.append(index)
success
=
False
if
not
write_to_file():
success
=
False
return
success
if
__name__
=
=
"__main__"
:
default_url
=
'https://www.xbqg06.com/373303/'
book_name_xpath
=
'//h1/text()'
chapter_links_xpath
=
'(//ul[@class="section-list fix"])[2]/li/a/@href'
chapter_links_start_number
=
0
title_elements_xpath
=
'//h1/text()'
contents_xpath
=
'//div[@id="content"]/p/text()'
directory_pages_xpath
=
'//option'
current_page_option_xpath
=
'//option[@selected="selected"]'
input_url
=
input
(f
"请输入小说目录页地址(默认 {default_url}): "
)
or
default_url
while
True
:
threads_input
=
input
(
"请输入并发线程数(1-100,默认20): "
)
or
"20"
if
threads_input.isdigit()
and
1
<
=
int
(threads_input) <
=
100
:
max_threads
=
int
(threads_input)
break
print
(
"输入无效,请输入1-100之间的整数"
)
start_time
=
time.time()
success
=
download_chapters(base_url
=
input_url, max_threads
=
max_threads)
elapsed
=
time.time()
-
start_time
if
success:
print
(f
"总耗时: {elapsed:.2f}秒"
)
else
:
print
(
"下载过程中发生错误"
)
input
(
"下载完成,小说保存在“我的小说”文件夹内,回车退出!"
)