js20184 发表于 2021-11-11 10:16

Pythons实现向dingding发送消息

本帖最后由 js20184 于 2021-11-11 15:58 编辑

使用Python 可以向钉钉发送消息,可以将任务插在程序之中,可以进行报警、异常提醒等动作

引用的程序包
import json
import requests
import logging

def send_ding_msg(webhook, msg):
    "发送text类型钉钉消息通知,webhook需要使用dingding机器人的唯一webhook,msg 传入你需要写的内容"

    # 构造钉钉消息头,必须制定utf-8编码
    headers = {"Content-Type": "application/json;charset=utf-8"}
    # 构造钉钉消息体,拷贝text消息格式
    msg_body = {
      "msgtype": "text",
      "text": {"content": msg},
      "at": {
            # 整体注释只发消息不@人
            # 指定需要@的人
            # "atMobiles": [
            #   "156xxxx8827"
            # ],
            # @所有人
            # "isAtAll": True
      }
    }
    # 向hook地址发送post请求
    try:
      res = requests.post(webhook, data=json.dumps(msg_body), headers=headers)
      res = res.json()
    except Exception as e:
      logging.error("请求异常: %s" % e)
      res = None

    return res



简单的一个消息发送任务,大家可以一起来讨论

js20184 发表于 2021-11-13 10:28

wahaha020 发表于 2021-11-12 10:47
这不是一个完整Demo吧,完整的贴一下啊,,,参数如传,,,

# 这个webhook值在钉钉群里创建机器人即可看到
    webhook = "https://oapi.dingtalk.com/robot/send?access_token=XXXXXXXXXXXXX"
    # 这个参数是你需要发送的内容
    msg = "时间: 我是测试机器人"
    # 将参数传入函数中
    send_ding_msg(webhook, msg)

DarkHC 发表于 2021-11-15 14:11

import aiohttp
import loguru


import base64
import hashlib
import hmac
from typing import Union
from urllib.parse import quote_plus


async def hmac_sign(timestamp: Union, secret: str) -> str:
    """

    hmac哈希函数
    :param timestamp: 时间戳
    :param secret: 安全码
    :return: 哈希值
    """
    string_to_sign = f'{timestamp}\n{secret}'

    hmac_code = hmac.new(secret.encode('utf-8'), string_to_sign.encode('utf-8'), digestmod=hashlib.sha256).digest()
    sign = quote_plus(base64.b64encode(hmac_code))

    return sign

PUSH_URL = "https://oapi.dingtalk.com/robot/send"
logger = loguru.logger

async def push_msg(msg: str, access_token: str, secret: str, receives: list = None, *, request: Request) -> dict:
    """
    消息推送
    :param msg: 消息
    :param access_token:系统颁发的token
    :param secret: 安全码
    :param receives: 收件人
    :param request: FastApi的Request对象 可以忽略
    :return:
    """
    push_result = {
      'status': False,
      'msg': ''
    }
    timestamp = str(int(time.time() * 1000))

    sign = await hmac_sign(timestamp, secret)

    params = {
      "access_token": access_token,
      "timestamp": timestamp,
      "sign": sign,
    }

    data = {
      "msgtype": "text",
      "text": {"content": msg},
      "at": {
            "atMobiles": receives if receives else [],
            "isAtAll": True if not receives else False
      }
    }

    try:
      async with aiohttp.ClientSession() as session:
            async with session.post(PUSH_URL, params=params, json=data) as resp:
                response_json = await resp.json()
    except Exception as e:
      logger.warning(f"事件: {request.ctx.event}, 推送消息异常: {e}")
    else:
      # {'errcode': 310000, 'errmsg': 'sign not match,
      # more: '}
      # {"errcode":0,"errmsg":"ok"}
      err_code = response_json.get('errcode', -1)
      if err_code != 0:
            push_result['msg'] = response_json.get('errmsg', '推送失败')
      else:
            push_result['status'] = True
            push_result['msg'] = '推送成功'
      return push_result

zyy22664488 发表于 2021-11-11 12:04

感谢分享正好需要

molinchz 发表于 2021-11-11 12:53

大佬,请问下 try这里是什么 意思 ,能解答 下吗谢谢

suhaoyue 发表于 2021-11-11 13:21

感谢分享,钉钉用起来的人太多太多了

sz090955 发表于 2021-11-11 13:45

没太看出和dingding的关联,似乎就是一个post请求

joivyhnman 发表于 2021-11-11 13:55

molinchz 发表于 2021-11-11 12:53
大佬,请问下 try这里是什么 意思 ,能解答 下吗谢谢

意思就是正常 就按照这个发,如果不正常就报错。 try 和下面的 except 相对应。

Bug!4u. 发表于 2021-11-11 14:57

大佬,要贴就贴完整的嘛,这算啥~

JuncoJet 发表于 2021-11-11 15:03

一看代码就不完整,webhook呢

js20184 发表于 2021-11-11 15:55

Bug!4u. 发表于 2021-11-11 14:57
大佬,要贴就贴完整的嘛,这算啥~

兄弟这就是完整的一个函数,直接往里面传参数就可以啦,很简单的

js20184 发表于 2021-11-11 15:55

JuncoJet 发表于 2021-11-11 15:03
一看代码就不完整,webhook呢

这就是一个函数,直接传对应的参数就可以啦{:1_937:}
页: [1] 2 3
查看完整版本: Pythons实现向dingding发送消息