一种内网服务器程序接收钉钉群机器人消息的方式
本帖最后由 duminghan23 于 2024-11-19 13:45 编辑基于钉钉群机器人的消息接收并处理
使用钉钉提供的stream方式进行连接钉钉服务器,当你再钉钉群里艾特对应的机器人后,此消息会被程序接收到,已经打印了详细的日志信息,可以根据日志信息进行二次开发。
```python
# !/usr/bin/env python
import argparse # 命令行参数的解析
import logging
from dingtalk_stream import AckMessage
import dingtalk_stream
# import requests,json
def get_instructions(info_text):
# 业务逻辑部分写在这里
return '输入有误。帮助文档\n'+\
'其他功能敬请期待...'
def setup_logger(): # 配置日志记录器,设置日志格式、输出到控制台,并设定日志级别为INFO。
logger = logging.getLogger()
handler = logging.StreamHandler()
handler.setFormatter(
logging.Formatter('%(asctime)s %(name)-8s %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s'))
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
def define_options(): # 用户提供--client_id和--client_secret,这些通常是与钉钉应用相关的密钥
parser = argparse.ArgumentParser()
parser.add_argument(
'--client_id', dest='client_id', required=True,
help='app_key or suite_key from https://open-dev.digntalk.com'
)
parser.add_argument(
'--client_secret', dest='client_secret', required=True,
help='app_secret or suite_secret from https://open-dev.digntalk.com'
)
options = parser.parse_args()
return options
class EchoTextHandler(dingtalk_stream.ChatbotHandler): # 继承自dingtalk_stream.ChatbotHandler,这是一个异步处理类,用于处理从钉钉聊天机器人接收到的消息。收到消息后,会回复相同的内容,并返回确认状态。
def __init__(self, logger: logging.Logger = None):
super(dingtalk_stream.ChatbotHandler, self).__init__()
if logger:
self.logger = logger
async def process(self, callback: dingtalk_stream.CallbackMessage):
incoming_message = dingtalk_stream.ChatbotMessage.from_dict(callback.data)
text = incoming_message.text.content.strip()
self.logger.info('mylog----callback.data:')
self.logger.info(callback.data)
self.logger.info('mylog----incoming_message:')
self.logger.info(incoming_message)
self.logger.info('mylog----text:')
self.logger.info(text)
# print('\n\n',callback.data,'\n\n',incoming_message,'\n\n',text)
resout = get_instructions(text)
self.logger.info('mylog----resout:')
self.logger.info(resout)
# print(resout)
self.reply_text(resout, incoming_message)
return AckMessage.STATUS_OK, 'OK'
def main():
logger = setup_logger()
# options = define_options()
# credential = dingtalk_stream.Credential(options.client_id, options.client_secret)
credential = dingtalk_stream.Credential('钉钉应用授权--client_id', '钉钉应用授权--client_secret')
client = dingtalk_stream.DingTalkStreamClient(credential)
client.register_callback_handler(dingtalk_stream.chatbot.ChatbotMessage.TOPIC, EchoTextHandler(logger))
client.start_forever()
if __name__ == '__main__':
main()
``` client_id和client_secret如何获取? 学习一下,谢谢 学一下,谢谢 学习了,谢谢楼主分享 感谢大佬分享 wkkk0920 发表于 2024-11-19 15:39
client_id和client_secret如何获取?
这是钉钉的认证信息吧,钉钉开发者平台去搜呗 感谢分享,学习了。多谢!!1 感谢分享,学习了。多谢 厉害厉害,学习了
页:
[1]
2