刚刚学习了python爬虫相关,写了个简单的多线程爬取斗破苍穹小说,主要是多线程和并发类的使用,还请各位大佬不吝赐教。
utils代码
[Asm] 纯文本查看 复制代码 from threading import Condition
"""
工具类库
"""
class CountDownLatch:
def __init__(self, count):
self.count = count
self.condition = Condition()
def wait(self):
try:
self.condition.acquire()
while self.count > 0:
self.condition.wait()
finally:
self.condition.release()
def countDown(self):
try:
self.condition.acquire()
self.count -= 1
self.condition.notifyAll()
finally:
self.condition.release()
def getCount(self):
return self.count
python代码
[Asm] 纯文本查看 复制代码 import logging
import os
import time
import threading
import requests
from bs4 import BeautifulSoup
from utils import CountDownLatch
"""
多线程爬取斗破苍穹小说
"""
# ======================初始化
# 线程数量
THREAD_NUM = 10
countDownLatch = CountDownLatch(THREAD_NUM)
# 设置loging等级
logging.basicConfig(level=logging.INFO)
# 代{过}{滤}理UA
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
}
# 小说网站域名
DOMAIN = 'http://www.xbiquge.la'
# 爬取的小说txt存放目录
DIR_NAME = './斗罗大陆小说'
if not os.path.exists(DIR_NAME):
os.makedirs(DIR_NAME)
# =======================工具函数
def timer(func):
def inner(*args, **kwargs):
start = time.time()
ret = func(*args, **kwargs)
end = time.time()
logging.info(' 执行{}函数花费时间: {} s'.format(func.__name__, end-start))
return inner
# =======================业务函数
def get_html(url):
"""
获取url的html内容
[url=home.php?mod=space&uid=155549]@Return[/url] str html文本内容
"""
html = requests.get(url).content.decode('utf-8')
return html
def get_chapater_url_list():
"""
获取小说章节url列表
@return list 章节列表url
"""
url = "http://www.xbiquge.la/1/1710/"
html = get_html(url)
soup = BeautifulSoup(html, 'lxml')
logging.debug("获取小说章节html如下: ")
logging.debug(soup.prettify())
# 提取章节
a_tag_list = soup.select('#list')[0].select('a')
logging.debug("提取章节a标签列表如下: ")
logging.debug('共{}个a标签,第一个: {},最后一个: {}'.format(
len(a_tag_list), a_tag_list[0], a_tag_list[-1]))
a_tag_url_list = []
# 获取所有a标签内的url,map在python3中返回的是迭代器
for tag in a_tag_list:
a_tag_url_list.append(tag['href'])
logging.debug('提取a标签列表的url完成')
return a_tag_url_list
def crawling_article_and_write_file(url_list, name):
"""
爬去小说章节消息内容,并将其写入文件中
[url=home.php?mod=space&uid=952169]@Param[/url] url_list 小说章节url列表
@param name 文件名称
"""
file_name = os.path.join(DIR_NAME, name)
with open(file_name, 'w', encoding='utf-8') as fp:
for url in url_list:
html = get_html(DOMAIN+url)
soup = BeautifulSoup(html, 'lxml')
# 文章标题
title = soup.select('div.bookname')[0].h1.text
# 文章内容
article = soup.select('div#content')[0].text
# 写入文件中
fp.writelines(title)
fp.writelines(article)
global countDownLatch
countDownLatch.countDown()
def allocate_task(chapter_list, num):
"""
给线程分配任务
@param chapter_list 章节的url列表
@param num 线程数量
@return list 分配任务的线程列表
"""
logging.info('共 {} 个url,预设线程数: {}采用如下方式分配线程执行'.format(
len(chapter_list), num))
threads = []
n = len(chapter_list) # url列表数量
step = int(len(chapter_list)/num)
if num > n:
logging.info('任务数小于预设线程数,请重新设置线程数')
return threads
for i in range(num):
start = i*step
end = start+step if (i != num - 1) else n
task_url_list = chapter_list[i*step:i*step +
step] if (i != num - 1) else chapter_list[i*step:]
# 开启线程处理
file_name = '{}-斗破苍穹{}-{}'.format(i, start, end)
t = threading.Thread(target=crawling_article_and_write_file,
args=(task_url_list, file_name))
logging.info('线程{},执行{}-{},共{}个url'.format(i,
start, end, len(task_url_list)))
threads.append(t)
return threads
def merge_file():
global THREAD_NUM, DIR_NAME
order_map = {}
file_list = os.listdir(DIR_NAME)
# 将 file_list 映射为 map key=数字顺序,value文件名称
for name in file_list:
order_map[name[0]] = name
logging.info('发现待合并文件数量: {}, 名称如下'.format(len(file_list)))
logging.info(file_list)
merge_file_path = os.path.join(DIR_NAME, '斗罗大陆.txt')
with open(merge_file_path, 'w', encoding='utf-8') as fpw:
for i in range(THREAD_NUM):
# 从 file_list 找到文件名称
fname = order_map[str(i)]
with open(os.path.join(DIR_NAME, fname), 'r', encoding='utf-8') as fpr:
while True:
buffer = fpr.read(4096)
if not buffer:
break
fpw.write(buffer)
fpw.write("\n")
logging.info('合并文件完成,merge文件名称是: {}'.format(merge_file_path))
@timer
def main():
global countDownLatch, THREAD_NUM
# 5个线程
chapter_list = get_chapater_url_list()
threads = allocate_task(chapter_list, THREAD_NUM)
if len(threads) == 0:
return
logging.info(" 子线程开始执行...")
for t in threads:
t.start()
countDownLatch.wait()
logging.info(' 所有任务执行完成...')
logging.info(' 开始合并各个章节内容')
merge_file()
if __name__ == "__main__":
main()
|