吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 1460|回复: 21
收起左侧

[Python 原创] 钉钉审批数据查询工具

[复制链接]
qzwsa 发表于 2024-12-9 15:51
本帖最后由 qzwsa 于 2024-12-9 16:52 编辑

工作需要查阅钉钉的审批数据开发的一个小工具,本工具只是保存审批的数据为json,如有其他的用途需要按需开发,小众工具用得到的可能比较少。并且代码核心内容不多,主要是在处理可视化部分,纯分享。
1、使用前提必须配置有钉钉的应用
2、获取到应用的相关参数:AppKey、AppSecret、AgentID、UserID
3、mac中开发后打包后的程序在windows10运行正常
4、下载地址:https://wwzm.lanzoub.com/ir5GW2hnfe1i
5、会同步发布在52和csdn
6、运行界面:

WX20241209-165109@2x.png

7、dingtalk_approval.py代码
[Python] 纯文本查看 复制代码
import requests
import time
import json
from typing import Dict, Any, List
from datetime import datetime, timedelta

class DingTalkApproval:
    def __init__(self):
        self.app_key = ""
        self.app_secret = ""
        self.agent_id = ""
        self.user_id = ""
        self.access_token = None
        self.token_expires = 0
        self.base_url = "https://oapi.dingtalk.com"

    def get_access_token(self) -> str:
        """获取访问令牌"""
        if self.access_token and time.time() < self.token_expires:
            return self.access_token

        url = f"{self.base_url}/gettoken"
        params = {
            "appkey": self.app_key,
            "appsecret": self.app_secret
        }
        
        response = requests.get(url, params=params)
        result = response.json()
        
        if result.get("errcode") == 0:
            self.access_token = result["access_token"]
            self.token_expires = time.time() + 7000
            return self.access_token
        else:
            raise Exception(f"获取access_token失败: {result}")

    def get_visible_process_forms(self, userid: str) -> List[Dict[str, Any]]:
        """获取企业所有可管理的表单"""
        url = f"{self.base_url}/topapi/process/template/manage/get"
        params = {
            "access_token": self.get_access_token()
        }
        data = {
            "userid": userid,
            "page_size": 100,
            "page_start": 0
        }
        
        all_forms = []
        while True:
            response = requests.post(url, params=params, json=data)
            result = response.json()
            
            if result.get("errcode") == 0:
                result_data = result.get("result", {})
                if isinstance(result_data, list):
                    for form in result_data:
                        if form.get("flow_title"):
                            formatted_form = {
                                "name": form.get("flow_title"),
                                "process_code": form.get("process_code", ""),
                                "create_time": form.get("gmt_modified", "未知"),
                                "status": "ENABLED" if not form.get("is_new_process") else "DISABLED"
                            }
                            all_forms.append(formatted_form)
                    break
                else:
                    forms = result_data.get("process_template_list", [])
                    for form in forms:
                        if form.get("flow_title"):
                            formatted_form = {
                                "name": form.get("flow_title"),
                                "process_code": form.get("process_code", ""),
                                "create_time": form.get("gmt_modified", "未知"),
                                "status": "ENABLED" if not form.get("is_new_process") else "DISABLED"
                            }
                            all_forms.append(formatted_form)
                    
                    if not result_data.get("next_cursor"):
                        break
                    
                    data["page_start"] = data["page_start"] + data["page_size"]
            else:
                raise Exception(f"获取审批表单列表失败: {result}")
        
        # 对表单列表按创建时间排序
        all_forms.sort(key=lambda x: x.get('create_time', '0'))  # 按时间升序排序
        return all_forms

    def get_process_instance_ids(self, process_code: str, start_time: int = None, end_time: int = None, callback=None) -> List[str]:
        """获取审批实例ID列表"""
        url = f"{self.base_url}/topapi/processinstance/listids"
        params = {
            "access_token": self.get_access_token()
        }
        
        # 确保时间戳在合理范围内
        current_time = int(datetime.now().timestamp() * 1000)
        if not start_time:
            start_time = current_time - (7 * 24 * 60 * 60 * 1000)  # 7天前
        if not end_time:
            end_time = current_time
        
        # 限制时间范围
        if end_time > current_time:
            end_time = current_time
        
        all_ids = []
        next_cursor = 0
        page_count = 0
        
        while True:
            page_count += 1
            data = {
                "process_code": process_code,
                "start_time": start_time,
                "end_time": end_time,
                "size": 20,  # 每页大小
                "cursor": next_cursor,
                "userid": ""  # 可选参数,不填表示查询所有用户
            }
            
            # 回调通知进度
            if callback:
                callback(f"正在获取第 {page_count} 页数据(每页20条)...")
            
            response = requests.post(url, params=params, json=data)
            result = response.json()
            
            if result.get("errcode") == 0:
                result_data = result.get("result", {})
                instance_ids = result_data.get("list", [])
                all_ids.extend(instance_ids)
                
                # 回调通知当前获取的数量
                if callback:
                    callback(f"已获取 {len(all_ids)} 条数据")
                
                # 检查是否有更多数据
                if not result_data.get("next_cursor"):
                    break
                
                next_cursor = result_data.get("next_cursor")
            else:
                raise Exception(f"获取审批实例ID列表失败: {result}")
        
        # 回调通知最终结果
        if callback:
            callback(f"获取完成,总共 {len(all_ids)} 条数据,共查询 {page_count} 页")
        
        return all_ids

    def get_process_instance_detail(self, instance_id: str) -> Dict[str, Any]:
        """获取单个审批实例详情"""
        url = f"{self.base_url}/topapi/processinstance/get"
        params = {
            "access_token": self.get_access_token()
        }
        data = {
            "process_instance_id": instance_id
        }
        
        response = requests.post(url, params=params, json=data)
        result = response.json()
        
        if result.get("errcode") == 0:
            return result.get("process_instance", {})
        else:
            raise Exception(f"获取审批实例详情失败: {result}")

def main():
    try:
        dingtalk = DingTalkApproval()
        
        # 确保必须先设置验证参数
        if not all([dingtalk.app_key, dingtalk.app_secret, dingtalk.agent_id, dingtalk.user_id]):
            print("请先设置必要的验证参数!")
            return
            
        print("\n1. 获取企业可管理的表单列表:")
        forms = dingtalk.get_visible_process_forms(dingtalk.user_id)
        print(f"找到 {len(forms)} 个审批表单")
        
        print("\n可用的审批表单列表 (按创建时间排序,最新的在最后):")
        print("-" * 50)
        for index, form in enumerate(forms, 1):
            create_time = form.get('create_time', '未知')
            # 格式化时间显示
            if create_time != '未知':
                try:
                    # 尝试将时间字符串转换为更友好的格式
                    dt = datetime.strptime(create_time, "%Y-%m-%d %H:%M:%S")
                    create_time = dt.strftime("%Y-%m-%d %H:%M:%S")
                except ValueError:
                    pass
                
            print(f"{index}. 表单名称: {form.get('name')}")
            print(f"   表单编码: {form.get('process_code')}")
            print(f"   创建时间: {create_time}")
            print(f"   是否启用: {'是' if form.get('status') == 'ENABLED' else '否'}")
            print("-" * 50)
        
        while True:
            try:
                choice = int(input("\n请输入要查询的表单序号(1-{0}): ".format(len(forms))))
                if 1 <= choice <= len(forms):
                    selected_form = forms[choice - 1]
                    break
                else:
                    print("无效的选择&#65533;&#65533;请重新输入")
            except ValueError:
                print("请输入有效的数字")
        
        # 让用户输入开始和结束时间
        while True:
            try:
                start_date = input("\n请输入开始时间(格式:2023-01-01): ")
                end_date = input("请输入结束时间(格式:2023-12-31): ")
                
                # 转换日期字符串为datetime对象,使用北京时间
                start_datetime = datetime.strptime(f"{start_date} 00:00:00", "%Y-%m-%d %H:%M:%S")
                end_datetime = datetime.strptime(f"{end_date} 23:59:59", "%Y-%m-%d %H:%M:%S")
                
                # 确保时间范围有效
                current_datetime = datetime.now()
                if start_datetime > current_datetime:
                    print("开始时间不能晚于当前时间,请重新输入")
                    continue
                    
                if start_datetime > end_datetime:
                    print("开始时间必须早于或等于结束时间,请重新输入")
                    continue
                
                # 转换为毫秒级时间戳
                start_time = int(start_datetime.timestamp() * 1000)
                end_time = int(end_datetime.timestamp() * 1000)
                
                # 确保时间戳在合理范围内
                current_time = int(current_datetime.timestamp() * 1000)
                if end_time > current_time:
                    end_time = current_time
                
                print(f"\n调试信息:")
                print(f"开始时间: {start_datetime.strftime('%Y-%m-%d %H:%M:%S')}")
                print(f"结束时间: {end_datetime.strftime('%Y-%m-%d %H:%M:%S')}")
                print(f"开始时间戳: {start_time}")
                print(f"结束时间戳: {end_time}")
                print(f"当前时间戳: {current_time}")
                
                break
                
            except ValueError:
                print("请输入正确的日期格式,例如:2023-01-01")
        
        print(f"\n开始查询 '{selected_form.get('name')}' 从 {start_date} 到 {end_date} 的审批记录...")
        
        instance_ids = dingtalk.get_process_instance_ids(
            selected_form.get('process_code'),
            start_time=start_time,
            end_time=end_time
        )
        print(f"找到 {len(instance_ids)} 个审批实例")
        
        for instance_id in instance_ids:
            try:
                instance_detail = dingtalk.get_process_instance_detail(instance_id)
                print("\n审批实例详情:")
                print(f"实例ID: {instance_id}")
                print(f"标题: {instance_detail.get('title')}")
                print(f"状态: {instance_detail.get('status')}")
                print(f"发起人: {instance_detail.get('originator_userid')}")
                print(f"发起时间: {instance_detail.get('create_time')}")
                print(f"完成时间: {instance_detail.get('finish_time')}")
                print("-" * 30)
                
                with open(f"approval_detail_{instance_id}.json", "w", encoding="utf-8") as f:
                    json.dump(instance_detail, f, ensure_ascii=False, indent=2)
                    
            except Exception as detail_error:
                print(f"获取实例 {instance_id} 详情失败: {str(detail_error)}")
        
        with open(f"approval_forms_{dingtalk.user_id}.json", "w", encoding="utf-8") as f:
            json.dump(forms, f, ensure_ascii=False, indent=2)
            print(f"\n所有表单信息已保存到 approval_forms_{dingtalk.user_id}.json")
            
    except Exception as e:
        print(f"发生错误: {str(e)}")

if __name__ == "__main__":
    main() 


8、dingtalk_approval_gui.py代码
[Python] 纯文本查看 复制代码
import tkinter as tk
from tkinter import ttk, messagebox, scrolledtext
import json
import os
from datetime import datetime, timedelta
from dingtalk_approval import DingTalkApproval
import threading
from tkcalendar import DateEntry
import concurrent.futures
from queue import Queue
import time
import queue

class DingTalkApprovalGUI:
    def __init__(self, root):
        self.root = root
        self.root.title("钉钉审批数据查询工具_52破解_qzwsa")
        self.root.geometry("1500x800")
        self.root.resizable(True, True)
        
        # 配置文件路径
        self.config_file = "dingtalk_config.json"
        
        # 初始化变量,从配置文件加载
        self.load_config()
        
        self.dingtalk = None
        self.forms = []
        self.selected_form = None
        
        # 添加用于线程间通信的队列
        self.result_queue = Queue()
        
        # 添加用于控制并发数的变量
        self.max_workers = 2
        
        # 创建主框架并配置网格权重
        self.main_frame = ttk.Frame(root)
        self.main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5)
        
        # 配置主框架的网格
        self.main_frame.grid_columnconfigure(0, weight=2)  # 左列占比更大
        self.main_frame.grid_columnconfigure(1, weight=3)  # 中列占比最大
        self.main_frame.grid_columnconfigure(2, weight=0)  # 右列不自动扩展
        self.main_frame.grid_rowconfigure(0, weight=1)  # 使行也可伸展
        
        # 创建左中右三列框架
        self.left_frame = ttk.Frame(self.main_frame)
        self.middle_frame = ttk.Frame(self.main_frame)
        self.right_frame = ttk.Frame(self.main_frame)
        
        # 使用grid布局并启用拉伸
        self.left_frame.grid(row=0, column=0, sticky='nsew', padx=5)
        self.middle_frame.grid(row=0, column=1, sticky='nsew', padx=5)
        self.right_frame.grid(row=0, column=2, sticky='nsew', padx=5)
        
        # 配置各个框架的网格权重
        self.left_frame.grid_rowconfigure(4, weight=1)  # 表单列表区域可伸展
        self.middle_frame.grid_rowconfigure(0, weight=1)  # 结果区域可伸展
        self.right_frame.grid_rowconfigure(0, weight=1)  # JSON预览区域可伸展
        
        # 创建各个区域
        self.create_connection_area()
        self.create_form_list_area()
        self.create_date_selection_area()
        self.create_query_area()
        self.create_result_area()
        self.create_json_preview_area()
        
    def load_config(self):
        """从配置文件加载设置"""
        try:
            if os.path.exists(self.config_file):
                with open(self.config_file, 'r', encoding='utf-8') as f:
                    config = json.load(f)
                    self.app_key = config.get('app_key', '')
                    self.app_secret = config.get('app_secret', '')
                    self.agent_id = config.get('agent_id', '')
                    self.user_id = config.get('user_id', '')
            else:
                self.app_key = ""
                self.app_secret = ""
                self.agent_id = ""
                self.user_id = ""
        except Exception as e:
            print(f"加载配置文件失败: {e}")
            self.app_key = ""
            self.app_secret = ""
            self.agent_id = ""
            self.user_id = ""
    
    def save_config(self):
        """保存配置到文件"""
        try:
            config = {
                'app_key': self.app_key,
                'app_secret': self.app_secret,
                'agent_id': self.agent_id,
                'user_id': self.user_id
            }
            with open(self.config_file, 'w', encoding='utf-8') as f:
                json.dump(config, f, indent=2, ensure_ascii=False)
            return True
        except Exception as e:
            messagebox.showerror("错误", f"保存配置失败:{str(e)}")
            return False
    
    def create_connection_area(self):
        # 创建连接配置区域
        conn_frame = ttk.LabelFrame(self.left_frame, text="连接配置")
        conn_frame.pack(fill=tk.X, pady=5)
        
        # 创建顶部按钮框架
        top_frame = ttk.Frame(conn_frame)
        top_frame.grid(row=0, column=0, columnspan=3, sticky='e', padx=5, pady=2)
        
        # 添加帮助按钮
        help_button = ttk.Button(top_frame, text="使用说明", command=self.show_help)
        help_button.pack(side=tk.RIGHT)
        
        # 创建输入框和标签
        fields = [
            ("AppKey:", self.app_key, "app_key_entry"),
            ("AppSecret:", self.app_secret, "app_secret_entry"),
            ("AgentID:", str(self.agent_id) if self.agent_id else "", "agent_id_entry"),
            ("UserID:", self.user_id, "user_id_entry")
        ]
        
        for i, (label, default, name) in enumerate(fields):
            ttk.Label(conn_frame, text=label).grid(row=i+1, column=0, padx=5, pady=2, sticky='e')
            entry = ttk.Entry(conn_frame, width=50)
            entry.insert(0, default)
            entry.grid(row=i+1, column=1, padx=5, pady=2, sticky='w')
            setattr(self, name, entry)
        
        # 创建连接按钮
        ttk.Button(conn_frame, text="连接并获取表单", command=self.connect_and_get_forms).grid(
            row=len(fields)+1, column=0, columnspan=2, pady=10)
            
    def create_form_list_area(self):
        # 创建表单列表区域
        form_frame = ttk.LabelFrame(self.left_frame, text="审批表单列表")
        form_frame.pack(fill=tk.BOTH, expand=True, pady=5)
        
        # 使form_frame持网格伸展
        form_frame.grid_rowconfigure(0, weight=1)
        form_frame.grid_columnconfigure(0, weight=1)
        
        # 创建表单列表
        columns = ("编号", "表单名称", "表单编码", "创建时间", "状态")
        self.form_tree = ttk.Treeview(form_frame, columns=columns, show="headings")
        self.form_tree.grid(row=0, column=0, sticky='nsew')
        
        # 设置列题和宽度
        for col in columns:
            self.form_tree.heading(col, text=col)
            width = 100 if col != "表单名称" else 200
            self.form_tree.column(col, width=width)
        
        # 添加滚动条
        scrollbar = ttk.Scrollbar(form_frame, orient="vertical", command=self.form_tree.yview)
        scrollbar.grid(row=0, column=1, sticky='ns')
        self.form_tree.configure(yscrollcommand=scrollbar.set)
        
        # 绑定点击事件
        self.form_tree.bind('<<TreeviewSelect>>', self.on_form_select)

    def on_form_select(self, event):
        """处理表单列表选择事件"""
        selection = self.form_tree.selection()
        if selection:
            selected_index = int(self.form_tree.item(selection[0])['values'][0]) - 1
            selected_form = self.forms[selected_index]
            
            try:
                # 尝试找并读取对应的JSON文件
                forms_file = None
                # 查找最新的文件夹
                folders = [d for d in os.listdir() if os.path.isdir(d) and d.startswith("审批数据_")]
                if folders:
                    # 按文件夹名称排序,获取最新的
                    latest_folder = sorted(folders)[-1]
                    forms_file = os.path.join(latest_folder, f"approval_forms_{self.user_id}.json")
                
                if forms_file and os.path.exists(forms_file):
                    with open(forms_file, 'r', encoding='utf-8') as f:
                        forms_data = json.load(f)
                        # 查找对应的表单数据
                        form_data = next(
                            (form for form in forms_data 
                             if form.get('process_code') == selected_form.get('process_code')),
                            selected_form
                        )
                        # 显示JSON预览
                        self.format_json_with_highlighting(form_data)
                else:
                    # 如果找不到文件接显示当前表单数据
                    self.format_json_with_highlighting(selected_form)
                    
            except Exception as e:
                self.json_preview.delete(1.0, tk.END)
                self.json_preview.insert(tk.END, f"读取表单数据失败: {str(e)}")

    def create_date_selection_area(self):
        # 创建日期选择区域
        date_frame = ttk.LabelFrame(self.left_frame, text="查询时间范围")
        date_frame.pack(fill=tk.X, pady=5)
        
        # 开始日期
        ttk.Label(date_frame, text="开始日期:").pack(side=tk.LEFT, padx=5)
        self.start_date_entry = DateEntry(date_frame, width=12, background='darkblue',
                                        foreground='white', borderwidth=2,
                                        date_pattern='yyyy-mm-dd')
        self.start_date_entry.pack(side=tk.LEFT, padx=5)
        
        # 结束日期
        ttk.Label(date_frame, text="结束日期:").pack(side=tk.LEFT, padx=5)
        self.end_date_entry = DateEntry(date_frame, width=12, background='darkblue',
                                      foreground='white', borderwidth=2,
                                      date_pattern='yyyy-mm-dd')
        self.end_date_entry.pack(side=tk.LEFT, padx=5)
        
        # 设置默认日期:始日期为当天期前119天,结束日期为当天
        today = datetime.now()
        start_date = today - timedelta(days=119)
        self.start_date_entry.set_date(start_date)
        self.end_date_entry.set_date(today)
        
        # 绑定开始日期变化事件
        self.start_date_entry.bind("<<DateEntrySelected>>", self.update_end_date)

    def update_end_date(self, event=None):
        """当开始日期变化时,自动更新结束日期为开始日期后的119天"""
        try:
            # 获取开始日期
            start_date = self.start_date_entry.get_date()
            # 计算结束日期(开始日期+119天)
            end_date = start_date + timedelta(days=119)
            
            # 如果计算的结束日期超过今天,则使用今天作为结束日期
            today = datetime.now().date()
            if end_date > today:
                end_date = today
                
            # &#65533;&#65533;&#65533;新结束日期
            self.end_date_entry.set_date(end_date)
            
        except Exception as e:
            messagebox.showerror("错误", f"更新结束日期失败: {str(e)}")
        
    def create_query_area(self):
        # 创建查询按钮区域
        query_frame = ttk.Frame(self.left_frame)
        query_frame.pack(fill=tk.X, pady=5)
        
        # 创一行包含并发线程数和查询按钮
        control_frame = ttk.Frame(query_frame)
        control_frame.pack(fill=tk.X, pady=5)
        
        # 左侧放置并发线程数控制
        thread_control = ttk.Frame(control_frame)
        thread_control.pack(side=tk.LEFT, padx=5)
        
        ttk.Label(thread_control, text="并发线程数:").pack(side=tk.LEFT)
        
        # 创建并初始化变量
        self.thread_var = tk.StringVar(value='2')
        self.thread_spinbox = ttk.Spinbox(
            thread_control,
            from_=1,
            to=20,
            width=5,
            textvariable=self.thread_var
        )
        self.thread_spinbox.pack(side=tk.LEFT, padx=5)
        
        # 右侧放置查询按钮
        self.query_button = ttk.Button(control_frame, text="查询审批数据", command=self.start_query)
        self.query_button.pack(side=tk.RIGHT, padx=5)
        
        # 进度显示
        self.progress_var = tk.StringVar(value="就绪")
        self.progress_label = ttk.Label(query_frame, textvariable=self.progress_var)
        self.progress_label.pack(pady=5)
        
    def create_result_area(self):
        # 创建结果显示区域
        result_frame = ttk.LabelFrame(self.middle_frame, text="查询结果")
        result_frame.pack(fill=tk.BOTH, expand=True, pady=5)
        
        # 使result_frame支持网格伸展
        result_frame.grid_rowconfigure(0, weight=1)
        result_frame.grid_columnconfigure(0, weight=1)
        
        # 创建文本区域
        self.result_text = scrolledtext.ScrolledText(result_frame)
        self.result_text.grid(row=0, column=0, sticky='nsew', padx=5, pady=5)
        
        # 绑定点击事件
        self.result_text.tag_configure("instance_id", foreground="blue", underline=1)
        self.result_text.tag_bind("instance_id", "<Button-1>", self.show_json_preview)
        
    def create_json_preview_area(self):
        # 创建JSON预览区域
        preview_frame = ttk.LabelFrame(self.right_frame, text="JSON预览")
        preview_frame.pack(fill=tk.BOTH, expand=True, pady=5)
        
        # 创建控制区域
        control_frame = ttk.Frame(preview_frame)
        control_frame.pack(fill=tk.X, padx=5, pady=2)
        
        # 字体大小调整
        ttk.Label(control_frame, text="字体大小:").pack(side=tk.LEFT, padx=2)
        self.json_font_size = tk.IntVar(value=10)
        font_spinbox = ttk.Spinbox(
            control_frame,
            from_=8,
            to=72,
            width=5,
            textvariable=self.json_font_size,
            command=self.update_json_font_size
        )
        font_spinbox.pack(side=tk.LEFT, padx=5)
        
        # 宽度调整
        ttk.Label(control_frame, text="宽度:").pack(side=tk.LEFT, padx=2)
        self.json_width = tk.IntVar(value=80)
        width_spinbox = ttk.Spinbox(
            control_frame,
            from_=40,
            to=200,
            width=5,
            textvariable=self.json_width,
            command=self.update_json_width
        )
        width_spinbox.pack(side=tk.LEFT, padx=5)
        
        # 创建文本区域框架
        text_frame = ttk.Frame(preview_frame)
        text_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=2)
        
        # 创建文本区域和滚动条
        self.json_preview = tk.Text(text_frame, wrap=tk.NONE)
        self.json_preview.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
        
        # 添加垂直滚动条
        v_scrollbar = ttk.Scrollbar(text_frame, orient="vertical", command=self.json_preview.yview)
        v_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
        
        # 添加水平滚动条
        h_scrollbar = ttk.Scrollbar(preview_frame, orient="horizontal", command=self.json_preview.xview)
        h_scrollbar.pack(side=tk.BOTTOM, fill=tk.X)
        
        # 配置文本框的滚动
        self.json_preview.configure(
            yscrollcommand=v_scrollbar.set,
            xscrollcommand=h_scrollbar.set
        )
        
        # 配置JSON预览的初始字体和颜色
        self.update_json_preview_style()
        
        # 创建标签用于语法高亮
        self.json_preview.tag_configure("key", foreground="#9cdcfe")
        self.json_preview.tag_configure("string", foreground="#ce9178")
        self.json_preview.tag_configure("number", foreground="#b5cea8")
        self.json_preview.tag_configure("boolean", foreground="#569cd6")
        
        # 记录初始宽度
        self.initial_width = 400
        self.right_frame.configure(width=self.initial_width)
        self.right_frame.grid_propagate(False)

    def update_json_preview_style(self):
        """更新JSON预览区域的样式"""
        self.json_preview.configure(
            font=('Consolas', self.json_font_size.get()),
            bg='#1e1e1e',  # 深色背景
            fg='#d4d4d4',  # 浅色文本
            insertbackground='white',  # 光标颜色
            width=self.json_width.get()  # 设置宽度
        )

    def update_json_font_size(self):
        """更新JSON预览区域的字体大小"""
        try:
            # 保存当前滚动位置
            current_pos = self.json_preview.yview()
            
            # 更新字体
            self.update_json_preview_style()
            
            # 恢复滚动位置
            self.json_preview.yview_moveto(current_pos[0])
            
        except Exception as e:
            messagebox.showerror("错误", f"更新字体大&#65533;&#65533;失败: {str(e)}")

    def update_json_width(self):
        """更新JSON预览区域的宽度"""
        try:
            # 获取当前宽度
            current_width = self.json_width.get()
            # 计算新的框架宽度(每个字大约8像素)
            new_frame_width = current_width * 8
            # 更新框架宽度
            self.right_frame.configure(width=new_frame_width)
            # 更新文本样式
            self.update_json_preview_style()
        except Exception as e:
            messagebox.showerror("错误", f"更新宽度失败: {str(e)}")

    def connect_and_get_forms(self):
        try:
            # 获取输入的配置
            self.app_key = self.app_key_entry.get().strip()
            self.app_secret = self.app_secret_entry.get().strip()
            self.agent_id = int(self.agent_id_entry.get().strip())
            self.user_id = self.user_id_entry.get().strip()
            
            # 创建DingTalkApproval实例并设置参数
            self.dingtalk = DingTalkApproval()
            self.dingtalk.app_key = self.app_key
            self.dingtalk.app_secret = self.app_secret
            self.dingtalk.agent_id = self.agent_id
            self.dingtalk.user_id = self.user_id
            
            # 获取表单列表
            self.forms = self.dingtalk.get_visible_process_forms(self.user_id)
            
            # 保存配置
            if self.save_config():
                print("配置已保存")
            
            # 清空现有表单列表
            for item in self.form_tree.get_children():
                self.form_tree.delete(item)
            
            # 显示表单列表
            for i, form in enumerate(self.forms, 1):
                create_time = form.get('create_time', '未知')
                if create_time != '未知':
                    try:
                        dt = datetime.strptime(create_time, "%Y-%m-%d %H:%M:%S")
                        create_time = dt.strftime("%Y-%m-%d %H:%M:%S")
                    except ValueError:
                        pass
                
                status = '是' if form.get('status') == 'ENABLED' else '否'
                self.form_tree.insert("", tk.END, values=(
                    i,
                    form.get('name'),
                    form.get('process_code'),
                    create_time,
                    status
                ))
            
            messagebox.showinfo("成功", f"成功获取 {len(self.forms)} 个审批表单")
            
        except Exception as e:
            messagebox.showerror("错误", f"连接失败: {str(e)}")
            
    def start_query(self):
        # 检查是否已设置验证参数
        if not all([self.dingtalk.app_key, self.dingtalk.app_secret, 
                   self.dingtalk.agent_id, self.dingtalk.user_id]):
            messagebox.showerror("错误", "请先设置并保存验证参数!")
            return
            
        try:
            # 获取选中的表单
            selection = self.form_tree.selection()
            if not selection:
                messagebox.showwarning("警告", "请先选择一个审批表单!")
                return
            
            # 获取日期范围
            start_date = self.start_date_entry.get()
            end_date = self.end_date_entry.get()
            
            # 转换日期
            start_datetime = datetime.strptime(f"{start_date} 00:00:00", "%Y-%m-%d %H:%M:%S")
            end_datetime = datetime.strptime(f"{end_date} 23:59:59", "%Y-%m-%d %H:%M:%S")
            
            # 检查日期有效
            if start_datetime > datetime.now():
                messagebox.showwarning("警告", "开始时间不能晚于当前时间!")
                return
            if start_datetime > end_datetime:
                messagebox.showwarning("警告", "开始时间必须早于结束时间!")
                return
                
            # &#65533;&#65533;&#65533;取选中表单的信息
            selected_index = int(self.form_tree.item(selection[0])['values'][0]) - 1
            self.selected_form = self.forms[selected_index]
            
            # 禁用查询按钮
            self.query_button.configure(state='disabled')
            
            # 在新线程中执行查询
            thread = threading.Thread(target=self.execute_query, args=(start_datetime, end_datetime))
            thread.daemon = True
            thread.start()
            
        except ValueError:
            messagebox.showerror("错", "请输入正确的日期格式(YYYY-MM-DD)!")
            
    def process_instance(self, instance_data):
        """处理单个实例的详情获取和保存"""
        i, instance_id = instance_data
        try:
            instance_detail = self.dingtalk.get_process_instance_detail(instance_id)
            
            # 保存详情文件
            file_path = os.path.join(self.current_save_folder, f"approval_detail_{instance_id}.json")
            with open(file_path, "w", encoding="utf-8") as f:
                json.dump(instance_detail, f, ensure_ascii=False, indent=2)
            
            # 将结果放入列
            self.result_queue.put({
                'success': True,
                'index': i,
                'instance_id': instance_id,
                'detail': instance_detail
            })
            
        except Exception as e:
            # 将错误信息放入队列
            self.result_queue.put({
                'success': False,
                'index': i,
                'instance_id': instance_id,
                'error': str(e)
            })
    
    def update_ui(self):
        """更新UI显示的方法"""
        try:
            while True:
                try:
                    result = self.result_queue.get_nowait()
                    if result['success']:
                        # 显示成功的实例详情
                        instance_detail = result['detail']
                        
                        # 先插入实例编号
                        self.result_text.insert(tk.END, f"\n实例 {result['index']}:\n")
                        
                        # 插入实例ID并添加标签
                        id_text = f"实例ID: {result['instance_id']}"
                        self.result_text.insert(tk.END, id_text + "\n")
                        
                        # 计算标签位置
                        start_pos = self.result_text.index("end-2c linestart")
                        end_pos = f"{start_pos}+{len(id_text)}c"
                        
                        # 添加可点击标签
                        self.result_text.tag_add("instance_id", start_pos, end_pos)
                        
                        # 插入其他信息
                        self.result_text.insert(tk.END,
                            f"标题: {instance_detail.get('title')}\n"
                            f"状态: {instance_detail.get('status')}\n"
                            f"发起人: {instance_detail.get('originator_userid')}\n"
                            f"发起时间: {instance_detail.get('create_time')}\n"
                            f"完成时间: {instance_detail.get('finish_time')}\n"
                            f"{'-' * 50}\n"
                        )
                    else:
                        self.log_message(f"获取实例 {result['instance_id']} 详情失败: {result['error']}")
                    
                    self.result_text.see(tk.END)
                    self.result_queue.task_done()
                    
                except queue.Empty:
                    break
            
            if not self.result_queue.empty() or self.tasks_pending:
                self.root.after(100, self.update_ui)
            else:
                self.progress_var.set("查询完成")
                self.query_button.configure(state='normal')
                
        except Exception as e:
            self.log_message(f"更新界面失败: {str(e)}")
            
    def execute_query(self, start_datetime, end_datetime):
        try:
            # 更新并发线程数
            try:
                self.max_workers = int(self.thread_spinbox.get())
            except ValueError:
                self.max_workers = 2
            
            # 创建保存文件夹
            folder_name = f"审批数据_{start_datetime.strftime('%Y%m%d')}_{end_datetime.strftime('%Y%m%d')}"
            self.current_save_folder = os.path.join(os.getcwd(), folder_name)
            os.makedirs(self.current_save_folder, exist_ok=True)
            
            # 转换为毫秒级时间戳
            start_time = int(start_datetime.timestamp() * 1000)
            end_time = int(end_datetime.timestamp() * 1000)
            
            self.result_text.delete(1.0, tk.END)
            self.log_message("开始查询审批实例...")
            self.log_message(f"使用 {self.max_workers} 个并发线程")
            self.log_message(f"查询时间范围: {start_datetime.strftime('%Y-%m-%d')} 至 {end_datetime.strftime('%Y-%m-%d')}")
            self.log_message(f"保存路径: {self.current_save_folder}")
            self.log_message("正在获取审批实例列表...\n")
            
            # 获取审批实例ID列表,添加回调函数
            instance_ids = self.dingtalk.get_process_instance_ids(
                self.selected_form.get('process_code'),
                start_time=start_time,
                end_time=end_time,
                callback=self.log_message  # 传入日志函数作为回调
            )
            
            self.log_message("\n开始获取详细数据...")
            
            # 使用线程池处理实例详情
            self.tasks_pending = True
            processed_count = 0
            error_count = 0
            
            with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
                # 提交所有任务
                futures = [
                    executor.submit(self.process_instance, (i, instance_id))
                    for i, instance_id in enumerate(instance_ids, 1)
                ]
                
                # 更新进度显示
                total = len(futures)
                for i, future in enumerate(concurrent.futures.as_completed(futures), 1):
                    self.progress_var.set(f"正在处理: {i}/{total}")
                    try:
                        future.result()
                        processed_count += 1
                    except Exception as e:
                        error_count += 1
                        self.log_message(f"处理任务时出错: {e}")
                    
                    # 每处理10个实例更新一次统计信息
                    if i % 10 == 0 or i == total:
                        self.log_message(
                            f"处理进度: {i}/{total} "
                            f"(成功: {processed_count}, 失败: {error_count})"
                        )
            
            self.tasks_pending = False
            
            # 保存表单信息
            forms_file = os.path.join(self.current_save_folder, f"approval_forms_{self.user_id}.json")
            with open(forms_file, "w", encoding="utf-8") as f:
                json.dump(self.forms, f, ensure_ascii=False, indent=2)
            
            self.log_message("\n处理完成统计:")
            self.log_message(f"总计处理: {total} 个实例")
            self.log_message(f"成功处理: {processed_count} 个")
            self.log_message(f"处理失败: {error_count} 个")
            self.log_message(f"数据保存在: {self.current_save_folder}")
            
            # 启动UI更新,显示详细数据
            self.root.after(100, self.update_ui)
            
        except Exception as e:
            self.log_message(f"\n查询过程出错: {str(e)}")
            messagebox.showerror("错误", f"查询失败: {str(e)}")
            self.query_button.configure(state='normal')

    def format_json_with_highlighting(self, json_str):
        """格式化JSON并添加语法高亮"""
        self.json_preview.delete(1.0, tk.END)
        
        def format_nested_json(value):
            """处理嵌套的JSON字符串"""
            try:
                # 尝试解析可能是JSON字符串的值
                if isinstance(value, str) and (value.startswith('[') or value.startswith('{')):
                    # 处理转义字符
                    value = value.replace('\\"', '"').replace('\\\\', '\\')
                    parsed = json.loads(value)
                    return json.dumps(parsed, ensure_ascii=False, indent=2)
            except:
                pass
            return value
        
        try:
            # 如果输入是字符串,先解析成Python对象
            if isinstance(json_str, str):
                data = json.loads(json_str)
            else:
                data = json_str
            
            def process_value(obj):
                """递归处理对象中的所有值"""
                if isinstance(obj, dict):
                    return {k: process_value(v) for k, v in obj.items()}
                elif isinstance(obj, list):
                    return [process_value(item) for item in obj]
                else:
                    return format_nested_json(obj)
            
            # 处理所有嵌套的JSON字符串
            processed_data = process_value(data)
            
            # 新格式化JSON,确保缩进正确
            formatted_json = json.dumps(processed_data, ensure_ascii=False, indent=2)
            
            # 逐行处理并添加高亮
            for line in formatted_json.split('\n'):
                # 处理键值对
                if ':' in line:
                    # 分割时保留冒号后的所有内容处理值中可能包含冒号的情况)
                    parts = line.split(':', 1)
                    if len(parts) == 2:
                        key, value = parts
                        # 插入键
                        self.json_preview.insert(tk.END, key + ':', 'key')
                        # 处理值
                        value = value.strip()
                        if value.startswith('"'):
                            self.json_preview.insert(tk.END, value + '\n', 'string')
                        elif value.lower() in ('true', 'false', 'null'):
                            self.json_preview.insert(tk.END, value + '\n', 'boolean')
                        elif value.replace('.','',1).isdigit():
                            self.json_preview.insert(tk.END, value + '\n', 'number')
                        else:
                            self.json_preview.insert(tk.END, value + '\n')
                else:
                    # 处理其他行(如括号、数组素等)
                    line_stripped = line.strip()
                    if line_stripped.startswith('"'):
                        self.json_preview.insert(tk.END, line + '\n', 'string')
                    elif line_stripped.replace('.','',1).isdigit():
                        self.json_preview.insert(tk.END, line + '\n', 'number')
                    else:
                        self.json_preview.insert(tk.END, line + '\n')
                    
        except Exception as e:
            self.json_preview.insert(tk.END, f"JSON格式化错误: {str(e)}")

    def show_json_preview(self, event):
        # 获取点击位置的行
        index = self.result_text.index(f"@{event.x},{event.y}")
        line = self.result_text.get(f"{index} linestart", f"{index} lineend")
        
        # 提取实例ID
        if "实例ID:" in line:
            instance_id = line.split("实例ID:")[1].strip()
            try:
                # 读取对应的JSON文件
                folder_path = self.current_save_folder
                file_path = os.path.join(folder_path, f"approval_detail_{instance_id}.json")
                
                if os.path.exists(file_path):
                    with open(file_path, 'r', encoding='utf-8') as f:
                        json_data = json.load(f)
                        # 使用新的格式化方法显示JSON
                        self.format_json_with_highlighting(json_data)
            except Exception as e:
                self.json_preview.delete(1.0, tk.END)
                self.json_preview.insert(tk.END, f"读取JSON文件失败: {str(e)}")

    def log_message(self, message):
        """添加日志消息到结果文本"""
        self.result_text.insert(tk.END, f"{message}\n")
        self.result_text.see(tk.END)
        self.result_text.update()

    def start_resize(self, event):
        """开始调整大小"""
        self.x = event.x_root
        self.initial_width = self.right_frame.winfo_width()

    def do_resize(self, event):
        """执行调整大小"""
        try:
            diff = event.x_root - self.x
            new_width = self.initial_width + diff
            
            # 获取主窗口宽度
            window_width = self.root.winfo_width()
            # 计算最大允许宽度(窗口宽度的60%)
            max_width = int(window_width * 0.6)
            
            # 调整限制范围
            if new_width < 300:  # 增加最小宽度
                new_width = 300
            elif new_width > max_width:
                new_width = max_width
            
            # 更新框架宽度
            self.right_frame.configure(width=new_width)
            
            # 强制更新显示
            self.right_frame.update_idletasks()
            
        except Exception as e:
            print(f"调整大小时出错: {str(e)}")

    def end_resize(self, event):
        """结束调整大小"""
        # 置鼠标样式
        self.sizer.configure(style='TFrame')
        # 更新 JSON 预览区域的滚动条
        self.json_preview.update_idletasks()

    def show_help(self):
        """显示帮助信息"""
        help_window = tk.Toplevel(self.root)
        help_window.title("钉钉审批数据查询工具 - 使用说明")
        help_window.geometry("800x600")
        
        # 设置模态窗口
        help_window.transient(self.root)
        help_window.grab_set()
        
        # 创建文本区域
        help_text = scrolledtext.ScrolledText(
            help_window, 
            wrap=tk.WORD, 
            padx=10, 
            pady=10,
            font=('Microsoft YaHei UI', 10),
            background='#f5f5f5'
        )
        help_text.pack(fill=tk.BOTH, expand=True)
        
        # 设置标签样式
        help_text.tag_configure('title', font=('Microsoft YaHei UI', 14, 'bold'))
        help_text.tag_configure('heading', font=('Microsoft YaHei UI', 12, 'bold'))
        help_text.tag_configure('normal', font=('Microsoft YaHei UI', 10))
        
        # 帮助内容
        help_content = """
# 钉钉审批数据查询工具使用说明

## 功能介绍
本工具用于批量获取钉钉审批数据,支持多线程并发查询,可导出详细的审批信息。

## 使用前准备
1. 需要在钉钉开放平台创建应用并获取以下信息:
   - AppKey:应用的唯一标识
   - AppSecret:应用密钥
   - AgentID:应用ID
   - UserID:钉钉用户ID

2. 确保应用有足够的权限:
   - 审批数据读取权限
   - 通讯录管理权限

## 使用步骤

### 1. 配置连接
- 首次使用时需要填写完整的连接信息
- 配置信息会自动保存,下次使用时自动加载
- 如需修改配置,直接在输入框中更改后点击"连接并获取表单"

### 2. 选择审批表单
- 连接成功后会显示可查询的审批表单列表
- 点击表单可在右侧预览表单详细信息

### 3. 设置查询参数
- 选择查询的时间范围(默认为当前日期前119天到当前日期)
- 设置并发线程数(默认为2,建议不超过10)

### 4. 执行查询
- 点击"查询审批数据"开始获取数据
- 查询结果会实时显示在中间区域
- 点击实例ID可在右侧查看详细信息

### 5. 数据保存
- 所有数据会自动保存在"审批数据_日期"文件夹中
- 每个审批实例的详细信息保存为单独的JSON文件
- 表单信息会保存在approval_forms_xxx.json文件中

## 注意事项
1. 查询时间范围建议不要太大,以免数据量过大
2. 并发线程数不要设置过高,以免触发接口限流
3. 如遇到连接失败,请检查:
   - 网络连接是否正常
   - 配置信息是否正确
   - 应用权限是否足够

## 常见问题
Q: 为什么连接失败?
A: 请检查AppKey、AppSecret等配置是否正确,以及网络连接是否正常。

Q: 为什么没有显示审批表单?
A: 请确保当前用户有权限访问这些审批表单。

Q: 数据保存在哪里?
A: 数据保存在程序运行目录下的"审批数据_日期"文件夹中。

## 技术支持
如有问题请自我解决啊~~
52破解_qzwsa
"""
        
        # 插入文本并应用样式
        help_text.insert(tk.END, help_content)
        help_text.configure(state='disabled')  # 设置为只读
        
        # 添加底部按钮框架
        button_frame = ttk.Frame(help_window)
        button_frame.pack(fill=tk.X, pady=10)
        
        # 添加关闭按钮
        close_button = ttk.Button(
            button_frame, 
            text="关闭", 
            command=help_window.destroy,
            width=20
        )
        close_button.pack(pady=5)
        
        # 居中显示窗口
        help_window.update_idletasks()
        width = help_window.winfo_width()
        height = help_window.winfo_height()
        x = (help_window.winfo_screenwidth() // 2) - (width // 2)
        y = (help_window.winfo_screenheight() // 2) - (height // 2)
        help_window.geometry(f'{width}x{height}+{x}+{y}')

def main():
    root = tk.Tk()
    app = DingTalkApprovalGUI(root)
    root.mainloop()

if __name__ == "__main__":
    main() 

免费评分

参与人数 7威望 +1 吾爱币 +13 热心值 +6 收起 理由
苏紫方璇 + 1 + 10 + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!
weidechan + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!
潇洒哥er + 1 我很赞同!
Brokesn + 1 热心回复!
cjcmxc + 1 + 1 我很赞同!
yanglinman + 1 谢谢@Thanks!
twty0401 + 1 + 1 我很赞同!

查看全部评分

本帖被以下淘专辑推荐:

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

gxldd 发表于 2024-12-9 16:58
好东西!支持
wuloveyou 发表于 2024-12-9 17:02
jun269 发表于 2024-12-9 17:13
yigaosoft 发表于 2024-12-9 17:38
这个是个好东西啊
luhui3 发表于 2024-12-9 18:36
太厉害了,谢谢楼主分享。
sifansi 发表于 2024-12-9 18:45
感谢楼主分享
Brokesn 发表于 2024-12-9 18:49
py大佬
hohon1213 发表于 2024-12-9 18:56
谢谢分享
iufoi 发表于 2024-12-9 19:06
学习中。。。
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2025-1-7 20:03

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表