[Python] 纯文本查看 复制代码
import requests
from lxml import etree
#这里是黄金的提醒价,低于此价格会@全体成员
au_remind_price=390.0
#这里是白银的提醒价,低于此价格会@全体成员
ag_remind_price=4900.0
robot_url=["这里存放你的钉钉/企业微信机器人链接,可使用列表存储多个,会同时发送"]
url="https://www.sge.com.cn/sjzx/yshqbg"
def requests_hold(url):
header={
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:102.0) Gecko/20100101 Firefox/102.0",
"Accept":"text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Accept-Language":"zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
"Connection":"keep-alive"
}
for i in range(0,10):
try:
r=requests.get(url=url,timeout=60,headers=header)
return r.text
except Exception:
pass
return False
def xpath_data(url):
"""
该函数用于从网页解析和整理黄金白银价格
"""
time_xpath="/html/body/div[6]/div/div[2]/div[2]/div[2]/div[1]/p/span[2]/text()"
au9999_new_xpath="/html/body/div[6]/div/div[2]/div[2]/div[2]/div[2]/div/table/tr[2]/td[2]/span/text()"
au9999_high_xpath="/html/body/div[6]/div/div[2]/div[2]/div[2]/div[2]/div/table/tr[2]/td[3]/text()"
au9999_low_xpath="/html/body/div[6]/div/div[2]/div[2]/div[2]/div[2]/div/table/tr[2]/td[4]/text()"
au9999_open_today_xpath="/html/body/div[6]/div/div[2]/div[2]/div[2]/div[2]/div/table/tr[2]/td[5]/text()"
Au100g_new_xpath="/html/body/div[6]/div/div[2]/div[2]/div[2]/div[2]/div/table/tr[4]/td[2]/span/text()"
Au100g_high_xpath="/html/body/div[6]/div/div[2]/div[2]/div[2]/div[2]/div/table/tr[4]/td[3]/text()"
Au100g_low_xpath="/html/body/div[6]/div/div[2]/div[2]/div[2]/div[2]/div/table/tr[4]/td[4]/text()"
Au100g_open_today_xpath="/html/body/div[6]/div/div[2]/div[2]/div[2]/div[2]/div/table/tr[4]/td[5]/text()"
auTD_new_xpath="/html/body/div[6]/div/div[2]/div[2]/div[2]/div[2]/div/table/tr[7]/td[2]/span/text()"
auTD_high_xpath="/html/body/div[6]/div/div[2]/div[2]/div[2]/div[2]/div/table/tr[7]/td[3]/text()"
auTD_low_xpath="/html/body/div[6]/div/div[2]/div[2]/div[2]/div[2]/div/table/tr[7]/td[4]/text()"
auTD_open_today_xpath="/html/body/div[6]/div/div[2]/div[2]/div[2]/div[2]/div/table/tr[7]/td[5]/text()"
agTD_new_xpath="/html/body/div[6]/div/div[2]/div[2]/div[2]/div[2]/div/table/tr[6]/td[2]/span/text()"
agTD_high_xpath="/html/body/div[6]/div/div[2]/div[2]/div[2]/div[2]/div/table/tr[6]/td[3]/text()"
agTD_low_xpath="/html/body/div[6]/div/div[2]/div[2]/div[2]/div[2]/div/table/tr[6]/td[4]/text()"
agTD_open_today_xpath="/html/body/div[6]/div/div[2]/div[2]/div[2]/div[2]/div/table/tr[6]/td[5]/text()"
data=requests_hold(url)
html = etree.HTML(data)
time_data=html.xpath(time_xpath)[0]
au9999_new=html.xpath(au9999_new_xpath)[0]
au9999_high=html.xpath(au9999_high_xpath)[0]
au9999_low=html.xpath(au9999_low_xpath)[0]
au9999_open_today=html.xpath(au9999_open_today_xpath)[0]
Au100g_new=html.xpath(Au100g_new_xpath)[0]
Au100g_high=html.xpath(Au100g_high_xpath)[0]
Au100g_low=html.xpath(Au100g_low_xpath)[0]
Au100g_open_today=html.xpath(Au100g_open_today_xpath)[0]
auTD_new=html.xpath(auTD_new_xpath)[0]
auTD_high=html.xpath(auTD_high_xpath)[0]
auTD_low=html.xpath(auTD_low_xpath)[0]
auTD_open_today=html.xpath(auTD_open_today_xpath)[0]
agTD_new=html.xpath(agTD_new_xpath)[0]
agTD_high=html.xpath(agTD_high_xpath)[0]
agTD_low=html.xpath(agTD_low_xpath)[0]
agTD_open_today=html.xpath(agTD_open_today_xpath)[0]
return {
"time":time_data,
"au9999_new":au9999_new,
"au9999_high":au9999_high,
"au9999_low":au9999_low,
"au9999_open_today":au9999_open_today,
"Au100g_new":Au100g_new,
"Au100g_high":Au100g_high,
"Au100g_low":Au100g_low,
"Au100g_open_today":Au100g_open_today,
"auTD_new":auTD_new,
"auTD_high":auTD_high,
"auTD_low":auTD_low,
"auTD_open_today":auTD_open_today,
"agTD_new":agTD_new,
"agTD_high":agTD_high,
"agTD_low":agTD_low,
"agTD_open_today":agTD_open_today
}
def check_price_data(price_data:dict):
"""
该函数用于判断要不要发起提醒和艾特全体成员
"""
global au_remind_price,ag_remind_price
au9999_new=float(price_data["au9999_new"])
au9999_high=float(price_data["au9999_high"])
au9999_low=float(price_data["au9999_low"])
au9999_open_today=float(price_data["au9999_open_today"])
Au100g_new=float(price_data["Au100g_new"])
Au100g_high=float(price_data["Au100g_high"])
Au100g_low=float(price_data["Au100g_low"])
Au100g_open_today=float(price_data["Au100g_open_today"])
auTD_new=float(price_data["auTD_new"])
auTD_high=float(price_data["auTD_high"])
auTD_low=float(price_data["auTD_low"])
auTD_open_today=float(price_data["auTD_open_today"])
agTD_new=float(price_data["agTD_new"])
agTD_high=float(price_data["agTD_high"])
agTD_low=float(price_data["agTD_low"])
agTD_open_today=float(price_data["agTD_open_today"])
remain={
"at":False,
"word":""
}
if au9999_new<=au_remind_price:
remain["at"]=True
remain["word"]=remain["word"]+">au9999最新价低于提醒价%.2f "%(au_remind_price)+"\n\n"
if au9999_high<=au_remind_price:
remain["at"]=True
remain["word"]=remain["word"]+">au9999最高价低于提醒价%.2f "%(au_remind_price)+"\n\n"
if au9999_low<=au_remind_price:
remain["at"]=True
remain["word"]=remain["word"]+">au9999最低价低于提醒价%.2f "%(au_remind_price)+"\n\n"
if au9999_open_today<=au_remind_price:
remain["at"]=True
remain["word"]=remain["word"]+">au9999今开价低于提醒价%.2f "%(au_remind_price)+"\n\n"
if Au100g_new<=au_remind_price:
remain["at"]=True
remain["word"]=remain["word"]+">Au100g最新价低于提醒价%.2f "%(au_remind_price)+"\n\n"
if Au100g_high<=au_remind_price:
remain["at"]=True
remain["word"]=remain["word"]+">Au100g最高价低于提醒价%.2f "%(au_remind_price)+"\n\n"
if Au100g_low<=au_remind_price:
remain["at"]=True
remain["word"]=remain["word"]+">Au100g最低价低于提醒价%.2f "%(au_remind_price)+"\n\n"
if Au100g_open_today<=au_remind_price:
remain["at"]=True
remain["word"]=remain["word"]+">Au100g今开价低于提醒价%.2f "%(au_remind_price)+"\n\n"
if auTD_new<=au_remind_price:
remain["at"]=True
remain["word"]=remain["word"]+">auTD最新价低于提醒价%.2f "%(au_remind_price)+"\n\n"
if auTD_high<=au_remind_price:
remain["at"]=True
remain["word"]=remain["word"]+">auTD最高价低于提醒价%.2f "%(au_remind_price)+"\n\n"
if auTD_low<=au_remind_price:
remain["at"]=True
remain["word"]=remain["word"]+">auTD最低价低于提醒价%.2f "%(au_remind_price)+"\n\n"
if auTD_open_today<=au_remind_price:
remain["at"]=True
remain["word"]=remain["word"]+">auTD今开价低于提醒价%.2f "%(au_remind_price)+"\n\n"
if agTD_new<=ag_remind_price:
remain["at"]=True
remain["word"]=remain["word"]+">agTD最新价低于提醒价%.2f "%(ag_remind_price)+"\n\n"
if agTD_high<=ag_remind_price:
remain["at"]=True
remain["word"]=remain["word"]+">agTD最高价低于提醒价%.2f "%(ag_remind_price)+"\n\n"
if agTD_low<=ag_remind_price:
remain["at"]=True
remain["word"]=remain["word"]+">agTD最低价低于提醒价%.2f "%(ag_remind_price)+"\n\n"
if agTD_open_today<=ag_remind_price:
remain["at"]=True
remain["word"]=remain["word"]+">agTD今开价低于提醒价%.2f "%(ag_remind_price)+"\n\n"
return remain
def send_word(robot_url:list,price_data:dict,remian:dict):
"""
该函数向钉钉机器人发送提醒消息
"""
au9999_word=">**Au9999**\n>>最新价**%s元** 最高价**%s元**\n\n>>最低价**%s元** 今开盘**%s元**\n\n"%(price_data["au9999_new"],price_data["au9999_high"],price_data["au9999_low"],price_data["au9999_open_today"])
Au100g_word=">**Au100g**\n>>最新价**%s元** 最高价**%s元**\n\n>>最低价**%s元** 今开盘**%s元**\n\n"%(price_data["Au100g_new"],price_data["Au100g_high"],price_data["Au100g_low"],price_data["Au100g_open_today"])
auTD_word=">**Au(T+D)**\n>>最新价**%s元** 最高价**%s元**\n\n>>最低价**%s元** 今开盘**%s元**\n\n"%(price_data["auTD_new"],price_data["auTD_high"],price_data["auTD_low"],price_data["auTD_open_today"])
agTD_word=">**Ag(T+D)**\n>>最新价**%s元** 最高价**%s元**\n\n>>最低价**%s元** 今开盘**%s元**\n\n"%(price_data["agTD_new"],price_data["agTD_high"],price_data["agTD_low"],price_data["agTD_open_today"])
remain_word=""
if remian["at"]:
remain_word="### 提醒信息\n "+remian["word"]
header={
"Content-Type":"application/json"
}
text="# %s\n "%(price_data["time"])+"## 黄金价格\n "+au9999_word+Au100g_word+auTD_word+"## 白银价格\n "+agTD_word+remain_word
if price_data["au9999_new"]=="0.0":
text="# %s\n "%(price_data["time"])+"## 今日无数据\n "
remian["at"]=False
dingtalk_body_data=[{
"msgtype": "markdown",
"markdown": {
"title":"贵金属提醒",
"text": text
},
"at": {
"atMobiles": [
],
"atUserIds": [
],
"isAtAll": remian["at"]
}
}]
weixin_qq_body=[{
"msgtype": "markdown",
"markdown": {
"content": text
}
}]
if remian["at"]:
weixin_qq_body.append({
"msgtype": "text",
"text": {
"content": "",
"mentioned_list":["@all"],
"mentioned_mobile_list":[]
}
})
for robot_url_now in robot_url:
if "dingtalk.com" in robot_url_now:
body_data=dingtalk_body_data
elif "weixin.qq.com" in robot_url_now:
body_data=weixin_qq_body
else:
print("%s 该url未能识别为钉钉机器人,或者为企微机器人。")
continue
for body_now in body_data:
for i in range(0,3):
try:
r=requests.post(url=robot_url_now,headers=header,timeout=60,json=body_now)
break
except Exception as err:
print(err)
def main(url,robot_url):
"""
主函数,调用其他函数
"""
price_data=xpath_data(url=url)
remain_data=check_price_data(price_data=price_data)
send_word(robot_url=robot_url,remian=remain_data,price_data=price_data)
main(url=url,robot_url=robot_url)