[Python] 纯文本查看 复制代码
# 52demo1118.py
import tkinter as tk
from tkinter import StringVar, ttk, messagebox
import time
import asyncio
import logging
import os
import cv2
import numpy as np
from pynput.mouse import Controller, Listener, Button
from pynput.keyboard import Controller as KeyboardController, Key
from pyautogui import screenshot
import threading
import queue
# 设置日志
logging.basicConfig(filename='PubgAuto.log', level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
encoding='utf-8')
logger = logging.getLogger(__name__)
class AutoTaskApp:
def __init__(self, root):
self.root = root
self.root.title("PubgAutotask by:PTL")
self.root.geometry("360x180") # 调整窗口高度
self.root.attributes('-topmost', True)
self.root.bind("<Unmap>", lambda event: self.toggle_topmost(False))
self.root.bind("<Map>", lambda event: self.toggle_topmost(True))
self.root.protocol("WM_DELETE_WINDOW", self.on_closing) # 绑定关闭窗口事件
# 变量初始化
self.running = False
self.match_count = 0
self.click_count = 0
self.mouse = Controller()
self.keyboard = KeyboardController() # 添加键盘控制器
self.mouse_queue = queue.Queue() # 创建队列用于传递鼠标事件
self.last_match_duration = 0 # 上一次对局时长
self.start_time = time.time() # 程序启动时间
self.global_delay = 5 # 全局延时时间(秒)
self.match_start_time = None # 对局开始时间
# UI元素
self.status_var = StringVar(value="当前鼠标坐标: (0, 0) | 对局次数: 0 | 鼠标点击次数: 0")
self.current_time_var = StringVar(value="当前系统时间: 0000-00-00 00:00")
self.runtime_var = StringVar(value="程序运行时长: 0d 0h 0m 0s")
self.last_match_duration_var = StringVar(value="上次对局时长: 0m 0s") # 新增的UI元素
# 左对齐的UI元素
tk.Label(root, textvariable=self.status_var, anchor="w").pack(fill=tk.X)
tk.Label(root, textvariable=self.current_time_var, anchor="w").pack(fill=tk.X)
tk.Label(root, textvariable=self.runtime_var, anchor="w").pack(fill=tk.X)
tk.Label(root, textvariable=self.last_match_duration_var, anchor="w").pack(fill=tk.X) # 显示上一次对局时长
# 按钮居中
self.start_button = tk.Button(root, text="开始挂机", command=self.toggle_task)
self.start_button.pack(pady=10)
self.progress = ttk.Progressbar(root, orient="horizontal", length=300, mode="indeterminate")
self.progress.pack(pady=10)
# 开始监听鼠标
self.mouse_listener = Listener(on_move=self.on_mouse_move)
self.mouse_listener.start()
# 更新当前系统时间和运行时长
self.update_current_time()
self.update_runtime()
# 创建一个单独的线程来运行异步事件循环
self.loop = asyncio.new_event_loop()
self.thread = threading.Thread(target=self.run_event_loop, args=(self.loop,))
self.thread.start()
# 定期检查队列中的鼠标事件
self.root.after(100, self.check_mouse_queue)
def run_event_loop(self, loop):
asyncio.set_event_loop(loop)
loop.run_forever()
def toggle_topmost(self, topmost):
self.root.attributes('-topmost', topmost)
def on_mouse_move(self, x, y):
self.mouse_queue.put((x, y)) # 将鼠标事件放入队列
def check_mouse_queue(self):
while not self.mouse_queue.empty():
x, y = self.mouse_queue.get()
self.status_var.set(
f"当前鼠标坐标: ({x}, {y}) | 对局次数: {self.match_count} | 鼠标点击次数: {self.click_count}")
self.root.after(100, self.check_mouse_queue) # 每100毫秒检查一次队列
def update_current_time(self):
current_time = time.strftime('%Y-%m-%d %H:%M')
self.current_time_var.set(f"当前系统时间: {current_time}")
self.root.after(1000, self.update_current_time)
def update_runtime(self):
elapsed_time = int(time.time() - self.start_time)
days, remainder = divmod(elapsed_time, 86400)
hours, remainder = divmod(remainder, 3600)
minutes, seconds = divmod(remainder, 60)
self.runtime_var.set(f"程序运行时长: {days}d {hours}h {minutes}m {seconds}s")
self.root.after(1000, self.update_runtime)
def toggle_task(self):
if self.running:
self.stop_task()
else:
self.start_task()
def start_task(self):
self.running = True
self.start_button.config(text="停止挂机")
self.progress.start()
asyncio.run_coroutine_threadsafe(self.run_task_async(), self.loop)
def stop_task(self):
self.running = False
self.start_button.config(text="开始挂机")
self.progress.stop()
if hasattr(self, 'task') and not self.task.done():
self.task.cancel()
async def run_task_async(self):
while self.running:
try:
await asyncio.wait_for(self.screenshot_and_match_async(), timeout=30) # 增加超时机制
except asyncio.TimeoutError:
logger.warning("任务超时,跳过本次任务")
await asyncio.sleep(self.global_delay) # 全局延时
async def screenshot_and_match_async(self):
screenshot_path = './Snipaste.png'
screenshot().save(screenshot_path)
logger.info(f"截图保存至: {screenshot_path}")
templates = [(cv2.imread(f'modes/{f}', 0), f) for f in os.listdir('modes') if f.endswith('.png')]
screen = cv2.imread(screenshot_path, 0)
for template, template_file in templates:
res = cv2.matchTemplate(screen, template, cv2.TM_CCOEFF_NORMED)
threshold = 0.8
loc = np.where(res >= threshold)
for pt in zip(*loc[::-1]):
target = (pt[0] + template.shape[1] // 2, pt[1] + template.shape[0] // 2)
template_name = os.path.splitext(template_file)[0]
logger.info(f"发现目标: {template_name} 位置: {target}")
await self.move_and_click_async(target, template_name)
break
async def move_and_click_async(self, target, template_name):
start_pos = self.mouse.position
logger.info(f"鼠标从 {start_pos} 移动到 {target}")
# 1秒内移动鼠标到坐标
self.mouse.position = target
await asyncio.sleep(1)
# 立刻鼠标左键双击坐标位置
self.mouse.click(Button.left, 2)
self.click_count += 1
logger.info(f"双击鼠标左键于 {target} 完成")
# 修改对局次数计数方式
if template_name == "start":
if self.match_start_time is not None:
self.last_match_duration = int(time.time() - self.match_start_time)
minutes, seconds = divmod(self.last_match_duration, 60)
self.last_match_duration_var.set(f"上一次对局时长: {minutes}m {seconds}s")
logger.info(f"上一次对局时长: {minutes}m {seconds}s")
self.match_start_time = time.time()
self.match_count += 1
logger.info(f"发现目标01-Fastart.png,记为一局,总对局次数: {self.match_count}")
logger.info(f"-------------↑↑↑↑↑↑↑↑↑↑-------------分割线 ------------- ")
self.status_var.set(
f"鼠标坐标: {start_pos} | 对局次数: {self.match_count} | 鼠标点击次数: {self.click_count}")
def on_closing(self):
if messagebox.askokcancel("退出", "确定要退出程序吗?"):
self.cleanup()
self.root.destroy()
def cleanup(self):
# 清理截图文件
for filename in os.listdir('.'):
if filename.startswith('Snipaste'):
os.remove(filename)
logger.info("截图文件已清理")
# 结束程序进程
self.running = False
self.loop.call_soon_threadsafe(self.loop.stop)
self.thread.join()
logger.info("程序已结束")
if __name__ == "__main__":
root = tk.Tk()
app = AutoTaskApp(root)
root.mainloop()