import
os
import
sys
import
json
import
re
import
logging
from
time
import
sleep
import
ssl
import
warnings
warnings.filterwarnings(
"ignore"
, message
=
"Unverified HTTPS request"
)
try
:
_create_unverified_https_context
=
ssl._create_unverified_context
except
AttributeError:
pass
else
:
ssl._create_default_https_context
=
_create_unverified_https_context
print
(
"正在初始化程序..."
, flush
=
True
)
requests
=
None
try
:
print
(
"正在加载requests模块..."
, flush
=
True
)
import
requests
from
requests.packages.urllib3.exceptions
import
InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
except
ImportError:
print
(
"错误: 未能导入'requests'模块。"
)
print
(
"请使用以下命令安装它: pip install requests"
)
print
(
"或者运行批处理文件'run_ocr_rename.bat',它会自动安装必要的依赖。"
)
sys.exit(
1
)
except
Exception as e:
print
(f
"警告: 加载requests模块时出现问题: {e}"
)
print
(
"尝试使用替代方法..."
, flush
=
True
)
try
:
import
os
os.environ[
'PYTHONHTTPSVERIFY'
]
=
'0'
import
requests
requests.packages.urllib3.disable_warnings()
except
Exception as e2:
print
(f
"错误: 无法加载requests模块: {e2}"
)
print
(
"请尝试重新安装requests模块: pip install --upgrade requests"
)
sys.exit(
1
)
if
requests:
requests.packages.urllib3.disable_warnings()
session
=
requests.Session()
session.verify
=
False
try
:
import
imghdr
HAS_IMGHDR
=
True
except
ImportError:
print
(
"注意: 未能导入'imghdr'模块。将使用基于文件扩展名的图片检测方法。"
)
HAS_IMGHDR
=
False
try
:
logging.basicConfig(
level
=
logging.INFO,
format
=
'%(asctime)s - %(levelname)s - %(message)s'
,
handlers
=
[
logging.FileHandler(
"ocr_rename.log"
, encoding
=
'utf-8'
),
logging.StreamHandler()
]
)
logger
=
logging.getLogger(__name__)
except
Exception as e:
print
(f
"警告: 无法配置日志系统: {e}"
)
class
SimpleLogger:
def
info(
self
, msg):
print
(f
"[信息] {msg}"
)
def
error(
self
, msg):
print
(f
"[错误] {msg}"
)
def
warning(
self
, msg):
print
(f
"[警告] {msg}"
)
logger
=
SimpleLogger()
UMI_OCR_HOST
=
"http://127.0.0.1"
UMI_OCR_PORT
=
1224
UMI_OCR_BASE_URL
=
f
"{UMI_OCR_HOST}:{UMI_OCR_PORT}"
def
check_umi_ocr_service():
try
:
response
=
requests.get(f
"{UMI_OCR_BASE_URL}/api/ocr/get_options"
, timeout
=
5
)
if
response.status_code
=
=
200
:
logger.info(
"Umi-OCR服务运行正常"
)
return
True
else
:
logger.error(f
"Umi-OCR服务返回状态码: {response.status_code}"
)
return
False
except
requests.exceptions.RequestException as e:
logger.error(f
"无法连接到Umi-OCR服务: {e}"
)
return
False
def
get_image_type(file_path):
file_ext
=
os.path.splitext(file_path)[
1
].lower().lstrip(
'.'
)
if
file_ext
in
[
'jpg'
,
'jpeg'
,
'png'
,
'gif'
,
'bmp'
,
'tiff'
,
'tif'
]:
return
file_ext
if
HAS_IMGHDR:
img_type
=
imghdr.what(file_path)
if
img_type:
return
img_type
try
:
with
open
(file_path,
'rb'
) as f:
header
=
f.read(
12
)
if
header.startswith(b
'\xff\xd8\xff'
):
return
'jpeg'
elif
header.startswith(b
'\x89PNG\r\n\x1a\n'
):
return
'png'
elif
header.startswith(b
'GIF89a'
)
or
header.startswith(b
'GIF87a'
):
return
'gif'
elif
header.startswith(b
'BM'
):
return
'bmp'
except
Exception:
pass
return
None
import
base64
import
time
import
random
MAX_RETRIES
=
3
RETRY_DELAY
=
1
def
encode_image_to_base64(image_path):
try
:
with
open
(image_path,
'rb'
) as image_file:
base64_data
=
base64.b64encode(image_file.read()).decode(
'utf-8'
)
return
base64_data
except
Exception as e:
logger.error(f
"编码图片为base64失败: {e}"
)
return
None
def
extract_text_from_image(image_path):
try
:
if
not
os.path.exists(image_path):
logger.error(f
"文件不存在: {image_path}"
)
return
None
img_type
=
get_image_type(image_path)
if
not
img_type:
logger.error(f
"文件不是有效的图片: {image_path}"
)
return
None
base64_data
=
encode_image_to_base64(image_path)
if
not
base64_data:
logger.error(f
"无法读取图片数据: {image_path}"
)
return
None
retries
=
0
while
retries < MAX_RETRIES:
try
:
logger.info(f
"尝试使用base64方法发送OCR请求 (尝试 {retries+1}/{MAX_RETRIES}): {image_path}"
)
data
=
{
"task_id"
:
"image_rename_task"
,
"base64"
: base64_data
}
response
=
session.post(
f
"{UMI_OCR_BASE_URL}/api/ocr"
,
json
=
data,
timeout
=
30
)
if
response.status_code
=
=
200
:
result
=
response.json()
logger.info(f
"base64方法OCR结果: {result}"
)
if
result.get(
'code'
)
=
=
100
:
return
parse_ocr_result(result, image_path)
elif
result.get(
'code'
) !
=
802
:
error_code
=
result.get(
'code'
)
error_msg
=
result.get(
'msg'
,
'未知错误'
)
logger.warning(f
"OCR请求返回错误码 {error_code}: {error_msg}"
)
logger.info(
"base64方法失败,尝试文件上传方法"
)
break
except
(requests.exceptions.ConnectionError,
requests.exceptions.Timeout,
requests.exceptions.ConnectionAbortedError) as e:
retries
+
=
1
if
retries < MAX_RETRIES:
delay
=
RETRY_DELAY
+
random.uniform(
0
,
1
)
logger.warning(f
"连接错误,{delay:.1f}秒后重试 ({retries}/{MAX_RETRIES}): {str(e)}"
)
time.sleep(delay)
else
:
logger.error(f
"base64方法连接失败,达到最大重试次数: {e}"
)
except
Exception as e:
logger.warning(f
"base64方法失败: {e}"
)
break
retries
=
0
while
retries < MAX_RETRIES:
try
:
logger.info(f
"尝试使用文件上传方法发送OCR请求 (尝试 {retries+1}/{MAX_RETRIES}): {image_path}"
)
mime_types
=
{
'jpeg'
:
'image/jpeg'
,
'jpg'
:
'image/jpeg'
,
'png'
:
'image/png'
,
'gif'
:
'image/gif'
,
'bmp'
:
'image/bmp'
,
'tiff'
:
'image/tiff'
,
'tif'
:
'image/tiff'
}
mime_type
=
mime_types.get(img_type, f
'image/{img_type}'
)
with
open
(image_path,
'rb'
) as img_file:
files
=
{
'file'
: (os.path.basename(image_path), img_file, mime_type)}
response
=
session.post(
f
"{UMI_OCR_BASE_URL}/api/ocr"
,
files
=
files,
timeout
=
30
)
if
response.status_code
=
=
200
:
result
=
response.json()
logger.info(f
"文件上传方法OCR结果: {result}"
)
return
parse_ocr_result(result, image_path)
else
:
logger.error(f
"OCR请求失败,状态码 {response.status_code}: {response.text}"
)
break
except
(requests.exceptions.ConnectionError,
requests.exceptions.Timeout,
requests.exceptions.ConnectionAbortedError) as e:
retries
+
=
1
if
retries < MAX_RETRIES:
delay
=
RETRY_DELAY
+
random.uniform(
0
,
1
)
logger.warning(f
"连接错误,{delay:.1f}秒后重试 ({retries}/{MAX_RETRIES}): {str(e)}"
)
time.sleep(delay)
else
:
logger.error(f
"文件上传方法连接失败,达到最大重试次数: {e}"
)
except
Exception as e:
logger.error(f
"文件上传方法失败: {e}"
)
break
if
retries
=
=
MAX_RETRIES:
try
:
logger.info(f
"尝试使用系统路径方法发送OCR请求: {image_path}"
)
abs_path
=
os.path.abspath(image_path)
data
=
{
"task_id"
:
"image_rename_task"
,
"path"
: abs_path}
response
=
session.post(
f
"{UMI_OCR_BASE_URL}/api/ocr"
,
json
=
data,
timeout
=
30
)
if
response.status_code
=
=
200
:
result
=
response.json()
logger.info(f
"系统路径方法OCR结果: {result}"
)
if
result.get(
'code'
)
=
=
100
:
return
parse_ocr_result(result, image_path)
except
Exception as e:
logger.error(f
"系统路径方法失败: {e}"
)
logger.error(f
"所有OCR方法都失败,无法识别图片: {image_path}"
)
return
None
except
Exception as e:
logger.error(f
"处理图片时出错 {image_path}: {e}"
)
return
None
def
is_date_format(text):
if
re.search(r
'\d{4}\.\d{1,2}\.\d{1,2}'
, text):
return
True
return
False
def
normalize_confused_characters(text):
if
not
text
or
not
isinstance
(text,
str
):
return
text
original_text
=
text
protected_parts
=
{}
placeholder_template
=
"PROTECTED_{}"
placeholder_count
=
0
for
pattern
in
chinese_number_patterns:
if
pattern
in
text:
placeholder
=
placeholder_template.
format
(placeholder_count)
text
=
text.replace(pattern, placeholder)
protected_parts[placeholder]
=
pattern
placeholder_count
+
=
1
logger.info(f
"保护特定模式: {pattern}"
)
if
re.search(r
'\d+l+\d+'
, text)
or
re.search(r
'^l+\d+'
, text)
or
re.search(r
'\d+l+$'
, text):
text
=
text.replace(
'l'
,
'1'
)
if
re.search(r
'\[\w*l\w*\]'
, text):
parts
=
re.split(r
'(\[\w*\])'
, text)
for
i, part
in
enumerate
(parts):
if
part.startswith(
'['
)
and
part.endswith(
']'
):
parts[i]
=
part.replace(
'l'
,
'1'
)
text
=
''.join(parts)
if
re.search(r
'^\d*l+\d*$'
, text)
or
re.search(r
'^[A-Z\d\-\[\]]*l+[A-Z\d\-\[\]]*$'
, text):
text
=
text.replace(
'l'
,
'1'
)
if
text.isalnum()
and
re.search(r
'[A-Z]+O\d+'
, text):
text
=
re.sub(r
'([A-Z]+)O(\d+)'
, r
'\1 0\2'
, text)
for
placeholder, original
in
protected_parts.items():
text
=
text.replace(placeholder, original)
if
'1'
in
original_text
and
'l'
in
text:
for
match
in
re.finditer(r
'1[a-z]+'
, original_text):
original_part
=
match.group(
0
)
potential_error
=
original_part.replace(
'1'
,
'l'
)
if
potential_error
in
text:
text
=
text.replace(potential_error, original_part)
logger.info(f
"修正错误转换: {potential_error} → {original_part}"
)
if
len
(text) >
30
and
'u'
in
text
and
'l'
in
text
and
text.count(
'u'
) >
10
and
text.count(
'l'
) >
10
:
logger.info(f
"检测到OCR错误文本: {text}"
)
return
None
if
text !
=
original_text:
logger.info(f
"字符归一化: {original_text} → {text}"
)
return
text
def
is_part_number(text):
normalized_text
=
normalize_confused_characters(text)
if
not
normalized_text:
return
False
text
=
normalized_text
if
is_date_format(text):
return
True
if
'shaixuanhouheidian-'
in
text:
return
True
if
'.'
in
text
and
not
is_date_format(text):
return
False
if
re.search(r
'\[\d+\]'
, text)
and
(
'-'
in
text
or
'%'
in
text):
return
True
if
re.search(r
'[A-Z0-9]+-[A-Z0-9]+'
, text):
return
True
special_chars
=
[
'-'
,
'%'
,
'#'
,
'/'
,
'\\', '
_']
has_special
=
any
(char
in
text
for
char
in
special_chars)
has_alphanumeric
=
re.search(r
'[A-Z0-9]{2,}'
, text)
if
text.isdigit()
and
int
(text) >
720
:
return
True
return
has_special
and
has_alphanumeric
def
parse_ocr_result(result, image_path):
if
result.get(
'code'
)
=
=
100
:
all_text_candidates
=
[]
part_numbers
=
[]
data
=
result.get(
'data'
, {})
if
isinstance
(data,
list
):
for
item
in
data:
if
isinstance
(item,
dict
)
and
'text'
in
item:
text
=
item.get(
'text'
, '').strip()
if
text:
normalized
=
normalize_confused_characters(text)
if
normalized:
all_text_candidates.append(normalized)
if
is_part_number(normalized):
part_numbers.append(normalized)
elif
isinstance
(data,
dict
):
if
'text'
in
data:
if
isinstance
(data[
'text'
],
list
):
for
text
in
data[
'text'
]:
if
text.strip():
all_text_candidates.append(text.strip())
if
is_part_number(text):
part_numbers.append(text)
elif
isinstance
(data[
'text'
],
str
)
and
data[
'text'
].strip():
all_text_candidates.append(data[
'text'
].strip())
if
is_part_number(data[
'text'
]):
part_numbers.append(data[
'text'
])
elif
'results'
in
data
and
isinstance
(data[
'results'
],
list
):
for
item
in
data[
'results'
]:
if
isinstance
(item,
dict
)
and
'text'
in
item:
text
=
item.get(
'text'
, '').strip()
if
text:
all_text_candidates.append(text)
if
is_part_number(text):
part_numbers.append(text)
if
not
all_text_candidates:
def
extract_all_text(obj):
texts
=
[]
if
isinstance
(obj,
dict
):
for
k, v
in
obj.items():
if
k
=
=
'text'
and
isinstance
(v,
str
)
and
v.strip():
texts.append(v.strip())
if
is_part_number(v):
part_numbers.append(v.strip())
else
:
texts.extend(extract_all_text(v))
elif
isinstance
(obj,
list
):
for
item
in
obj:
texts.extend(extract_all_text(item))
return
texts
all_text_candidates
=
extract_all_text(data)
logger.info(f
"从图片提取的文本候选: {all_text_candidates}"
)
if
part_numbers:
logger.info(f
"找到产品代码/零件号: {part_numbers}"
)
selected_text
=
None
filtered_candidates
=
[]
for
t
in
all_text_candidates:
if
is_date_format(t):
filtered_candidates.append(t)
elif
'shaixuanhouheidian-'
in
t:
filtered_candidates.append(t)
elif
'.'
not
in
t:
filtered_candidates.append(t)
filtered_part_numbers
=
[]
for
p
in
part_numbers:
if
is_date_format(p):
filtered_part_numbers.append(p)
elif
'shaixuanhouheidian-'
in
p:
filtered_part_numbers.append(p)
elif
'.'
not
in
p:
filtered_part_numbers.append(p)
if
filtered_candidates:
all_text_candidates
=
filtered_candidates
if
filtered_part_numbers:
part_numbers
=
filtered_part_numbers
logger.info(f
"过滤后的文本候选: {all_text_candidates}"
)
logger.info(f
"过滤后的产品代码: {part_numbers}"
)
numeric_candidates
=
[]
for
text
in
all_text_candidates:
if
text.isdigit()
and
int
(text) >
720
:
numeric_candidates.append(text)
logger.info(f
"找到大于720的数字: {text}"
)
if
part_numbers:
selected_text
=
max
(part_numbers, key
=
len
)
logger.info(f
"找到产品代码/零件号: {selected_text}"
)
elif
numeric_candidates:
selected_text
=
max
(numeric_candidates, key
=
int
)
logger.info(f
"使用大于720的数字: {selected_text}"
)
elif
all_text_candidates:
candidates
=
[t
for
t
in
all_text_candidates
if
len
(t) >
3
]
if
candidates:
selected_text
=
max
(candidates, key
=
len
)
logger.info(f
"使用最长的文本块: {selected_text}"
)
else
:
selected_text
=
all_text_candidates[
0
]
logger.info(f
"使用首个文本块: {selected_text}"
)
if
not
selected_text:
base_name
=
os.path.basename(image_path)
name_parts
=
os.path.splitext(base_name)[
0
].split()
non_numeric_parts
=
[part
for
part
in
name_parts
if
not
part.isdigit()]
if
non_numeric_parts:
selected_text
=
'-'
.join(non_numeric_parts)
logger.info(f
"使用文件名部分作为替代文本: {selected_text}"
)
else
:
selected_text
=
os.path.splitext(base_name)[
0
]
logger.info(f
"使用原始文件名作为替代文本: {selected_text}"
)
result
=
clean_text_for_filename(selected_text)
if
result:
return
result
else
:
filename_prefix
=
os.path.basename(image_path).split(
'('
)[
0
].strip()
if
not
filename_prefix:
filename_prefix
=
"OCR_image"
logger.info(f
"使用文件名前缀作为最终替代文本: {filename_prefix}"
)
return
filename_prefix
else
:
logger.error(f
"OCR失败: {image_path}: {result.get('msg', '未知错误')}"
)
filename_prefix
=
os.path.basename(image_path).split(
'('
)[
0
].strip()
if
not
filename_prefix:
filename_prefix
=
"OCR_image"
logger.info(f
"OCR失败,使用文件名前缀作为替代文本: {filename_prefix}"
)
return
filename_prefix
def
clean_text_for_filename(text):
if
not
text
or
text.strip()
=
=
"":
return
None
text
=
re.sub(r
'\s+'
,
' '
, text).strip()
max_length
=
100
if
len
(text) > max_length:
text
=
text[:max_length]
text
=
re.sub(r
'[\\/*?:"<>|]'
, '', text)
text
=
text.strip()
return
text
if
text
else
None
def
is_image_file(filename):
return
get_image_type(filename)
is
not
None
def
process_images_in_directory(directory
=
"."
):
if
not
check_umi_ocr_service():
logger.error(
"Umi-OCR服务未运行,请先启动服务。"
)
print
(
"请确保已安装并运行Umi-OCR,且启用了HTTP服务功能。"
)
print
(
"您可以从这里下载Umi-OCR: https://github.com/hiroi-sora/Umi-OCR"
)
return
all_image_files
=
[]
for
root, _, files
in
os.walk(directory):
for
filename
in
files:
file_path
=
os.path.join(root, filename)
if
is_image_file(file_path):
rel_path
=
os.path.relpath(file_path, directory)
all_image_files.append(rel_path)
if
not
all_image_files:
logger.warning(
"未找到任何图片文件。"
)
print
(
"未找到任何图片文件。"
)
return
logger.info(f
"在目录及子目录中找到 {len(all_image_files)} 个图片文件等待处理"
)
print
(f
"找到 {len(all_image_files)} 个图片文件,开始处理..."
)
success_count
=
0
for
i, rel_path
in
enumerate
(all_image_files):
file_path
=
os.path.join(directory, rel_path)
file_dir
=
os.path.dirname(file_path)
filename
=
os.path.basename(file_path)
logger.info(f
"正在处理图片 {i+1}/{len(all_image_files)}: {rel_path}"
)
print
(f
"正在处理图片 ({i+1}/{len(all_image_files)}): {rel_path}"
)
extracted_text
=
extract_text_from_image(file_path)
if
extracted_text:
file_ext
=
os.path.splitext(filename)[
1
]
new_filename
=
f
"{extracted_text}{file_ext}"
new_file_path
=
os.path.join(file_dir, new_filename)
counter
=
1
while
os.path.exists(new_file_path):
new_filename
=
f
"{extracted_text}-{counter}{file_ext}"
new_file_path
=
os.path.join(file_dir, new_filename)
counter
+
=
1
try
:
os.rename(file_path, new_file_path)
new_rel_path
=
os.path.relpath(new_file_path, directory)
logger.info(f
"已重命名: {rel_path} -> {new_rel_path}"
)
print
(f
"✓ 已成功重命名: {rel_path} -> {new_rel_path}"
)
success_count
+
=
1
sleep(
0.1
)
except
Exception as e:
logger.error(f
"重命名 {rel_path} 失败: {e}"
)
print
(f
"× 重命名失败: {rel_path}"
)
else
:
logger.warning(f
"无法从 {rel_path} 提取文本或文本不适合用作文件名"
)
print
(f
"× 无法提取文本: {rel_path}"
)
logger.info(f
"处理完成。成功重命名 {success_count} 个文件,共 {len(all_image_files)} 个文件。"
)
print
(f
"\n处理完成!成功重命名 {success_count} 个文件,共 {len(all_image_files)} 个文件。"
)
print
(f
"详细日志请查看 ocr_rename.log 文件。"
)
if
__name__
=
=
"__main__"
:
print
(
"=== 图片文字提取并重命名工具 ==="
)
print
(
"本程序将识别当前目录及其所有子目录中的图片文字,并用识别的文字重命名图片文件。"
)
print
(
"正在初始化..."
)
try
:
process_images_in_directory()
print
(
"\n程序执行完毕。"
)
except
Exception as e:
logger.error(f
"发生意外错误: {e}"
)
print
(f
"发生错误: {e}"
)
print
(
"请查看 ocr_rename.log 获取详细信息。"
)