好友
阅读权限10
听众
最后登录1970-1-1
|
- 平时很多项目中相同格式的数据需要导入MySQL,内勤不会使用脚本,闲来无事就用Python+PyQT+sqlalchemy搞了个数据导入工具给他们用谁还能跟我这么好
[Python] 纯文本查看 复制代码 from PyQt5.QtCore import QThread, pyqtSignal
class TaskThread(QThread):
# 定义信号
progress_signal = pyqtSignal(int) # 进度条信号,传递整数值
log_signal = pyqtSignal(str) # 日志信号,传递字符串
error_signal = pyqtSignal(str) # 错误信号,传递错误信息
success_signal = pyqtSignal(str) # 成功信号,传递成功信息
def __init__(self, task_function, task_args=None, task_kwargs=None, parent=None):
super(TaskThread, self).__init__(parent)
self.task_function = task_function # 任务函数
self.task_args = task_args if task_args is not None else [] # 任务函数的参数列表
self.task_kwargs = task_kwargs if task_kwargs is not None else {} # 任务函数的关键字参数
self.is_stopped = False # 添加停止标志位
def stop(self):
"""停止任务"""
self.is_stopped = True
def run(self):
"""执行任务"""
try:
# 执行任务函数,并传递参数
self.task_function(
*self.task_args,
**self.task_kwargs,
progress_callback=self.check_progress, # 进度条更新
log_callback=self.log_signal.emit # 日志更新
)
# if not self.is_stopped: # 如果没有被停止,任务完成后发出成功信号
# self.success_signal.emit("任务成功完成!")
except Exception as e:
self.error_signal.emit(f"任务执行失败:{str(e)}") # 错误信号
def check_progress(self, value):
"""检查进度并判断是否需要停止任务"""
if self.is_stopped:
raise Exception("任务已被手动停止")
self.progress_signal.emit(value) # 正常更新进度
|
|
发帖前要善用【论坛搜索】功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。 |
|
|
|
|