好友
阅读权限10
听众
最后登录1970-1-1
|
自己写的一个boss直聘爬虫,没用框架,也没做登录cookie。试过代{过}{滤}理IP,但是免费的基本上没几个能用的。都是用自己IP爬取,会被封IP。
主要用了python的requests包、bs4包,mongdb数据库存储爬下来的数据,redis存链接。
主要包括五个模块:HTMLDownload(网页HTML内容下载)、URLManager(url管理)、HTMLAnalysis(HTML内容解析)、Control(调度模块)、Data(数据库连接等)。
一些说明:代码最上边的导入包语句,是我自己的其他包。其实我这是一个完整的boss、看准网爬虫+django做的界面展示数据。这是拿出来的boss爬虫部分。有些自己写的包可能没有,但是都是辅助部分,比如log啊,还有像db相关的其实就是连接本地redis或者mongdb库的代码。数据库插入修改这些代码里都有。大家自己加一下就是,都很简单。
如果大家有疑问的可以回帖问我。
第一次发自己写的代码,也不太习惯吾爱破解的编辑界面,请多多指教!
下面附详细代码
首先是URLManager模块:
[Python] 纯文本查看 复制代码
# coding=utf-8
import hashlib
from db import redisPool
from job.spider.log import logger
import logging
# logger = logger('URLManager')
class URLManager():
# url管理类。url集合存储在redis中
def __init__(self,site):
'''
初始化
'''
self.conn = redisPool.getRedis() # 初始化一个redis的连接
# self.new_url = self.conn.sadd()
self.new_url = site+'_new_urls'
self.old_url = site+'_old_urls'
def old_urls_size(self):
'''
获取已爬取链接的数量
:return:
'''
return self.conn.scard(self.old_url)
def add_new_url(self,url):
'''
向待爬取链接集合中增加新的待爬取链接
:param url: 单个链接
:return:
'''
if 'www.zhipin.com' in url:
logger = logging.getLogger('boss')
else:
logger = logging.getLogger('kanzhun')
logger.info('开始向['+self.new_url+']中添加待爬取url')
if not url:
return None
m = hashlib.md5()
m.update(url.encode('utf-8'))
url_md5 = m.hexdigest()
if not self.conn.sismember(self.new_url,url) and not self.conn.sismember(self.old_url,url_md5):
self.conn.sadd(self.new_url,url)
def add_new_urls(self, urls):
'''
向待爬取链接集合中增加新的待爬取链接集合
:param urls: 待爬取链接。集合
:return:
'''
# logger.info('开始向[' + self.new_url + ']中添加待爬取url')
if not urls:
return None
for url in urls:
self.add_new_url(url)
def has_new_url(self):
'''
是否还有待爬取的链接
:return:Boolean
'''
return self.conn.scard(self.new_url) != 0
def get_new_url(self):
'''
获取即将要爬取的新链接
:return:
'''
# logger.info('开始从[' + self.new_url + ']中获取待爬取url')
new_url = self.conn.spop(self.new_url)
# logger.info('还有'+str(self.conn.scard(self.new_url))+'条链接待爬取,当前待爬取链接'+new_url)
#引入hashlib库,将url进行md5转化,并只取中间128位,减少数据长度,节省内存。
m = hashlib.md5()
m.update(new_url.encode())
# logger.info('开始向[' + self.old_url + ']中添加已爬取url')
self.conn.sadd(self.old_url,m.hexdigest()[8:-8])
return new_url
HTMLDownload模块代码:
[Python] 纯文本查看 复制代码
#coding=utf-8
import requests,logging,re
from job.spider.spiderHelper import Proxy,get_agent
from job.spider.ProxyVaildate.proxyProvider import able_ip
from bs4 import BeautifulSoup
import logging
class HTMLDownload():
#下载html
def __init__(self):
# 获取一个随机ip
self.ip = able_ip()
# 获取一个随机agent
self.headers = get_agent()
#模拟生成代{过}{滤}理
self.proxies = {'http':'http://'+self.ip,
'https':'https://'+self.ip
}
def download(self,url):
'''
根据传进来的url下载对应页面
:param url:
:return:
'''
if 'www.zhipin.com' in url:
logger = logging.getLogger('boss')
else:
logger = logging.getLogger('kanzhun')
logger.info('开始下载当前url[' + str(url) + ']')
#测试ip是否可用
# res = requests.get(url,headers=self.headers,proxies=self.proxies,timeout=5)
res = requests.get(url,headers=self.headers,timeout=5)
if res.status_code == 200:
logger.info('下载当前url[' + str(url) + ']成功')
res.encoding = 'utf-8'
return res.text
logger.info('下载当前url['+url+']失败,状态码:'+str(res.status_code))
return None
def test_ip(self,test_ip):
'''
测试ip是否有效
:param ip:
:return:
'''
url = 'https://ip.cn/'
res = requests.get(url,headers=self.headers,proxies=self.proxies,time_out=5)
if res.status_code == 200:
res.encoding = 'utf-8'
soup = BeautifulSoup(res.text,'html.parser')
script = soup.find('div', 'container-fluid').find_next('script').string
ip = re.findall(r'\d+.\d+.\d+.\d+', script)[0]
print(ip)
if ip == test_ip:
print('测试通过')
return True
return False
HTMLAnalysis模块代码:
[Asm] 纯文本查看 复制代码
#coding=utf-8
from bs4 import BeautifulSoup
from abc import ABCMeta,abstractmethod
from urllib.parse import urljoin
import re
import logging
logger = logging.getLogger('boss')
job_info = {}
class HTMLAnalysis(metaclass=ABCMeta):
'''
HTML解析抽象类
'''
@abstractmethod
def parse(self, url, html):
pass
@abstractmethod
def getNewUrl(self, url, soup):
pass
@abstractmethod
def getNewData(self, url, soup):
pass
class JobHTMLAnalysis(HTMLAnalysis):
'''
boss直聘职位列表页面html的解析
'''
def parse(self,url,html):
'''
重写虚类的parse方法,解析html
:param url: 当前页面的url
:param html: 当前页面得到的text即html文件
:return:
'''
if not url or not html:
logger.info('传入参数不完整')
return set(),None
logger.info('开始解析职位列表页面')
soup = BeautifulSoup(html,'html.parser')
new_url = self.getNewUrl(url,soup)
new_data = self.getNewData(url,soup)
logger.info('解析职位列表页面html成功')
return new_url,new_data
def getNewUrl(self,url,soup):
'''
职位页面获取新url
:param url:
:param soup:
:return: 详情url
'''
logger.info('开始从职位列表页面解析新的url,即职位详情')
new_urls = set()
jobs = soup.find_all('div','job-primary')
for job in jobs:
href = job.find('div','info-primary').find('a')['href']
new_url = urljoin(url,href)
logger.info('解析到新的详情界面url:'+new_url)
new_urls.add(new_url)
logger.info('getNewURL方法执行完毕')
return new_urls
def getNewData(self,url,soup):
'''
职位页面获取职位相关数据
:param url:
:param soup:
:return:
'''
logger.info('开始获得职位列表界面的数据')
result = []
jobs = soup.find_all('div','job-primary')
city = soup.find('div','city-sel').find('span','label-text').find('b').string
for job in jobs:
jobId = job.find('div','info-primary').find('a')['data-jobid']
result.append(job.find('div','job-title').string) #获取职位名
result.append(job.find('span', 'red').string) #获取工资
result.append(job.find('div', 'company-text').find('a').string.replace('.','')) #获取公司名称
# 获取公司地址、经验、学历要求。在一个p标签内还有e标签,先找到p标签,再找到其子标签
response1 = job.find('div', 'info-primary').find('p').contents
result.append(response1[0]) #获取工作地点
result.append(response1[2]) #获取工作经验要求
result.append(response1[4]) #获取学历要求
# 获取公司行业、规模、融资情况
response2 = job.find('div', 'company-text').find('p').contents
result.append(response2[0]) #获取公司行业
if len(response2) < 5:
result.append('无融资信息') #公司融资情况
result.append(response2[2]) #公司规模
else:
result.append(response2[2])
result.append(response2[4])
# job_info['jobId'] = jobId
result.append(city)
job_info[jobId] = [i for i in result]
result.clear()
print(job_info[jobId])
logger.info('已经获得了职位列表界面的数据')
return job_info
class DetailHTMLAnalysis(HTMLAnalysis):
'''
职位详情界面HTML解析
'''
def parse(self,url,html):
if not url or not html:
logger.info('传入参数不完整')
return None,None
logger.info('开始解析职位详情页面')
soup = BeautifulSoup(html,'html.parser')
new_url = self.getNewUrl(url,soup)
new_data = self.getNewData(url,soup)
logger.info('解析职位详情页面html成功')
return new_url,new_data
def getNewUrl(self,url,soup):
return set()
def getNewData(self,url,soup):
'''
职位详情页面解析。目前就解析出工作年限和全部的工作要求
:param url:
:param soup:
:return:
'''
#工作年限要求
year = ''
if not url or not soup:
logger.info('参数错误')
return None
logger.info('开始获取职位详情数据'+url)
error = soup.find('div','error-content')
logger.info(error)
if soup.find('div','error-content'):
logger.info('工作详情界面不存在')
return None
job = soup.find('div','detail-op').find('a')['ka']
jobId = re.search(r'\d+',job).group(0)
details = soup.find('div', 'detail-content').find('div', 'text').contents
logger.info('获取了职位详情数据**'+str(len(details))+'***'+jobId)
for i in range(len(details)):
if i % 2 != 0:
continue
if '年' in details[i].strip():
if re.match(r'[1-9]', details[i].strip()) and '经验' in details[i].strip():
year = details[i].strip()
#details是一个列表,里边包含有tag和None,要过滤掉
detail = ''.join(i.strip() for i in details if type(i) is not 'bs4.element.Tag' and len(i)!=0).replace(' ','')
logger.info('当前获得的职位信息'+detail)
# logger.info('当前职位列表信息'+str(job_info[jobId]))
# logger.info(str(job_info[jobId].append(detail)))
# job_info[jobId].append(detail)
info = [jobId,year,detail]
return info
Data数据库操作代码:
[Python] 纯文本查看 复制代码
# 这是RedisPool文件,上边导入过的
#coding=utf-8
import redis
from pymongo import MongoClient
import logging
logger = logging.getLogger('django_console')
# host = '192.168.0.106'
host = '127.0.0.1'
portRedis = 6379
portMongo = 27017
db = 0
pool = redis.ConnectionPool(host=host,port=portRedis,decode_responses=True,password=123456)
def getRedis():
'''
获取一个新的redis连接
:return:
'''
conn = redis.Redis(connection_pool=pool)
return conn
def getMongo(database,collection):
'''
获取一个新的mongo连接
:param database:传入数据库名称
:param collections:集合名称
:return:
'''
logger.info('*************************'+database+' '+collection)
conn = MongoClient(host,portMongo)
db = conn[database]
logger.info('当前连接mongo库:'+database)
table = db[collection]
logger.info('当前集合:'+str(collection))
return table
def getMongoNocollection(database):
'''
生成一个新的mongo连接,不指定数据表
:param database:
:return:
'''
conn = MongoClient(host,portMongo)
db = conn[database]
return db
[Python] 纯文本查看 复制代码
# 这是调度部分会用到的DataClass文件
#coding=utf-8
import redis
from pymongo import MongoClient
from .redisPool import getRedis,getMongo,getMongoNocollection
class DataClass():
#对数据的操作方法
#
def __init__(self,database,*args):
'''
初始化redis和mongo的连接
'''
#生成redis的新连接
# self.connR = getRedis() redis直接用redisPool
#生成一个mongo的新连接,返回的是一个集合。这里不用
if args:
#这个连接时指定数据表
self.connM = getMongo(database,args[0])
#不指定数据表先建立连接
self.connM1 = getMongoNocollection(database)
# def
Control调度控制部分代码
[Asm] 纯文本查看 复制代码
#coding=utf-8
from job.spider.URLManager import URLManager
from job.spider.HTMLDownload import HTMLDownload
from job.spider.bossSpider.HTMLAnalysis import DetailHTMLAnalysis,JobHTMLAnalysis
from db import DataClass
import logging
import time,random
logger = logging.getLogger('boss')
class Control():
#爬虫调度管理
def __init__(self):
self.manager = URLManager('boss')
self.download = HTMLDownload()
self.job_analysis = JobHTMLAnalysis()
self.detail_analysis = DetailHTMLAnalysis()
self.db = DataClass.DataClass('bossDB','bossDB')
# self.query = 'python'
self.page = 1
def spider(self,root_url):
'''
爬虫调度方法
:param root_url:
:return:
'''
self.manager.add_new_url(root_url)
logger.info('添加根url成功'+root_url)
# if self.manager.old_urls_size() > 3:
# return None
#开始爬取
while self.manager.has_new_url():
logger.info('有待爬取url,开始爬取')
time.sleep(random.randint(5,10))
new_url = self.manager.get_new_url()
logger.info('即将开始下载['+str(new_url)+']的内容')
html = self.download.download(new_url)
if 'job_detail' not in new_url:
#判断是否是职位详情界面
new_urls,data = self.job_analysis.parse(new_url,html)
if data:
for k,v in data.items():
self.db.connM.update({'jobId':k},{'$set':{'job':v[0],'city':v[-1],'content':v}},True)
else:
#是职位详情界面 [jobId,year,detail]
new_urls,data = self.detail_analysis.parse(new_url,html)
logger.info(str(data))
if data:
self.db.connM.update({'jobId':data[0]},{'$addToSet':{'content':{'$each':[data[1],data[2]]}}},True)
self.manager.add_new_urls(new_urls)
logger.info('html解析完毕,开始存入数据')
logger.info('已经爬取了'+str(self.manager.old_urls_size())+'个链接')
# return data
def main(self,job,city):
'''
爬虫启动入口
:return:
'''
logger.info('爬虫启动')
#爬取十页
while self.page < 11:
root_url = 'https://www.zhipin.com/{}/?query={}&page={}&ka=page-{}'.format(city,job, self.page,self.page)
self.spider(root_url)
self.page += 1
if __name__ == '__main__':
spider = Control()
print('开始')
spider.main('python','深圳')
|
免费评分
-
查看全部评分
|