好友
阅读权限10
听众
最后登录1970-1-1
|
本帖最后由 xiangwu820 于 2021-3-8 11:16 编辑
第一次在论坛发贴,请各位大神多多关照!本人纯粹小白一枚,由于工作需要想收集气象资料,未能找到很好的资料收集途径,无奈只能自己逐天收集!在此感谢论坛、感谢某度的资料,写了个可以满足使用的天气爬虫。内容包括:日期,AQI最高值,最高气温,最低气温,最大相对湿度,最大风力,总降水量,,风频率,时间,角度,风向,风力,时间等
目标网站:中国天气网
软件支持:Chrome和ChromeDriver抓取的内容为中国天气网过去24小时的整点天气实况
抓取内容:过去24小时整点天气实况
抓取内容
目标页面的URL中由一个9位数字的代码来确定访问的乡镇 ,因此准备了一份所需的资料的乡镇清单,并与中国天气网中的代码一一对应。
URL
地名
由于数据是通过JS加载的,所以采用模拟浏览器的方法(selenium)获取数据,再用BeautifulSoup对网页进行分析。
[Asm] 纯文本查看 复制代码 import requests
from bs4 import BeautifulSoup
import traceback
import re
from selenium import webdriver
import time,datetime
import os
import psutil
读取前面准备的乡镇清单,并存放在townDict字典中备用
[Asm] 纯文本查看 复制代码 def getTownInfo(townList,townIdList,townDict,dNamePath):
f = open(dNamePath, "r", encoding="utf-8")
for line in f:
lineList = line.split("\t")
townList.append(lineList[0])
townIdList.append(lineList[1].replace("\n", ""))
townDict[lineList[0]] = lineList[1].replace("\n", "")
获取页面资源函数,为确保页面加载完整,将页面拖动到最后并停留5秒
[Asm] 纯文本查看 复制代码 def getPageSource(weatherUrl,html):
url = weatherUrl + wid + ".shtml"
driver = webdriver.Chrome(executable_path="chromedriver.exe")
driver.get(url)
driver.execute_script('window.scrollTo(0,document.body.scrollHeight)')
time.sleep(5)
html = driver.page_source
天气数据收集函数
[Asm] 纯文本查看 复制代码 def getWeatherInfo(townList,townIdList,failTownList,weatherUrl):
print("开始收集天气数据……")
count = 0
#failTownList = []#初始化获取失败的乡镇名
for i in range(len(townList)):
wName = townList[i]
wid = townIdList[i]
weatherInfoList = []#数据初始化
#利用selenium库获取JS动态页面源码
url = weatherUrl + wid + ".shtml"
options = webdriver.ChromeOptions()
options.add_experimental_option('excludeSwitches', ['enable-logging'])
options.headless = True
driver = webdriver.Chrome(executable_path="chromedriver.exe", options=options)
#driver = webdriver.Chrome (executable_path="chromedriver.exe")
try:
driver.get (url)
driver.execute_script ('window.scrollTo(0,document.body.scrollHeight)')
time.sleep (5)
html = driver.page_source
driver.close ()
if html == "":
count += 1
print ("\r收集{:^10}天气数据为空,当前进度: {:>6.2f}%,第{:^5}项,共{:^5}项".format (wName, count * 100 / len (townList), count,
len (townList)), end="")
failTownList.append(wName)
continue
infoDict = {}
soup = BeautifulSoup(html,"html.parser")#利用BeautifulSoup库处理selenium库提取的页面
weatherInfo = soup.find('div',attrs={'class':'weather_zdsk'})
air = soup.find_all('p',attrs={'class':'air'})[0]
tem = soup.find_all ('p', attrs={'class': 'tem on'})[0]
humidity = soup.find_all ('p', attrs={'class': 'humidity'})[0]
wind = soup.find_all ('p', attrs={'class': 'wind'})[0]
rain = soup.find_all ('p', attrs={'class': 'rain'})[0]
wins = weatherInfo.find_all('script')[0]#获取每个小时的风向数据
wins = str(wins).replace(" ","")
wRe = re.compile(r">.+[</]+?")
#提取主要内容
air = wRe.findall(str(air))[0].strip(">").strip("</")
tem = wRe.findall(str(tem))[0].strip(">").strip("</")
humidity = wRe.findall(str(humidity))[0].strip(">").strip("</")
wind = wRe.findall(str(wind))[0].strip(">").strip("</")
rain = wRe.findall(str(rain))[0].strip(">").strip("</")
wins = re.findall(r"{.+}",wins)[0]
winsList = dict(eval(wins))["od"]["od2"]
w1 = air.split (":")[1]
w2 = tem.split(",")[0].split(":")[1]
w3 = tem.split(",")[1].split(":")[1]
w4 = humidity.split(":")[1]
w5 =wind.split(":")[1]
w6 = rain.split(":")[1]
#print ("{}-->{}-->{}-->{9}-->{}-->{}".format (w1, w2, w3, w4, w5,w6))
weatherInfoList = [w1, w2, w3, w4, w5,w6,"",""]
for i in winsList:
winsListDict = dict(i)
weatherInfoList.append(winsListDict["od21"])#时间
#weatherInfoList.append (winsListDict["od22"])#未知
weatherInfoList.append (winsListDict["od23"])#角度
weatherInfoList.append (winsListDict["od24"])#风向
weatherInfoList.append (winsListDict["od25"])#风力
#weatherInfoList.append (winsListDict["od26"])#未知
#weatherInfoList.append (winsListDict["od27"])#未知
#weatherInfoList.append (winsListDict["od28"])#未知
#写入数据
dirpath="./weatherData/"
fpath = "./weatherData/{}.txt".format(wid + wName)
mkdirlambda = lambda x: os.makedirs(x) if not os.path.exists(x) else True # 目录是否存在,不存在则创建
mkdirlambda(dirpath)
with open(fpath,"a",encoding="utf-8") as f:
if os.path.getsize(fpath) == 0:#判断文件是否为空
f.write ("日期,AQI最高值,最高气温,最低气温,最大相对湿度,最大风力,总降水量,,风频率" +",时间,角度,风向,风力"*25+"\n")
f.write(time.strftime('%Y-%m-%d',time.localtime())+",")
for i in weatherInfoList:
f.write("{},".format(i))
f.write ("\n")
f.close()
#print("{}-->{}-->{}-->{}-->{}".format(air,tem,humidity,wind,rain))
#print(winsDict)
count += 1
print ("\r正在收集{:^10}天气数据,当前进度: {:>6.2f}%,第{:^5}项,共{:^5}项".format (wName, count * 100 / len (townList), count,
len (townList)), end="")
except:
count += 1
print ("\r收集{:^10}天气数据错误,当前进度: {:>6.2f}%,第{:^5}项,共{:^5}项".format (wName, count * 100 / len (townList), count,
len (townList)), end="")
#traceback.print_exc()
failTownList.append(wName)
continue
#time.sleep(3)
return failTownList
print("\n完成天气数据收集……")
主程序函数
[Asm] 纯文本查看 复制代码 def main():
countyList = []
townList = []
townIdList = []
#global failTownList
failTownList =[]
townDict={}
failTownStr = ""
failCount = 0
dNamePath = ".\dNameLIst.txt"#地名文件
#getTownInfo(countyList,townList,townIdList,townDict,dNamePath)
getTownInfo( townList,townIdList,townDict,dNamePath)
#print(countyList,townList,townIdList)
weatherUrl = "http://forecast.weather.com.cn/town/weather1dn/"#中国天气网乡镇天气网址前部
getWeatherInfo(townList,townIdList,failTownList,weatherUrl)
while len(failTownList) != 0 :
fTIList = []
fTList = failTownList
failTownList = []
for l in fTList:
fTIList.append(townDict[l])
print("\n正在第{}次重采集失败的乡镇的天气数据……".format(failCount + 1))
getWeatherInfo (fTList, fTIList, failTownList, weatherUrl)
if len(failTownList) == 0:
with open ("./failTownLog.txt", "a", encoding="utf-8") as fF:
failTownTime = time.strftime ("%Y-%m-%d", time.localtime ())
fF.write ("{},所有乡镇数据收集完成!\n".format (failTownTime))
break
failCount += 1
if failCount == 5:
failTownStr = ",".join (failTownList)
with open ("./failTownLog.txt", "a", encoding="utf-8") as fF:
failTownTime = time.strftime ("%Y-%m-%d", time.localtime ())
fF.write ("{},以下乡镇数据收集失败:{}。\n".format (failTownTime, failTownStr))
print ("\n重采集次数大于{}次,退出重采集!".format (failCount))
break
获取CPU使用情况函数
[Asm] 纯文本查看 复制代码 #CPU使用情况
def monitorSystem(logfile = None):
#获取CPU使用
cpuper = psutil.cpu_percent()
#获取内存使用情况:系统内存大小,使用内存,有效内存,内存使用率
mem = psutil.virtual_memory()
#内存使用率
memper = mem.percent
#获取当前时间
now1 = datetime.datetime.now()
ts = now1.strftime("%Y-%m-%d %H:%M:%S")
#line = f'{ts} cpu:{cpuper}%, mem:{memper}%'
#print(line)
with open(logfile,"a",encoding="utf-8") as logfile:
logfile.write('{} cpu:{}%, mem:{}%\n'.format(ts,cpuper,memper))
logfile.close()
定时开始收集天气数据函数
[Asm] 纯文本查看 复制代码 #定时启动
def runAuto(h,m):
if h == "":
h = 8
if m == "":
m = 0
while True:
print("\n*********天气抓取程序运行中……,请勿关闭本窗口!——联系人:吴翔*********")
now = datetime.datetime.now()
monitorSystem(logfile="./logfile.txt")
#if now.hour == h and now.minute == m:
if now.hour < h:
print("尚未到程序执行时间,程序执行时间为{}点{}分".format(h,m))
sleepTime = (h-now.hour)*3600 - now.minute*60 + m*60
time.sleep(sleepTime)
#continue
elif now.hour > h:
print("已超过程序执行时间,程序执行时间为{}点{}分".format(h,m))
sleepTime = (24 - now.hour ) * 3600
time.sleep(sleepTime)
else:
if now.minute == m:
print("现在是{}点{}分,开始执行程序……".format(h,m))
main()
time.sleep(60)
else:
if now.minute < m:
print("尚未到程序执行时间,程序执行时间为{}点{}分".format(h,m))
sleepTime = (m - now.minute) * 60 - now.second
time.sleep(sleepTime)
else:
print("已超过程序执行时间,程序执行时间为{}点{}分".format(h,m))
sleepTime = (24 - now.hour) * 3600
time.sleep(sleepTime)
#break
设置开始收集数据时间
[Asm] 纯文本查看 复制代码 #hStr = eval(input("请输入获取数据的时间(时):"))
#mStr = eval(input("请输入获取数据的时间(分):"))
hStr = 8
mStr = 0
runAuto(h=hStr,m=mStr)
|
|