Rdies+python编写可视化数据储存工具
本帖最后由 Test_dada 于 2021-4-25 14:33 编辑运行环境需要下载:
Redis3.2.1(直接解压即可,无需安装),解压后运行redis_sever.exe启动服务
安装必要的python第三方包(redis、tkinter、tkcalendar),在命令窗口使用pip命令安装
实现功能:
查询:已存在用户数据查询,在用户名输入‘全部’,查看所有用户
保存:保存新用户,或更新数据;默认记录当前时间,过期时间为30天
用户过期时间:查询用户剩余过期时间
删除用户数据:删除单个用户数据
清屏:清除文本内容
1.10更新内容:
1.添加多用户保存功能,以中文或英文逗号隔开/
2.优化接收区排版
2.10更新内容:
新增时间栏:可指定时间储存和查询,但不可查询某段时间的全部数据
在用户名输入‘全部’,查看所有用户
觉得有用的小伙伴,收藏评分点赞三连哟{:17_1067:}
代码如下
import redis
import time
from tkinter import *
from tkinter import scrolledtext,ttk
import tkinter.messagebox# 这个是消息框,对话框的关键
from tkcalendar import Calendar
from babel import numbers # pyintsller打包后Calendar需要此库,所以要导入不然Calendar报错
class radis_sever():
def __init__(self):
'''
host,port,db:ip,端口,数据库名
decode_responses = true 设置为True返回的数据格式就是时str类型
'''
# os.system('''cd E:\Redis &\
# e: &\
# redis-server.exe''') #启动redis服务,每行最后加上&,多行执行
self.pool= redis.ConnectionPool(host='localhost', port=6379, db=0, decode_responses=True)
self.rs = redis.StrictRedis(connection_pool=self.pool)
def now_time(self):
'''当前时间格式化'''
return str(time.strftime("%Y-%m-%d", time.localtime()))
def Dbsize(self):
'''读取总键数'''
return '数据总数:',self.rs.dbsize()
def Keys(self,model):
'''
返回指定模式的key
pattern:
* ,代表任意个字符
? ,代表一个字符
[],字符集合
'''
return self.rs.keys(pattern=model)
def Expire(self, name, time=30):
'''设置过期时间,默认30天'''
# rsp = self.rs.expire(name, time*24*60*60)
# return rsp
pass
def Ttl(self, name):
'''查看过期时间'''
if name == '':
self.log_insert('请输入要查询的用户名!')
else:
name = self.Strip(name)
self.log_dl()
seconds = self.rs.ttl(name) # 过期时间s
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
if h == -1:
self.log_insert("【%s】该用户未设置过期时间"%name)
else:
d, h = divmod(h, 24)
self.log_insert("\n%d天/%02d时/%02d分/%02d秒 后过期删除" % (d, h, m, s))
return self.rs.ttl(name)
def log_insert(self, str):
'''在文本框打印log'''
self.Receive_Window.insert(END,str+'\n')
def log_dl(self,values=''):
'''清除文本框log'''
self.default_value1.set('可不选日历,默认为当前时间')
self.Receive_Window.delete(0.0, END)
self.E_name.delete(0, END) # 输入框清除代码,执行后自动清除
self.data.delete(0, END)# 输入框清除代码,执行后自动清除
def Delete(self,name):
'''删除存在的key,并返回成功删除个数'''
if name == '':
self.log_insert('请输入要删除的用户名!')
else:
name = self.Strip(name)
num = self.rs.delete(name)
self.log_dl()
self.log_insert('成功删除【%s】条数据:%s'%(num, name))
def Strip(self,name):
'''保存前去除前后空格'''
return name.strip()
def Shutdown(self):
'''关闭服务'''
return self.rs.shutdown(save=False, nosave=False)
def Hsets(self, name, values, Key):
'''批量存入'''
self.log_dl() # 清除文本
if name == '':
self.log_insert('用户名不能为空!')
elif values == '':
self.log_insert('数据栏不能为空!')
elif ',' in name:
self.log_insert('检测到【{0}】条数据需要保存'.format(len(name.split(','))))
# 循环保存
for n in name.split(','):
self.Hset(n,values, Key)
elif ',' in name:
self.log_insert('检测到【{0}】条数据需要保存'.format(len(name.split(','))))
for n in name.split(','):
self.Hset(n, values, Key)
else:
self.log_insert('检测到【1】条数据需要保存')
self.Hset(name, values, Key)
def Hset(self, name, values, Key):
'''单个存入字典
name:
Key:要存入的键名,可不填,默认为当前时间
values:存入的数据
'''
name = self.Strip(name)# 保存前去除前后空格
try:
if Key == '可不选日历,默认为当前时间':
Key = self.now_time() # 设置默认时间
if self.rs.exists(name): # 判断name是否存在,True存在
if Key not in self.rs.hkeys(name) or self.rs.hgetall(name) == '':
# 判断键名是否存在hkeys(name)键名列表中,或键值为空
self.rs.hset(name, Key, values) # 保存新值
self.Expire(name)# 设置name过期时间,默认30天
# 显示在scrolledtext.ScrolledText滚动文本框上
self.log_insert('【%s】保存成功!\n[%s]:' % (name, Key)+self.rs.hgetall(name))
else:
# print(self.rs.hkeys(name))
values_list = self.rs.hgetall(name).split(',') # 获取键值,字符串转列表
values_list.append(values)# 添加新值
self.rs.hset(name, Key, ','.join(values_list)) #拼接存入新值
self.Expire(name)# 设置name过期时间,默认30天
self.log_insert('【%s】更新成功!\n[%s]:'%(name, Key)+self.rs.hgetall(name))
else:
# name不存在直接保存
self.rs.hset(name, Key, values)
self.Expire(name)# 设置name过期时间,默认30天
self.log_insert('【%s】保存成功!\n[%s]:' % (name, Key) + self.rs.hgetall(name))
except Exception as e:
self.log_insert(str(e))
return str(e)
def Hgetall(self, name, Key=None):
'''读取字典'''
if name == '':
self.log_insert('请输入要查询的用户名!')
elif name == '全部':
self.log_dl()
key_list = # 返回所有name
self.log_insert('当前共有[%s]名用户\n%s'%(len(key_list),'\n'.join(key_list)))
else:
name = self.Strip(name)
self.log_dl()
try:
if name in self.Keys('*'): # 返回所有name
if Key == '可不选日历,默认为当前时间':
dc = self.rs.hgetall(name)
list = ['【%s】:%s'%(k,v) for k,v in dc.items()] # 字典键和值转换为列表
str = '\n'.join(list) # 列表换行拼接
# tkinter.messagebox.showinfo('提示', str) #提示框
self.log_insert('【%s】查询成功\n%s'%(name,str)) # 显示在scrolledtext.ScrolledText滚动文本框上
return self.rs.hgetall(name) # 返回name的所有数据
else:
str = self.rs.hgetall(name) # 字符串类型:大哥,二哥,三哥
# tkinter.messagebox.showinfo('提示', str.split(',')) #提示框
self.log_insert('%s【%s】查询成功\n%s'%(name,Key,str.split(',')))# 显示在scrolledtext.ScrolledText滚动文本框上
return str.split(',') # 切割返回列表类型
else:
self.log_insert('没有找到[%s]【%s】该用户数据'%(name,Key))
except Exception as e:
# tkinter.messagebox.showinfo('提示', '没有找到[%s]该用户数据\n%s'%(name,e))
self.log_insert('没有找到[%s]该用户数据\n【错误信息】%s'%(name,e))
return '没有找到[%s]该用户数据'%name
def example1(self):
'''日历'''
def print_sel():
# cal.selection_get()获取用户确定的时间2018-02-08类型timedata
self.default_value1.set(str(cal.selection_get()))
top.destroy()# 关闭时间窗口,不关闭主体,关闭主体用quit()
top = tkinter.Toplevel(self.root)# 创建弹出式窗口
# font=字体,selectmode=是否用户可选择日期,locale=设置日期语言
try:
cal = Calendar(top, font="Arial 14", selectmode='day', locale='zh_CN')
cal.pack(fill="both", expand=True)
ttk.Button(top, text="ok", command=print_sel).pack()
return str(cal.selection_get())
except Exception as e:
self.log_insert(str(e))
def dp(self):
'''可视化'''
try:
# 测试连接正常返回true
if self.rs.ping():
self.root = Tk()
self.root.title('数据存储工具')# 窗口标题
self.root.geometry('350x450')# 设置窗口大小
Label(self.root, text="日期:").place(height=25, width=48, x=14, y=10)# 标签
self.default_value1 = StringVar()
self.default_value1.set('可不选日历,默认为当前时间')
self.E_time = Entry(self.root, state='readonly',fg='#DB7093', textvariable = self.default_value1)# 时间输入框
self.E_time.place(height = 25,width = 160,x = 61,y = 10)
b1 = Button(self.root, text="日历", command=self.example1)
b1.place(height=25, width=63, x=240, y=10)# 选择按钮
Label(self.root, text="用户名:").place(height = 25,width = 48,x = 14,y = 50)# 标签
self.E_name = Entry(self.root)# 用户名输入框
self.E_name.place(height = 25,width = 240,x = 61,y = 50)
Label(self.root, text="数据:").place(height = 25,width = 48,x = 14,y = 90)# 标签
self.data = Entry(self.root)# 数据输入框
self.data.place(height = 25,width = 240,x = 61,y = 90)
b = Button(self.root, text="查询", command=lambda :self.Hgetall(self.E_name.get(),self.E_time.get()),bg="SkyBlue")
b.place(height = 25,width = 63,x = 60,y = 120) # 选择按钮
c = Button(self.root, text="保存", command=lambda :self.Hsets(self.E_name.get(),self.data.get(),self.E_time.get()),bg='Pink')
c.place(height = 25,width = 63,x = 240,y = 120)
# 信息接收区
Receive = LabelFrame(self.root, text="接收区", padx=10, pady=10)# 水平,垂直方向上的边距均为 10
Receive.place(x=20, y=150)
self.Receive_Window = scrolledtext.ScrolledText(Receive, width=35, height=12, padx=10, pady=10, font=('微软雅黑', 9), wrap=WORD)
self.Receive_Window.grid()
c1 = Button(self.root, text="用户过期时间", command=lambda :self.Ttl(self.E_name.get()),bg='#FFD700',relief=GROOVE)
c1.place(height=25, width=80, x=20, y=420)
dl = Button(self.root, text="删除用户数据", command=lambda :self.Delete(self.E_name.get()),bg='#FFD700',relief=GROOVE)
dl.place(height = 25,width = 80,x = 120,y = 420)
dl1 = Button(self.root, text="清屏", command=lambda :self.log_dl(), bg='#90EE90', relief=GROOVE)
dl1.place(height = 25,width = 63,x = 265,y = 420)
self.log_insert('数据库连接正常...')
self.root.mainloop()
except Exception as e:
tkinter.messagebox.showinfo('提示', str(e))
if __name__ == '__main__':
rd = radis_sever()# 实例化
rd.dp() # 执行可视化 学习了! 收藏学习下 学习一下,收藏参考 谢谢分享,python好语言 谢谢分享
页:
[1]