同类文档:Python学习笔记
ZY.Zhang
本文档基于B站视频教程
一、爬虫基础简介
1. 爬虫简介
什么是爬虫:通过编写程序,模拟浏览器上网,然后让其去互联网上抓取数据的过程。
2. 爬虫合法性探究
爬虫究竟是合法还是违法的?
- 在法律中是不被禁止的
- 具有违法风险
- 善意爬虫 & 恶意爬虫
爬虫带来的风险可以体现在如下两个方面:
- 爬虫干扰了被访问网站的正常运营
- 爬虫抓取了受到法律保护的特定类型的数据或信息
如何在编写使用的过程中避免进入局子的厄运?
- 时常优化自己的程序,避免干扰被访问网站的正常运行
- 在使用,传播爬取到的数据时,审查抓取到的内容,如果发现了涉及到用户隐私或者商业机密等敏感内容,需要及时停止爬取或者传播。
3. 爬虫初试深入
爬虫在使用场景中的分类:
-
通用爬虫:抓取系统的重要组成部分。抓取的是一整张页面数据。
-
聚焦爬虫:是建立在通用爬虫的基础之上。抓取的是页面中特定的局部内容。
-
增量式爬虫:监测网站中数据更新的情况。只会抓取网站中最新更新出来的数据。
爬虫的矛与盾:
robots.txt协议:君子协议。规定了网站中那些数据可以被爬虫爬取,那些数据不允许被爬取。
例如:www.tabao.com/robots.txt
4. http&https协议
(1)http协议
概念:就是服务器和客户端进行数据交互的一种形式。
常用请求头信息:
- User-Agent:请求载体的身份标识
- Connection:请求完毕后,是断开连接还是保持连接
常用响应头信息:
- Content-Type:服务器响应回客户端的数据类型
(2)https协议
概念:安全的超文本传输协议
(3)加密方式
1. requests第一血
requests模块:Python中原生的一款基于网络请求的模块,功能非常强大,简单便捷,效率极高。
作用:模拟浏览器发请求。
如何使用:(requests模块的编码流程)
环境的安装:pip install requests
实战编码:
import requests
if __name__ == '__main__':
url = 'https://www.sogou.com/'
response = requests.get(url = url)
page_text = response.text
print(page_text)
with open('./sogou.html','w',encoding = 'utf-8') as fp:
fp.write(page_text)
print('爬取数据结束!')
2. requests巩固深入案例介绍
(1)简易网页采集器
'''UA检测:门户网站的服务器会监测对应请求的载体身份标识,
如果检测到请求载体身份标识是某一款浏览器,说明该请求时一个正常的请求;
但是,如果检测到请求的载体身份不是基于某一款浏览器的,则表示该请求为不正常请求(爬虫),
则服务器很有可能拒绝该次请求'''
import requests
if __name__ == '__main__':
headers = {
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
}
url = 'https://www.sogou.com/web'
kw = input('Enter a word:')
param ={
'query':kw
}
response = requests.get(url = url,params = param,headers = headers)
page_text = response.text
fileName = kw + '.html'
with open(fileName,'w',encoding ='utf-8') as fp:
fp.write(page_text)
print(fileName,'保存成功!!')
(2)破解百度翻译
- post请求(携带了参数)
- 响应数据是一组json数据
import requests
import json
if __name__ == '__main__':
post_url = 'https://fanyi.baidu.com/sug'
headers = {
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
}
word = input('Enter a word:\n')
data = {
'kw':word
}
response = requests.post(url = post_url,data = data,headers = headers)
dict_obj = response.json()
print(dict_obj)
fileName = word + '.json'
fp = open(fileName,'w',encoding='utf-8')
json.dump(dict_obj,fp = fp,ensure_ascii = False)
print('Over!')
(3)豆瓣电影
import requests
import json
if __name__ == '__main__':
url = 'https://movie.douban.com/j/chart/top_list'
param = {
'type':'24',
'interval_id':'100:90',
'action':'',
'start':'0',
'limit':'20'
}
headers = {
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
}
response = requests.get(url = url,params = param,headers = headers)
list_data = response.json()
fp = open('./douban.json','w',encoding = 'utf-8')
json.dump(list_data,fp = fp,ensure_ascii = False)
print('Over!')
3. 作业---肯德基餐厅查询
import requests
import json
if __name__ == '__main__':
post_url = 'https://www.kfc.com.cn/kfccda/ashx/GetStoreList.ashx?op=keyword'
keyword = input('请输入要查询的城市:')
data ={
'cname': '',
'pid': '',
'keyword': keyword,
'pageindex': '1',
'pageSize': '10'
}
headers = {
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
}
response = requests.post(url = post_url, data = data, headers = headers)
page = response.json()
for dict in page['Table1']:
StoreName = dict['storeName']
address = dict['addressDetail']
print('StoreName:' + StoreName, 'address:' + address + '\n')
4. 综合练习---药监总局
import requests
import json
if __name__ == '__main__':
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
}
id_list = []
all_data_list = []
url = 'http://scxk.nmpa.gov.cn:81/xk/itownet/portalAction.do?method=getXkzsList'
for page in range(1, 11):
page = str(page)
data = {
'on': 'true',
'page': page,
'pageSize': '15',
'productName': '',
'conditionType': '1',
'applyname': '',
'applysn': '',
}
json_ids = requests.post(url=url, headers=headers, data=data).json()
for dic in json_ids['list']:
id_list.append(dic['ID'])
post_url = 'http://scxk.nmpa.gov.cn:81/xk/itownet/portalAction.do?method=getXkzsById'
for id in id_list:
data = {
'id': id
}
json_detail = requests.post(url=post_url, data=data, headers=headers).json()
all_data_list.append(json_detail )
all_data_list.append('---------------------------------------------------------')
fp = open('./allData.json', 'w', encoding='utf-8')
json.dump(all_data_list, fp=fp, ensure_ascii=False, indent= True)
print('Over!')
三、数据解析
1. 数据解析概述
- 聚焦爬虫:爬取页面中指定的页面内容。
- 编码流程:1. 指定URL → 2. 发起请求 → 3. 获取响应数据 → 4. 数据解析 → 5. 持久化存储
- 数据解析分类:
- 正则表达式
bs4
解析
xpath
解析(重点)
- 数据解析原理概述:解析的局部的文本内容都会在标签对应的属性中进行存储。
- 进行指定标签的定位
- 标签或者标签对应的属性中存储的数据值进行提取(解析)
2. 图片数据爬取---正则表达式
函数 |
说明 |
re.search() |
在一个字符串中搜索匹配正则表达式的第一个位置,返回match对象 |
re.match() |
从字符串的开始位置起匹配正则表达式,返回match对象 |
re.findall() |
搜搜字符串,以列表类型返回全部能匹配的子串 |
re.split() |
将一个字符串按照正则表达式匹配结果进行分割,返回列表类型 |
re.finditer() |
搜索字符串,返回一个匹配结果的迭代类型,每个迭代元素是match对象 |
re.sub() |
在一个字符串中替换所有匹配正则表达式的子串,返回替换后的字符串 |
修饰符 |
描述 |
re.I |
使匹配对大小写不敏感 |
re.L |
做本地化识别匹配 |
re.M |
多行匹配,影响^和$ |
re.S |
使.匹配包括换行在内的所有字符 |
re.U |
根据Unicode字符集解析字符,这个标志影响\w,\W,\b,\B |
re.X |
该标志通过给予你跟灵活的格式以便你将正则表达式写得更易于理解 |
常用的正则表达式
单字符:
. : 除换行以外所有字符
[ ] : [aoe] [a-w] 匹配集合中任意一个字符
\d : 数字 [0-9]
\D : 非数字
\w : 数字、字母、下划线、中文
\W : 非\w
\s : 所有的空白字符包,包括空格、制表符、换页符等等,等价于[ \f \n \r \t \v ]
\S : 非空白
数量修饰:
\* : 任意多次 >=0
\+ : 至少一次 >=1
? : 可有可无 0次或者1次
{m} : 固定m次 hello{3,}
{m,} : 至少m次
{m,n} : m-n次
边界:
\$ : 以某某结尾
^ : 以某某开头
分组:
(ab)
贪婪模式: .\*
非贪婪(惰性)模式: .\*?
re.I : 忽略大小写
re.M : 多行匹配
re.S : 单行匹配
re.sub : 正则表达式,替换内容,字符串
'''正则练习'''
import re
key = "javapythonc++php"
re.findall('python', key)[0]
key = "<html><h1><hello world><h1></html>"
re.findall('<h1>(.*)<h1>', key)[0]
string = '我喜欢身高为170的女孩’
re.findall('\d+', string)
#提取出http://和https://
key = 'http://www.baidu.com and https://boob.com'
re.findall('https?://', key)
#提取出hello
key = 'lalala<hTml><hello></HtMl>hahah' #输出<hTml><hello></HtMl>
re.findall('<[Hh][Tt][mM][lL]>(.*)</[Hh][Tt][mM][lL]>', key)
#提取出hit.
key = 'bobo@hit.edu.com' #想要匹配到hit
re.findall('h.*?\.', key)
#匹配sas和saas
key = 'sasa and sas and saaas'
re.findall('sa{1,2}s', key)
import requests
if __name__ == '__main__':
url = 'https://pic.qiushibaike.com/system/pictures/12409/124098453/medium/YNPHJQC101MS31E1.jpg'
img_data = requests.get(url = url).content
with open('./qiutu.jpg', 'wb') as fp:
fp.write(img_data)
3. 正则解析案例
'''<div class="thumb">
<a href="/article/124098472" target="_blank">
<img src="//pic.qiushibaike.com/system/pictures/12409/124098472/medium/HSN2WWN0TP1VUPNG.jpg" alt="糗事#124098472" class="illustration" width="100%" height="auto">
</a>
</div>'''
import re
import os
import requests
if __name__ == '__main__':
if not os.path.exists('./qiutuLibs'):
os.mkdir('./qiutuLibs')
url = 'https://www.qiushibaike.com/imgrank/ '
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
}
page_text = requests.get(url=url, headers=headers).text
ex = '<div class="thumb">.*?<img src="(.*?)" alt=.*?</div>'
img_src_list = re.findall(ex, page_text, re.S)
print(img_src_list)
for src in img_src_list:
src = 'https:' + src
img_data = requests.get(url = src, headers = headers).content
img_name = src.split('/')[-1]
imgPath = './qiutuLibs/' + img_name
with open(imgPath, 'wb') as fp:
fp.write(img_data)
print(img_name, '下载成功!')
import re
import os
import requests
if __name__ == '__main__':
if not os.path.exists('./qiutuLibs'):
os.mkdir('./qiutuLibs')
url = 'https://www.qiushibaike.com/imgrank/page/%d/'
for pageNum in range(1, 11):
new_url = format(url % pageNum)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
}
page_text = requests.get(url=new_url, headers=headers).text
ex = '<div class="thumb">.*?<img src="(.*?)" alt=.*?</div>'
img_src_list = re.findall(ex, page_text, re.S)
print(img_src_list)
for src in img_src_list:
src = 'https:' + src
img_data = requests.get(url = src, headers = headers).content
img_name = src.split('/')[-1]
imgPath = './qiutuLibs/' + img_name
with open(imgPath, 'wb') as fp:
fp.write(img_data)
print(img_name, '下载成功!')
4. bs4解析概述
5. bs4 解析具体讲解
- **如?*? BeautifulSoup 对象:
- 导包,
from bs4 import BeautifulSoup
- 对象的实例化:
- (1)将本地的 html 文档中的数据加载到该对象中;
- (2)将互联网上获取的页面源码加载到该对象中。
- 提供的用于数据解析的方法和属性:
soup.tagName
:返回的是文档中第一次出现的 tagName
标签;
soup.find(tagName)
:可以等同于soup.tagName
;也可以进行属性定位;
soup.find_all( )
:返回符合要求的所有标签;
select('某种选择器(id,class,标签...选择器)')
返回的是一个列表;层级选择器
- 获取标签之间的文本数据:
soup.a.text/string/get_text( )
text/get_text( )
:可以获取某一个标签中所有的文本内容
string
:只可以获取该标签下面直系的文本内容
- 获取标签中的属性值:
soup.a['href']
<html lang="en">
<head>
<meta charset="UTF-8" />
<title>测试bs4</title>
</head>
<body>
<div>
<p>百里守约</p>
</div>
<div class="song">
<p>李清照</p>
<p>王安石</p>
<p>苏轼</p>
<p>柳宗元</p>
<a title="赵匡胤" target="_self">
<span>this is span</span>
宋朝是最强大的王朝,不是军队的强大,而是经济很强大,国民都很有钱</a>
<a href="" class="du">总为浮云能蔽日,长安不见使人愁</a>
<img src="http://www.baidu.com/meinv.jpg" alt="" />
</div>
<div class="tang">
<ul>
<li><a title="qing">清明时节雨纷纷,路上行人欲断魂,借问酒家何处有,牧童遥指杏花村</a></li>
<li><a title="qin">秦时明月汉时关,万里长征人未还,但使龙城飞将在,不教胡马度阴山</a></li>
<li><a alt="qi">岐王宅里寻常见,崔九堂前几度闻,正是江南好风景,落花时节又逢君</a></li>
<li><a class="du">杜甫</a></li>
<li><a class="du">杜牧</a></li>
<li><b>杜小月</b></li>
<li><i>度蜜月</i></li>
<li><a id="feng">凤凰台上凤凰游,凤去台空江自流,吴宫花草埋幽径,晋代衣冠成古丘</a></li>
</ul>
</div>
</body>
</html>
from bs4 import BeautifulSoup
if __name__ == '__main__':
fp = open('./test.html', 'r', encoding='utf-8')
soup = BeautifulSoup(fp, 'lxml')
print(soup.a)
print(soup.div)
print(soup.find('div'))
print(soup.find('div', class_='song'))
print(soup.find_all('a'))
print(soup.select('.tang'))
print(soup.select('.tang > ul > li > a')[0])
print(soup.select('.tang > ul a')[0])
print(soup.select('.tang > ul a')[0].text)
print(soup.select('.tang > ul a')[0].get_text())
print(soup.select('.tang > ul a')[0].string)
print(soup.select('.tang > ul a')[0]['href'])
6. bs4 解析案例实战
import requests
from bs4 import BeautifulSoup
if __name__ == '__main__':
headers = {
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
}
url = 'https://www.shicimingju.com/book/sanguoyanyi.html'
response = requests.get(url = url, headers = headers)
response.encoding = 'utf-8'
page_text = response.text
soup = BeautifulSoup(page_text, 'lxml')
li_list = soup.select('.book-mulu > ul > li')
fp = open('./sanguo.txt', 'w', encoding = 'utf-8')
for li in li_list:
title = li.a.string
detail_url ='http://www.shicimingju.com' + li.a['href']
detail_response = requests.get(url = detail_url, headers = headers)
detail_response.encoding = 'utf-8'
detail_page_text = detail_response.text
detail_soup = BeautifulSoup(detail_page_text, 'lxml')
div_tag = detail_soup.find('div', class_ = 'chapter_content')
content = div_tag.text
fp.write(title + ':' + content + '\n')
print(title, '爬取成功!')
7. xpath解析基础
- xpath解析:最常用且最便捷高效的一种解析方式。通用性。
- xpath解析原理:
- (1)实例化一个etree的对象,且需要将被解析的页面源码数据加载到该对象中;
- (2)调用etree对象中的xpath方法结合着xpath表达式实现标签的定位和内容的捕获。
- 环境的安装:
pip install lxml
(lxml解析器)
- **如?*桓鰁tree对象:
from lxml import etree
- (1)将本地的html文档中的源码数据加载到etree对象中:
etree.parse(filePath)
- (2)可以将从互联网上获取的源码数据加载到该对象中:
etree.HTML('page_text')
- xpath('xpath表达式'):
- 其中 / 表示从根节点定位或者表示一个层级;
- // 表示多个层级或者从任意位置开始定位;
- 属性定位:
tag[@attrName="attrValue"]
;
- 索引定位:
tag[@attrName="attrValue"]/p[3]
,注意索引从1开始
- 取文本:
/text( )
:获取的是标签中直系的文本内容;//text( )
:标签中非直系的文本内容(所有的文本内容)
- 取属性:
/@attrName ==> img/@src
from lxml import etree
if __name__ == "__main__":
tree = etree.parse('test.html')
r = tree.xpath('//div[@class="song"]/img/@src')
print(r)
8. xpath实战-58二手房
import requests
from lxml import etree
if __name__ == '__main__':
headers = {
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
}
url = 'https://bj.58.com/ershoufang/'
page_text = requests.get(url = url,headers = headers).text
tree = etree.HTML(page_text)
div_list = tree.xpath('//section[@class="list"]/div')
fp = open('58.txt','w',encoding = 'utf-8')
for div in div_list:
title = div.xpath('./a/div[2]//h3/text()')[0]
fp.write(title + '\n\n')
print('---------------Over!------------------')
9. xpath解析案例
(1)4k图片解析下载
import requests
from lxml import etree
import os
if __name__ == "__main__":
url = 'http://pic.netbian.com/4kmeinv/'
headers = {
'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36'
}
response = requests.get(url=url, headers=headers)
page_text = response.text
tree = etree.HTML(page_text)
li_list = tree.xpath('//div[@class="slist"]/ul/li')
if not os.path.exists('./picLibs'):
os.mkdir('./picLibs')
for li in li_list:
img_src = 'http://pic.netbian.com'+li.xpath('./a/img/@src')[0]
img_name = li.xpath('./a/img/@alt')[0]+'.jpg'
img_name = img_name.encode('iso-8859-1').decode('gbk')
img_data = requests.get(url=img_src, headers=headers).content
img_path = 'picLibs/'+img_name
with open(img_path, 'wb') as fp:
fp.write(img_data)
print(img_name, '下载成功!!!')
print('------------------------OVER!---------------------------------')
(2)全国城市名称爬取
import requests
from lxml import etree
if __name__ == '__main__':
'''headers = {
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
}
url = 'https://www.aqistudy.cn/historydata/'
page_text = requests.get(url=url,headers=headers).text
tree = etree.HTML(page_text)
#数据解析
hot_li_list = tree.xpath('//div[@class="bottom"]/ul/li')
all_city_names = []
#解析热门城市名字
for li in hot_li_list:
hot_city_names = li.xpath('./a/text()')[0]
all_city_names.append(hot_city_names)
#解析全部城市名字:
city_names_list = tree.xpath('.//div[@class="bottom"]/ul/div[2]/li')
for li in city_names_list:
city_name = li.xpath('./a/text()')[0]
all_city_names.append(city_name)
print(all_city_names,len(all_city_names))'''
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
}
url = 'https://www.aqistudy.cn/historydata/'
page_text = requests.get(url=url, headers=headers).text
tree = etree.HTML(page_text)
a_list = tree.xpath('//div[@class="bottom"]/ul/li/a | //div[@class="bottom"]/ul/div[2]/li/a ')
all_city_names = []
for a in a_list:
a_name = a.xpath('./text()')[0]
all_city_names.append(a_name)
print(all_city_names, len(all_city_names))
10. xpath作业---爬取站长素材中免费简历模板
import os
import requests
from lxml import etree
if __name__ == '__main__':
if not os.path.exists('./jianli'):
os.mkdir('./jianli')
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
}
url = 'https://sc.chinaz.com/jianli/free_%d.html'
page = int(input('您一共想要爬取多少页:'))
for pageNum in range(1, page):
if pageNum == 1:
new_url = 'https://sc.chinaz.com/jianli/free.html'
else:
new_url = format(url%pageNum)
page_text = requests.get(url = new_url, headers = headers).text
tree = etree.HTML(page_text)
url_div_list = tree.xpath('//*[@id="container"]/div')
for detail_url in url_div_list:
detail_url = 'https:' + detail_url.xpath('./a/@href')[0]
detail_page_text = requests.get(url = detail_url, headers =headers).text
tree = etree.HTML(detail_page_text)
name = tree.xpath('//h1/text()')[0].encode('iso-8859-1').decode('utf-8')
download_url = tree.xpath('//*[@id="down"]/div[2]/ul/li[1]/a/@href')[0]
file_path = 'jianli/' + name + '.rar'
download_content = requests.get(url = download_url, headers = headers).content
with open(file_path, 'wb') as fp:
fp.write(download_content)
print(name, '下载完成')
print('-------------------------------OVER!---------------------------------------')
四、验证码
1. 验证码识别简介
验证码和爬虫之间的爱恨情仇:
- 反爬机制:验证码。识别验证码图片中的数据,用于模拟登录操作。
识别验证码的操作:
2. 云打码使用流程
<!--作者学习期间,该平台已经挂掉,故而使用超级鹰进行代替。同类打码平台可以自行百度选择-->
- 注册:用户中心身份
- 登录:用户中心身份
- 查询余额,题分是否足够(第一次使用,绑定微信即可免费获赠1000题分;非首次使用,建议小额充值,1元即可)
- 创建软件ID——用户中心左下角
- 下载示例代码 ——开发文档
from lxml import etree
import requests
from hashlib import md5
class Chaojiying_Client(object):
def __init__(self, username, password, soft_id):
self.username = username
password = password.encode('utf8')
self.password = md5(password).hexdigest()
self.soft_id = soft_id
self.base_params = {
'user': self.username,
'pass2': self.password,
'softid': self.soft_id,
}
self.headers = {
'Connection': 'Keep-Alive',
'User-Agent': 'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0)',
}
def PostPic(self, im, codetype):
"""
im: 图片字节
codetype: 题目类型 参考 http://www.chaojiying.com/price.html
"""
params = {
'codetype': codetype,
}
params.update(self.base_params)
files = {'userfile': ('ccc.jpg', im)}
r = requests.post('http://upload.chaojiying.net/Upload/Processing.php', data=params, files=files, headers=self.headers)
return r.json()
def ReportError(self, im_id):
"""
im_id:报错题目的图片ID
"""
params = {
'id': im_id,
}
params.update(self.base_params)
r = requests.post('http://upload.chaojiying.net/Upload/ReportError.php', data=params, headers=self.headers)
return r.json()
def tranformImgCode(imgPath,imgType):
chaojiying = Chaojiying_Client('此处是账户', '此处是密码', '此处是软件ID')
im = open(imgPath, 'rb').read()
return chaojiying.PostPic(im,imgType)['pic_str']
print(tranformImgCode('./a.jpg',1902))
3. 古诗文网验证码识别
session = requests.Session()
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
}
url = 'https://so.gushiwen.cn/user/login.aspx?from=http://so.gushiwen.cn/user/collect.aspx'
page_text = session.get(url=url, headers=headers).text
tree = etree.HTML(page_text)
img_src = 'https://so.gushiwen.org' + tree.xpath('//*[@id="imgCode"]/@src')[0]
img_data = session.get(img_src, headers=headers).content
with open('./code.jpg', 'wb') as fp:
fp.write(img_data)
code_text = tranformImgCode('./code.jpg', 1902)
print(code_text)
login_url = 'https://so.gushiwen.cn/user/login.aspx?from=http%3a%2f%2fso.gushiwen.cn%2fuser%2fcollect.aspx'
data = {
'__VIEWSTATE': 'f1ECt6+6MPtdTZMJtYOYS/7ww2d/DPy9t8JQcIt1QuOneLTbNQuYqPcCjZNbDAbfb9vj3k6f0M7EKTf0YqElM1k1A5ELwyTvUzBii+9LDRBbIMmc/jb0DJPsYfI=',
'__VIEWSTATEGENERATOR': 'C93BE1AE',
'from': 'http://so.gushiwen.cn/user/collect.aspx',
'email': '账号',
'pwd': '密码',
'code': code_text,
'denglu': '登录',
}
page_text_login = session.post(url=login_url, headers=headers, data=data).text
with open('./gushiwen.html', 'w', encoding='utf-8') as fp:
fp.write(page_text_login)
五、requests模块高级
1. 模拟登录实现流程梳理
模拟登录:爬取基于某些用户的用户信息。
需求:对人人网进行模拟登录
- 点击登录按钮后会发起一个post请求
- post请求中会携带登陆之前录入的相关的登录信息(用户名、密码、验证码.......)
- 验证码:每次请求都会动态变化
2. 人人网模拟登录
import requests
from lxml import etree
headers = {
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
}
url = 'http://www.renren.com/SysHome.do'
page_text = response.get(url = url,headers = headers).text
tree = etree.HTML(page_text)
code_img_src = tree.xpath('//*[@id="verifyPic_login"]/@src')[0]
code_img_data = requests.get(url = code_img_src,headers = headers).content
with open('./code.jpg','wb') as fp:
fp.write(code_img_data)
login_url = ' '
data = {
}
response = requests.post(url = login_url,headers = headers,data = data)
print(response.satus_code)
fp.write(login_page_text)
'''视频UP主的源代码'''
from CodeClass import YDMHttp
import requests
from lxml import etree
def getCodeText(imgPath,codeType):
username = 'bobo328410948'
password = 'bobo328410948'
appid = 6003
appkey = '1f4b564483ae5c907a1d34f8e2f2776c'
filename = imgPath
codetype = codeType
timeout = 20
result = None
if (username == 'username'):
print('请设置好相关参数再测试')
else:
yundama = YDMHttp(username, password, appid, appkey)
uid = yundama.login();
print('uid: %s' % uid)
balance = yundama.balance();
print('balance: %s' % balance)
cid, result = yundama.decode(filename, codetype, timeout);
print('cid: %s, result: %s' % (cid, result))
return result
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
}
url = 'http://www.renren.com/SysHome.do'
page_text = requests.get(url=url,headers=headers).text
tree = etree.HTML(page_text)
code_img_src = tree.xpath('//*[@id="verifyPic_login"]/@src')[0]
code_img_data = requests.get(url=code_img_src,headers=headers).content
with open('./code.jpg','wb') as fp:
fp.write(code_img_data)
result = getCodeText('code.jpg',1000)
print(result)
login_url = 'http://www.renren.com/ajaxLogin/login?1=1&uniqueTimestamp=2019431046983'
data = {
'email': 'www.zhangbowudi@qq.com',
'icode': result,
'origURL': 'http://www.renren.com/home',
'domain': 'renren.com',
'key_id': '1',
'captcha_type': 'web_login',
'password': '06768edabba49f5f6b762240b311ae5bfa4bcce70627231dd1f08b9c7c6f4375',
'rkey': '1028219f2897941c98abdc0839a729df',
'f':'https%3A%2F%2Fwww.baidu.com%2Flink%3Furl%3Dgds6TUs9Q1ojOatGda5mVsLKC34AYwc5XiN8OuImHRK%26wd%3D%26eqid%3D8e38ba9300429d7d000000035cedf53a',
}
response = requests.post(url=login_url,headers=headers,data=data)
print(response.text)
print(response.status_code)
3. 模拟登录cookie操作
session = requests.Session()
'''手动获取Cookie(不推荐) headers = {
‘'Cookie':'xxxx'
}'''
detail_url = 'http://www.renren.com/976279344/profile'
detail_page_test = session.get(url = detail_url,headers = headers).text
with open('bobo.html','w',encoding = 'utf-8' ) as fp:
fp.write(detail_page_test)
4. 代{过}{滤}理理论讲解
- 代{过}{滤}理:破解封 IP 这种反爬机制。
- 什么是代{过}{滤}理?代{过}{滤}理服务器。
- 代{过}{滤}理的作用:
- 突破自身 IP 被访问的限制
- 可以隐藏自身真实的 IP,免受攻击
- 相关网站:
- 代{过}{滤}理 ip 的类型:
- http:只能应用到 http 协议对应的 url 中
- https:只能应用到 https 协议对应的 url 中
- 代{过}{滤}理ip的匿名度:
- 透明:服务器知道该次请求使用了代{过}{滤}理,也知道请求对应的真实 ip
- 匿名:知道使用了代{过}{滤}理,不知道真实 ip
- 高匿:不知道使用了代{过}{滤}理,也不知道真实 ip
5. 代{过}{滤}理在爬虫中的应用
import requests
url = 'http://www.baidu.com/s?wd=ip'
headers = {
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
}
page_text = requests.get(url = url, headers = headers, proxies = {"http": "http://124.205.155.153:9090"}).text
with open('ip.html', 'w', encoding = 'utf-8') as fp:
fp.write(page_text)
六、高性能异步爬虫
1. 异步爬虫概述
-
同步:不同程序单元为了完成某个任务,在执行过程中需靠某种通信方式以协调一致,称这些程序单元是同步执行的。 例如购物系统中更新商品库存,需要用 “行锁” 作为通信信号,让不同的更新请求强制排队顺序执行,那更新库存的操作是同步的。 简言之,同步意味着有序。
-
异步:为完成某个任务,不同程序单元之间过程中无需通信协调,也能完成任务的方式,不相关的程序单元之间可以是异步的。 例如,爬虫下载网页。调度程序调用下载程序后,即可调度其他任务,而无需与该下载任务保持通信以协调行为。不同网页的下载、保存等操作都是无关的,也无需相互通知协调。这些异步操作的完成时刻并不确定。 简言之,异步意味着无序。
-
目的:在爬虫中使用异步实现高性能的数据爬取操作。
import requests
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
}
urls = [
'https://downsc.chinaz.net/Files/DownLoad/jianli/202102/jianli14667.rar',
'https://downsc.chinaz.net/Files/DownLoad/jianli/202102/jianli14665.rar',
'https://downsc.chinaz.net/Files/DownLoad/jianli/202102/jianli14648.rar'
]
def get_content(url):
print('正在爬取:', url)
response = requests.get(url=url, headers=headers)
if response.status_code == 200:
return response.content
def parse_content(content):
print('响应数据的长度为:', len(content))
for url in urls:
content = get_content(url)
parse_content(content)
2. 多线程and多线程
异步爬虫的方式:
- 多线程,多进程:(不建议)
- 好处:可以为相关阻塞的操作单独开启线程或者进程,阻塞操作就可以异步执行
- 弊端:无法无限制的开启多线程或者多进程
3. 线程池and进程池
- 线程池、进程池:(适当使用)
- 好处:可以降低系统对进程或者线程创建和销毁的一个频率,从而很好地降低系统地开销。
- 弊端:池中线程或进程地数量是有上限的。
4. 线程池的基本使用
import time
def get_page(str):
print('正在下载:',str)
time.sleep(2)
print('下载成功:',str)
name_list = ['xiaozi','aa','bb','cc']
start_time = time.time()
for i in range(len(name_list)):
get_page(name_list[i])
end_time = time.time()
print('%d second' % (end_time-start_time))
import time
from multiprocessing.dummy import Pool
start_time = time.time()
def get_page(str):
print('正在下载:', str)
time.sleep(2)
print('下载成功:', str)
name_list = ['xiaozi','aa','bb','cc']
pool = Pool(4)
pool.map(get_page, name_list)
end_time = time.time()
print(end_time - start_time)
5. 线程池案例应用
import requests
import os
from multiprocessing.dummy import Pool
from lxml import etree
import random
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
}
if __name__ == '__main__':
if not os.path.exists('./video'):
os.mkdir('./video')
url = 'https://www.pearvideo.com/category_5'
page_text = requests.get(url=url, headers=headers).text
tree = etree.HTML(page_text)
li_list = tree.xpath('//ul[@id="listvideoListUl"]/li')
urls = []
for li in li_list:
detail_url = 'https://www.pearvideo.com/' + li.xpath('./div/a/@href')[0]
name = li.xpath('./div/a/div[2]/text()')[0] + '.mp4'
detail_page_text = requests.get(url=detail_url, headers=headers).text
detail_tree = etree.HTML(detail_page_text)
name = detail_tree.xpath('//*[@id="detailsbd"]/div[1]/div[2]/div/div[1]/h1/text()')[0]
str_ = str(li.xpath('./div/a/@href')[0]).split('_')[1]
ajax_url = 'https://www.pearvideo.com/videoStatus.jsp?'
params = {
'contId': str_,
'mrd': str(random.random())
}
ajax_headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36',
'Referer': 'https://www.pearvideo.com/video_' + str_
}
dic_obj = requests.get(url=ajax_url, params=params, headers=ajax_headers).json()
video_url = dic_obj["videoInfo"]['videos']["srcUrl"]
video_true_url = ''
s_list = str(video_url).split('/')
for i in range(0, len(s_list)):
if i < len(s_list) - 1:
video_true_url += s_list[i] + '/'
else:
ss_list = s_list[i].split('-')
for j in range(0, len(ss_list)):
if j == 0:
video_true_url += 'cont-' + str_ + '-'
elif j == len(ss_list) - 1:
video_true_url += ss_list[j]
else:
video_true_url += ss_list[j] + '-'
dic = {
'name': name,
'url': video_true_url
}
urls.append(dic)
def get_video_data(dic):
urll = dic['url']
data = requests.get(url=urll, headers=headers).content
path = './video/' + dic['name'] + '.mp4'
print(dic['name'], '正在下载.......')
with open(path, 'wb') as fp:
fp.write(data)
print(dic['name']+ '.mp4', '下载成功!')
pool = Pool(4)
pool.map(get_video_data, urls)
pool.close()
pool.join()
6. 协程相关概念回顾
7. 协程相关操作回顾
import asyncio
async def request(url):
print('正在请求的url是',url)
print('请求成功,',url)
return url
c = request('www.baidu.com')
def callback_func(task):
print(task.result())
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(c)
task.add_done_callback(callback_func)
loop.run_until_complete(task)
8. 多任务异步协程实现
import time
import asyncio
async def request(url):
print('正在下载',url)
await asyncio.sleep(2)
print('下载完毕',url)
start = time.time()
urls =[
'www.baidu.com',
'www.sougou.com',
'www.goubanjia.com'
]
stasks = []
for url in urls:
c = request(url)
task = asyncio.ensure_future(c)
stasks.append(task)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(stasks))
print(time.time()-start)
9. aiohttp 模块引出
import requests
import asyncio
import time
start = time.time()
urls = [
'http://127.0.0.1:1080/bobo',
'http://127.0.0.1:1080/jay',
'http://127.0.0.1:1080/tom'
]
async def get_page(url):
print('正在下载', url)
response = requests.get(url = url)
print('下载完毕', response.text)
tasks = []
for url in urls:
c = get_page(url)
task = asyncio.ensure_future(c)
tasks.append(task)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('总耗时:', end-start)
10. aiohttp + 多任务异步协程实现异步爬虫
import asyncio
import time
import aiohttp
start = time.time()
urls = [
'http://www.baidu.com',
'http://www.sougou.com',
'http://www.taobao.com'
]
async def get_page(url):
async with aiohttp.ClientSession() as session:
async with await session.get(url) as response:
page_text = await response.text()
tasks = []
for url in urls:
c = get_page(url)
task = asyncio.ensure_future(c)
tasks.append(task)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('总耗时:', end-start)
七、动态加载数据处理
1. selenium简介
2. selenium初试
selenium使用流程:
- 环境安装:
pip install selenium
- 下载一个对应浏览器的驱动程序(以谷歌浏览器为例)
from selenium import webdriver
from lxml import etree
from time import sleep
bro = webdriver.Chrome(executable_path='./chromedriver.exe')
bro.get('http://scxk.nmpa.gov.cn:81/xk/')
page_text = bro.page_source
tree = etree.HTML(page_text)
li_list = tree.xpath('//ul[@id="gzlist"]/li')
for li in li_list:
name = li.xpath('./dl/@title')[0]
print(name)
sleep(5)
bro.quit()
3. selenium其他自动化操作
from selenium import webdriver
from time import sleep
bro = webdriver.Chrome(executable_path='./chromedriver.exe')
bro.get('https://www.taobao.com/')
search_input = bro.find_element_by_id('q')
search_input.send_keys('iphone')
bro.execute_script('window.scrollTo(0,document.body.scrollHeight)')
sleep(2)
btn = bro.find_element_by_css_selector('.btn-search')
btn.click()
bro.get('https://baidu.com/')
sleep(2)
bro.back()
sleep(2)
bro.forward()
sleep(5)
bro.quit()
4. iframe 处理+动作链
**selenium
处理iframe
:**
- 如果定位的标签存在于iframe标签之中,则必须使用
switch_to.frame(id)
- 动作链(拖动):
from selenium.webdriver import ActionChains
- 实例化一个动作链对象:
action = ActionChains(bro)
click_and_hold(div)
:长按且点击
move_by_offset(x,y)
perform( )
:让动作链立即执行
action.release( )
:释放动作链对象
from selenium import webdriver
from time import sleep
from selenium.webdriver import ActionChains
bro = webdriver.Chrome(executable_path='./chromedriver.exe')
bro.get('https://www.runoob.com/try/try.php?filename=jqueryui-example-droppable')
bro.switch_to.frame('iframeResult')
div = bro.find_element_by_id('draggable')
action = ActionChains(bro)
action.click_and_hold(div)
for i in range(5):
action.move_by_offset(11, 0).perform()
sleep(0.3)
action.release()
bro.quit()
5. selenium模拟登录QQ空间
from selenium import webdriver
from time import sleep
bro = webdriver.Chrome(executable_path='./chromedriver.exe')
bro.get('https://qzone.qq.com/')
bro.switch_to.frame('login_frame')
a_tag = bro.find_element_by_id('switcher_plogin')
a_tag.click()
userName_tag = bro.find_element_by_id('u')
password_tag = bro.find_element_by_id('p')
sleep(1)
userName_tag.send_keys('QQ号码')
password_tag.send_keys('QQ密码')
sleep(1)
btn = bro.find_element_by_id('login_button')
btn.click()
sleep(3)
bro.quit()
6. 无头浏览器+规避操作
from selenium import webdriver
from time import sleep
from selenium.webdriver.chrome.options import Options
from selenium.webdriver import ChromeOptions
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
option = ChromeOptions()
option.add_experimental_option('excludeSwitches', ['enable-automation'])
bro = webdriver.Chrome(executable_path='./chromedriver.exe', chrome_options=chrome_options,options=option)
bro.get('https://www.baidu.com')
print(bro.page_source)
sleep(2)
bro.quit()
7. 超级鹰的基本使用
超级鹰:https://www.chaojiying.com/about.html
- 注册:普通用户
- 登录:普通用户
- 题分查询:充值
- 软件ID——创建一个软件ID
- 下载示例代码
8. 12306模拟登录
编码流程:
- 使用
selenium
打开登录界面
- 对当前
selenium
打开的这张界面进行截图
- 对截取的图片进行局部区域(验证码图片)的裁剪
- 使用超级鹰识别验证码图片(坐标)
import requests
from hashlib import md5
class Chaojiying_Client(object):
def __init__(self, username, password, soft_id):
self.username = username
password = password.encode('utf8')
self.password = md5(password).hexdigest()
self.soft_id = soft_id
self.base_params = {
'user': self.username,
'pass2': self.password,
'softid': self.soft_id,
}
self.headers = {
'Connection': 'Keep-Alive',
'User-Agent': 'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0)',
}
def PostPic(self, im, codetype):
"""
im: 图片字节
codetype: 题目类型 参考 http://www.chaojiying.com/price.html
"""
params = {
'codetype': codetype,
}
params.update(self.base_params)
files = {'userfile': ('ccc.jpg', im)}
r = requests.post('http://upload.chaojiying.net/Upload/Processing.php', data=params, files=files, headers=self.headers)
return r.json()
def ReportError(self, im_id):
"""
im_id:报错题目的图片ID
"""
params = {
'id': im_id,
}
params.update(self.base_params)
r = requests.post('http://upload.chaojiying.net/Upload/ReportError.php', data=params, headers=self.headers)
return r.json()
from selenium import webdriver
import time
from PIL import Image
from selenium.webdriver import ActionChains
bro = webdriver.Chrome(executable_path='./chromedriver.exe')
bro.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
"source": """
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
})
"""
})
bro.get('https://kyfw.12306.cn/otn/resources/login.html')
bro.maximize_window()
time.sleep(1)
zhanghao_tag = bro.find_element_by_class_name('login-hd-account')
zhanghao_tag.click()
time.sleep(1)
bro.save_screenshot('aa.png')
code_img_ele = bro.find_element_by_class_name('touclick-wrapper')
location = code_img_ele.location
print('location:', location)
size = code_img_ele.size
print('size:', size)
rangle = (location['x']*1.25, location['y']*1.25, (location['x']+size['width'])*1.25, (location['y']+size['height'])*1.25)
i = Image.open('./aa.png')
code_img_name = './code.png'
frame = i.crop(rangle)
frame.save(code_img_name)
time.sleep(3)
chaojiying = Chaojiying_Client('超级🦅账号', '超级🦅密码', '软件ID')
im = open('code.png', 'rb').read()
print(chaojiying.PostPic(im, 9004)['pic_str'])
result = chaojiying.PostPic(im, 9004)['pic_str']
all_list = []
if '|' in result:
list_1 = result.split('|')
count_1 = len(list_1)
for i in range(count_1):
xy_list = []
x = int(list_1[i].split(',')[0])
y = int(list_1[i].split(',')[1])
xy_list.append(x)
xy_list.append(y)
all_list.append(xy_list)
else:
x = int(result.split(',')[0])
y = int(result.split(',')[1])
xy_list = []
xy_list.append(x)
xy_list.append(y)
all_list.append(xy_list)
print(all_list)
for l in all_list:
x = l[0]
y = l[1]
ActionChains(bro).move_to_element_with_offset(code_img_ele, x/1.25, y/1.25).click().perform()
time.sleep(1)
bro.find_element_by_id('J-userName').send_keys('12306账号')
time.sleep(1)
bro.find_element_by_id('J-password').send_keys('12306密码')
time.sleep(1)
bro.find_element_by_id('J-login').click()
time.sleep(5)
bro.quit()
八、scrapy框架
1. scrapy框架初识
2. scrapy基本使用
scrapy框架的基本使用:
- 环境的安装:
- mac or linux:
pip install scrapy
- windows:
- 创建一个工程:
scrapy startproject xxxPro
cd xxxPro
- 在spiders子目录中创建一个爬虫文件
scrapy genspider spiderName www.xxx.com
- 执行工程:
import scrapy
class FirstSpider(scrapy.Spider):
name = 'first'
start_urls = ['https://www.baidu.com/', 'https://www.sogou.com/']
def parse(self, response):
print(response)
3. scrapy数据解析操作
import scrapy
class QiubaiSpider(scrapy.Spider):
name = 'qiubai'
start_urls = ['https://www.qiushibaike.com/text/']
def parse(self, response):
div_list = response.xpath('//div[@id="col1 old-style-col1"]/div')
for div in div_list:
author = div.xpath('./div[1]/a[2]/h2/text()')[0].extract()
content = div.xpath('./a[1]/div/span//text()').extract()
content = ''.join(content)
print(author,content)
break
4. 基于终端指令的持久化存储
scrapy持久化存储:
- 基于终端指令:
- 要求:只可以将parse方法的返回值存储到本地的文本文件中
- 注意:持久化存储对应的文本文件类型只可以为:json、jsonlines、jl、csv、xml、marshal、pickle
- 指令:
scrapy crawl xxx -o filePath
- 好处:简洁高效便捷
- 缺点:局限性比较强(数据只可以存储到指定后缀的文本文件中)
5. 基于管道持久化存储操作
基于管道:
- 编码流程:
- 数据解析
- 在item类中定义相关的属性
- 将解析的数据封装到item类型的对象
- 将item类型的对象提交给管道进行持久化存储的操作
- 在管道类的process_item中要将其接收到的item对象中存储的数据进行持久化存储操作
- 在配置文件中开启管道
- 好处:
面试题:将爬取到的数据一份存储到本地,一份存储到数据库,如何实现?
- 管道文件中一个管道类对应的是将数据存储到一种平台
- 爬虫文件提交的item只会给管道文件中第一个被执行的管道类接收
process_item
中的return item
表示将item传递给下一个即将被执行的管道类
6. 全站数据爬取
基于spider的全站数据爬取:就是将网站中某板块下的全部页码对应的页面数据进行爬取。
- 爬取:校花网明星写真的名称
- 实现方式:
- 将所有页面的
url
添加到start_urls
列表(不推荐)
- 自行手动进行请求发送(推荐)
'''------------校花网xiaohua.py----------------'''
import scrapy
class XiaohuaSpider(scrapy.Spider):
name = 'xiaohua'
start_urls = ['http://www.521609.com/tuku/mxxz/']
url = 'http://www.521609.com/tuku/mxxz/index_%d.html'
page_num = 2
def parse(self, response):
li_list = response.xpath('/html/body/div[4]/div[3]/ul/li')
for li in li_list:
img_name = li.xpath('./a/p/text()').extract_first()
print(img_name)
if self.page_num <= 28:
new_url = format(self.url%self.page_num)
self.page_num += 1
yield scrapy.Request(url=new_url,callback=self.parse)
'''---------------校花网pipelines.py--------------------'''
class XiaohuaproPipeline(object):
def process_item(self, item, spider):
return item
'''----------------校花网settings.py部分代码---------------------------'''
ROBOTSTXT_OBEY = False
LOG_LEVEL = 'ERROR'
USER_AGENT = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36'
7. 五大核心组件
五大核心组件:
- Spiders:
- 引擎(Scrapy Engine):
- 调度器(Scheduler):
- 下载器(Downloader):
- 负责获取页面数据并提供给引擎,而后提供给Spider
- 项目管道(Item Pipeline):
- 负责处理爬虫从网页中抽取的实体,页面被爬虫解析所需的数据存入item后,将被发送到管道,经过特定的次序处理数据,最后存入本地文件或者数据库。
8. 请求传参
- 使用场景:如果爬取解析的数据不在同一张页面中。(深度爬取)
- 需求:爬取boss的岗位名称和岗位描述
import scrapy
from bossPro.items import BossproItem
class BossSpider(scrapy.Spider):
name = 'boss'
start_urls = ['https://www.zhipin.com/c100010000/?page=1&ka=page-1']
url = 'https://www.zhipin.com/c100010000/?page=%d'
page_num = 2
def parse_detail(self,response):
item = response.meta['item']
job_desc = response.xpath('//*[@id="main"]/div[3]/div/div[2]/div[2]/div[1]/div//text()').extract()
job_desc = ''.join(job_desc)
print(job_desc)
item['job_desc'] = job_desc
yield item
def parse(self, response):
li_list = response.xpath('//*[@id="main"]/div/div[2]/ul/li')
for li in li_list:
item = BossproItem()
job_name = li.xpath('.//div/div[1]/div[1]/div/div[1]/span[1]/a/text()').extract_first()
item['job_name'] = job_name
print(job_name)
detail_url = 'https://www.zhipin.com' + li.xpath('.//div/div[1]/div[1]/div/div[1]/span[1]/a/@href').extract_first()
yield scrapy.Request(detail_url,callback=self.parse_detail,meta={'item':item})
if self.page_num <= 5:
new_url = format(self.url%self.page_num)
self.page_num += 1
yield scrapy.Request(new_url,callback=self.parse)
9. scrapy图片爬取
图片数据爬取之ImagesPipline:
'''----------------爬取站长素材高清图片 img.py-----------------------'''
import scrapy
from imgsPro.items import ImgsproItem
class ImgSpider(scrapy.Spider):
name = 'img'
start_urls = ['http://sc.chinaz.com/tupian/']
def parse(self, response):
div_list = response.xpath('//div[@id="container"]/div')
for div in div_list:
src = 'https:' + div.xpath('./div/a/img/@src2').extract_first()
item = ImgsproItem()
item['src'] = src
yield item
'''----------------------爬取站长素材高清图片 pipelines.py---------------------------'''
from scrapy.pipelines.images import ImagesPipeline
import scrapy
class imgsPileLine(ImagesPipeline):
def get_media_requests(self, item, info):
yield scrapy.Request(item['src'])
def file_path(self, request, response=None, info=None):
imgName = request.url.split('/')[-1]
return imgName
def item_completed(self, results, item, info):
return item
'''---------------------------------爬取站长素材高清图片 items.py-----------------------------'''
import scrapy
class ImgsproItem(scrapy.Item):
src = scrapy.Field()
'''------------------------------爬取站长素材高清图片 setting.py部分代码-------------------'''
IMAGES_STORE = './imgs_ZYZhang'
ITEM_PIPELINES = {
'imgsPro.pipelines.imgsPileLine': 300,
}
LOG_LEVEL = 'ERROR'
USER_AGENT = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36'
ROBOTSTXT_OBEY = False
10. 中间件
- 下载中间件:
- 位置:引擎和下载器之间
- 作用:批量拦截到整个工程中所有的请求和响应
- 拦截请求:
- UA伪装:
process_request
- 代{过}{滤}理IP:
process_exception:return request
- 拦截响应:
11. 网易新闻
需求:爬取网易新闻的新闻数据(标题和内容)
- 通过网易新闻的首页解析出几大板块对应的详情页的url(经验证,无动态加载)
- 每个板块点击后,其中的新闻标题都是动态加载出来的(动态加载)
- 通过解析出每一条新闻详情页的url,获取详情页的页面源码,解析出新闻内容
'''-------------------------------网易新闻 wangyi.py------------------------'''
import scrapy
from selenium import webdriver
from wangyiPro.items import WangyiproItem
class WangyiSpider(scrapy.Spider):
name = 'wangyi'
start_urls = ['https://news.163.com/']
models_urls = []
def __init__(self):
self.bro = webdriver.Chrome(executable_path='F:\PythonProjects\爬虫\动态加载数据处理\chromedriver.exe')
def parse(self, response):
li_list = response.xpath('//*[@id="index2016_wrap"]/div[1]/div[2]/div[2]/div[2]/div[2]/div/ul/li')
alist = [3,4,6,7,8]
for index in alist:
model_url = li_list[index].xpath('./a/@href').extract_first()
self.models_urls.append(model_url)
for url in self.models_urls:
yield scrapy.Request(url,callback=self.parse_model)
def parse_model(self,response):
div_list = response.xpath('/html/body/div/div[3]/div[4]/div[1]/div/div/ul/li/div/div')
for div in div_list:
title = div.xpath('./div/div[1]/h3/a/text()').extract_first()
new_detail_url = div.xpath('./div/div[1]/h3/a/@href').extract_first()
item = WangyiproItem()
item['title'] = title
yield scrapy.Request(url=new_detail_url, callback=self.parse_detail, meta={'item': item})
def parse_detail(self,response):
content = response.xpath('//*[@id="content"]/div[2]//text()').extract()
content = ''.join(content)
item = response.meta['item']
item['content'] = content
yield item
def closed(self, spider):
self.bro.quit()
'''-------------------------------网易新闻 pipelines.py-----------------------------------'''
class WangyiproPipeline(object):
def process_item(self, item, spider):
print(item)
return item
'''-------------------------------网易新闻 middlewares.py-------------------------'''
from scrapy import signals
from scrapy.http import HtmlResponse
from time import sleep
class WangyiproDownloaderMiddleware(object):
def process_request(self, request, spider):
return None
def process_response(self, request, response, spider):
bro = spider.bro
if request.url in spider.models_urls:
bro.get(request.url)
sleep(3)
page_text = bro.page_source
new_response = HtmlResponse(url=request.url, body=page_text, encoding='utf-8', request=request)
return new_response
else:
return response
def process_exception(self, request, exception, spider):
pass
'''-----------------------------网易新闻 setting.py部分代码---------------------------------'''
USER_AGENT = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36'
ROBOTSTXT_OBEY = False
DOWNLOADER_MIDDLEWARES = {
'wangyiPro.middlewares.WangyiproDownloaderMiddleware': 543,
}
ITEM_PIPELINES = {
'wangyiPro.pipelines.WangyiproPipeline': 300,
}
LOG_LEVEL = 'ERROR'
12. CrawlSpider的全站数据爬取
CrawlSpider:基于Spider的一个子类
- 全站数据爬取的方式
- 基于Spider:手动请求发送
- 基于CrawlSpider
- CrawlSpider的使用:
- 创建一个工程
- cd XXX
- 创建爬虫文件(CrawlSpider)
scrapy genspider -t crawl xxx www.xxxx.com
- 链接提取器(LinkExtractor):根据指定规则(allow="正则")进行指定链接的提取
- 规则解析器(Rule):将链接提取器提取到的链接进行指定规则(callback)的解析操作
- 需求:爬取阳光热线网站中的编号,新闻标题,新闻内容,标号
- 分析:爬取的数据没有在同一张页面中
-
- 可以使用链接提取器提取所有的页码链接
- 让链接提取器提取所有的问政详情页链接
'''---------------------阳光问政 sun.py---------------------------'''
'''网站页面源码跟视频课有改动,建议follow先改False爬一下,不然容易被封IP,有兴趣的可以改改,搞个代{过}{滤}理啥的再爬'''
import scrapy
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import CrawlSpider, Rule
from sunPro.items import SunproItem, DetailItem
class SunSpider(CrawlSpider):
name = 'sun'
start_urls = ['http://wz.sun0769.com/political/index/politicsNewest?id=1&page=']
link = LinkExtractor(allow=r'id=1&page=\d+')
link_detail = LinkExtractor(allow=r'index\?id=\d+')
rules = (
Rule(link, callback='parse_item', follow=False),
Rule(link_detail, callback='parse_detail')
)
def parse_item(self, response):
li_list = response.xpath('/html//div[2]/div[3]/ul[2]/li')
for li in li_list:
new_num = li.xpath('./span[1]/text()').extract_first()
new_title = li.xpath('./span[3]/a/text()').extract_first()
item = SunproItem()
item['title'] = new_title
item['new_num'] = new_num
yield item
def parse_detail(self,response):
new_id = response.xpath('/html//div[3]/div[2]/div[2]/div[1]/span[4]/text()').extract_first().strip().replace("\r\n", "").replace(" ", "")
new_content = response.xpath('/html//div[3]/div[2]/div[2]/div[2]/pre/text()').extract()
new_content = ''.join(new_content)
item = DetailItem()
item['content'] = new_content
item['new_id'] = new_id
yield item
'''-------------------------------pipelines.py------------------------------'''
class SunproPipeline(object):
def process_item(self, item, spider):
if item.__class__.__name__ == 'DetailItem':
print(item['new_id'],item['content'])
else:
print(item['new_num'],item['title'])
return item
'''---------------------------items.py----------------------'''
import scrapy
class SunproItem(scrapy.Item):
title = scrapy.Field()
new_num = scrapy.Field()
class DetailItem(scrapy.Item):
new_id = scrapy.Field()
content = scrapy.Field()
13. 分布式概述及搭建
分布式爬虫:
- 概念:我们需要搭建一个分布式的机群,让其对一组资源进行分布联合爬取。
- 作用:提升爬取数据的效率
如何实现分布式?
- 安装一个scrapy-redis的组件
- 原生的scrapy是不可以实现分布式爬虫的,必须要让scrapy-redis组件一起实现分布式爬虫。
为什么原生的scrapy不可以实现分布式?
- 调度器不可以被分布式机群共享
- 管道不可以被分布式机群共享
scrapy-redis组件作用:
- 可以给原生的scrapy框架提供可以被共享的管道和调度器。
scrapy-redis实现流程:
14. 增量式爬虫
- 概念:监测网站数据更新的情况,只会爬取网站最新更新出来的数据。
- 分析:
- 指定一个起始url
- 基于CrawlSpider获取其他页码链接
- 基于Rule将其他页码链接进行请求
- 从每一个页码对应的页面源码中解析出每一个电影详情页的URL
- 核心:检测电影详情页的url之前有没有请求过
- 将爬取过的电影详情页的url存储
- 存储到redis的set数据结构
- 对详情页的url发起请求,然后解析出电影的名称和简介
- 进行持久化存储
九、补充——异步编程
为什么要讲?
- 这一部分的知识点不太容易学习(异步非阳塞、 asyncio)
- 异步相关话题和框架越来越多,例如:tornado、fastapi、django 3.x asgi、aiohttp都在异步→提升性能
如何讲解?
- 第一部分:协程
- 第二部分:asyncio模块进行异步编程
- 第三部分:实战案例
1. 协程
协程不是计算机提供,程序员人为创造。
协程( Coroutine),也可以被称为微线程,是一种用户态内的上下文切换技术。简而言之,其实就是通过一个线程实现代码块相互切换执行。
def func1():
print(1)
...
print(2)
def func2():
print(3)
...
print(4)
func1()
func2()
实现协程的集中方法:
- greelet,早期模块
- yield关键字
- asyncio装饰器(py3.4及以后版本)
- async、await关键字(py3.5及以后版本)
(1)greenlet实现协程
pip install greenlet
from greenlet import greenlet
def func1():
print(1)
gr2.switch()
print(2)
gr2.switch()
def func2():
print(3)
gr1.switch()
print(4)
gr1 = greenlet(func1)
gr2 = greenlet(func2)
gr1.switch()
(2)yield关键字
def func1():
yield 1
yield from func2()
yield 2
def func2():
yield 3
yield 4
f1 = func1()
for item in f1:
print(item)
(3)asyncio装饰器
==遇到IO阻塞自动切换==
import asyncio
@asyncio.coroutine
def func1():
print(1)
yield from asyncio.sleep(2)
print(2)
@asyncio.coroutine
def func2():
print(3)
yield from asyncio.sleep(2)
print(4)
tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
(4)async、await关键字(推荐)
import asyncio
async def func1():
print(1)
await asyncio.sleep(2)
print(2)
async def func2():
print(3)
await asyncio.sleep(2)
print(4)
tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
2. 协程的意义
在一个线程中,如果遇到IO等待的时间,线程不会等待,利用空闲的时间去做其他的事情。
需求:下载三张图片(网络IO)
'''普通的request方式'''
import requests
def download_image(url):
print('开始下载:', url)
response = requests.get(url)
print('下载完成')
file_name = url.rsplit('-')[-1]
with open(file_name, mode='wb') as file_object:
file_object.write(response.content)
if __name__ == '__main__':
url_list = [
'https://pic.netbian.com/uploads/allimg/210302/000706-1614614826df15.jpg',
'https://pic.netbian.com/uploads/allimg/210228/010301-1614445381005c.jpg',
'https://pic.netbian.com/uploads/allimg/190902/152344-1567409024af8c.jpg'
]
for item in url_list:
download_image(item)
'''使用aiohttp模块下载 协程方式'''
import aiohttp
import asyncio
import time
start = time.time()
async def fetch(session, url):
print('发送请求:', url)
async with session.get(url, verify_ssl = False) as response:
content = await response.content.read()
file_name = url.rsplit('-')[-1]
with open(file_name, mode='wb') as file_object:
file_object.write(content)
print('下载完成', url)
async def main():
async with aiohttp.ClientSession() as session:
url_list = [
'https://pic.netbian.com/uploads/allimg/210302/000706-1614614826df15.jpg',
'https://pic.netbian.com/uploads/allimg/210228/010301-1614445381005c.jpg',
'https://pic.netbian.com/uploads/allimg/190902/152344-1567409024af8c.jpg'
]
tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
await asyncio.wait(tasks)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print(time.time() - start)
3. 异步编程
(1)事件循环
概念:理解为一个死循环,去检测并执行某些代码。
# 伪代码
任务列表 = [任务1 , 任务2 , 任务3 ....]
while True:
可执行的任务列表,已完成的任务列表-->去任务列表中检测所有的任务,将“可执行”和“已完成”的任务返回
for 就绪任务 in 可执行的任务列表:
执行已就绪的任务
for 已完成的任务 in 已完成的任务列表:
在任务列表中移除 已完成的任务
如果 任务列表 中的任务都已经完成,则终止循环。
import asyncio
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
(2)快速上手
协程函数:定义函数时 async def 函数名
协程对象:执行 协程函数 得到的对象
async def func():
pass
result = func()
<!--注意:执行协程函数创建协程对象,函数内部代码不会执行!-->
<!--如果想要运行协程函数内部代码,必须要将协程代码交给事件循环来处理。-->
import asyncio
async def func():
print('快来打我吧!')
result = func()
loop = asyncio.get_event_loop()
loop.run_until_complete(result)
(3)await关键字
await 可等待的对象(协程对象、Future对象、Task对象)
'''示例一'''
import asyncio
async def func():
print('来玩呀')
response = await asyncio.sleep(2)
print('结束', response)
asyncio.run(func())
'''示例二'''
import asyncio
async def others():
print('start')
await asyncio.sleep(2)
print('end')
return '返回值'
async def func():
print('执行协程函数内部代码')
response = await others()
print('IO请求结束,结果为:', response)
asyncio.run(func())
'''示例三'''
import asyncio
async def others():
print('start')
await asyncio.sleep(2)
print('end')
return '返回值'
async def func():
print('执行协程函数内部代码')
response1 = await others()
print('IO请求结束,结果为:', response1)
response2 = await others()
print('IO请求结束,结果为:', response2)
asyncio.run(func())
<!--await就是等待对象的值得到结果之后再继续向下走。-->
(4)Task对象
Task对象官方文档
主要就是在事件循环中添加多个任务。
Task 用于并发调度协程,通过asyncio.create_task(协程对象)
的方式创建 Task 对象,这样可以让协程加入事件循环中等待被调度执行。除了使用 asyncio.create_task()
函数之外,还可以使用低层级的 loop.create_task()
或者 ensure_future()
函数,不建议手动实例化 Task 对象
<!--注意:asyncio.create_task() 函数在 Python 3.7 中被加入,在Python 3.7之前,可以改用低层级的 asyncio.ensure_future() 函数。-->
'''示例1'''
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return '返回值'
async def main():
print('main开始')
task1 = asyncio.create_task(func())
task2 = asyncio.create_task(func())
print('main结束')
ret1 = await task1
ret2 = await task2
print(ret1,ret2)
asyncio.run(main())
'''示例2'''
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return '返回值'
async def main():
print('main开始')
task_list = [
asyncio.create_task(func(), name='n1'),
asyncio.create_task(func(), name='n2')
]
print('main结束')
done, pending = await asyncio.wait(task_list, timeout=None)
print(done)
asyncio.run(main())
'''示例3'''
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return '返回值'
task_list = [
func(),
func()
]
done, pending = asyncio.run(asyncio.wait(task_list))
print(done)
(5)asyncio.Future对象
asyncio.Future官方文档
Task 对象继承 Future,Task 对象内部 await 结果的处理是基于 Future 对象来的。
'''示例1'''
import asyncio
async def main():
loop = asyncio.get_running_loop()
fut = loop.create_future()
await fut
asyncio.run(main())
'''示例2'''
import asyncio
async def set_after(fut):
await asyncio.sleep(2)
fut.set_result('666')
async def main():
loop = asyncio.get_running_loop()
await loop.create_task(set_after(fut))
data = await fut
print(data)
asyncio.run(main())
(6)concurrent.futures.Future对象
concurrent.futures官方文档
使用进程池或者线程池实现异步操作时用到的对象。
import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
def func(value):
time.sleep(1)
print(value)
pool = ThreadPoolExecutor(max_workers= 5)
for i in range(10):
fut = pool.submit(func, i)
print(fut)
以后写代码或许还有交叉使用。例如:crm项目80%都是属于基于协程异步编程 + MySQL(不支持)【线程或者进程做异步编程】
import time
import asyncio
import concurrent.futures
def func1():
time.sleep(2)
return "SB"
async def main():
loop = asyncio.get_running_loop()
fut = loop.run_in_executor(None, func1)
result = await fut
print('default thread pool', result)
asyncio.run(main())
(7)案例:asyncio + 不支持异步的模块
import requests
import asyncio
async def download_image(url):
print('开始下载', url)
loop = asyncio.get_event_loop()
future = loop.run_in_executor(None, requests.get, url)
response = await future
print('下载完成')
file_name = url.rsplit('-')[-1]
with open(file_name, mode='wb') as file_object:
file_object.write(response.content)
if __name__ == '__main__':
url_list = [
'https://pic.netbian.com/uploads/allimg/210302/000706-1614614826df15.jpg',
'https://pic.netbian.com/uploads/allimg/200910/200207-1599739327e5a8.jpg',
'https://pic.netbian.com/uploads/allimg/190902/152344-1567409024af8c.jpg'
]
tasks = [download_image(url) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
(8)异步迭代器
什么是异步迭代器?
实现了 __aiter__()
和 __anext__()
方法的对象。__anext__()
必须返回一个 awaitable
对象。async for
会处理异步迭代器的 __anext__()
方法所返回的可等待对象,直到其引发一个 StopAsyncIteration
异常。由 PEP 492
引入。
什么是异步可迭代对象?
可在 async for
语句中被使用的对象。必须通过它的 __aiter__()
方法返回一个 asynchronous iterator
。由 PEP 492
引入。
import asyncio
class Reader(object):
'''自定义异步迭代器 (同时也是一部可迭代对象)'''
def __init__(self):
self.count = 0
async def readline(self):
self.count += 1
if self.count == 100:
return None
return self.count
def __aiter__(self):
return self
async def __anext__(self):
val = await self.readline()
if val == None:
raise StopAsyncIteration
return val
async def func():
obj = Reader()
async for item in obj:
print(item)
asyncio.run( func() )
(9)异步上下文管理器
此种对象通过定义 __aenter__()
和 __aexit__()
方法来对 async with
语句中的环境进行控制。
import asyncio
class AsyncContextManager:
def __init__(self, conn=None):
self.conn = conn
async def do_something(self):
return 666
async def __aenter__(self):
self.conn = await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc, tb):
await asyncio.sleep(1)
async def func():
async with AsyncContextManager() as f:
result = await f.do_something()
print(result)
asyncio.run(func())
4. uvloop
uvloop 是 asyncio 的事件循环的替代方案。事件循环 > 默认 asyncio 的事件循环。
pip install uvloop
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
asyncio.run(...)
<!--注意:一个 asgi ->uvicorn 内部默认使用的就是uvloop -->
5. 实战案例
(1)异步 redis
在使用 python 代码操作 redis 时,链接/操作/断开都是网络IO。
pip install aioredis
import asyncio
import aioredis
async def execute(address, password):
print('开始执行', address)
redis = await aioredis.create_redis(address, password = password)
await redis.hmset_dict('car', key1 = 1, key2 = 2, key3 = 3)
result = await redis.hgetall('car', encoding = 'utf-8')
print(result)
redis.close()
await redis.wait_closed()
print('结束', address)
asyncio.run(execute('redis://47.93.4.198:6379', "root!2345"))
'''示例2'''
import asyncio
import aioredis
async def execute(address, password):
print('开始执行', address)
redis = await aioredis.create_pool(address, password = password)
await redis.hmset_dict('car', key1 = 1, key2 = 2, key3 = 3)
result = await redis.hgetall('car', encoding = 'utf-8')
print(result)
redis.close()
await redis.wait_closed()
print('结束', address)
task_list =[
execute('redis://47.93.4.197:6379','root!2345'),
execute('redis://47.93.4.198:6379','root!2345')
]
asyncio.run(asyncio.wait(task_list))
(2)异步MySQL
pip3 install aiomysql
'''示例1'''
import asyncio
import aiomysql
async def execute():
conn = await aiomysql.connect(host='127.0.0.1', port= 3306, user = 'root', password = '123',db= 'mysql')
cur = await conn.cursor()
await cur.execute('SELECT Host,User FROM user')
result = await cur.fetchall()
print(result)
await cur.close()
conn.close()
asyncio.run(execute())
'''示例2'''
import asyncio
import aiomysql
async def execute(host, password):
print('开始', host)
conn = await aiomysql.connect(host = host, port= 3306, user = 'root', password = password,db= 'mysql')
cur = await conn.cursor()
await cur.execute('SELECT Host,User FROM user')
result = await cur.fetchall()
print(result)
await cur.close()
conn.close()
print('结束', host)
task_list =[
execute('47.93.4.197:6379','root!2345'),
execute('47.93.4.198:6379','root!2345')
]
asyncio.run(asyncio.wait(task_list))
(3)FastAPI框架
pip3 install fastapi
pip3 install uvicorn
'''示例'''
import uvicorn
import asyncio
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
def index():
'''普通操作接口'''
return{"message":"Hello World"}
if __name__ == '__main__':
uvicorn.run("luffy:app",host= '127.0.0.1',port= 5000, log_level= 'info')
'''示例2'''
from aioredis import Redis
import uvicorn
import aioredis
import asyncio
from fastapi import FastAPI
app = FastAPI()
REDIS_POOL = aioredis.ConnectionPool('redis://47.193.14.198:6379', password= 'root123', minsize = 1 , maxsize = 10)
@app.get("/")
def index():
'''普通操作接口'''
return{"message":"Hello World"}
@app.get('/red')
async def red():
print('请求来了')
await asyncio.sleep(3)
conn = await REDIS_POOL.acquire()
redis = Redis(conn)
await redis.hmset_dict('car',key1 = 1,key2 = 2,key3 =3)
result = await redis.hgetall('car', encoding ='utf-8')
print(result)
REDIS_POOL.release(conn)
return result
if __name__ == '__main__':
uvicorn.run("脚本名:app",host= '127.0.0.1',port= 5000, log_level= 'info')
(4)异步爬虫
pip3 install aiohttp
'''使用aiohttp模块下载 协程方式'''
import aiohttp
import asyncio
async def fetch(session, url):
print('发送请求:', url)
async with session.get(url, verify_ssl = False) as response:
text = await response.text()
file_name = url.rsplit('-')[-1]
print('得到结果:', url , len(text))
return text
async def main():
async with aiohttp.ClientSession() as session:
url_list = [
'https://python.org',
'https://www.baidu.com',
'https://www.pythonav.com'
]
tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
done, pending = await asyncio.wait(tasks)
if __name__ == '__main__':
asyncio.run(main())