[Asm] 纯文本查看 复制代码
import os
import ttkbootstrap as ttk
from ttkbootstrap.constants import *
from tkinter import filedialog, messagebox, END, Text, StringVar, IntVar, Listbox, CENTER, Toplevel
from concurrent.futures import ThreadPoolExecutor, as_completed
import subprocess
import threading
import psutil
import re
import sys
import webbrowser
from PIL import Image, ImageTk
def resource_path(relative_path):
""" 获取资源的绝对路径,适用于开发和PyInstaller """
try:
base_path = sys._MEIPASS
except Exception:
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
class VideoProcessor:
def __init__(self, master):
self.master = master
self.master.title("视频处理器 v1.2《作者:是貔貅呀》")
self.master.resizable(False, False) # Disable resizing of the main window
self.input_files = []
self.mask_files = []
self.output_folder = ""
self.process_thread = None
self.pause_event = threading.Event()
self.pause_event.set() # 初始状态为未暂停
self.ffmpeg_processes = [] # 跟踪所有ffmpeg进程的列表
self.is_closing = False
self.thread_count = IntVar(value=1) # 默认线程数为1
self.create_widgets()
self.master.protocol("WM_DELETE_WINDOW", self.on_closing)
def create_widgets(self):
main_frame = ttk.Frame(self.master, padding=10)
main_frame.pack(fill=BOTH, expand=YES)
left_frame = ttk.Frame(main_frame)
left_frame.pack(side=LEFT, fill=BOTH, expand=YES)
right_frame = ttk.Frame(main_frame, width=190, height=474, padding=10)
right_frame.pack(side=RIGHT, fill=Y)
self.file_list_frame = ttk.Frame(left_frame)
self.file_list_frame.pack(fill=BOTH, expand=YES, pady=5)
columns = ('序号', '视频文件名', '蒙版文件名', '进度')
self.file_tree = ttk.Treeview(self.file_list_frame, columns=columns, show='headings')
self.file_tree.heading('序号', text='序号')
self.file_tree.heading('视频文件名', text='视频文件名')
self.file_tree.heading('蒙版文件名', text='蒙版文件名')
self.file_tree.heading('进度', text='进度')
self.file_tree.column('序号', width=50, anchor='center')
self.file_tree.column('视频文件名', width=200, anchor='w')
self.file_tree.column('蒙版文件名', width=200, anchor='w')
self.file_tree.column('进度', width=100, anchor='center')
self.file_tree.pack(side=LEFT, fill=BOTH, expand=YES)
scrollbar = ttk.Scrollbar(self.file_list_frame, orient="vertical", command=self.file_tree.yview)
self.file_tree.configure(yscrollcommand=scrollbar.set)
scrollbar.pack(side=RIGHT, fill=Y)
self.log_text = Text(left_frame, height=5, state='disabled')
self.log_text.pack(fill=BOTH, expand=YES, pady=5)
button_frame = ttk.Frame(left_frame)
button_frame.pack(fill=BOTH, expand=YES, pady=5)
self.add_files_button = ttk.Button(button_frame, text="导入视频文件夹", command=self.add_files, bootstyle=PRIMARY)
self.add_files_button.pack(side=LEFT, padx=10, pady=10)
self.add_masks_button = ttk.Button(button_frame, text="导入蒙版文件夹", command=self.add_masks, bootstyle=PRIMARY)
self.add_masks_button.pack(side=LEFT, padx=10, pady=10)
self.set_output_folder_button = ttk.Button(button_frame, text="导出文件", command=self.set_output_folder, bootstyle=SUCCESS)
self.set_output_folder_button.pack(side=LEFT, padx=10, pady=10)
self.update_button = ttk.Button(button_frame, text="不想出错最好点这", command=self.show_update_window, bootstyle=INFO)
self.update_button.pack(side=LEFT, padx=10, pady=10)
self.process_button = ttk.Button(button_frame, text="开始处理文件", command=self.start_processing, bootstyle=INFO)
self.process_button.pack(side=RIGHT, padx=10, pady=10)
config_frame = ttk.LabelFrame(left_frame, text="配置")
config_frame.pack(fill=BOTH, expand=YES, pady=5)
ttk.Label(config_frame, text="线程数:").pack(side=LEFT, padx=5, pady=5)
ttk.Entry(config_frame, textvariable=self.thread_count).pack(side=LEFT, padx=5, pady=5)
self.preview_button = ttk.Button(config_frame, text="预览合并效果", command=self.preview_merge, bootstyle=INFO)
self.preview_button.pack(side=LEFT, padx=10, pady=10)
self.remove_video_button = ttk.Button(config_frame, text="删除视频文件", command=self.remove_videos, bootstyle=DANGER)
self.remove_video_button.pack(side=LEFT, padx=10, pady=10)
self.remove_mask_button = ttk.Button(config_frame, text="删除蒙版文件", command=self.remove_masks, bootstyle=DANGER)
self.remove_mask_button.pack(side=LEFT, padx=10, pady=10)
self.pause_button = ttk.Button(config_frame, text="暂停/继续", command=self.toggle_pause, bootstyle=WARNING)
self.pause_button.pack(side=LEFT, padx=10, pady=10)
self.open_output_folder_button = ttk.Button(config_frame, text="打开文件", command=self.open_output_folder, bootstyle=SUCCESS)
self.open_output_folder_button.pack(side=LEFT, padx=10, pady=10)
# 预览显示区
self.preview_frame = ttk.LabelFrame(right_frame, text="预览", width=190, height=474)
self.preview_frame.pack_propagate(False) # 防止frame自动调整大小
self.preview_frame.pack(fill=BOTH, expand=YES, pady=5)
self.preview_label = ttk.Label(self.preview_frame, anchor=CENTER)
self.preview_label.pack(expand=YES)
self.master.after(1000, self.auto_select_and_preview) # 延迟一秒执行自动选择和预览
def show_update_window(self):
update_window = Toplevel(self.master)
update_window.title("更新")
update_window.geometry("400x300")
update_window.resizable(False, False) # Disable resizing of the update window
frame = ttk.Frame(update_window)
frame.pack(fill=BOTH, expand=True)
donation_label = ttk.Label(frame, text=" 1.规定蒙版大小为高2206=宽1242\n \n 2.视频大小为高1080*宽1920或者高720*宽1280\n \n 3.做图上半部分的大小为高755*宽1242\n 下半部分的大小为高756*宽1242\n 最好中间保留695\n 均为像素而不是厘米\n \n 4.必须是偶数的宽度0,2,4,6,8这样的\n 前提是自己修改蒙版哈\n \n 5.程序处理视频中别删除已添加的视频\n 因为就算删除了也会被视为有视频所以看好来哈", justify=LEFT)
donation_label.pack(pady=10, padx=10)
qr_code_label = ttk.Label(frame, text="是貔貅呀")
qr_code_label.pack()
def add_files(self):
folder = filedialog.askdirectory(title="选择视频文件夹")
if folder:
video_files = [f for f in os.listdir(folder) if f.endswith(('.mp4', '.avi', '.mov'))]
for video_file in video_files:
video_path = os.path.join(folder, video_file)
if video_path not in self.input_files:
self.input_files.append({'path': video_path, 'status': '未处理'})
file_name = os.path.basename(video_path)
self.file_tree.insert('', END, values=(len(self.input_files), file_name, "", "未处理"))
self.log(f"导入视频文件: {video_path}")
else:
messagebox.showwarning("警告", f"文件已存在: {os.path.basename(video_path)}")
def add_masks(self):
folder = filedialog.askdirectory(title="选择蒙版文件夹")
if folder:
mask_files = [f for f in os.listdir(folder) if f.endswith(('.png', '.jpg', '.jpeg'))]
for mask_file in mask_files:
mask_path = os.path.join(folder, mask_file)
if mask_path not in self.mask_files:
self.mask_files.append(mask_path)
self.log(f"导入蒙版文件: {mask_path}")
# 更新界面中的蒙版文件名
for i in range(len(self.file_tree.get_children())):
video_file = os.path.basename(self.input_files[i]['path'])
if os.path.splitext(video_file)[0] == os.path.splitext(os.path.basename(mask_file))[0]:
self.file_tree.set(self.file_tree.get_children()[i], column='蒙版文件名', value=os.path.basename(mask_file))
def remove_videos(self):
if self.process_thread and self.process_thread.is_alive():
messagebox.showwarning("警告", "处理进行中,无法删除视频文件。")
return
selected_items = self.file_tree.selection()
for item in selected_items:
video_name = self.file_tree.item(item, 'values')[1]
if video_name:
video_index = next((i for i, f in enumerate(self.input_files) if os.path.basename(f['path']) == video_name), None)
if video_index is not None and self.input_files[video_index]['status'] == '未处理':
del self.input_files[video_index]
self.file_tree.delete(item)
self.log("删除选中的视频文件")
self.refresh_file_list()
def remove_masks(self):
if self.process_thread and self.process_thread.is_alive():
messagebox.showwarning("警告", "处理进行中,无法删除蒙版文件。")
return
selected_items = self.file_tree.selection()
for item in selected_items:
mask_name = self.file_tree.item(item, 'values')[2]
if mask_name:
mask_path = next((f for f in self.mask_files if os.path.basename(f) == mask_name), None)
if mask_path:
self.mask_files.remove(mask_path)
self.file_tree.delete(item)
self.log("删除选中的蒙版文件")
self.refresh_file_list()
def refresh_file_list(self):
for item in self.file_tree.get_children():
self.file_tree.delete(item)
for index, file_info in enumerate(self.input_files):
file = file_info['path']
mask_file = ""
for mask in self.mask_files:
if os.path.splitext(os.path.basename(mask))[0] == os.path.splitext(os.path.basename(file))[0]:
mask_file = os.path.basename(mask)
break
self.file_tree.insert('', END, values=(index + 1, os.path.basename(file), mask_file, file_info['status']))
def set_output_folder(self):
self.output_folder = filedialog.askdirectory(title="选择输出文件夹")
self.log(f"设置输出文件夹: {self.output_folder}")
def start_processing(self):
if not self.input_files or not self.output_folder:
messagebox.showerror("错误", "请添加文件并设置输出文件夹。")
return
for file_info in self.input_files:
file_info['status'] = '等待处理'
self.refresh_file_list()
self.process_thread = threading.Thread(target=self.process_videos_concurrently)
self.process_thread.start()
def toggle_pause(self):
if self.pause_event.is_set():
self.pause_event.clear()
self.log("处理暂停")
for process in self.ffmpeg_processes:
try:
proc = psutil.Process(process.pid)
proc.suspend()
except psutil.NoSuchProcess:
continue
else:
self.pause_event.set()
self.log("处理继续")
for process in self.ffmpeg_processes:
try:
proc = psutil.Process(process.pid)
proc.resume()
except psutil.NoSuchProcess:
continue
def open_output_folder(self):
if self.output_folder:
os.startfile(self.output_folder)
self.log(f"打开输出文件夹: {self.output_folder}")
else:
messagebox.showerror("错误", "请先设置输出文件夹。")
def log(self, message):
if not self.is_closing:
self.master.after(0, self._log, message)
def _log(self, message):
if not self.is_closing:
self.log_text.configure(state='normal')
self.log_text.insert(END, message + '\n')
self.log_text.configure(state='disabled')
self.log_text.yview(END)
def update_tree_status(self, index, status):
if not self.is_closing:
self.master.after(0, self._update_tree_status, index, status)
def _update_tree_status(self, index, status):
if not self.is_closing:
self.input_files[index]['status'] = status
self.file_tree.item(self.file_tree.get_children()[index], values=(index + 1, os.path.basename(self.input_files[index]['path']), self.file_tree.set(self.file_tree.get_children()[index], '蒙版文件名'), status))
def process_videos_concurrently(self):
max_workers = self.thread_count.get()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(self.process_video, index, input_file['path']) for index, input_file in enumerate(self.input_files)]
for future in as_completed(futures):
future.result()
def process_video(self, index, input_file):
ffmpeg_path = resource_path(os.path.join("ffmpeg_folder", "ffmpeg"))
video_name = os.path.basename(input_file)
folder_name = os.path.splitext(video_name)[0]
output_file = os.path.join(self.output_folder, f"processed_{video_name}.mp4")
if os.path.exists(output_file):
overwrite = messagebox.askyesno("文件已存在", f"{output_file} 已存在,是否覆盖?")
if not overwrite:
self.update_tree_status(index, "跳过")
return
# 查找匹配的蒙版文件
mask_file = None
for mask in self.mask_files:
if os.path.splitext(os.path.basename(mask))[0] == folder_name:
mask_file = mask
break
if not mask_file:
self.log(f"未找到匹配的蒙版文件: {folder_name}")
self.update_tree_status(index, "未找到蒙版")
return
# 获取边框图片的尺寸
border_img = Image.open(mask_file)
border_width, border_height = border_img.size
# 检查宽度是否是偶数
if border_width % 2 != 0:
self.log(f"蒙版文件的宽度不是偶数: {border_width} (文件: {mask_file})")
border_width += 1 # 调整为偶数宽度
# FFmpeg 命令
cmd = [
ffmpeg_path,
"-y", # 自动覆盖输出文件
"-i", input_file,
"-i", mask_file,
"-filter_complex",
f"[0:v]scale={border_width}:{border_height}:force_original_aspect_ratio=decrease[scaled];[1:v][scaled]overlay=(main_w-overlay_w)/2:(main_h-overlay_h)/2",
"-c:v", "libx264",
"-c:a", "aac",
output_file
]
self.log(f"开始处理: {video_name}")
self.update_tree_status(index, "处理中")
try:
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
process = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=True, encoding='utf-8', startupinfo=startupinfo)
self.ffmpeg_processes.append(process)
stderr_output = []
for line in process.stderr:
stderr_output.append(line)
if self.is_closing:
break
progress = self.parse_progress(line)
if progress:
self.update_tree_status(index, progress)
stdout, stderr = process.communicate()
if process.returncode != 0:
if "width not divisible by 2" in ''.join(stderr_output):
self.log(f"处理文件时出错: {video_name} - 蒙版文件的宽度不是偶数")
else:
self.log(f"处理文件时出错: {video_name} - FFmpeg处理失败,返回码非零。")
self.update_tree_status(index, "处理失败")
return
# 检查输出文件是否有效
if not os.path.exists(output_file) or os.path.getsize(output_file) == 0:
self.log(f"处理文件时出错: {video_name} - 输出文件无效或大小为0")
self.update_tree_status(index, "处理失败")
return
except Exception as e:
self.log(f"处理文件时出错: {video_name} - {str(e)}")
self.update_tree_status(index, "处理失败")
process = None
self.current_index = None
return
if self.is_closing:
self.update_tree_status(index, "未完成")
else:
self.log(f"完成处理: {video_name}")
self.update_tree_status(index, "已完成")
self.ffmpeg_processes.remove(process)
def parse_progress(self, line):
match = re.search(r'time=(\d+:\d+:\d+\.\d+)', line)
if match:
return f"进度: {match.group(1)}"
return None
def on_closing(self):
self.is_closing = True
for process in self.ffmpeg_processes:
try:
proc = psutil.Process(process.pid)
if proc.is_running():
proc.terminate()
except psutil.NoSuchProcess:
continue
self.master.destroy()
def preview_merge(self):
selected_items = self.file_tree.selection()
if not selected_items:
messagebox.showerror("错误", "请先选择一个视频文件进行预览。")
return
item = selected_items[0]
index = int(self.file_tree.item(item, 'values')[0]) - 1
input_file = self.input_files[index]['path']
mask_file = self.get_matching_mask(input_file)
if not mask_file:
messagebox.showerror("错误", "未找到匹配的蒙版文件。")
return
preview_file = "preview.jpg"
self.log("正在预览...")
self.generate_preview_frame(input_file, mask_file, preview_file)
self.show_preview_image(preview_file)
self.log("预览完成,请查看。")
def get_matching_mask(self, input_file):
video_name = os.path.basename(input_file)
folder_name = os.path.splitext(video_name)[0]
for mask in self.mask_files:
if os.path.splitext(os.path.basename(mask))[0] == folder_name:
return mask
return None
def generate_preview_frame(self, input_file, mask_file, output_file):
ffmpeg_path = resource_path(os.path.join("ffmpeg_folder", "ffmpeg"))
# 获取边框图片的尺寸
border_img = Image.open(mask_file)
border_width, border_height = border_img.size
# 检查宽度是否是偶数
if border_width % 2 != 0:
self.log(f"蒙版文件的宽度不是偶数: {border_width} (文件: {mask_file})")
border_width += 1 # 调整为偶数宽度
cmd = [
ffmpeg_path,
"-i", input_file,
"-i", mask_file,
"-filter_complex",
f"[0:v]scale={border_width}:{border_height}:force_original_aspect_ratio=decrease[scaled];[1:v][scaled]overlay=(main_w-overlay_w)/2:(main_h-overlay_h)/2",
"-frames:v", "1",
output_file
]
try:
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
subprocess.run(cmd, startupinfo=startupinfo)
except Exception as e:
self.log(f"生成预览帧时出错: {str(e)}")
def show_preview_image(self, image_file):
try:
img = Image.open(image_file)
img.thumbnail((190, 474))
img = ImageTk.PhotoImage(img)
self.preview_label.configure(image=img)
self.preview_label.image = img # Keep a reference to avoid garbage collection
# 删除预览文件
os.remove(image_file)
except Exception as e:
self.log(f"显示预览图片时出错: {str(e)}")
def auto_select_and_preview(self):
for i in range(len(self.input_files)):
video_file = os.path.basename(self.input_files[i]['path'])
folder_name = os.path.splitext(video_file)[0]
for mask_file in self.mask_files:
if os.path.splitext(os.path.basename(mask_file))[0] == folder_name:
self.file_tree.selection_set(self.file_tree.get_children()[i])
self.preview_merge()
return
if __name__ == "__main__":
try:
root = ttk.Window(themename="superhero")
app = VideoProcessor(master=root)
root.mainloop()
except Exception as e:
print(f"Error: {e}")