import
win32serviceutil
import
win32service
import
win32con
import
time
import
logging
import
threading
import
ttkbootstrap as ttk
from
ttkbootstrap.constants
import
*
from
ttkbootstrap.dialogs
import
Messagebox
from
ttkbootstrap.scrolled
import
ScrolledText
from
logging.handlers
import
RotatingFileHandler
import
pystray
from
PIL
import
Image, ImageDraw
import
sys
class
TextHandler(logging.Handler):
def
__init__(
self
, text_widget):
super
().__init__()
self
.text_widget
=
text_widget.text
def
emit(
self
, record):
msg
=
self
.
format
(record)
def
_append():
try
:
self
.text_widget.configure(state
=
'normal'
)
self
.text_widget.insert(END, msg
+
'\n'
)
self
.text_widget.see(END)
self
.text_widget.configure(state
=
'disabled'
)
except
Exception as e:
print
(f
"日志写入失败: {str(e)}"
)
self
.text_widget.after(
0
, _append)
logger
=
logging.getLogger()
logger.setLevel(logging.INFO)
class
ServiceMonitorApp:
def
__init__(
self
, root):
self
.root
=
root
self
.root.title(
"Windows服务监控程序 By:Thebzk"
)
self
.root.geometry(
"650x900"
)
self
.root.style.theme_use(
'litera'
)
self
.monitor_thread
=
None
self
.status_update_thread
=
None
self
.stop_event
=
threading.Event()
self
.monitored_services
=
[]
self
.tray_icon
=
None
self
.init_tray_icon()
self
.root.protocol(
'WM_DELETE_WINDOW'
,
self
.on_close)
self
.root.bind(
"<Unmap>"
,
self
.on_minimize)
self
.CHECKED
=
"✓"
self
.UNCHECKED
=
"☐"
self
.FONT
=
(
'Microsoft YaHei'
,
11
)
self
.create_widgets()
self
.root.
eval
(
'tk::PlaceWindow . center'
)
def
init_tray_icon(
self
):
image
=
Image.new(
'RGB'
, (
64
,
64
), (
255
,
255
,
255
))
dc
=
ImageDraw.Draw(image)
dc.rectangle([
16
,
16
,
48
,
48
], fill
=
'#007bff'
)
menu
=
(
pystray.MenuItem(
'显示主界面'
,
self
.show_window),
pystray.MenuItem(
'退出程序'
,
self
.exit_app)
)
self
.tray_icon
=
pystray.Icon(
"service_monitor"
,
image,
"服务监控程序"
,
menu
)
self
.tray_thread
=
threading.Thread(
target
=
self
.tray_icon.run,
daemon
=
True
)
self
.tray_thread.start()
def
create_widgets(
self
):
main_paned
=
ttk.PanedWindow(
self
.root, bootstyle
=
PRIMARY, orient
=
VERTICAL)
main_paned.pack(fill
=
BOTH, expand
=
True
, padx
=
10
, pady
=
10
)
top_frame
=
ttk.Frame(main_paned)
main_paned.add(top_frame, weight
=
1
)
list_frame
=
ttk.Labelframe(top_frame, text
=
"选择要监控的服务(点击复选框选择)"
, bootstyle
=
INFO)
list_frame.pack(padx
=
5
, pady
=
5
, fill
=
BOTH, expand
=
True
)
self
.service_tree
=
ttk.Treeview(
list_frame,
columns
=
(
"selected"
,
"service_name"
,
"status"
),
show
=
"headings"
,
bootstyle
=
INFO,
height
=
10
,
selectmode
=
'none'
)
style
=
ttk.Style()
style.configure(
'Treeview'
,
rowheight
=
40
,
font
=
self
.FONT,
padding
=
(
0
,
5
,
0
,
5
)
)
style.
map
(
'Treeview'
, background
=
[(
'selected'
,
'#e0e0e0'
)])
self
.service_tree.heading(
"selected"
, text
=
" 选择 "
, anchor
=
CENTER)
self
.service_tree.heading(
"service_name"
, text
=
"服务名称"
)
self
.service_tree.heading(
"status"
, text
=
"状态"
)
self
.service_tree.column(
"selected"
, width
=
100
, anchor
=
CENTER)
self
.service_tree.column(
"service_name"
, width
=
350
)
self
.service_tree.column(
"status"
, width
=
150
, anchor
=
CENTER)
scrollbar
=
ttk.Scrollbar(list_frame, bootstyle
=
ROUND
, command
=
self
.service_tree.yview)
self
.service_tree.configure(yscrollcommand
=
scrollbar.
set
)
self
.service_tree.pack(side
=
LEFT, fill
=
BOTH, expand
=
True
)
scrollbar.pack(side
=
RIGHT, fill
=
Y)
self
.service_tree.bind(
"<Button-1>"
,
self
.on_treeview_click)
btn_frame
=
ttk.Frame(top_frame)
btn_frame.pack(pady
=
15
, fill
=
X)
self
.start_btn
=
ttk.Button(
btn_frame,
text
=
"添加监控"
,
command
=
self
.add_monitoring,
bootstyle
=
(SUCCESS, OUTLINE),
width
=
12
)
self
.start_btn.pack(side
=
LEFT, padx
=
10
)
ttk.Button(
btn_frame,
text
=
"刷新列表"
,
command
=
self
.load_services,
bootstyle
=
(INFO, OUTLINE),
width
=
12
).pack(side
=
LEFT, padx
=
10
)
self
.stop_btn
=
ttk.Button(
btn_frame,
text
=
"停止监控"
,
command
=
self
.stop_monitoring,
bootstyle
=
(DANGER, OUTLINE),
state
=
DISABLED,
width
=
12
)
self
.stop_btn.pack(side
=
LEFT, padx
=
10
)
bottom_frame
=
ttk.Frame(main_paned)
main_paned.add(bottom_frame, weight
=
1
)
monitored_frame
=
ttk.Labelframe(bottom_frame, text
=
"已监控的服务列表"
, bootstyle
=
SUCCESS)
monitored_frame.pack(padx
=
5
, pady
=
5
, fill
=
BOTH, expand
=
True
)
self
.monitored_tree
=
ttk.Treeview(
monitored_frame,
columns
=
(
"service_name"
,
"status"
),
show
=
"headings"
,
bootstyle
=
SUCCESS,
height
=
10
,
selectmode
=
'none'
)
self
.monitored_tree.heading(
"service_name"
, text
=
"服务名称"
)
self
.monitored_tree.heading(
"status"
, text
=
"状态"
)
self
.monitored_tree.column(
"service_name"
, width
=
350
)
self
.monitored_tree.column(
"status"
, width
=
180
, anchor
=
CENTER)
scrollbar_monitored
=
ttk.Scrollbar(monitored_frame, bootstyle
=
ROUND
, command
=
self
.monitored_tree.yview)
self
.monitored_tree.configure(yscrollcommand
=
scrollbar_monitored.
set
)
self
.monitored_tree.pack(side
=
LEFT, fill
=
BOTH, expand
=
True
)
scrollbar_monitored.pack(side
=
RIGHT, fill
=
Y)
log_frame
=
ttk.Labelframe(
self
.root, text
=
"监控日志"
, bootstyle
=
WARNING)
log_frame.pack(padx
=
10
, pady
=
10
, fill
=
BOTH, expand
=
True
)
self
.log_text
=
ScrolledText(
log_frame,
wrap
=
WORD,
font
=
(
"Consolas"
,
10
),
height
=
8
,
autohide
=
True
,
bootstyle
=
ROUND
)
self
.log_text.pack(fill
=
BOTH, expand
=
True
)
text_handler
=
TextHandler(
self
.log_text)
text_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
))
logger.addHandler(text_handler)
self
.status_update_thread
=
threading.Thread(target
=
self
.update_all_service_statuses, daemon
=
True
)
self
.status_update_thread.start()
self
.load_services()
def
on_treeview_click(
self
, event):
region
=
self
.service_tree.identify(
"region"
, event.x, event.y)
if
region
=
=
"cell"
:
column
=
self
.service_tree.identify_column(event.x)
item
=
self
.service_tree.identify_row(event.y)
if
column
=
=
"#1"
:
current_value
=
self
.service_tree.item(item,
"values"
)[
0
]
new_value
=
self
.CHECKED
if
current_value
=
=
self
.UNCHECKED
else
self
.UNCHECKED
values
=
list
(
self
.service_tree.item(item,
"values"
))
values[
0
]
=
new_value
self
.service_tree.item(item, values
=
values, tags
=
(
'checked'
if
new_value
=
=
self
.CHECKED
else
'unchecked'
))
self
.service_tree.tag_configure(
'checked'
, foreground
=
'#28a745'
)
self
.service_tree.tag_configure(
'unchecked'
, foreground
=
'#6c757d'
)
def
load_services(
self
):
self
.service_tree.delete(
*
self
.service_tree.get_children())
try
:
services
=
self
.list_services()
for
service
in
services:
self
.service_tree.insert("", END,
values
=
(
self
.UNCHECKED, service,
"加载中"
),
tags
=
(
'unchecked'
,)
)
except
Exception as e:
Messagebox.show_error(f
"加载服务列表失败:{str(e)}"
,
"错误"
, alert
=
True
)
def
list_services(
self
):
SC_MANAGER_CONNECT
=
0x0001
SC_MANAGER_ENUMERATE_SERVICE
=
0x0004
scm
=
win32service.OpenSCManager(
None
,
None
, SC_MANAGER_CONNECT | SC_MANAGER_ENUMERATE_SERVICE)
services
=
win32service.EnumServicesStatus(scm, win32service.SERVICE_WIN32, win32service.SERVICE_STATE_ALL)
return
[s[
0
]
for
s
in
services]
def
add_monitoring(
self
):
selected_services
=
[]
for
item
in
self
.service_tree.get_children():
values
=
self
.service_tree.item(item,
"values"
)
if
values[
0
]
=
=
self
.CHECKED:
service_name
=
values[
1
]
if
service_name
not
in
self
.monitored_services:
selected_services.append(service_name)
self
.service_tree.item(item, values
=
(
self
.UNCHECKED, values[
1
], values[
2
]), tags
=
(
'unchecked'
,))
if
not
selected_services:
Messagebox.show_warning(
"请先勾选要添加的服务"
,
"提示"
)
return
for
service
in
selected_services:
self
.monitored_services.append(service)
self
.monitored_tree.insert("
", END, values=(service, "
监控中"))
logger.info(f
"新增监控服务: {service}"
)
if
not
self
.monitor_thread
or
not
self
.monitor_thread.is_alive():
self
.stop_event.clear()
self
.monitor_thread
=
threading.Thread(
target
=
self
.monitor_services,
daemon
=
True
)
self
.monitor_thread.start()
self
.stop_btn.configure(state
=
NORMAL)
def
monitor_services(
self
):
while
not
self
.stop_event.is_set():
current_services
=
self
.monitored_services.copy()
if
not
current_services:
time.sleep(
1
)
continue
for
service_name
in
current_services:
try
:
status
=
win32serviceutil.QueryServiceStatus(service_name)[
1
]
self
.root.after(
0
,
self
.update_monitored_status, service_name, status)
if
status
=
=
win32service.SERVICE_STOPPED:
logger.warning(f
"检测到服务停止: {service_name}"
)
win32serviceutil.StartService(service_name)
logger.info(f
"已成功重启服务: {service_name}"
)
except
Exception as e:
logger.error(f
"监控服务 {service_name} 时出错: {str(e)}"
)
time.sleep(
5
)
def
update_monitored_status(
self
, service_name, status_code):
status_text
=
self
.get_status_text(status_code)
for
item
in
self
.monitored_tree.get_children():
values
=
self
.monitored_tree.item(item,
"values"
)
if
values[
0
]
=
=
service_name:
self
.monitored_tree.item(item, values
=
(service_name, status_text))
break
def
update_all_service_statuses(
self
):
while
True
:
try
:
services
=
self
.list_services()
for
service
in
services:
try
:
status
=
win32serviceutil.QueryServiceStatus(service)[
1
]
status_text
=
self
.get_status_text(status)
for
item
in
self
.service_tree.get_children():
values
=
self
.service_tree.item(item,
"values"
)
if
values[
1
]
=
=
service:
self
.service_tree.item(item, values
=
(values[
0
], service, status_text))
break
except
Exception as e:
pass
except
Exception as e:
logger.error(f
"更新服务状态出错: {str(e)}"
)
time.sleep(
5
)
def
get_status_text(
self
, status_code):
status_map
=
{
win32service.SERVICE_STOPPED:
"已停止"
,
win32service.SERVICE_START_PENDING:
"启动中"
,
win32service.SERVICE_STOP_PENDING:
"停止中"
,
win32service.SERVICE_RUNNING:
"运行中"
,
win32service.SERVICE_CONTINUE_PENDING:
"继续中"
,
win32service.SERVICE_PAUSE_PENDING:
"暂停中"
,
win32service.SERVICE_PAUSED:
"已暂停"
}
return
status_map.get(status_code,
"未知"
)
def
stop_monitoring(
self
):
self
.stop_event.
set
()
self
.monitored_services.clear()
self
.monitored_tree.delete(
*
self
.monitored_tree.get_children())
self
.stop_btn.configure(state
=
DISABLED)
logger.info(
"已停止所有服务监控"
)
def
on_minimize(
self
, event):
if
self
.root.state()
=
=
'iconic'
:
self
.hide_to_tray()
return
"break"
def
on_close(
self
):
if
Messagebox.okcancel(
"确认退出"
,
"确定要退出程序吗?"
):
self
.exit_app()
def
hide_to_tray(
self
):
self
.root.withdraw()
self
.tray_icon.visible
=
True
logger.info(
"程序已最小化到系统托盘"
)
def
show_window(
self
):
self
.tray_icon.visible
=
False
self
.root.deiconify()
self
.root.lift()
self
.root.focus_force()
logger.info(
"恢复程序主界面"
)
def
exit_app(
self
):
self
.stop_monitoring()
self
.tray_icon.stop()
self
.root.destroy()
logger.info(
"程序已正常退出"
)
sys.exit(
0
)
if
__name__
=
=
"__main__"
:
app
=
ttk.Window()
monitor_app
=
ServiceMonitorApp(app)
app.mainloop()