[Python] 纯文本查看 复制代码
# -*- coding:utf-8 -*-
import os
import openpyxl
import PySimpleGUI as sg
import xlrd
from docx import Document
import time
import threading
"""
安装依赖 xlrd PySimpleGUI python-docx openpyxl
"""
def get_headers_from_xlsx(worksheet):
headers = []
for column in worksheet.iter_cols(1, worksheet.max_column):
if column[0].value and len(str(column[0].value).strip()) > 0:
headers.append(str(column[0].value).strip())
else:
headers.append('')
return headers
def get_headers_from_xls(worksheet):
num_cols = worksheet.ncols
headers = []
for curr_col in range(num_cols):
cell = worksheet.cell_value(0, curr_col)
if cell and len(str(cell).strip()) > 0:
headers.append(str(cell).strip())
else:
headers.append('')
return headers
def read_data_form_xlsx_file(excelFile):
# 打开一个workbook
try:
workbook = openpyxl.load_workbook(excelFile)
except FileNotFoundError as e:
# print(e)
print(f"当前文件不存在【{excelFile}】,程序自动退出!\n")
return ([], [])
except Exception as e:
print(e)
print(f"打开当前文件报错【{excelFile}】,程序自动退出!\n")
return ([], [])
# 抓取所有sheet页的名称
sheetnames = workbook.sheetnames
worksheets = workbook.worksheets
print(f"读取表名为:{sheetnames}")
if not worksheets or len(worksheets) == 0:
return ([], [])
worksheet = worksheets[0]
# 遍历sheet1中所有行row
num_rows = worksheet.max_row
num_cols = worksheet.max_column
print(f"当前excel共 {num_rows}行 {num_cols}列")
if num_rows <= 0 or num_cols <= 0:
return ([], [])
headers = get_headers_from_xlsx(worksheet)
print(f"开始读取表头:")
if len(headers) == 0:
return ([], [])
print(f"{headers}\n")
orderExpDatas = []
# 遍历sheet1中所有单元格cell 第一行是标题
for row in worksheet.iter_rows(2, worksheet.max_row):
orderExpData = {}
for coln in range(worksheet.max_column):
if len(headers[coln]) > 0:
cell = row[coln]
if not cell or not cell.value or str(cell.value).strip() == "":
# print(f"第 {rown}行 {coln}列的 {headers[coln]} 为空")
orderExpData[headers[coln]] = ""
else:
orderExpData[headers[coln]] = str(cell.value).strip()
orderExpDatas.append(orderExpData)
return (headers, orderExpDatas)
def read_data_form_xls_file(excelFile):
# 打开一个workbook
try:
workbook = xlrd.open_workbook(excelFile)
except FileNotFoundError as e:
# print(e)
print(f"当前文件不存在【{excelFile}】,程序自动退出!\n")
return ([], [])
except Exception as e:
print(e)
print(f"打开当前文件报错【{excelFile}】,程序自动退出!\n")
return ([], [])
# 抓取所有sheet页的名称
sheetnames = workbook.sheet_names()
worksheets = workbook.sheets()
print(f"读取表名为:{sheetnames}")
if not worksheets or len(worksheets) == 0:
return ([], [])
worksheet = worksheets[0]
# 遍历sheet1中所有行row
num_rows = worksheet.nrows
num_cols = worksheet.ncols
print(f"当前excel共 {num_rows}行 {num_cols}列")
if num_rows <= 0 or num_cols <= 0:
return ([], [])
headers = get_headers_from_xls(worksheet)
print(f"开始读取表头:")
if len(headers) == 0:
return ([], [])
print(f"{headers}\n")
orderExpDatas = []
# 遍历sheet1中所有单元格cell
for rown in range(num_rows):
if rown == 0:
continue
orderExpData = {}
for coln in range(len(headers)):
if len(headers[coln]) > 0:
cell = worksheet.cell_value(rown, coln)
if not cell or str(cell).strip() == "":
# print(f"第 {rown}行 {coln}列的 {headers[coln]} 为空")
orderExpData[headers[coln]] = ""
else:
orderExpData[headers[coln]] = str(cell).strip()
orderExpDatas.append(orderExpData)
return (headers, orderExpDatas)
def read_data_form_file(excelFile):
isXlsx = excelFile.strip().lower().endswith('xlsx')
return read_data_form_xlsx_file(excelFile) if isXlsx else read_data_form_xls_file(excelFile)
def replace_txt_by_data(text, restr, header_keys, header_datas):
if text:
for key in header_keys:
if len(key) <= 0:
continue
regTxt = restr % (key)
text = text.replace(regTxt, header_datas[key])
return text
def replace_all_tables(tables, restr, header_keys, header_datas):
if not tables or len(tables) == 0:
return
for tb in tables:
for row in tb.rows:
for cell in row.cells:
if cell.text:
cell.text = replace_txt_by_data(cell.text, restr, header_keys, header_datas)
def write_word_file_from_datas(word_file, header_keys, header_datas, output_path):
file_name = header_datas[header_keys[0]] if header_keys[0] else None
default_file_name_key = '文件名'
output_path_file = None
if default_file_name_key in header_datas and len(header_datas[default_file_name_key].strip()) > 0:
file_name = header_datas[default_file_name_key].strip()
if file_name and len(file_name) > 0:
doc = Document(word_file)
restr = '{%s}'
replace_all_tables(doc.tables, restr, header_keys, header_datas)
for p in doc.paragraphs:
if p and p.text:
p.text = replace_txt_by_data(p.text, restr, header_keys, header_datas)
if not os.path.exists(output_path):
os.makedirs(output_path, exist_ok=True)
output_path_file = os.path.join(output_path, '%s.docx' % file_name)
doc.save(output_path_file)
print('生成文件:%s' % output_path_file)
else:
print('文件名为空,跳过当前行')
return output_path_file
def start_to_convert(word_file, excel_file, output_path):
excel_headers, excel_datas = read_data_form_file(excel_file)
if not excel_headers or len(excel_headers) == 0:
print("excel表头信息为空,程序自动退出\n")
return
if not excel_datas or len(excel_datas) == 0:
print("excel数据为空,程序自动退出\n")
return
time_str = time.strftime("%d-%H-%M-%S")
output_path = os.path.join(output_path, 'docx', time_str)
# os.makedirs(output_path, exist_ok=True)
output_path_file_set = set()
for data in excel_datas:
output_path_file = write_word_file_from_datas(word_file, excel_headers, data, output_path)
if output_path_file:
output_path_file_set.add(output_path_file)
print("共生成 %s 个文件" % (len(output_path_file_set)))
print("\n")
class MyThread(threading.Thread):
def __init__(self, word_file, excel_file, output_path):
super(MyThread, self).__init__() # 重构run函数必须要写
self.daemon = True
self.word_file = word_file
self.excel_file = excel_file
self.output_path = output_path
self.running = False
self.complete = False
self.__lock = threading.Lock()
def run(self):
try:
with self.__lock:
if not self.running:
self.running = True
start_to_convert(self.word_file, self.excel_file, self.output_path)
self.running = False
self.complete = True
except Exception as e:
self.running = False
self.complete = True
print(e)
print(f"执行失败:\n")
def get_running(self):
return self.running
def get_complete(self):
return self.complete
def main_launcher():
sg.ChangeLookAndFeel('LightGreen')
layout = [
[sg.In(key='_wordfile_', size=(75, 1), enable_events=True, readonly=True), sg.FileBrowse("选择Word模板", font='Courier 12', size=(13, 1), file_types=(("Word File", "*.docx"),))],
[sg.In(key='_excelfile_', size=(75, 1), enable_events=True, readonly=True), sg.FileBrowse("选择Excel", font='Courier 12', size=(13, 1), file_types=(("Excel File", "*.xls"), ("Excel File", "*.xlsx"),))],
[sg.In(key='_outputfile_', size=(75, 1), enable_events=True, readonly=True), sg.FolderBrowse("输出Word目录", font='Courier 12', size=(13, 1))],
[sg.Frame('日 志', font='Any 15',layout= [[sg.Output(size=(85, 35), font='Courier 12')]])],
[sg.ReadFormButton('执 行', font='Courier 12', key='_start_', bind_return_key=True),
sg.SimpleButton('退 出', font='Courier 12', key='_quit_', button_color=('white','firebrick3')),
sg.T('excel不能合并单元格,以及表头必须是第一行【.docx/.xls/.xlsx】', size=(60, 1), font='Courier 12', text_color='red', justification='left')]
]
window = sg.Window('小小程序',
auto_size_text=False,
auto_size_buttons=False,
default_element_size=(20,1,),
text_justification='right'
)
window.Layout(layout)
print_one = False
mythread = None
# ---===--- Loop taking in user input --- #
while True:
(button, values) = window.Read(timeout=100)
if not print_one:
print_one = True
print('****************** 说明 ******************')
print('1.excel不能合并单元格')
print('2.excel表头必须是第一行,并且不能为空')
print('3.excel中第一列的数据作为导出word文件名,或者在表头定义【文件名】列')
print('4.word只支持.docx,excel只支持.xls/.xlsx 格式')
print('5.word模板占位符格式:{xxx},xxx:是excel中表头列的名字')
print('\n\n\n\n')
if button in ('_quit_', None):
break # exit button clicked
word_file = values['_wordfile_']
excel_file = values['_excelfile_']
output_path = values['_outputfile_']
def check_word():
if not word_file or len(str(word_file).strip()) == 0 or not str(word_file).strip().lower().endswith(".docx"):
window['_wordfile_'].update(value='')
sg.PopupAutoClose("请选择以【.docx】后缀的文件", title="", no_titlebar=False, modal=False, font='Courier 12')
return False
return True
def check_excel():
if not excel_file or len(str(excel_file).strip()) == 0 or not (str(excel_file).strip().lower().endswith(".xls") or str(excel_file).strip().lower().endswith(".xlsx")):
window['_excelfile_'].update(value='')
sg.PopupAutoClose("请选择以【.xls|.xlsx】后缀的文件", title="", no_titlebar=False, modal=False, font='Courier 12')
return False
return True
def check_out_path():
if not output_path:
sg.PopupAutoClose("请选择输出目录", title="", no_titlebar=False, modal=False, font='Courier 12')
return False
return True
if button == '_wordfile_':
if not check_word():
continue
if button == '_excelfile_':
if not check_excel():
continue
start_btn = '_start_'
if button == start_btn:
window[start_btn].update(disabled=True)
if not check_word() or not check_excel() or not check_out_path():
window[start_btn].update(disabled=False)
continue
print("******** 开始执行 ********")
try:
if not mythread:
mythread = MyThread(word_file, excel_file, output_path)
mythread.start()
else:
if not mythread.get_complete():
if mythread.get_running():
print("程序运行中")
else:
mythread.start()
else:
mythread = MyThread(word_file, excel_file, output_path)
mythread.start()
except Exception as e:
print(f"执行失败:")
print(e)
window[start_btn].update(disabled=False)
try:
if mythread and mythread.get_complete():
mythread = None
print("******** 执行完成 ********\n\n\n\n\n")
window[start_btn].update(disabled=False)
except Exception as e:
print(f"执行失败:")
print(e)
window[start_btn].update(disabled=False)
if __name__ == '__main__':
main_launcher()