[Python] 纯文本查看 复制代码
# -*- coding:utf8 -*-
import time
import re
import sqlite3
# History file path
his_path = 'filepath\\history.log' # 这里填入你的log路径
# connect DB
conn = sqlite3.connect('HisMon.db')
c = conn.cursor()
c.execute("select count(*) from sqlite_master WHERE type='table' AND name='hisSQL'")
r = c.fetchall()
if str(r[0][0]) == '0':
csql="""create table hisSQL(id integer PRIMARY KEY autoincrement,rundate datetime
,connname varchar(50),dbname varchar(50),sql text)"""
c.execute(csql)
conn.commit()
# ignoreSQL list 忽略列表,Navicat的历史日志把它自己的那些自动完成之类的SQL也记录在内了,把其中一次过滤掉了,还看到其他的话,自己添加吧,只要开头的一次字符做特征就可以了。
ignoreSQL=[
# MYSQL
'SHOW TABLE STATUS','SHOW COLUMNS','SELECT DISTINCT ROUTINE_SCHEMA','SELECT TABLE_SCHEMA','SHOW FULL TABLES'
,'SELECT SCHEMA_NAME','SHOW VARIABLES LIKE'
# SQLITE
,'PRAGMA foreign_keys = ON','SELECT m.name AS name','SELECT name AS ObjectName','PRAGMA database_list'
,'SELECT name, seq AS auto_inc','SELECT *,rowid "NAVICAT_ROWID'
# PGSQL
,'SELECT rolname','SELECT t.oid','SELECT t.relname','SELECT n.nspname','SELECT c.oid','SELECT attname AS name'
,'SELECT d.oid','SHOW datestyle','SELECT DISTINCT datlastsysoid','SELECT o.name','SELECT cl.column_name'
,'SELECT col.table_schema','SELECT opc.oid','SELECT opr.oid','SELECT i.indrelid AS oid','SELECT r.routine_schema'
,'SELECT COUNT(*) FROM pg_class','SELECT oid, nspname'
# MSSQL
,'SELECT d.name db_name','SELECT TOP 0 *','SELECT av.name','SELECT ao.object_id AS oid','SELECT @@SPID'
,'SELECT COUNT(*) FROM information_schema','SELECT SERVERPROPERTY','SET DATEFORMAT','SELECT s.name AS name'
,'SELECT TABLE_SCHEMA','SELECT DISTINCT ROUTINE_SCHEMA','SELECT t.name','SELECT o.name','SELECT c.name'
,'SELECT OBJECT_NAME','SELECT s.name']
def ig(sqlstr):
for i in ignoreSQL:
if re.search(r'USE +\[.*?\]', sqlstr) is not None:
return False
if i in s:
return False
return True
tlist = []
slist = []
# Monitor
with open(his_path, 'r', encoding='utf8') as file:
while True:
line = file.readline()
if line:
if re.search(
r'\[(\d+-\d+-\d+ \d+:\d+:\d+\.\d+)\]\[(\w+)\]\[\w+\]\[(\w+)\]', line) is not None:
if len(slist) > 0:
s = '\n'.join(slist)
sql = """insert into hisSQL(rundate,connname,dbname,sql)values(?,?,?,?)"""
if ig(s):
c.execute(sql, (tlist[0], tlist[1], tlist[2], s))
tmp = re.search(
r'\[(\d+-\d+-\d+ \d+:\d+:\d+\.\d+)\]\[(\w+)\]\[\w+\]\[(\w+)\]', line)
tlist = [tmp.group(1), tmp.group(2), tmp.group(3)]
slist = []
else:
if len(line) > 0 and line != '\n':
slist.append(line.replace('\n', ' '))
else:
conn.commit()
time.sleep(1)
file.seek(0, 1)
conn.close()