好友
阅读权限10
听众
最后登录1970-1-1
|
楼主|
无字情书
发表于 2022-4-26 16:41
加了疫情的取值给你参考下我的
# 群
# -*- coding: utf-8 -*-
# python 3.8
import re
import requests
import time
import hmac
import hashlib
import datetime
import base64
import urllib.parse
from lxml import etree
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
# 设置浏览器隐藏
chrome_options = Options()
chrome_options.add_argument('--headless')
driver = webdriver.Chrome(options=chrome_options)
# 获取今日天气
def tianqi(url):
res1 = urllib.request.urlopen(url)
date = res1.read().decode("utf8")
pattern = re.compile(r'value="(.+?)" /')
res2 = re.findall(pattern, date)
return res2[1]
# 取具体地点天气,取当前时间
def weather():
weather1 = tianqi("http://www.weather.com.cn/weather1d/101230509.shtml") # 101230509 城市代码
now_time = datetime.datetime.now().strftime('%Y-%m-%d')
return weather1[9:], now_time
# 疫情取值函数
def epidemic(url: 'str', Locally_added: 'str', a: 'int', Asymptomatic: 'str', b: 'int', Cumulative: 'str', c: 'int',
receivetime: 'str', d: 'int', isquit: 'int'):
driver.get(url)
a = a
b = b
c = c
d = d
num = driver.find_element(By.XPATH, "//*[text()='新型冠状病毒肺炎']").get_attribute("class")
num = num[5:14] # _1-1-341_,页面属性值是动态,先取标签属性,用于下方取值
Locally_added = Locally_added[:-6] + num + Locally_added[-6:]
Asymptomatic = Asymptomatic[:-6] + num + Asymptomatic[-6:]
Cumulative = Cumulative[:-6] + num + Cumulative[-6:]
receivetime = receivetime[:5] + num + receivetime[5:]
new_diagnosis = Locally_added
Locally_added = driver.find_elements(By.CLASS_NAME, Locally_added)[a].text
Asymptomatic = driver.find_elements(By.CLASS_NAME, Asymptomatic).text
Cumulative = driver.find_elements(By.CLASS_NAME, Cumulative)[c].text
if d == 1:
receivetime = driver.find_element(By.CLASS_NAME, receivetime).text[:20]
total = receivetime + '--泉州' + '\n' '新增本土:' + Locally_added + ' 新增无症状:' + Asymptomatic + ' 现存确诊:' + Cumulative
else:
new_diagnosis = driver.find_elements(By.CLASS_NAME, new_diagnosis)[0].text
total = '\n' + '国内疫情' + '\n' + '【新增确诊:' + new_diagnosis + ' 新增本土:' + Locally_added + ' 新增无症状:' + Asymptomatic + ' 现存确诊:' + Cumulative + '】' + '\n' + '数据来源:https://t.hk.uy/aVkb'
if isquit == 1:
time.sleep(1)
driver.quit()
return total
try:
# 取本地
local_epidemic_situation = epidemic(
'https://voice.baidu.com/act/newpneumonia/newpneumonia/?from=osari_aladin_banner&city=' # 泉州地址
'%E7%A6%8F%E5%BB%BA-%E6%B3%89%E5%B7%9E'
, 'ProvinceSummary3aIcdg'
, 0
, 'ProvinceSummary19j0la'
, 0
, 'ProvinceSummaryF8LjRz'
, 0
, 'Virus32Y_aO'
, 1
, 0)
# 取国内
domestic_epidemic_situation = epidemic(
'https://voice.baidu.com/act/newpneumonia/newpneumonia/?from=osari_aladin_banner' # 首页地址
, 'VirusSummarySix2ZJJBJ'
, 1
, 'VirusSummarySix2ZJJBJ'
, 3
, 'VirusSummarySix2ZJJBJ'
, 4
, 'Virus32Y_aO'
, 0
, 1)
send_text = local_epidemic_situation + domestic_epidemic_situation + '\n' # 消息合并
except:
send_text = '疫情信息获取失败,请检查' + '\n' # 异常抛出
# 早报获取函数
def news():
url = "https://www.163.com/dy/media/T1603594732083.html"
rsp = requests.get(url)
html = etree.HTML(rsp.text)
today_url = html.xpath("//h2[@class='media_article_title']/a/@href")[0]
rsp = requests.get(today_url)
html = etree.HTML(rsp.text)
news_list = html.xpath("//div[@class='post_body']/p[2]//text()")
news_time = html.xpath("//html/@data-publishtime")
news_time = str(news_time[0])
news_time = news_time[:10]
news_list = news_list[2:]
test = '\n'.join(news_list)
test = test.replace('[公众号:365资讯简报]', '') #替换早报广告
if test.find('【微语】') != -1:
text = test.replace('【微语】', '\n' + '\n' + '鸡汤来咯:')
return news_time, text
else:
return news_time, test
#合并发送消息
def send():
send1 = send_text + weather()[1] + ' ' + weather()[0] + '\n' + news()[1]
return send1
#合并发送消息
send = send()
send1 = send_text + '\n' + weather()[1] + ' ' + weather()[0] + '\n' + '未更新今日新闻'
# 钉钉发送
def ding(send):
timestamp = str(round(time.time() * 1000))
secret = 'SEC65752ca25b84d92f23a6d86c2111672a'
secret_enc = secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
url = 'https://oapi.dingtalk.com/robot/send?access_token=4920226b86e6f08ec4d6f7×tamp={}&sign={}'.format(
timestamp, sign)
headers = {'Content-Type': 'application/json'}
json = {"msgtype": "text",
"text": {
"content": send
},
"at": {
"atMobiles": [
"", # 机器人发送信息@的用户
],
"isAtAll": False # 是否@所有人
}
}
requests.post(url=url, headers=headers, json=json)
def main():
if weather()[1] == news()[0]:
ding(send)
print('发送成功' + ' ' + weather()[1])
else:
ding(send1)
print('未更新今日新闻')
if __name__ == '__main__':
main()
|
|