吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 1022|回复: 12
上一主题 下一主题
收起左侧

[Windows] txt转批量pdf(用到了阿里巴巴普惠体3.0)

[复制链接]
跳转到指定楼层
楼主
MXDZRB 发表于 2025-4-1 14:21 回帖奖励
[Python] 纯文本查看 复制代码
001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
import tkinter
import tkinter.filedialog
import tkinter.messagebox
import customtkinter
import os
import multiprocessing # Changed from threading
from multiprocessing import Manager, Pool # Use Manager for shared Queue, Pool directly
import queue # Added for queue.Empty exception
# Removed fpdf import
# --- ReportLab Imports ---
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, PageBreak
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.pagesizes import A4
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
# --- End ReportLab Imports ---
from pathlib import Path
import time
import locale
import math
# Removed datetime and timedelta imports
 
FONT_PATH = 'AlibabaPuHuiTi-3/AlibabaPuHuiTi-3-55-Regular/AlibabaPuHuiTi-3-55-Regular.ttf'
FONT_NAME = 'AlibabaPuHuiTiR' # ReportLab font name (can be different)
# Use CPU count for true parallelism, fallback to 4 if None
MAX_CONCURRENT = os.cpu_count() if os.cpu_count() else 4
BUFFER_SIZE = 16 * 1024 * 1024  # 16MB 缓冲区大小
 
customtkinter.set_appearance_mode("System")
customtkinter.set_default_color_theme("blue")
 
# --- Font Registration (Do this once, ideally before multiprocessing starts, but needs path) ---
# It's generally safer to register in the worker process if path issues arise,
# but let's try registering globally first. If it causes issues (e.g., font not found in worker),
# we'll move it into the worker.
try:
    if os.path.exists(FONT_PATH):
        pdfmetrics.registerFont(TTFont(FONT_NAME, FONT_PATH))
    else:
        # This error should ideally be caught before starting the GUI
        print(f"[ERROR] Font file not found during initial registration: {FONT_PATH}")
        # We'll still raise an error in the worker if needed.
except Exception as e:
    print(f"[ERROR] Failed to register font globally: {e}")
 
 
# --- Worker Functions (Top Level for Multiprocessing) ---
def convert_file_worker(txt_path, output_queue):
    """Worker function executed in a separate process using ReportLab."""
    pdf_path = Path(txt_path).with_suffix('.pdf')
    file_basename = os.path.basename(txt_path)
    content = None # Initialize content
 
    try:
        # 尝试读取文件内容和检测编码
        encodings_to_try = ['utf-8', 'gbk', locale.getpreferredencoding(False)]
 
        for enc in encodings_to_try:
            try:
                with open(txt_path, 'rb', buffering=BUFFER_SIZE) as f:
                     raw_content = f.read()
                content = raw_content.decode(enc)
                break
            except UnicodeDecodeError:
                continue
            except Exception as read_err:
                raise Exception(f"读取时出错 ({txt_path}): {read_err}")
 
        if content is None:
            raise Exception(f"无法使用 {encodings_to_try} 解码文件: {file_basename}")
 
        if not content.strip():
            output_queue.put(("status", txt_path, True, "文件为空,已跳过"))
            return
 
        # --- ReportLab PDF Generation ---
        try:
            # Check font registration within worker (more robust)
            # This might be slightly redundant if global registration worked, but safer.
            if FONT_NAME not in pdfmetrics.getRegisteredFontNames():
                 if os.path.exists(FONT_PATH):
                      pdfmetrics.registerFont(TTFont(FONT_NAME, FONT_PATH))
                 else:
                      raise Exception(f"错误:字体文件 '{FONT_PATH}' 在工作进程中未找到!")
 
            doc = SimpleDocTemplate(str(pdf_path), pagesize=A4) # Use str(pdf_path)
            styles = getSampleStyleSheet()
             
            # Create a custom style using the registered font
            custom_style = ParagraphStyle(
                name='CustomStyle',
                parent=styles['Normal'],
                fontName=FONT_NAME,
                fontSize=10,
                leading=14, # Line spacing
                encoding='utf8' # Important for ReportLab with Unicode
            )
 
            # Prepare content for ReportLab Flowables
            # Replace potential problematic characters if needed, handle newlines
            # ReportLab's Paragraph handles HTML-like tags for basic formatting
            # We'll treat each line as a paragraph for simplicity, preserving line breaks.
            # Replace \r\n with \n, then split by \n
            lines = content.replace('\r\n', '\n').split('\n')
            story = []
            for line in lines:
                # Escape HTML tags within the line to prevent misinterpretation
                # escaped_line = line.replace('&', '&').replace('<', '<').replace('>', '>')
                # Or simply use as is if no HTML-like tags are expected in TXT
                para = Paragraph(line, custom_style)
                story.append(para)
                # Add a small spacer for visual separation if desired, or rely on leading
                # story.append(Spacer(1, 2))
 
            # Build the PDF
            doc.build(story)
 
        except Exception as pdf_err:
             raise Exception(f"使用 ReportLab 生成 PDF 时出错 ({file_basename}): {pdf_err}")
        # --- End ReportLab PDF Generation ---
 
        output_queue.put(("status", txt_path, True, f"成功转换为 {os.path.basename(pdf_path)}"))
 
    except Exception as e:
        output_queue.put(("status", txt_path, False, f"处理 {file_basename} 时出错: {e}"))
 
# 将转换进程目标函数移到类外部
def start_conversion_process_target(file_list, queue_ref):
    """Target function for the management process."""
    try:
        with Pool(processes=MAX_CONCURRENT) as pool:
            for txt_file in file_list:
                pool.apply_async(convert_file_worker, args=(txt_file, queue_ref))
            pool.close()
            pool.join()
        queue_ref.put(("done",))
    except Exception as e:
        queue_ref.put(("error", f"转换管理进程出错: {e}"))
 
# --- Main Application Class ---
class App(customtkinter.CTk):
    def __init__(self):
        super().__init__()
 
        self.title("TXT to PDF 批量转换器 (ReportLab)") # Updated title
        self.geometry("700x550")
        self.configure(fg_color=("#f5f5f5", "#2b2b2b"))
 
        self.grid_columnconfigure(0, weight=1)
        self.grid_rowconfigure(3, weight=1)
 
        main_frame = customtkinter.CTkFrame(self, corner_radius=15, fg_color=("white", "#333333"))
        main_frame.grid(row=0, column=0, padx=30, pady=30, sticky="nsew")
        main_frame.grid_columnconfigure(0, weight=1)
        main_frame.grid_columnconfigure(1, weight=0)
        main_frame.grid_rowconfigure(3, weight=1)
 
        title_label = customtkinter.CTkLabel(
            main_frame,
            text="TXT to PDF 批量转换器",
            font=customtkinter.CTkFont(size=24, weight="bold"),
            text_color=("#1a1a1a", "#ffffff")
        )
        title_label.grid(row=0, column=0, columnspan=2, padx=20, pady=(20, 30))
 
        self.select_folder_button = customtkinter.CTkButton(
            main_frame,
            text="选择 TXT 文件",
            command=self.select_folder,
            height=40,
            corner_radius=8,
            font=customtkinter.CTkFont(size=14),
            fg_color=("#2986cc", "#1f6aa5"),
            hover_color=("#246ea6", "#195785")
        )
        self.select_folder_button.grid(row=1, column=0, padx=(30, 5), pady=(0, 15), sticky="ew")
 
        self.start_button = customtkinter.CTkButton(
            main_frame,
            text="开始转换",
            command=self.start_conversion,
            state="disabled",
            height=40,
            corner_radius=8,
            font=customtkinter.CTkFont(size=14),
            fg_color=("#27ae60", "#219653"),
            hover_color=("#219653", "#1e8449")
        )
        self.start_button.grid(row=1, column=1, padx=(5, 30), pady=(0, 15), sticky="ew")
 
        self.folder_path_label = customtkinter.CTkLabel(
            main_frame,
            text="未选择文件",
            font=customtkinter.CTkFont(size=12),
            text_color=("#666666", "#999999"),
            anchor="w"
        )
        self.folder_path_label.grid(row=2, column=0, columnspan=2, padx=30, pady=(0, 15), sticky="ew")
 
        self.status_textbox = customtkinter.CTkTextbox(
            main_frame,
            state="disabled",
            wrap="word",
            height=200,
            corner_radius=8,
            border_width=1,
            border_color=("#e0e0e0", "#404040"),
            fg_color=("#ffffff", "#2b2b2b")
        )
        self.status_textbox.grid(row=3, column=0, columnspan=2, padx=30, pady=(0, 20), sticky="nsew")
 
        self.progressbar = customtkinter.CTkProgressBar(
            main_frame,
            height=15,
            corner_radius=5,
            fg_color=("#f0f0f0", "#333333"),
            progress_color=("#2986cc", "#1f6aa5")
        )
        self.progressbar.grid(row=4, column=0, columnspan=2, padx=30, pady=(0, 10), sticky="ew")
        self.progressbar.set(0)
 
        self.progress_percent_label = customtkinter.CTkLabel(
            main_frame,
            text="0%",
            font=customtkinter.CTkFont(size=12),
            text_color=("#666666", "#999999")
        )
        self.progress_percent_label.grid(row=5, column=0, columnspan=2, padx=30, pady=(0, 20), sticky="w")
 
        self.selected_folder = ""
        self.txt_files = []
        self.manager = None
        self.conversion_queue = None
        self.conversion_process = None
        self.monitor_queue_id = None
 
        self.start_time = None
        self.processed_files = 0
        self.total_files = 0
        self.failed_files = []
 
    def select_folder(self):
        if self.conversion_process and self.conversion_process.is_alive():
            self.log_status("请等待当前转换完成。")
            return
 
        files = tkinter.filedialog.askopenfilenames(filetypes=[("TXT files", "*.txt")])
        if files:
            self.txt_files = list(files)
            self.selected_folder = os.path.dirname(self.txt_files[0])
            self.folder_path_label.configure(text=f"已选择 {len(self.txt_files)} 个文件")
            self.log_status(f"已选择 {len(self.txt_files)} 个 TXT 文件")
            self.start_button.configure(state="normal")
            self.update_progress(value=0)
            self.processed_files = 0
            self.total_files = len(self.txt_files)
            self.failed_files = []
        else:
            if self.txt_files:
                 self.selected_folder = ""
                 self.txt_files = []
                 self.folder_path_label.configure(text="未选择文件")
                 self.start_button.configure(state="disabled")
                 self.log_status("未选择文件。")
                 self.update_progress(value=0)
 
    def log_status(self, message):
        def _update():
            self.status_textbox.configure(state="normal")
            self.status_textbox.insert("end", f"{message}\n")
            self.status_textbox.configure(state="disabled")
            self.status_textbox.see("end")
        self.after(0, _update)
 
    def update_progress(self, value=None):
        def _update():
            if value is not None:
                self.progressbar.set(value)
                percent = int(value * 100)
                self.progress_percent_label.configure(text=f"{percent}%")
        self.after(0, _update)
 
    def process_queue(self):
        try:
            while True:
                message = self.conversion_queue.get_nowait()
                msg_type = message[0]
 
                if msg_type == "status":
                    _, txt_path, success, status_msg = message
                    self.processed_files += 1
                    progress_value = self.processed_files / self.total_files if self.total_files > 0 else 0
                    self.update_progress(value=progress_value)
 
                    log_prefix = "[成功]" if success else "[失败]"
                    self.log_status(f"{log_prefix} {os.path.basename(txt_path)}: {status_msg}")
                    if not success:
                        self.failed_files.append(os.path.basename(txt_path))
 
                elif msg_type == "done":
                    end_time = time.time()
                    duration = end_time - self.start_time if self.start_time else 0
                    self.log_status("-" * 20)
                    self.log_status(f"转换完成!总共处理 {self.processed_files} 个文件,耗时: {duration:.2f} 秒。")
                    if self.failed_files:
                        self.log_status(f"失败 {len(self.failed_files)} 个文件: {', '.join(self.failed_files)}")
                    else:
                        self.log_status("所有文件转换成功!")
                    self.start_button.configure(state="normal")
                    self.select_folder_button.configure(state="normal")
                    self.conversion_process = None
                    self.monitor_queue_id = None
                    self.update_progress(value=1.0)
                    if self.manager:
                        try:
                            self.manager.shutdown()
                        except Exception as e:
                            print(f"Error shutting down manager after completion: {e}")
                        self.manager = None
                    return
 
                elif msg_type == "error":
                    self.log_status(f"[严重错误] {message[1]}")
                    self.start_button.configure(state="normal")
                    self.select_folder_button.configure(state="normal")
                    self.conversion_process = None
                    self.monitor_queue_id = None
                    if self.manager:
                        try:
                            self.manager.shutdown()
                        except Exception as e:
                            print(f"Error shutting down manager on error: {e}")
                        self.manager = None
                    return
 
        except queue.Empty:
            pass
 
        if self.conversion_process and self.conversion_process.is_alive():
             self.monitor_queue_id = self.after(100, self.process_queue)
        elif self.processed_files < self.total_files and self.conversion_process and not self.conversion_process.is_alive():
             self.log_status("[错误] 转换进程意外终止。")
             self.start_button.configure(state="normal")
             self.select_folder_button.configure(state="normal")
             self.conversion_process = None
             self.monitor_queue_id = None
             if self.manager:
                 try:
                     self.manager.shutdown()
                 except Exception as e:
                     print(f"Error shutting down manager on unexpected process exit: {e}")
                 self.manager = None
        elif self.processed_files == self.total_files and self.conversion_queue and not self.conversion_queue.empty():
             self.monitor_queue_id = self.after(100, self.process_queue)
 
 
    def start_conversion(self):
        if not self.txt_files:
            self.log_status("请先选择 TXT 文件。")
            return
 
        if self.conversion_process and self.conversion_process.is_alive():
             self.log_status("转换已经在进行中...")
             return
 
        if self.monitor_queue_id:
            self.after_cancel(self.monitor_queue_id)
            self.monitor_queue_id = None
 
        if self.conversion_process:
            if self.conversion_process.is_alive():
                self.conversion_process.terminate()
                self.conversion_process.join(timeout=0.5)
            self.conversion_process = None
        if self.manager:
             try:
                 self.manager.shutdown()
             except Exception as e:
                 print(f"Error shutting down old manager: {e}")
             self.manager = None
 
        self.manager = Manager()
        self.conversion_queue = self.manager.Queue()
 
        self.start_button.configure(state="disabled")
        self.select_folder_button.configure(state="disabled")
        self.status_textbox.configure(state="normal")
        self.status_textbox.delete("1.0", "end")
        self.status_textbox.configure(state="disabled")
 
        self.update_progress(value=0)
        self.processed_files = 0
        self.total_files = len(self.txt_files)
        self.failed_files = []
 
        self.log_status(f"开始使用 {MAX_CONCURRENT} 个进程转换 {self.total_files} 个文件 (使用 ReportLab)...") # Updated log
        self.start_time = time.time()
 
        self.conversion_process = multiprocessing.Process(
            target=start_conversion_process_target,
            args=(self.txt_files, self.conversion_queue),
            daemon=False
        )
        self.conversion_process.start()
 
        self.monitor_queue_id = self.after(100, self.process_queue)
 
    def on_closing(self):
        if self.conversion_process and self.conversion_process.is_alive():
            if tkinter.messagebox.askyesno("退出", "转换仍在进行中,确定要退出吗?"):
                print("Terminating conversion process...")
                self.conversion_process.terminate()
                self.conversion_process.join(timeout=0.5)
                if self.manager:
                    try:
                        self.manager.shutdown()
                    except Exception as e:
                        print(f"Error shutting down manager during termination: {e}")
                self.destroy()
            else:
                return
        else:
            if self.manager:
                 try:
                     self.manager.shutdown()
                 except Exception as e:
                     print(f"Error shutting down manager on normal close: {e}")
            self.destroy()
 
if __name__ == "__main__":
    multiprocessing.freeze_support()
 
    # Font check now includes trying to register it early
    font_ok = False
    if os.path.exists(FONT_PATH):
        try:
            pdfmetrics.registerFont(TTFont(FONT_NAME, FONT_PATH))
            font_ok = True
            print(f"Font '{FONT_NAME}' registered successfully from {FONT_PATH}")
        except Exception as e:
            print(f"[ERROR] Failed to register font '{FONT_NAME}' from {FONT_PATH}: {e}")
    else:
        print(f"[ERROR] Font file not found: {FONT_PATH}")
 
    if not font_ok:
        root = tkinter.Tk()
        root.withdraw()
        tkinter.messagebox.showerror("字体错误", f"错误:无法加载或注册字体文件 '{FONT_PATH}'! 请确保文件存在且有效。")
        root.destroy()
        exit()
 
    try:
        app = App()
        app.protocol("WM_DELETE_WINDOW", app.on_closing)
        app.mainloop()
    except Exception as e:
        print(f"Application failed to start: {e}")
        root = tkinter.Tk()
        root.withdraw()
        tkinter.messagebox.showerror("启动错误", f"无法启动应用程序: {e}")
        root.destroy()

image.png (179.16 KB, 下载次数: 2)

image.png

免费评分

参与人数 1吾爱币 +1 热心值 +1 收起 理由
c199188177c + 1 + 1 谢谢@Thanks!

查看全部评分

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

推荐
夸克逃逸 发表于 2025-4-1 14:54
感谢分享。建议增加一些核心代码的分析,增加一些实际效果结果,说一下实际的应用场景,而不是一段代码。
3#
shroer 发表于 2025-4-1 14:44
4#
sktao 发表于 2025-4-1 17:18
5#
hs99 发表于 2025-4-1 19:21
支持分享
6#
hierme 发表于 2025-4-1 19:44
感谢楼主的分享
7#
touristxxp 发表于 2025-4-1 20:05

感谢分享!
8#
hnzx 发表于 2025-4-1 21:06


感谢分享!
9#
1230tjh 发表于 2025-4-2 07:37
很实用的工具
10#
yycc1818 发表于 2025-4-2 08:17
很实用,感谢分享
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2025-4-7 02:55

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表