import pymysql
from sshtunnel import SSHTunnelForwarder
class DB_SSH:
def init(self):
self.ssh_host = XXXX
self.ssh_port = XXX
self.ssh_username = XXXX
self.ssh_password = XXXX
self.db_host = "localhost"
self.db_port = XXXX
self.db_username = XXXX
self.db_password = XXXX
self.db_name = XXXX
def query(self, sql: str, value: tuple):
"""
数据库查询的方法
:param sql: sql语句
:param value: sql语句中的变量
:return: res
"""
with SSHTunnelForwarder(ssh_address_or_host=(self.ssh_host, self.ssh_port),
ssh_username=self.ssh_username,
ssh_password=self.ssh_password,
remote_bind_address=(self.db_host, self.db_port)) as server:
conn = pymysql.connect(host='127.0.0.1', # ip必须为127.0.0.1,代表本地机器
port=server.local_bind_port, # ssh目标服务器用于连接mysql服务器的端口
user=self.db_username,
password=self.db_password,
database=self.db_name,
charset='utf8')
cursor = conn.cursor() # cursor游标,查询返回默认元组的值
try:
cursor.execute(sql, value) # 3、在游标中执行SQL语句,select
res = cursor.fetchall() # 4、获取返回值
conn.close() # 5、关闭数据库连接
cursor.close() # 关闭数据库的连接
return res
except Exception as e:
return "SQL语句错误,请检查后再输入!,{}".format(e)
def commit(self, sql: str, value: tuple):
"""
数据库新增、修改、删除的方法
:param sql: sql语句
:param value: sql语句中的变量
:return: rows
"""
with SSHTunnelForwarder(ssh_address_or_host=(self.ssh_host, self.ssh_port),
ssh_username=self.ssh_username,
ssh_password=self.ssh_password,
remote_bind_address=(self.db_host, self.db_port)) as server:
conn = pymysql.connect(host='127.0.0.1', # ip必须为127.0.0.1,代表本地机器
port=server.local_bind_port, # ssh目标服务器用于连接mysql服务器的端口
user=self.db_username,
password=self.db_password,
database=self.db_name,
charset='utf8')
cursor = conn.cursor() # cursor游标,查询返回默认元组的值
try:
rows = cursor.execute(sql, value)
db.commit()
conn.close() # 5、关闭数据库连接
cursor.close()
return rows
except Exception as e:
return "SQL语句错误,请检查后再输入!,{}".format(e)
def query_all(self, sql: str, value: tuple):
"""
数据库查询的方法(返回表结构)
:param sql: sql语句
:param value: sql语句中的变量
:return: res
"""
with SSHTunnelForwarder(ssh_address_or_host=(self.ssh_host, self.ssh_port),
ssh_username=self.ssh_username,
ssh_password=self.ssh_password,
remote_bind_address=(self.db_host, self.db_port)) as server:
# 连接mysql数据库
conn = pymysql.connect(host='127.0.0.1', # ip必须为127.0.0.1,代表本地机器
port=server.local_bind_port, # ssh目标服务器用于连接mysql服务器的端口
user=self.db_username,
password=self.db_password,
database=self.db_name,
charset='utf8')
# cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # 查询返回字典,占用空间多
cursor = conn.cursor() # cursor游标,查询返回默认元组的值
try:
cursor.execute(sql, value) # 3、在游标中执行SQL语句,select
res = cursor.description # 4、获取返回值
conn.close() # 5、关闭数据库连接
cursor.close()
return res
except Exception as e:
return "SQL语句错误,请检查后再输入!,{}".format(e)