Pythons实现向dingding发送消息
本帖最后由 js20184 于 2021-11-11 15:58 编辑使用Python 可以向钉钉发送消息,可以将任务插在程序之中,可以进行报警、异常提醒等动作
引用的程序包
import json
import requests
import logging
def send_ding_msg(webhook, msg):
"发送text类型钉钉消息通知,webhook需要使用dingding机器人的唯一webhook,msg 传入你需要写的内容"
# 构造钉钉消息头,必须制定utf-8编码
headers = {"Content-Type": "application/json;charset=utf-8"}
# 构造钉钉消息体,拷贝text消息格式
msg_body = {
"msgtype": "text",
"text": {"content": msg},
"at": {
# 整体注释只发消息不@人
# 指定需要@的人
# "atMobiles": [
# "156xxxx8827"
# ],
# @所有人
# "isAtAll": True
}
}
# 向hook地址发送post请求
try:
res = requests.post(webhook, data=json.dumps(msg_body), headers=headers)
res = res.json()
except Exception as e:
logging.error("请求异常: %s" % e)
res = None
return res
简单的一个消息发送任务,大家可以一起来讨论 wahaha020 发表于 2021-11-12 10:47
这不是一个完整Demo吧,完整的贴一下啊,,,参数如传,,,
# 这个webhook值在钉钉群里创建机器人即可看到
webhook = "https://oapi.dingtalk.com/robot/send?access_token=XXXXXXXXXXXXX"
# 这个参数是你需要发送的内容
msg = "时间: 我是测试机器人"
# 将参数传入函数中
send_ding_msg(webhook, msg) import aiohttp
import loguru
import base64
import hashlib
import hmac
from typing import Union
from urllib.parse import quote_plus
async def hmac_sign(timestamp: Union, secret: str) -> str:
"""
hmac哈希函数
:param timestamp: 时间戳
:param secret: 安全码
:return: 哈希值
"""
string_to_sign = f'{timestamp}\n{secret}'
hmac_code = hmac.new(secret.encode('utf-8'), string_to_sign.encode('utf-8'), digestmod=hashlib.sha256).digest()
sign = quote_plus(base64.b64encode(hmac_code))
return sign
PUSH_URL = "https://oapi.dingtalk.com/robot/send"
logger = loguru.logger
async def push_msg(msg: str, access_token: str, secret: str, receives: list = None, *, request: Request) -> dict:
"""
消息推送
:param msg: 消息
:param access_token:系统颁发的token
:param secret: 安全码
:param receives: 收件人
:param request: FastApi的Request对象 可以忽略
:return:
"""
push_result = {
'status': False,
'msg': ''
}
timestamp = str(int(time.time() * 1000))
sign = await hmac_sign(timestamp, secret)
params = {
"access_token": access_token,
"timestamp": timestamp,
"sign": sign,
}
data = {
"msgtype": "text",
"text": {"content": msg},
"at": {
"atMobiles": receives if receives else [],
"isAtAll": True if not receives else False
}
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(PUSH_URL, params=params, json=data) as resp:
response_json = await resp.json()
except Exception as e:
logger.warning(f"事件: {request.ctx.event}, 推送消息异常: {e}")
else:
# {'errcode': 310000, 'errmsg': 'sign not match,
# more: '}
# {"errcode":0,"errmsg":"ok"}
err_code = response_json.get('errcode', -1)
if err_code != 0:
push_result['msg'] = response_json.get('errmsg', '推送失败')
else:
push_result['status'] = True
push_result['msg'] = '推送成功'
return push_result 感谢分享正好需要 大佬,请问下 try这里是什么 意思 ,能解答 下吗谢谢 感谢分享,钉钉用起来的人太多太多了 没太看出和dingding的关联,似乎就是一个post请求 molinchz 发表于 2021-11-11 12:53
大佬,请问下 try这里是什么 意思 ,能解答 下吗谢谢
意思就是正常 就按照这个发,如果不正常就报错。 try 和下面的 except 相对应。 大佬,要贴就贴完整的嘛,这算啥~ 一看代码就不完整,webhook呢 Bug!4u. 发表于 2021-11-11 14:57
大佬,要贴就贴完整的嘛,这算啥~
兄弟这就是完整的一个函数,直接往里面传参数就可以啦,很简单的 JuncoJet 发表于 2021-11-11 15:03
一看代码就不完整,webhook呢
这就是一个函数,直接传对应的参数就可以啦{:1_937:}