xiaomingtt 发表于 2024-6-25 20:59

ESP32 & ESP8266 - micropython & C++调用钉钉群机器人群发消息

本帖最后由 xiaomingtt 于 2024-7-4 08:43 编辑

准备用ESP32-S3做个监控设备,监测未接入环动设备的运行情况,当设备出现问题可以通过企业微信机器人和钉钉群机器人报警。
micropython做钉钉群 机器人的时候出现了问题,钉钉机器人安全设置为“加密”,官方给的DEMO中使用的python库在micropython中都没有,网上找了一圈也没找到micropython能用的钉钉机器人代码。
安全设置如果是关键词或IP地址应该跟微信机器人一样比较简单,但这个机器人已经在很多设备上运行很久了,不想修改以前的代码,只能继续研究micro python。
研究了两天,终于弄出一个micropython能用的钉钉群机器人代码。
import urequests as requests
import json

def sendDingTalkMsg(msgAlert):
    import time
    import uhashlib
    import ubinascii
   
    webhook = 'https://oapi.dingtalk.com/robot/send?access_token=*****your_token*****'
    secret_key = '*****secret_key*****'

    timestamp = str(round((time.time() + 946656000) * 1000))

    def hmac_sha256(key, message):
      block_size = 64

      if len(key) > block_size:
            key = uhashlib.sha256(key).digest()
      if len(key) < block_size:
            key = key + b'\x00' * (block_size - len(key))

      o_key_pad = bytearray()
      i_key_pad = bytearray()

      inner = uhashlib.sha256(i_key_pad + message).digest()
      hmac_code = uhashlib.sha256(o_key_pad + inner).digest()
      
      return hmac_code

    def get_Hmac():
      secret_enc = secret_key.encode('utf-8')
      string_to_sign = '{}\n{}'.format(timestamp, secret_key)
      string_to_sign_enc = string_to_sign.encode('utf-8')
      hmac_code = hmac_sha256(secret_enc, string_to_sign_enc)
      return hmac_code


    def is_alnum(char):
      return ('0' <= char <= '9') or ('a' <= char <= 'z') or ('A' <= char <= 'Z')

    def url_encode(s):
      result = []
      for char in s:
            if is_alnum(char) or char in '-_.~':
                result.append(char)
            elif char == ' ':
                result.append('+')
            else:
                result.append('%{:02X}'.format(ord(char)))
      return ''.join(result)

    hmac = get_Hmac()
    base64_data = ubinascii.b2a_base64(hmac)
    base64_data = base64_data.decode('utf-8')
    sign = url_encode(base64_data)
    if sign.endswith("%0A"):
      sign=sign.rstrip('%0A')
    signed_webhook = '{}×tamp={}&sign={}'.format(webhook, timestamp, sign)

    headers = {'content-type': "application/json"}
    body = {
      "msgtype":"text",
      "at":{
            "isAtAll":"true"
      },
      "text": {
            "content":msgAlert
      }
    }

    d = json.dumps(body)
    response = requests.post(signed_webhook, data = d.encode("utf-8"), headers = headers)
    if response.status_code == 200 and response.json().get('errcode') == 0:
      print("钉钉发送成功")
    else:
      print("钉钉发送失败",response.text)


有个比较坑的地方,micro python的时间戳跟python的起点不一样,以前不知道,盲目相信chatgpt,一直告诉我要同步设备时间,直到搜索后才发现解决方法这么简单,被chatgpt浪费了半天时间。

rapid7 发表于 2024-7-23 09:56

真好,难得看到在这里玩乐鑫板子的。

xiaomingtt 发表于 2024-7-4 08:46

开发板又换成了ESP8266,mpy代码要改成C++。
#include <ESP8266WiFi.h>
#include <ESP8266HTTPClient.h>
#include <ArduinoJson.h>
#include <Arduino.h>
#include <Crypto.h>
#include <SHA256.h>
#include <WiFiUdp.h>
#include <NTPClient.h>

const char base64_chars[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
                            "abcdefghijklmnopqrstuvwxyz"
                            "0123456789+/";

#define WIFI_SSID "F1"
#define WIFI_PASSWORD "123456789"

const unsigned long connectInterval = 60000;
unsigned long lastConnectAttemptTime = 0;
const int maxConnectAttempts = 15;

unsigned long epochTime = 0;

WiFiUDP ntpUDP;
NTPClient timeClient(ntpUDP, "ntp7.aliyun.com", 60*60*8, 30*60*1000);

void setup() {
Serial.begin(2400);
connectWiFi();
timeClient.begin();
}


void loop() {
checkWiFiConnection();
timeClient.update();
epochTime = timeClient.getEpochTime() - 28800;
sendDingMSG("Hello World From ESP8266!");
delay(10000);   
}

void connectWiFi() {
int attempt = 0;
WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
while (WiFi.status() != WL_CONNECTED && attempt < maxConnectAttempts) {
    delay(1000);
    Serial.print(".");
    attempt++;
}

if (WiFi.status() == WL_CONNECTED) {
    Serial.println("WiFi Connected");
    Serial.print("IP Address: ");
    Serial.println(WiFi.localIP());
} else {
    Serial.println("WiFi Connection failed after max attempts");
}
}

void checkWiFiConnection() {
unsigned long currentTime = millis();

if (currentTime - lastConnectAttemptTime > connectInterval) {
    lastConnectAttemptTime = currentTime;
    if (WiFi.status() != WL_CONNECTED) {
      WiFi.disconnect(true);
      delay(1000);
      connectWiFi();
    } else {
      Serial.print(WiFi.localIP());
    }
}
}

void sendDingMSG(const char* msgAlert) {
    String webhook = "https://oapi.dingtalk.com/robot/send?access_token=3ae63bd51de7053486b32d4e2d2eeb176e4f52b8798fdbd680e4be61f78cec87";
    String secret_key = "SEC1306372b3167575940247b5fdc840fabda0fe28408b0e8c8dd232d29c70f2e9e";
    String stringToSign = String(epochTime) + "000\n" + secret_key;
    String sign = hmac_sha256_base64(secret_key, stringToSign);
    String url = webhook + "×tamp=" + String(epochTime) + "000&sign=" + sign;
   
if (WiFi.status() == WL_CONNECTED) {
    WiFiClientSecure client;
    client.setInsecure();
    HTTPClient http;
    http.begin(client,url);
    http.addHeader("Content-Type", "application/json");
   
    DynamicJsonDocument doc(2048);
    doc["msgtype"] = "text";
    JsonObject text = doc.createNestedObject("text");
    text["content"] = msgAlert;
    String httpRequestData;
   
    serializeJson(doc, httpRequestData);
   
    int httpResponseCode = http.POST(httpRequestData);
   
    if (httpResponseCode > 0) {
      String response = http.getString();
      Serial.println(httpResponseCode);
      Serial.println(response);
    } else {
      Serial.print("Error on sending POST: ");
      Serial.println(httpResponseCode);
    }
   
    http.end();
} else {
    Serial.println("Error in WiFi connection");
}
}


String base64_encode(byte* input, int length) {
String encoded;
int i = 0;
while (i < length) {
    int octet_a = i < length ? input : 0;
    int octet_b = i < length ? input : 0;
    int octet_c = i < length ? input : 0;

    int triple = (octet_a << 0x10) + (octet_b << 0x08) + octet_c;

    encoded += base64_chars[(triple >> 3 * 6) & 0x3F];
    encoded += base64_chars[(triple >> 2 * 6) & 0x3F];
    encoded += base64_chars[(triple >> 1 * 6) & 0x3F];
    encoded += base64_chars[(triple >> 0 * 6) & 0x3F];
}
for (int j = 0; j < (3 - length % 3) % 3; j++) {
    encoded.setCharAt(encoded.length() - 1 - j, '=');
}
return encoded;
}

String hmac_sha256_base64(const String& key, const String& message) {
int block_size = 64;
byte key_copy;
memset(key_copy, 0, block_size);

if (key.length() > block_size) {
    SHA256 sha256;
    sha256.update((const byte*)key.c_str(), key.length());
    sha256.finalize(key_copy, block_size);
} else {
    memcpy(key_copy, key.c_str(), key.length());
}

byte o_key_pad;
byte i_key_pad;

for (int i = 0; i < block_size; i++) {
    o_key_pad = 0x5c ^ key_copy;
    i_key_pad = 0x36 ^ key_copy;
}

SHA256 inner_sha256;
inner_sha256.update(i_key_pad, block_size);
inner_sha256.update((const byte*)message.c_str(), message.length());

byte inner_hash;
inner_sha256.finalize(inner_hash, 32);

SHA256 outer_sha256;
outer_sha256.update(o_key_pad, block_size);
outer_sha256.update(inner_hash, 32);

byte hmac_code;
outer_sha256.finalize(hmac_code, 32);

return base64_encode(hmac_code, 32);
}


rxxcy 发表于 2024-7-4 09:11

`被chatgpt浪费了很多时间`我不认可

爱生活520 发表于 2024-7-4 09:36

虽然看不懂,依然奥利给原创:lol

motto 发表于 2024-10-30 23:15

优秀啊,支持原创,码起来学习

ssmss 发表于 2024-11-1 15:39

优秀啊,支持原创

yakey 发表于 2024-11-1 15:42


优秀啊,支持原创,码起来学习

小白无常 发表于 2024-11-2 09:38

timestamp = str(round((time.time() + 946656000) * 1000)){:1_918:}
页: [1]
查看完整版本: ESP32 & ESP8266 - micropython & C++调用钉钉群机器人群发消息