腾讯兔小巢是一款方便而且免费的反馈系统,而且开发团队很活跃一直在更新功能,但唯独一直没提供实时通知功能,于是自己用云函数写了一个。
可以支持主贴发布,主贴更新,回复发布,回复更新4种类型的通知推送。并且同步记录日志到日志库中方便检索
推送效果:
使用方法:- 在阿里云函数里增加一个http函数:运行环境选择python3,函数名称和触发器名称随便写,认证方式选择 anonymous,请求方式选择post
- 进入函数配置,增加2个环境变量:hook_url填写企业微信或钉钉机器人的webhook地址,product_id填写兔小巢的产品id
- 下载本仓库的代码并上传到云函数中
- 在自定义域名中增加一个域名路径,并填写到兔小巢的新反馈实时通知中。
- all done , enjoy it .
代码:
[Python] 纯文本查看 复制代码 # -*- coding: utf-8 -*-
import logging,json,urllib,requests,os
webhook = os.environ.get('hook_url')
product_id = os.environ.get('product_id')
def log(context): #写日志
logger = logging.getLogger()
logger.info(context)
return context
def send(content,url): #发送信息
headers = {'Content-Type': 'application/json'}
data = {
"msgtype": "markdown",
"markdown": {
"content": content,
}
}
r = requests.post(url, headers=headers, json=data)
return r.content
def handler(environ, start_response):
#获取回调内容
try:
request_body_size = int(environ.get('CONTENT_LENGTH', 0))
except (ValueError):
request_body_size = 0
request_body = environ['wsgi.input'].read(request_body_size)
param = json.loads(request_body.decode('utf8'))
#设定通知变量
action = param['type']
content = param['payload']
tid = param['id']
if "post" in action:
post_user = content['post']['nick_name']
post_content=content['post']['content']
post_url=content['post']['post_url']
post_extra = content['post'].get('extra')
extra_content="\n"
if len(post_extra) > 0:
for k,v in post_extra.items():
extra_content+=">"+str(k)+":"+str(v)+"\n"
text="**"+action.replace('post', '主贴').replace('created', '创建').replace('updated', '更新')+"**\n\n["+post_user+":"+post_content+"]("+post_url+")"+extra_content
if "reply" in action:
reply_user = content['reply']['nick_name']
reply_content=content['reply']['content']
reply_url="https://support.qq.com/products/"+str(product_id)+"/post/"+str(content['reply']['f_title_id'])
text="**"+action.replace('reply', '回复').replace('created', '创建').replace('updated', '更新')+"**\n\n["+reply_user+":"+reply_content+"]("+reply_url+")"
#记录日志
log(text.replace('\n', '').replace('\r', ''))
#发送信息
response_body=send(text,webhook)
#响应回调
status = '200 OK'
response_headers = [('Content-type', 'text/plain')]
start_response(status, response_headers)
return [str(response_body).encode('UTF-8')]
代码仓库地址: sikros/txc-webhook (github.com)
|