import
sqlparse
from
ast
import
literal_eval
with
open
(
'data.sql'
, encoding
=
'utf-8'
) as f:
sql
=
f.read()
parsed
=
sqlparse.parse(sql)
rows
=
[]
for
stmt
in
parsed:
if
stmt.get_type()
=
=
'INSERT'
:
import
re
values_part
=
re.search(r
'VALUES\s*\((.*)\);?$'
,
str
(stmt), re.DOTALL | re.IGNORECASE)
if
not
values_part:
continue
clean_values
=
(
values_part.group(
1
)
.replace(
'\\n'
,
'[NEWLINE]'
)
.replace(r
"\'"
,
"'"
)
.replace(
'NULL'
,
'None'
)
.replace(
'"['
,
'['
).replace(
']"'
,
']'
)
.replace(
'[NEWLINE]'
,
'\\n'
)
)
clean_values
=
f
"({clean_values})"
try
:
parsed_data
=
literal_eval(clean_values)
if
isinstance
(parsed_data,
tuple
):
rows.append(parsed_data)
elif
isinstance
(parsed_data,
list
):
rows.extend(parsed_data)
except
(SyntaxError, ValueError) as e:
print
(f
"[解析失败] 错误类型: {type(e).__name__}"
)
print
(f
"问题数据片段: {clean_values[:100]}..."
)
continue
print
(f
"成功加载 {len(rows)} 条数据"
)
import
csv
with
open
(
'output.csv'
,
'w'
, newline
=
'
', encoding='
utf
-
8
') as f:
writer
=
csv.writer(f,
quoting
=
csv.QUOTE_ALL,
escapechar
=
'\\'
)
writer.writerow([
'ID'
,
'Category'
,
'Remark'
])
clean_rows
=
[]
for
row
in
rows:
clean_row
=
[
str
(field).replace(
'\n'
,
'\\n'
)
if
isinstance
(field,
str
)
else
field
for
field
in
row]
clean_rows.append(clean_row)
writer.writerows(clean_rows)
print
(
"数据已导出到 output.csv"
)