从ubibot获取消息后发送到钉钉
本帖最后由 liyitong 于 2024-7-12 17:38 编辑ubibot通过无线WiFi配置设备,可全方位实时采集并监测农业作物的生长环境,环境参数主要包括农业大棚或园中的温湿度、光照、PH值等。
简而言之,就是一个温度等数据的监控器,可以通过网络把数据上传。可以通过一个App查看具体数据,但是不喜欢在手机上安装不常用的APP。
于是研究了一下接口,于是有了这篇文章。
首先看一下数据获取的网址:
http://api.ubibot.cn/channels/5375/feeds?end=2024-07-12 15:59:19&field=field1&start=2024-07-12 15:44:19&timezone=Asia/Shanghai&token_id=64e6b44e17944ba9834b879f33517bd0
其中end=2024-07-12 15:59:19&field=field1&start=2024-07-12 15:44:19很显然就是要获取的时间段。
这里就注意到有个token_id,但是用了几天之后提示token_id error,那么这个token_id肯定是有有效期的,于是去找token_id的生成方法。
登陆页:https://console.ubibot.cn
登陆接口:https://api.ubibot.cn/accounts/login
输入用户名和密码后,post方式提交以下数据:{
"expire_in_seconds": "2592000",
"password": "e266f3804bc46f8eec8290b247712490860f8fc367c21c91254d4d34891efe14",
"password_type": "sha256",
"username": "-------@qq.com"
}expire_in_seconds是登陆信息的有效期。
password显然是个密文。
password_type只是参考,并非完全可信。
username是登陆的用户名。
在浏览器的调试器中搜索password,全部出现在https://console.ubibot.cn/scripts/scripts.js?v=1.7.4.11文件中。var H = this;
var c = {
username: s.username(),
password: sha256.hex(H.form.password),
password_type: "sha256",
expire_in_seconds: 2592e3
};
t.login(null, c).$promise.then(function(c) {
a.stepPsd = b
}sha256未发现自定义函数,也没从别处import,所以就是原生态sha256算法。找个在线网站测试一下:
完全一致。所以接下就是用Python重现这个过程。既然是原生态sha256,直接用hashlib库调用吧。import hashlib
def sha256_hash(s):
return hashlib.sha256(s.encode('utf-8')).hexdigest()
print(sha256_hash("------"))执行结果:e266f3804bc46f8eec8290b247712490860f8fc367c21c91254d4d34891efe14
拿到token_id后,去获取数据就行了。
得到数据后,再把内容发送到钉钉或者微信即可。
发送到钉钉,参考:https://www.52pojie.cn/thread-1861278-1-1.html
综上所述,完整代码如下:from datetime import datetime, timezone, timedelta
import requests, json
import hashlib
def sha256_hash(s):
return hashlib.sha256(s.encode('utf-8')).hexdigest()
def get_token(user,pwd):
login_url=r"https://api.ubibot.cn/accounts/login"
shuju={
"expire_in_seconds": "2592000",
"password": sha256_hash(pwd),
"password_type": "sha256",
"username": user
}
jieguo=requests.post(url=login_url, data=shuju).json()
print("token_id:",jieguo['token_id'])
print("过期时间:",jieguo['expire_time'])
return jieguo['token_id']
def send_message(xiaoxi: str, quantizhaohuan=0):
dingding_url = r"https://oapi.dingtalk.com/robot/send?access_token=76b01a6c051-----------158e6cd3b3ef948631be776c1a90b5d16d457bb3"
tou = {"Content-Type": "application/json; charset=utf-8"}
neirong = {"msgtype": "text",
"text": {"content": xiaoxi},
"at": {
"atMobiles": [],
"isAtAll": quantizhaohuan# 为1则@所有人
}
}
messagebody = json.dumps(neirong)
print(messagebody)
result = requests.post(url=dingding_url, data=messagebody, headers=tou)
print(result.text)
def make_message(token:str):
# 定义UTC+8时区
tz_utc_8 = timezone(timedelta(hours=8))
# 获取东8区的当前日期和时间
now_utc_8 = datetime.now(tz_utc_8)
before_15fenzhong = now_utc_8 + timedelta(minutes=-15)
# 打印年月日
jieshu = now_utc_8.strftime('%Y-%m-%d %H:%M:%S')
kaishi = before_15fenzhong.strftime('%Y-%m-%d %H:%M:%S')
wangzhi = "http://api.ubibot.cn/channels/5375/feeds?end=" + jieshu + "&field=field1&" \
"start=" + kaishi + "&timezone=Asia/Shanghai&token_id="+token
print(wangzhi)
jieguo = requests.get(url=wangzhi).json()
if 'error' in jieguo.values():
print(jieguo['errorCode'] + ":" + jieguo['desp'])
print('每个小时最多只能获取数据5次')
jg = "每个小时最多只能获取数据5次"
else:
for i in jieguo['feeds']:
print(i['created_at'].split('+').replace('T', ' '), " %.1f℃" % i['field1'])
wendu = "%.1f℃" % i['field1']
jg = "温度:%s (%s)" % (wendu, jieshu)
print(jg)
send_message(jg, 0)
if __name__=='__main__':
yonghuming = "-----@qq.com"
mima = "-----"
token=get_token(yonghuming,mima)
make_message(token)然后把代码丢到github上,设置一个action,设定好cron时间,每天坐等即可。
这个平台怎么绑定设备呢?好奇 已注册。 好东西,感谢分享,继续完善
页:
[1]