ESP32 & ESP8266 - micropython & C++调用钉钉群机器人群发消息
本帖最后由 xiaomingtt 于 2024-7-4 08:43 编辑准备用ESP32-S3做个监控设备,监测未接入环动设备的运行情况,当设备出现问题可以通过企业微信机器人和钉钉群机器人报警。
micropython做钉钉群 机器人的时候出现了问题,钉钉机器人安全设置为“加密”,官方给的DEMO中使用的python库在micropython中都没有,网上找了一圈也没找到micropython能用的钉钉机器人代码。
安全设置如果是关键词或IP地址应该跟微信机器人一样比较简单,但这个机器人已经在很多设备上运行很久了,不想修改以前的代码,只能继续研究micro python。
研究了两天,终于弄出一个micropython能用的钉钉群机器人代码。
import urequests as requests
import json
def sendDingTalkMsg(msgAlert):
import time
import uhashlib
import ubinascii
webhook = 'https://oapi.dingtalk.com/robot/send?access_token=*****your_token*****'
secret_key = '*****secret_key*****'
timestamp = str(round((time.time() + 946656000) * 1000))
def hmac_sha256(key, message):
block_size = 64
if len(key) > block_size:
key = uhashlib.sha256(key).digest()
if len(key) < block_size:
key = key + b'\x00' * (block_size - len(key))
o_key_pad = bytearray()
i_key_pad = bytearray()
inner = uhashlib.sha256(i_key_pad + message).digest()
hmac_code = uhashlib.sha256(o_key_pad + inner).digest()
return hmac_code
def get_Hmac():
secret_enc = secret_key.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, secret_key)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac_sha256(secret_enc, string_to_sign_enc)
return hmac_code
def is_alnum(char):
return ('0' <= char <= '9') or ('a' <= char <= 'z') or ('A' <= char <= 'Z')
def url_encode(s):
result = []
for char in s:
if is_alnum(char) or char in '-_.~':
result.append(char)
elif char == ' ':
result.append('+')
else:
result.append('%{:02X}'.format(ord(char)))
return ''.join(result)
hmac = get_Hmac()
base64_data = ubinascii.b2a_base64(hmac)
base64_data = base64_data.decode('utf-8')
sign = url_encode(base64_data)
if sign.endswith("%0A"):
sign=sign.rstrip('%0A')
signed_webhook = '{}×tamp={}&sign={}'.format(webhook, timestamp, sign)
headers = {'content-type': "application/json"}
body = {
"msgtype":"text",
"at":{
"isAtAll":"true"
},
"text": {
"content":msgAlert
}
}
d = json.dumps(body)
response = requests.post(signed_webhook, data = d.encode("utf-8"), headers = headers)
if response.status_code == 200 and response.json().get('errcode') == 0:
print("钉钉发送成功")
else:
print("钉钉发送失败",response.text)
有个比较坑的地方,micro python的时间戳跟python的起点不一样,以前不知道,盲目相信chatgpt,一直告诉我要同步设备时间,直到搜索后才发现解决方法这么简单,被chatgpt浪费了半天时间。 真好,难得看到在这里玩乐鑫板子的。 开发板又换成了ESP8266,mpy代码要改成C++。
#include <ESP8266WiFi.h>
#include <ESP8266HTTPClient.h>
#include <ArduinoJson.h>
#include <Arduino.h>
#include <Crypto.h>
#include <SHA256.h>
#include <WiFiUdp.h>
#include <NTPClient.h>
const char base64_chars[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz"
"0123456789+/";
#define WIFI_SSID "F1"
#define WIFI_PASSWORD "123456789"
const unsigned long connectInterval = 60000;
unsigned long lastConnectAttemptTime = 0;
const int maxConnectAttempts = 15;
unsigned long epochTime = 0;
WiFiUDP ntpUDP;
NTPClient timeClient(ntpUDP, "ntp7.aliyun.com", 60*60*8, 30*60*1000);
void setup() {
Serial.begin(2400);
connectWiFi();
timeClient.begin();
}
void loop() {
checkWiFiConnection();
timeClient.update();
epochTime = timeClient.getEpochTime() - 28800;
sendDingMSG("Hello World From ESP8266!");
delay(10000);
}
void connectWiFi() {
int attempt = 0;
WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
while (WiFi.status() != WL_CONNECTED && attempt < maxConnectAttempts) {
delay(1000);
Serial.print(".");
attempt++;
}
if (WiFi.status() == WL_CONNECTED) {
Serial.println("WiFi Connected");
Serial.print("IP Address: ");
Serial.println(WiFi.localIP());
} else {
Serial.println("WiFi Connection failed after max attempts");
}
}
void checkWiFiConnection() {
unsigned long currentTime = millis();
if (currentTime - lastConnectAttemptTime > connectInterval) {
lastConnectAttemptTime = currentTime;
if (WiFi.status() != WL_CONNECTED) {
WiFi.disconnect(true);
delay(1000);
connectWiFi();
} else {
Serial.print(WiFi.localIP());
}
}
}
void sendDingMSG(const char* msgAlert) {
String webhook = "https://oapi.dingtalk.com/robot/send?access_token=3ae63bd51de7053486b32d4e2d2eeb176e4f52b8798fdbd680e4be61f78cec87";
String secret_key = "SEC1306372b3167575940247b5fdc840fabda0fe28408b0e8c8dd232d29c70f2e9e";
String stringToSign = String(epochTime) + "000\n" + secret_key;
String sign = hmac_sha256_base64(secret_key, stringToSign);
String url = webhook + "×tamp=" + String(epochTime) + "000&sign=" + sign;
if (WiFi.status() == WL_CONNECTED) {
WiFiClientSecure client;
client.setInsecure();
HTTPClient http;
http.begin(client,url);
http.addHeader("Content-Type", "application/json");
DynamicJsonDocument doc(2048);
doc["msgtype"] = "text";
JsonObject text = doc.createNestedObject("text");
text["content"] = msgAlert;
String httpRequestData;
serializeJson(doc, httpRequestData);
int httpResponseCode = http.POST(httpRequestData);
if (httpResponseCode > 0) {
String response = http.getString();
Serial.println(httpResponseCode);
Serial.println(response);
} else {
Serial.print("Error on sending POST: ");
Serial.println(httpResponseCode);
}
http.end();
} else {
Serial.println("Error in WiFi connection");
}
}
String base64_encode(byte* input, int length) {
String encoded;
int i = 0;
while (i < length) {
int octet_a = i < length ? input : 0;
int octet_b = i < length ? input : 0;
int octet_c = i < length ? input : 0;
int triple = (octet_a << 0x10) + (octet_b << 0x08) + octet_c;
encoded += base64_chars[(triple >> 3 * 6) & 0x3F];
encoded += base64_chars[(triple >> 2 * 6) & 0x3F];
encoded += base64_chars[(triple >> 1 * 6) & 0x3F];
encoded += base64_chars[(triple >> 0 * 6) & 0x3F];
}
for (int j = 0; j < (3 - length % 3) % 3; j++) {
encoded.setCharAt(encoded.length() - 1 - j, '=');
}
return encoded;
}
String hmac_sha256_base64(const String& key, const String& message) {
int block_size = 64;
byte key_copy;
memset(key_copy, 0, block_size);
if (key.length() > block_size) {
SHA256 sha256;
sha256.update((const byte*)key.c_str(), key.length());
sha256.finalize(key_copy, block_size);
} else {
memcpy(key_copy, key.c_str(), key.length());
}
byte o_key_pad;
byte i_key_pad;
for (int i = 0; i < block_size; i++) {
o_key_pad = 0x5c ^ key_copy;
i_key_pad = 0x36 ^ key_copy;
}
SHA256 inner_sha256;
inner_sha256.update(i_key_pad, block_size);
inner_sha256.update((const byte*)message.c_str(), message.length());
byte inner_hash;
inner_sha256.finalize(inner_hash, 32);
SHA256 outer_sha256;
outer_sha256.update(o_key_pad, block_size);
outer_sha256.update(inner_hash, 32);
byte hmac_code;
outer_sha256.finalize(hmac_code, 32);
return base64_encode(hmac_code, 32);
}
`被chatgpt浪费了很多时间`我不认可 虽然看不懂,依然奥利给原创:lol 优秀啊,支持原创,码起来学习 优秀啊,支持原创
优秀啊,支持原创,码起来学习 timestamp = str(round((time.time() + 946656000) * 1000)){:1_918:}
页:
[1]