好友
阅读权限20
听众
最后登录1970-1-1
|
qzwsa
发表于 2024-12-9 15:51
本帖最后由 qzwsa 于 2024-12-9 16:52 编辑
工作需要查阅钉钉的审批数据开发的一个小工具,本工具只是保存审批的数据为json,如有其他的用途需要按需开发,小众工具用得到的可能比较少。并且代码核心内容不多,主要是在处理可视化部分,纯分享。
1、使用前提必须配置有钉钉的应用
2、获取到应用的相关参数:AppKey、AppSecret、AgentID、UserID
3、mac中开发后打包后的程序在windows10运行正常
4、下载地址:https://wwzm.lanzoub.com/ir5GW2hnfe1i
5、会同步发布在52和csdn
6、运行界面:
7、dingtalk_approval.py代码
[Python] 纯文本查看 复制代码 import requests
import time
import json
from typing import Dict, Any, List
from datetime import datetime, timedelta
class DingTalkApproval:
def __init__(self):
self.app_key = ""
self.app_secret = ""
self.agent_id = ""
self.user_id = ""
self.access_token = None
self.token_expires = 0
self.base_url = "https://oapi.dingtalk.com"
def get_access_token(self) -> str:
"""获取访问令牌"""
if self.access_token and time.time() < self.token_expires:
return self.access_token
url = f"{self.base_url}/gettoken"
params = {
"appkey": self.app_key,
"appsecret": self.app_secret
}
response = requests.get(url, params=params)
result = response.json()
if result.get("errcode") == 0:
self.access_token = result["access_token"]
self.token_expires = time.time() + 7000
return self.access_token
else:
raise Exception(f"获取access_token失败: {result}")
def get_visible_process_forms(self, userid: str) -> List[Dict[str, Any]]:
"""获取企业所有可管理的表单"""
url = f"{self.base_url}/topapi/process/template/manage/get"
params = {
"access_token": self.get_access_token()
}
data = {
"userid": userid,
"page_size": 100,
"page_start": 0
}
all_forms = []
while True:
response = requests.post(url, params=params, json=data)
result = response.json()
if result.get("errcode") == 0:
result_data = result.get("result", {})
if isinstance(result_data, list):
for form in result_data:
if form.get("flow_title"):
formatted_form = {
"name": form.get("flow_title"),
"process_code": form.get("process_code", ""),
"create_time": form.get("gmt_modified", "未知"),
"status": "ENABLED" if not form.get("is_new_process") else "DISABLED"
}
all_forms.append(formatted_form)
break
else:
forms = result_data.get("process_template_list", [])
for form in forms:
if form.get("flow_title"):
formatted_form = {
"name": form.get("flow_title"),
"process_code": form.get("process_code", ""),
"create_time": form.get("gmt_modified", "未知"),
"status": "ENABLED" if not form.get("is_new_process") else "DISABLED"
}
all_forms.append(formatted_form)
if not result_data.get("next_cursor"):
break
data["page_start"] = data["page_start"] + data["page_size"]
else:
raise Exception(f"获取审批表单列表失败: {result}")
# 对表单列表按创建时间排序
all_forms.sort(key=lambda x: x.get('create_time', '0')) # 按时间升序排序
return all_forms
def get_process_instance_ids(self, process_code: str, start_time: int = None, end_time: int = None, callback=None) -> List[str]:
"""获取审批实例ID列表"""
url = f"{self.base_url}/topapi/processinstance/listids"
params = {
"access_token": self.get_access_token()
}
# 确保时间戳在合理范围内
current_time = int(datetime.now().timestamp() * 1000)
if not start_time:
start_time = current_time - (7 * 24 * 60 * 60 * 1000) # 7天前
if not end_time:
end_time = current_time
# 限制时间范围
if end_time > current_time:
end_time = current_time
all_ids = []
next_cursor = 0
page_count = 0
while True:
page_count += 1
data = {
"process_code": process_code,
"start_time": start_time,
"end_time": end_time,
"size": 20, # 每页大小
"cursor": next_cursor,
"userid": "" # 可选参数,不填表示查询所有用户
}
# 回调通知进度
if callback:
callback(f"正在获取第 {page_count} 页数据(每页20条)...")
response = requests.post(url, params=params, json=data)
result = response.json()
if result.get("errcode") == 0:
result_data = result.get("result", {})
instance_ids = result_data.get("list", [])
all_ids.extend(instance_ids)
# 回调通知当前获取的数量
if callback:
callback(f"已获取 {len(all_ids)} 条数据")
# 检查是否有更多数据
if not result_data.get("next_cursor"):
break
next_cursor = result_data.get("next_cursor")
else:
raise Exception(f"获取审批实例ID列表失败: {result}")
# 回调通知最终结果
if callback:
callback(f"获取完成,总共 {len(all_ids)} 条数据,共查询 {page_count} 页")
return all_ids
def get_process_instance_detail(self, instance_id: str) -> Dict[str, Any]:
"""获取单个审批实例详情"""
url = f"{self.base_url}/topapi/processinstance/get"
params = {
"access_token": self.get_access_token()
}
data = {
"process_instance_id": instance_id
}
response = requests.post(url, params=params, json=data)
result = response.json()
if result.get("errcode") == 0:
return result.get("process_instance", {})
else:
raise Exception(f"获取审批实例详情失败: {result}")
def main():
try:
dingtalk = DingTalkApproval()
# 确保必须先设置验证参数
if not all([dingtalk.app_key, dingtalk.app_secret, dingtalk.agent_id, dingtalk.user_id]):
print("请先设置必要的验证参数!")
return
print("\n1. 获取企业可管理的表单列表:")
forms = dingtalk.get_visible_process_forms(dingtalk.user_id)
print(f"找到 {len(forms)} 个审批表单")
print("\n可用的审批表单列表 (按创建时间排序,最新的在最后):")
print("-" * 50)
for index, form in enumerate(forms, 1):
create_time = form.get('create_time', '未知')
# 格式化时间显示
if create_time != '未知':
try:
# 尝试将时间字符串转换为更友好的格式
dt = datetime.strptime(create_time, "%Y-%m-%d %H:%M:%S")
create_time = dt.strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
pass
print(f"{index}. 表单名称: {form.get('name')}")
print(f" 表单编码: {form.get('process_code')}")
print(f" 创建时间: {create_time}")
print(f" 是否启用: {'是' if form.get('status') == 'ENABLED' else '否'}")
print("-" * 50)
while True:
try:
choice = int(input("\n请输入要查询的表单序号(1-{0}): ".format(len(forms))))
if 1 <= choice <= len(forms):
selected_form = forms[choice - 1]
break
else:
print("无效的选择��请重新输入")
except ValueError:
print("请输入有效的数字")
# 让用户输入开始和结束时间
while True:
try:
start_date = input("\n请输入开始时间(格式:2023-01-01): ")
end_date = input("请输入结束时间(格式:2023-12-31): ")
# 转换日期字符串为datetime对象,使用北京时间
start_datetime = datetime.strptime(f"{start_date} 00:00:00", "%Y-%m-%d %H:%M:%S")
end_datetime = datetime.strptime(f"{end_date} 23:59:59", "%Y-%m-%d %H:%M:%S")
# 确保时间范围有效
current_datetime = datetime.now()
if start_datetime > current_datetime:
print("开始时间不能晚于当前时间,请重新输入")
continue
if start_datetime > end_datetime:
print("开始时间必须早于或等于结束时间,请重新输入")
continue
# 转换为毫秒级时间戳
start_time = int(start_datetime.timestamp() * 1000)
end_time = int(end_datetime.timestamp() * 1000)
# 确保时间戳在合理范围内
current_time = int(current_datetime.timestamp() * 1000)
if end_time > current_time:
end_time = current_time
print(f"\n调试信息:")
print(f"开始时间: {start_datetime.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"结束时间: {end_datetime.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"开始时间戳: {start_time}")
print(f"结束时间戳: {end_time}")
print(f"当前时间戳: {current_time}")
break
except ValueError:
print("请输入正确的日期格式,例如:2023-01-01")
print(f"\n开始查询 '{selected_form.get('name')}' 从 {start_date} 到 {end_date} 的审批记录...")
instance_ids = dingtalk.get_process_instance_ids(
selected_form.get('process_code'),
start_time=start_time,
end_time=end_time
)
print(f"找到 {len(instance_ids)} 个审批实例")
for instance_id in instance_ids:
try:
instance_detail = dingtalk.get_process_instance_detail(instance_id)
print("\n审批实例详情:")
print(f"实例ID: {instance_id}")
print(f"标题: {instance_detail.get('title')}")
print(f"状态: {instance_detail.get('status')}")
print(f"发起人: {instance_detail.get('originator_userid')}")
print(f"发起时间: {instance_detail.get('create_time')}")
print(f"完成时间: {instance_detail.get('finish_time')}")
print("-" * 30)
with open(f"approval_detail_{instance_id}.json", "w", encoding="utf-8") as f:
json.dump(instance_detail, f, ensure_ascii=False, indent=2)
except Exception as detail_error:
print(f"获取实例 {instance_id} 详情失败: {str(detail_error)}")
with open(f"approval_forms_{dingtalk.user_id}.json", "w", encoding="utf-8") as f:
json.dump(forms, f, ensure_ascii=False, indent=2)
print(f"\n所有表单信息已保存到 approval_forms_{dingtalk.user_id}.json")
except Exception as e:
print(f"发生错误: {str(e)}")
if __name__ == "__main__":
main()
8、dingtalk_approval_gui.py代码
[Python] 纯文本查看 复制代码 import tkinter as tk
from tkinter import ttk, messagebox, scrolledtext
import json
import os
from datetime import datetime, timedelta
from dingtalk_approval import DingTalkApproval
import threading
from tkcalendar import DateEntry
import concurrent.futures
from queue import Queue
import time
import queue
class DingTalkApprovalGUI:
def __init__(self, root):
self.root = root
self.root.title("钉钉审批数据查询工具_52破解_qzwsa")
self.root.geometry("1500x800")
self.root.resizable(True, True)
# 配置文件路径
self.config_file = "dingtalk_config.json"
# 初始化变量,从配置文件加载
self.load_config()
self.dingtalk = None
self.forms = []
self.selected_form = None
# 添加用于线程间通信的队列
self.result_queue = Queue()
# 添加用于控制并发数的变量
self.max_workers = 2
# 创建主框架并配置网格权重
self.main_frame = ttk.Frame(root)
self.main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5)
# 配置主框架的网格
self.main_frame.grid_columnconfigure(0, weight=2) # 左列占比更大
self.main_frame.grid_columnconfigure(1, weight=3) # 中列占比最大
self.main_frame.grid_columnconfigure(2, weight=0) # 右列不自动扩展
self.main_frame.grid_rowconfigure(0, weight=1) # 使行也可伸展
# 创建左中右三列框架
self.left_frame = ttk.Frame(self.main_frame)
self.middle_frame = ttk.Frame(self.main_frame)
self.right_frame = ttk.Frame(self.main_frame)
# 使用grid布局并启用拉伸
self.left_frame.grid(row=0, column=0, sticky='nsew', padx=5)
self.middle_frame.grid(row=0, column=1, sticky='nsew', padx=5)
self.right_frame.grid(row=0, column=2, sticky='nsew', padx=5)
# 配置各个框架的网格权重
self.left_frame.grid_rowconfigure(4, weight=1) # 表单列表区域可伸展
self.middle_frame.grid_rowconfigure(0, weight=1) # 结果区域可伸展
self.right_frame.grid_rowconfigure(0, weight=1) # JSON预览区域可伸展
# 创建各个区域
self.create_connection_area()
self.create_form_list_area()
self.create_date_selection_area()
self.create_query_area()
self.create_result_area()
self.create_json_preview_area()
def load_config(self):
"""从配置文件加载设置"""
try:
if os.path.exists(self.config_file):
with open(self.config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
self.app_key = config.get('app_key', '')
self.app_secret = config.get('app_secret', '')
self.agent_id = config.get('agent_id', '')
self.user_id = config.get('user_id', '')
else:
self.app_key = ""
self.app_secret = ""
self.agent_id = ""
self.user_id = ""
except Exception as e:
print(f"加载配置文件失败: {e}")
self.app_key = ""
self.app_secret = ""
self.agent_id = ""
self.user_id = ""
def save_config(self):
"""保存配置到文件"""
try:
config = {
'app_key': self.app_key,
'app_secret': self.app_secret,
'agent_id': self.agent_id,
'user_id': self.user_id
}
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2, ensure_ascii=False)
return True
except Exception as e:
messagebox.showerror("错误", f"保存配置失败:{str(e)}")
return False
def create_connection_area(self):
# 创建连接配置区域
conn_frame = ttk.LabelFrame(self.left_frame, text="连接配置")
conn_frame.pack(fill=tk.X, pady=5)
# 创建顶部按钮框架
top_frame = ttk.Frame(conn_frame)
top_frame.grid(row=0, column=0, columnspan=3, sticky='e', padx=5, pady=2)
# 添加帮助按钮
help_button = ttk.Button(top_frame, text="使用说明", command=self.show_help)
help_button.pack(side=tk.RIGHT)
# 创建输入框和标签
fields = [
("AppKey:", self.app_key, "app_key_entry"),
("AppSecret:", self.app_secret, "app_secret_entry"),
("AgentID:", str(self.agent_id) if self.agent_id else "", "agent_id_entry"),
("UserID:", self.user_id, "user_id_entry")
]
for i, (label, default, name) in enumerate(fields):
ttk.Label(conn_frame, text=label).grid(row=i+1, column=0, padx=5, pady=2, sticky='e')
entry = ttk.Entry(conn_frame, width=50)
entry.insert(0, default)
entry.grid(row=i+1, column=1, padx=5, pady=2, sticky='w')
setattr(self, name, entry)
# 创建连接按钮
ttk.Button(conn_frame, text="连接并获取表单", command=self.connect_and_get_forms).grid(
row=len(fields)+1, column=0, columnspan=2, pady=10)
def create_form_list_area(self):
# 创建表单列表区域
form_frame = ttk.LabelFrame(self.left_frame, text="审批表单列表")
form_frame.pack(fill=tk.BOTH, expand=True, pady=5)
# 使form_frame持网格伸展
form_frame.grid_rowconfigure(0, weight=1)
form_frame.grid_columnconfigure(0, weight=1)
# 创建表单列表
columns = ("编号", "表单名称", "表单编码", "创建时间", "状态")
self.form_tree = ttk.Treeview(form_frame, columns=columns, show="headings")
self.form_tree.grid(row=0, column=0, sticky='nsew')
# 设置列题和宽度
for col in columns:
self.form_tree.heading(col, text=col)
width = 100 if col != "表单名称" else 200
self.form_tree.column(col, width=width)
# 添加滚动条
scrollbar = ttk.Scrollbar(form_frame, orient="vertical", command=self.form_tree.yview)
scrollbar.grid(row=0, column=1, sticky='ns')
self.form_tree.configure(yscrollcommand=scrollbar.set)
# 绑定点击事件
self.form_tree.bind('<<TreeviewSelect>>', self.on_form_select)
def on_form_select(self, event):
"""处理表单列表选择事件"""
selection = self.form_tree.selection()
if selection:
selected_index = int(self.form_tree.item(selection[0])['values'][0]) - 1
selected_form = self.forms[selected_index]
try:
# 尝试找并读取对应的JSON文件
forms_file = None
# 查找最新的文件夹
folders = [d for d in os.listdir() if os.path.isdir(d) and d.startswith("审批数据_")]
if folders:
# 按文件夹名称排序,获取最新的
latest_folder = sorted(folders)[-1]
forms_file = os.path.join(latest_folder, f"approval_forms_{self.user_id}.json")
if forms_file and os.path.exists(forms_file):
with open(forms_file, 'r', encoding='utf-8') as f:
forms_data = json.load(f)
# 查找对应的表单数据
form_data = next(
(form for form in forms_data
if form.get('process_code') == selected_form.get('process_code')),
selected_form
)
# 显示JSON预览
self.format_json_with_highlighting(form_data)
else:
# 如果找不到文件接显示当前表单数据
self.format_json_with_highlighting(selected_form)
except Exception as e:
self.json_preview.delete(1.0, tk.END)
self.json_preview.insert(tk.END, f"读取表单数据失败: {str(e)}")
def create_date_selection_area(self):
# 创建日期选择区域
date_frame = ttk.LabelFrame(self.left_frame, text="查询时间范围")
date_frame.pack(fill=tk.X, pady=5)
# 开始日期
ttk.Label(date_frame, text="开始日期:").pack(side=tk.LEFT, padx=5)
self.start_date_entry = DateEntry(date_frame, width=12, background='darkblue',
foreground='white', borderwidth=2,
date_pattern='yyyy-mm-dd')
self.start_date_entry.pack(side=tk.LEFT, padx=5)
# 结束日期
ttk.Label(date_frame, text="结束日期:").pack(side=tk.LEFT, padx=5)
self.end_date_entry = DateEntry(date_frame, width=12, background='darkblue',
foreground='white', borderwidth=2,
date_pattern='yyyy-mm-dd')
self.end_date_entry.pack(side=tk.LEFT, padx=5)
# 设置默认日期:始日期为当天期前119天,结束日期为当天
today = datetime.now()
start_date = today - timedelta(days=119)
self.start_date_entry.set_date(start_date)
self.end_date_entry.set_date(today)
# 绑定开始日期变化事件
self.start_date_entry.bind("<<DateEntrySelected>>", self.update_end_date)
def update_end_date(self, event=None):
"""当开始日期变化时,自动更新结束日期为开始日期后的119天"""
try:
# 获取开始日期
start_date = self.start_date_entry.get_date()
# 计算结束日期(开始日期+119天)
end_date = start_date + timedelta(days=119)
# 如果计算的结束日期超过今天,则使用今天作为结束日期
today = datetime.now().date()
if end_date > today:
end_date = today
# ���新结束日期
self.end_date_entry.set_date(end_date)
except Exception as e:
messagebox.showerror("错误", f"更新结束日期失败: {str(e)}")
def create_query_area(self):
# 创建查询按钮区域
query_frame = ttk.Frame(self.left_frame)
query_frame.pack(fill=tk.X, pady=5)
# 创一行包含并发线程数和查询按钮
control_frame = ttk.Frame(query_frame)
control_frame.pack(fill=tk.X, pady=5)
# 左侧放置并发线程数控制
thread_control = ttk.Frame(control_frame)
thread_control.pack(side=tk.LEFT, padx=5)
ttk.Label(thread_control, text="并发线程数:").pack(side=tk.LEFT)
# 创建并初始化变量
self.thread_var = tk.StringVar(value='2')
self.thread_spinbox = ttk.Spinbox(
thread_control,
from_=1,
to=20,
width=5,
textvariable=self.thread_var
)
self.thread_spinbox.pack(side=tk.LEFT, padx=5)
# 右侧放置查询按钮
self.query_button = ttk.Button(control_frame, text="查询审批数据", command=self.start_query)
self.query_button.pack(side=tk.RIGHT, padx=5)
# 进度显示
self.progress_var = tk.StringVar(value="就绪")
self.progress_label = ttk.Label(query_frame, textvariable=self.progress_var)
self.progress_label.pack(pady=5)
def create_result_area(self):
# 创建结果显示区域
result_frame = ttk.LabelFrame(self.middle_frame, text="查询结果")
result_frame.pack(fill=tk.BOTH, expand=True, pady=5)
# 使result_frame支持网格伸展
result_frame.grid_rowconfigure(0, weight=1)
result_frame.grid_columnconfigure(0, weight=1)
# 创建文本区域
self.result_text = scrolledtext.ScrolledText(result_frame)
self.result_text.grid(row=0, column=0, sticky='nsew', padx=5, pady=5)
# 绑定点击事件
self.result_text.tag_configure("instance_id", foreground="blue", underline=1)
self.result_text.tag_bind("instance_id", "<Button-1>", self.show_json_preview)
def create_json_preview_area(self):
# 创建JSON预览区域
preview_frame = ttk.LabelFrame(self.right_frame, text="JSON预览")
preview_frame.pack(fill=tk.BOTH, expand=True, pady=5)
# 创建控制区域
control_frame = ttk.Frame(preview_frame)
control_frame.pack(fill=tk.X, padx=5, pady=2)
# 字体大小调整
ttk.Label(control_frame, text="字体大小:").pack(side=tk.LEFT, padx=2)
self.json_font_size = tk.IntVar(value=10)
font_spinbox = ttk.Spinbox(
control_frame,
from_=8,
to=72,
width=5,
textvariable=self.json_font_size,
command=self.update_json_font_size
)
font_spinbox.pack(side=tk.LEFT, padx=5)
# 宽度调整
ttk.Label(control_frame, text="宽度:").pack(side=tk.LEFT, padx=2)
self.json_width = tk.IntVar(value=80)
width_spinbox = ttk.Spinbox(
control_frame,
from_=40,
to=200,
width=5,
textvariable=self.json_width,
command=self.update_json_width
)
width_spinbox.pack(side=tk.LEFT, padx=5)
# 创建文本区域框架
text_frame = ttk.Frame(preview_frame)
text_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=2)
# 创建文本区域和滚动条
self.json_preview = tk.Text(text_frame, wrap=tk.NONE)
self.json_preview.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
# 添加垂直滚动条
v_scrollbar = ttk.Scrollbar(text_frame, orient="vertical", command=self.json_preview.yview)
v_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# 添加水平滚动条
h_scrollbar = ttk.Scrollbar(preview_frame, orient="horizontal", command=self.json_preview.xview)
h_scrollbar.pack(side=tk.BOTTOM, fill=tk.X)
# 配置文本框的滚动
self.json_preview.configure(
yscrollcommand=v_scrollbar.set,
xscrollcommand=h_scrollbar.set
)
# 配置JSON预览的初始字体和颜色
self.update_json_preview_style()
# 创建标签用于语法高亮
self.json_preview.tag_configure("key", foreground="#9cdcfe")
self.json_preview.tag_configure("string", foreground="#ce9178")
self.json_preview.tag_configure("number", foreground="#b5cea8")
self.json_preview.tag_configure("boolean", foreground="#569cd6")
# 记录初始宽度
self.initial_width = 400
self.right_frame.configure(width=self.initial_width)
self.right_frame.grid_propagate(False)
def update_json_preview_style(self):
"""更新JSON预览区域的样式"""
self.json_preview.configure(
font=('Consolas', self.json_font_size.get()),
bg='#1e1e1e', # 深色背景
fg='#d4d4d4', # 浅色文本
insertbackground='white', # 光标颜色
width=self.json_width.get() # 设置宽度
)
def update_json_font_size(self):
"""更新JSON预览区域的字体大小"""
try:
# 保存当前滚动位置
current_pos = self.json_preview.yview()
# 更新字体
self.update_json_preview_style()
# 恢复滚动位置
self.json_preview.yview_moveto(current_pos[0])
except Exception as e:
messagebox.showerror("错误", f"更新字体大��失败: {str(e)}")
def update_json_width(self):
"""更新JSON预览区域的宽度"""
try:
# 获取当前宽度
current_width = self.json_width.get()
# 计算新的框架宽度(每个字大约8像素)
new_frame_width = current_width * 8
# 更新框架宽度
self.right_frame.configure(width=new_frame_width)
# 更新文本样式
self.update_json_preview_style()
except Exception as e:
messagebox.showerror("错误", f"更新宽度失败: {str(e)}")
def connect_and_get_forms(self):
try:
# 获取输入的配置
self.app_key = self.app_key_entry.get().strip()
self.app_secret = self.app_secret_entry.get().strip()
self.agent_id = int(self.agent_id_entry.get().strip())
self.user_id = self.user_id_entry.get().strip()
# 创建DingTalkApproval实例并设置参数
self.dingtalk = DingTalkApproval()
self.dingtalk.app_key = self.app_key
self.dingtalk.app_secret = self.app_secret
self.dingtalk.agent_id = self.agent_id
self.dingtalk.user_id = self.user_id
# 获取表单列表
self.forms = self.dingtalk.get_visible_process_forms(self.user_id)
# 保存配置
if self.save_config():
print("配置已保存")
# 清空现有表单列表
for item in self.form_tree.get_children():
self.form_tree.delete(item)
# 显示表单列表
for i, form in enumerate(self.forms, 1):
create_time = form.get('create_time', '未知')
if create_time != '未知':
try:
dt = datetime.strptime(create_time, "%Y-%m-%d %H:%M:%S")
create_time = dt.strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
pass
status = '是' if form.get('status') == 'ENABLED' else '否'
self.form_tree.insert("", tk.END, values=(
i,
form.get('name'),
form.get('process_code'),
create_time,
status
))
messagebox.showinfo("成功", f"成功获取 {len(self.forms)} 个审批表单")
except Exception as e:
messagebox.showerror("错误", f"连接失败: {str(e)}")
def start_query(self):
# 检查是否已设置验证参数
if not all([self.dingtalk.app_key, self.dingtalk.app_secret,
self.dingtalk.agent_id, self.dingtalk.user_id]):
messagebox.showerror("错误", "请先设置并保存验证参数!")
return
try:
# 获取选中的表单
selection = self.form_tree.selection()
if not selection:
messagebox.showwarning("警告", "请先选择一个审批表单!")
return
# 获取日期范围
start_date = self.start_date_entry.get()
end_date = self.end_date_entry.get()
# 转换日期
start_datetime = datetime.strptime(f"{start_date} 00:00:00", "%Y-%m-%d %H:%M:%S")
end_datetime = datetime.strptime(f"{end_date} 23:59:59", "%Y-%m-%d %H:%M:%S")
# 检查日期有效
if start_datetime > datetime.now():
messagebox.showwarning("警告", "开始时间不能晚于当前时间!")
return
if start_datetime > end_datetime:
messagebox.showwarning("警告", "开始时间必须早于结束时间!")
return
# ���取选中表单的信息
selected_index = int(self.form_tree.item(selection[0])['values'][0]) - 1
self.selected_form = self.forms[selected_index]
# 禁用查询按钮
self.query_button.configure(state='disabled')
# 在新线程中执行查询
thread = threading.Thread(target=self.execute_query, args=(start_datetime, end_datetime))
thread.daemon = True
thread.start()
except ValueError:
messagebox.showerror("错", "请输入正确的日期格式(YYYY-MM-DD)!")
def process_instance(self, instance_data):
"""处理单个实例的详情获取和保存"""
i, instance_id = instance_data
try:
instance_detail = self.dingtalk.get_process_instance_detail(instance_id)
# 保存详情文件
file_path = os.path.join(self.current_save_folder, f"approval_detail_{instance_id}.json")
with open(file_path, "w", encoding="utf-8") as f:
json.dump(instance_detail, f, ensure_ascii=False, indent=2)
# 将结果放入列
self.result_queue.put({
'success': True,
'index': i,
'instance_id': instance_id,
'detail': instance_detail
})
except Exception as e:
# 将错误信息放入队列
self.result_queue.put({
'success': False,
'index': i,
'instance_id': instance_id,
'error': str(e)
})
def update_ui(self):
"""更新UI显示的方法"""
try:
while True:
try:
result = self.result_queue.get_nowait()
if result['success']:
# 显示成功的实例详情
instance_detail = result['detail']
# 先插入实例编号
self.result_text.insert(tk.END, f"\n实例 {result['index']}:\n")
# 插入实例ID并添加标签
id_text = f"实例ID: {result['instance_id']}"
self.result_text.insert(tk.END, id_text + "\n")
# 计算标签位置
start_pos = self.result_text.index("end-2c linestart")
end_pos = f"{start_pos}+{len(id_text)}c"
# 添加可点击标签
self.result_text.tag_add("instance_id", start_pos, end_pos)
# 插入其他信息
self.result_text.insert(tk.END,
f"标题: {instance_detail.get('title')}\n"
f"状态: {instance_detail.get('status')}\n"
f"发起人: {instance_detail.get('originator_userid')}\n"
f"发起时间: {instance_detail.get('create_time')}\n"
f"完成时间: {instance_detail.get('finish_time')}\n"
f"{'-' * 50}\n"
)
else:
self.log_message(f"获取实例 {result['instance_id']} 详情失败: {result['error']}")
self.result_text.see(tk.END)
self.result_queue.task_done()
except queue.Empty:
break
if not self.result_queue.empty() or self.tasks_pending:
self.root.after(100, self.update_ui)
else:
self.progress_var.set("查询完成")
self.query_button.configure(state='normal')
except Exception as e:
self.log_message(f"更新界面失败: {str(e)}")
def execute_query(self, start_datetime, end_datetime):
try:
# 更新并发线程数
try:
self.max_workers = int(self.thread_spinbox.get())
except ValueError:
self.max_workers = 2
# 创建保存文件夹
folder_name = f"审批数据_{start_datetime.strftime('%Y%m%d')}_{end_datetime.strftime('%Y%m%d')}"
self.current_save_folder = os.path.join(os.getcwd(), folder_name)
os.makedirs(self.current_save_folder, exist_ok=True)
# 转换为毫秒级时间戳
start_time = int(start_datetime.timestamp() * 1000)
end_time = int(end_datetime.timestamp() * 1000)
self.result_text.delete(1.0, tk.END)
self.log_message("开始查询审批实例...")
self.log_message(f"使用 {self.max_workers} 个并发线程")
self.log_message(f"查询时间范围: {start_datetime.strftime('%Y-%m-%d')} 至 {end_datetime.strftime('%Y-%m-%d')}")
self.log_message(f"保存路径: {self.current_save_folder}")
self.log_message("正在获取审批实例列表...\n")
# 获取审批实例ID列表,添加回调函数
instance_ids = self.dingtalk.get_process_instance_ids(
self.selected_form.get('process_code'),
start_time=start_time,
end_time=end_time,
callback=self.log_message # 传入日志函数作为回调
)
self.log_message("\n开始获取详细数据...")
# 使用线程池处理实例详情
self.tasks_pending = True
processed_count = 0
error_count = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 提交所有任务
futures = [
executor.submit(self.process_instance, (i, instance_id))
for i, instance_id in enumerate(instance_ids, 1)
]
# 更新进度显示
total = len(futures)
for i, future in enumerate(concurrent.futures.as_completed(futures), 1):
self.progress_var.set(f"正在处理: {i}/{total}")
try:
future.result()
processed_count += 1
except Exception as e:
error_count += 1
self.log_message(f"处理任务时出错: {e}")
# 每处理10个实例更新一次统计信息
if i % 10 == 0 or i == total:
self.log_message(
f"处理进度: {i}/{total} "
f"(成功: {processed_count}, 失败: {error_count})"
)
self.tasks_pending = False
# 保存表单信息
forms_file = os.path.join(self.current_save_folder, f"approval_forms_{self.user_id}.json")
with open(forms_file, "w", encoding="utf-8") as f:
json.dump(self.forms, f, ensure_ascii=False, indent=2)
self.log_message("\n处理完成统计:")
self.log_message(f"总计处理: {total} 个实例")
self.log_message(f"成功处理: {processed_count} 个")
self.log_message(f"处理失败: {error_count} 个")
self.log_message(f"数据保存在: {self.current_save_folder}")
# 启动UI更新,显示详细数据
self.root.after(100, self.update_ui)
except Exception as e:
self.log_message(f"\n查询过程出错: {str(e)}")
messagebox.showerror("错误", f"查询失败: {str(e)}")
self.query_button.configure(state='normal')
def format_json_with_highlighting(self, json_str):
"""格式化JSON并添加语法高亮"""
self.json_preview.delete(1.0, tk.END)
def format_nested_json(value):
"""处理嵌套的JSON字符串"""
try:
# 尝试解析可能是JSON字符串的值
if isinstance(value, str) and (value.startswith('[') or value.startswith('{')):
# 处理转义字符
value = value.replace('\\"', '"').replace('\\\\', '\\')
parsed = json.loads(value)
return json.dumps(parsed, ensure_ascii=False, indent=2)
except:
pass
return value
try:
# 如果输入是字符串,先解析成Python对象
if isinstance(json_str, str):
data = json.loads(json_str)
else:
data = json_str
def process_value(obj):
"""递归处理对象中的所有值"""
if isinstance(obj, dict):
return {k: process_value(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [process_value(item) for item in obj]
else:
return format_nested_json(obj)
# 处理所有嵌套的JSON字符串
processed_data = process_value(data)
# 新格式化JSON,确保缩进正确
formatted_json = json.dumps(processed_data, ensure_ascii=False, indent=2)
# 逐行处理并添加高亮
for line in formatted_json.split('\n'):
# 处理键值对
if ':' in line:
# 分割时保留冒号后的所有内容处理值中可能包含冒号的情况)
parts = line.split(':', 1)
if len(parts) == 2:
key, value = parts
# 插入键
self.json_preview.insert(tk.END, key + ':', 'key')
# 处理值
value = value.strip()
if value.startswith('"'):
self.json_preview.insert(tk.END, value + '\n', 'string')
elif value.lower() in ('true', 'false', 'null'):
self.json_preview.insert(tk.END, value + '\n', 'boolean')
elif value.replace('.','',1).isdigit():
self.json_preview.insert(tk.END, value + '\n', 'number')
else:
self.json_preview.insert(tk.END, value + '\n')
else:
# 处理其他行(如括号、数组素等)
line_stripped = line.strip()
if line_stripped.startswith('"'):
self.json_preview.insert(tk.END, line + '\n', 'string')
elif line_stripped.replace('.','',1).isdigit():
self.json_preview.insert(tk.END, line + '\n', 'number')
else:
self.json_preview.insert(tk.END, line + '\n')
except Exception as e:
self.json_preview.insert(tk.END, f"JSON格式化错误: {str(e)}")
def show_json_preview(self, event):
# 获取点击位置的行
index = self.result_text.index(f"@{event.x},{event.y}")
line = self.result_text.get(f"{index} linestart", f"{index} lineend")
# 提取实例ID
if "实例ID:" in line:
instance_id = line.split("实例ID:")[1].strip()
try:
# 读取对应的JSON文件
folder_path = self.current_save_folder
file_path = os.path.join(folder_path, f"approval_detail_{instance_id}.json")
if os.path.exists(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
json_data = json.load(f)
# 使用新的格式化方法显示JSON
self.format_json_with_highlighting(json_data)
except Exception as e:
self.json_preview.delete(1.0, tk.END)
self.json_preview.insert(tk.END, f"读取JSON文件失败: {str(e)}")
def log_message(self, message):
"""添加日志消息到结果文本"""
self.result_text.insert(tk.END, f"{message}\n")
self.result_text.see(tk.END)
self.result_text.update()
def start_resize(self, event):
"""开始调整大小"""
self.x = event.x_root
self.initial_width = self.right_frame.winfo_width()
def do_resize(self, event):
"""执行调整大小"""
try:
diff = event.x_root - self.x
new_width = self.initial_width + diff
# 获取主窗口宽度
window_width = self.root.winfo_width()
# 计算最大允许宽度(窗口宽度的60%)
max_width = int(window_width * 0.6)
# 调整限制范围
if new_width < 300: # 增加最小宽度
new_width = 300
elif new_width > max_width:
new_width = max_width
# 更新框架宽度
self.right_frame.configure(width=new_width)
# 强制更新显示
self.right_frame.update_idletasks()
except Exception as e:
print(f"调整大小时出错: {str(e)}")
def end_resize(self, event):
"""结束调整大小"""
# 置鼠标样式
self.sizer.configure(style='TFrame')
# 更新 JSON 预览区域的滚动条
self.json_preview.update_idletasks()
def show_help(self):
"""显示帮助信息"""
help_window = tk.Toplevel(self.root)
help_window.title("钉钉审批数据查询工具 - 使用说明")
help_window.geometry("800x600")
# 设置模态窗口
help_window.transient(self.root)
help_window.grab_set()
# 创建文本区域
help_text = scrolledtext.ScrolledText(
help_window,
wrap=tk.WORD,
padx=10,
pady=10,
font=('Microsoft YaHei UI', 10),
background='#f5f5f5'
)
help_text.pack(fill=tk.BOTH, expand=True)
# 设置标签样式
help_text.tag_configure('title', font=('Microsoft YaHei UI', 14, 'bold'))
help_text.tag_configure('heading', font=('Microsoft YaHei UI', 12, 'bold'))
help_text.tag_configure('normal', font=('Microsoft YaHei UI', 10))
# 帮助内容
help_content = """
# 钉钉审批数据查询工具使用说明
## 功能介绍
本工具用于批量获取钉钉审批数据,支持多线程并发查询,可导出详细的审批信息。
## 使用前准备
1. 需要在钉钉开放平台创建应用并获取以下信息:
- AppKey:应用的唯一标识
- AppSecret:应用密钥
- AgentID:应用ID
- UserID:钉钉用户ID
2. 确保应用有足够的权限:
- 审批数据读取权限
- 通讯录管理权限
## 使用步骤
### 1. 配置连接
- 首次使用时需要填写完整的连接信息
- 配置信息会自动保存,下次使用时自动加载
- 如需修改配置,直接在输入框中更改后点击"连接并获取表单"
### 2. 选择审批表单
- 连接成功后会显示可查询的审批表单列表
- 点击表单可在右侧预览表单详细信息
### 3. 设置查询参数
- 选择查询的时间范围(默认为当前日期前119天到当前日期)
- 设置并发线程数(默认为2,建议不超过10)
### 4. 执行查询
- 点击"查询审批数据"开始获取数据
- 查询结果会实时显示在中间区域
- 点击实例ID可在右侧查看详细信息
### 5. 数据保存
- 所有数据会自动保存在"审批数据_日期"文件夹中
- 每个审批实例的详细信息保存为单独的JSON文件
- 表单信息会保存在approval_forms_xxx.json文件中
## 注意事项
1. 查询时间范围建议不要太大,以免数据量过大
2. 并发线程数不要设置过高,以免触发接口限流
3. 如遇到连接失败,请检查:
- 网络连接是否正常
- 配置信息是否正确
- 应用权限是否足够
## 常见问题
Q: 为什么连接失败?
A: 请检查AppKey、AppSecret等配置是否正确,以及网络连接是否正常。
Q: 为什么没有显示审批表单?
A: 请确保当前用户有权限访问这些审批表单。
Q: 数据保存在哪里?
A: 数据保存在程序运行目录下的"审批数据_日期"文件夹中。
## 技术支持
如有问题请自我解决啊~~
52破解_qzwsa
"""
# 插入文本并应用样式
help_text.insert(tk.END, help_content)
help_text.configure(state='disabled') # 设置为只读
# 添加底部按钮框架
button_frame = ttk.Frame(help_window)
button_frame.pack(fill=tk.X, pady=10)
# 添加关闭按钮
close_button = ttk.Button(
button_frame,
text="关闭",
command=help_window.destroy,
width=20
)
close_button.pack(pady=5)
# 居中显示窗口
help_window.update_idletasks()
width = help_window.winfo_width()
height = help_window.winfo_height()
x = (help_window.winfo_screenwidth() // 2) - (width // 2)
y = (help_window.winfo_screenheight() // 2) - (height // 2)
help_window.geometry(f'{width}x{height}+{x}+{y}')
def main():
root = tk.Tk()
app = DingTalkApprovalGUI(root)
root.mainloop()
if __name__ == "__main__":
main() |
免费评分
-
查看全部评分
|