[Python] 纯文本查看 复制代码 import aiohttp
import loguru
import base64
import hashlib
import hmac
from typing import Union
from urllib.parse import quote_plus
async def hmac_sign(timestamp: Union[str, int], secret: str) -> str:
"""
hmac哈希函数
:param timestamp: 时间戳
:param secret: 安全码
:return: 哈希值
"""
string_to_sign = f'{timestamp}\n{secret}'
hmac_code = hmac.new(secret.encode('utf-8'), string_to_sign.encode('utf-8'), digestmod=hashlib.sha256).digest()
sign = quote_plus(base64.b64encode(hmac_code))
return sign
PUSH_URL = "https://oapi.dingtalk.com/robot/send"
logger = loguru.logger
async def push_msg(msg: str, access_token: str, secret: str, receives: list = None, *, request: Request) -> dict:
"""
消息推送
:param msg: 消息
:param access_token: 系统颁发的token
:param secret: 安全码
:param receives: 收件人
:param request: FastApi的Request对象 可以忽略
:return:
"""
push_result = {
'status': False,
'msg': ''
}
timestamp = str(int(time.time() * 1000))
sign = await hmac_sign(timestamp, secret)
params = {
"access_token": access_token,
"timestamp": timestamp,
"sign": sign,
}
data = {
"msgtype": "text",
"text": {"content": msg},
"at": {
"atMobiles": receives if receives else [],
"isAtAll": True if not receives else False
}
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(PUSH_URL, params=params, json=data) as resp:
response_json = await resp.json()
except Exception as e:
logger.warning(f"事件: {request.ctx.event}, 推送消息异常: {e}")
else:
# {'errcode': 310000, 'errmsg': 'sign not match,
# more: [https://ding-doc.dingtalk.com/doc#/serverapi2/qf2nxq]'}
# {"errcode":0,"errmsg":"ok"}
err_code = response_json.get('errcode', -1)
if err_code != 0:
push_result['msg'] = response_json.get('errmsg', '推送失败')
else:
push_result['status'] = True
push_result['msg'] = '推送成功'
return push_result |