[C] 纯文本查看 复制代码 import os
from PIL import Image
from concurrent.futures import ThreadPoolExecutor
import multiprocessing
def is_image_file(file):
try:
with Image.open(file) as img:
return img.format in ['JPEG', 'PNG', 'BMP', 'GIF', 'TIFF']
except IOError:
return False
def compress_image(input_file, output_file):
# 打开图像文件
with Image.open(input_file) as img:
# 获取图像的格式
file_format = img.format
try:
# 保存压缩后的图像
img.save(output_file, format=file_format, optimize=True)
except OSError:
# 如果遇到缺少 EXIF 信息的情况,重新保存图像
img.save(output_file, format=file_format, optimize=True, exif=b'')
print(f"图像已成功压缩: {output_file}")
def compress_images_in_folders(thread_count):
# 获取用户输入的文件夹路径
folder_path = input("请输入要压缩图像文件所在的文件夹路径:")
# 遍历文件夹,找到所有图像文件
image_files = []
for root, _, files in os.walk(folder_path):
for file in files:
input_file = os.path.join(root, file)
# 检查文件扩展名
_, ext = os.path.splitext(input_file)
if ext.lower() in ['.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tiff']:
if is_image_file(input_file):
image_files.append(input_file)
# 使用线程池进行图像压缩
with ThreadPoolExecutor(max_workers=thread_count) as executor:
for input_file in image_files:
# 生成压缩后的文件名
output_file = os.path.splitext(input_file)[0] + "_compressed" + os.path.splitext(input_file)[1]
# 检查是否已存在同名文件
if os.path.exists(output_file):
i = 1
while os.path.exists(output_file):
output_file = os.path.splitext(input_file)[0] + f"_compressed_{i}" + os.path.splitext(input_file)[1]
i += 1
executor.submit(compress_image, input_file, output_file)
if __name__ == "__main__":
thread_count = multiprocessing.cpu_count()
print(f"使用 {thread_count} 个线程进行图像压缩")
compress_images_in_folders(thread_count)
更新一版手动选择路径并不替换源文件 |