好友
阅读权限10
听众
最后登录1970-1-1
|
本帖最后由 aa7088414 于 2024-5-4 04:32 编辑
近期发现了@winty 大佬的Python批量备份交换机配置与巡检帖子,发现我用的上,很心动,就研究了一下。
但是因为我这边老设备比较多,有些设备还是没法用,干脆重构了部分代码与功能(主要是备份部分,巡检部分未动),顺便修了几个小bug
主要修改以下几点:
首先是把备份功能需要执行的代码放到了外置的dis_cmds.conf(JSON格式)文件内,可自定义执行的命令,方便使用exe时修改和变更执行的命令。
修改设备类型识别逻辑,不再限定为华为思科华三。但没设置其他品牌的命令,也没测试其他品牌。命令可在dis_cmds.conf文件内设置
修改enable登录逻辑,enpassword设置为None时不再enabla登录,留空时按空密码enabla。
修改读取设备时,变量类型识别的bug。由于excel单元格有数据类型区别,但netmiko库只接受字符串而导致登录失败
更新说明:
1.1:根据网友要求,记录访问错误的设备,写出到错误日志.log,并新增了一键重试失败的设备。还修了几个小bug
2.0:更新一个多线程版本,重构绝大部分代码;以表格显示多个设备;ping检测加速爬取;日志记录;暂时放弃巡检功能。未完全测试;暂未打包。
剩余的问题:
enable登录时,如果密码错误,可能会出现重新输入密码的提示,导致netmiko获取不到新的数据导致命令超时。所以不知道密码时,最好还是填None
华为/华三设备登录时,如果没有设置用户名,可能会卡住导致登录失败。把密码放resource.xlsx的username列即可。
使用说明:
set_main:主程序或源码。源码有点挑环境如果报错请仔细检查库版本和python版本(本人python3.9)
dis_cmds.conf:字典列表格式的JSON文件,name为代码中使用的项目昵称,剩余key为型号,键值为命令。必须用英文双引号,否则读取报错
resource.xlsx:设备表格,type目前支持cisco_ios(思科),huawei(华为),hp_comware(华三)。其余的理论上都支持,具体自测。
protocol支持telnet和ssh;端口一般为默认22 或者23,账号密码没设置的话直接留空即可
附上代码(1.1版本):
[Python] 纯文本查看 复制代码
# coding=utf-8
import json
import logging
import time
from datetime import datetime
from netmiko import ConnectHandler
from openpyxl import load_workbook
import os
from sys import exit
from netmiko import exceptions
import re
# 取消以下注释开启调试模式
# logging.basicConfig(level=logging.DEBUG)
# 读取excel内设备列表信息
def check_and_get_dev_list(filename, sheet_name):
excel_information = []
sheet_header = []
wb = load_workbook(filename)
sh = wb[sheet_name]
# 获取最大行数
row = sh.max_row
# 获取最大列数
column = sh.max_column
data = []
# 获取表头写入列表中方便调用
for data_1 in range(1, column + 1):
get_sheet_header = sh.cell(row=1, column=data_1).value
sheet_header.append(get_sheet_header)
# 第一行为表头, 此处 row +1 是pyton循环时不读取最后一个数
for row_1 in range(2, row + 1):
# 存储一行信息
sheet_data_1 = dict()
# 逐行读取表中的数据
for b in range(1, column + 1):
cell = sh.cell(row=row_1, column=b).value
# 将数据已字典形式写入 sheet_data_1 中
# if cell != None:
sheet_data_1[sheet_header[b - 1]] = cell
excel_information.append(sheet_data_1)
for i in excel_information:
if i['ip'] is not None:
data.append(i)
return data
# 获取excel数据并整合成dev字典
def get_dev():
res = check_and_get_dev_list('./resource.xlsx', 'Sheet1')
devices = []
for i in res:
if i['protocol'] == 'telnet':
i['type'] = i['type'] + '_telnet'
dev = {'device_type': i['type'],
'host': i['ip'],
'username': i['username'],
'password': i['password'],
'secret': i['enpassword'],
'port': i['port'], 'session_log': 'session.log'}
for key in dev: # 排除变量类型导致的错误
if dev[key] is not None:
dev[key] = str(dev[key])
devices.append(dev)
return devices
def get_cmds(): # dis_cmds.conf为json文件,引号必须英文双引号
with open("./dis_cmds.conf", 'r', encoding='utf-8') as f:
js = json.loads(f.read())
f.close()
return js
# 配置批量备份信息与配置导出
def devices_confbak(devices=None):
if devices is None:
devices = []
# 创建备份文件夹
current_date = datetime.now().strftime("%Y-%m-%d") # 这里定义日期文件夹的格式 %Y年、%m月、%d日、%H时、%M分、%S秒
path = './conf_bak' # 定义保存目录 绝对路径相对路径均可 不要结尾的/
path = os.path.join(path, current_date)
try:
os.makedirs(path)
except FileExistsError:
pass
# 存储连接失败的所有信息
failed_ips = []
# 循环登录设备获取配置
for dev in devices:
try:
logging.debug(str(dev))
with (ConnectHandler(**dev) as conn):
print('\n----------成功登录到:' + dev['host'] + '----------')
try:
if dev['secret'] != "None": # enable密码不= "None"时,enable登录
conn.enable()
except:
print("enable登录失败")
for i in range(6): # 防止卡在输入密码
conn.send_command(command_string=" ")
time.sleep(0.2)
time.sleep(3) # 某些型号密码错误后会卡几秒
pass
pwd = path + '/' + dev['host']
try:
os.makedirs(pwd) # 创建以IP地址命名的文件夹
except FileExistsError:
pass
# 获取命令列表,遍历所有key获取所有型号
dis_cmds = get_cmds()
keys = []
for item in dis_cmds:
for key in item.keys():
if key != "name" and key not in keys:
keys.append(key)
# 执行命令列表 并写出到文件
for cmd_string_dict in dis_cmds:
print("正在获取:{}".format(cmd_string_dict["name"]))
time.sleep(0.1) # 适当延迟,防止上一条命令没运行完导致卡住
output = None
for dev_type in keys: # 匹配设备所预设的命令,并执行获取返回信息
if dev['device_type'].startswith(dev_type):
output = conn.send_command(command_string=cmd_string_dict[dev_type])
if output is None: # 运行完以上for以后,如果output依然为None视为找不到命令
print('error:未找到设备类型为{}的{}预设命令,请检查并修改dis_cmds.conf文件'.format(
dev['device_type'], cmd_string_dict["name"]))
continue
# 写出文件
fname = f"{dev['host']}_{cmd_string_dict['name']}.txt" # 定义写出的文件格式
with open(pwd + '/' + fname, mode='w', encoding='utf8') as f:
print('正在备份:' + dev['host'] + "配置到" + pwd + '/' + fname)
# 文件读写异常处理
try:
f.write(output)
except PermissionError:
print('*****-无写入权限,请将文件夹赋予读写权限-*****')
continue
else:
print(f'备份{fname}成功!')
# 连接异常处理
except Exception as e:
logging.error(f"在登录{dev['host']}时发生了的错误:{e}")
failed_ips.append({"错误设备": dev['host'], "错误内容": e})
continue
# 处理错误
if len(failed_ips) > 0:
print('\n以下设备连接失败,请检查:')
for x in failed_ips:
print(x)
try: # 写出连接失败记录文件
with open("错误日志.log", 'w', encoding='utf-8') as f:
f.write(str(failed_ips))
except Exception as e:
print("写出{}失败,{}".format("错误日志.log", e))
# 尝试重试失败的设备
if len(failed_ips) > 0:
if input("是否重试失败的设备?(y 或 n)").lower() == "y":
device_names = [d["错误设备"] for d in failed_ips]
new_devices = [d for d in devices if d["host"] in device_names]
devices_confbak(new_devices) # 递归 重新登录失败的设备
return 0
# 配置巡检
def devices_autocheck(devices='', cmd=''):
# 存储命令执行回显
results = []
try:
for x in range(len(devices)):
# 循环登录设备
with ConnectHandler(**devices[x]) as conn:
try:
if devices[x]['secret'] != "None": # enable密码不= "None"时,enable登录
conn.enable()
except:
pass
print('正在巡检:' + devices[x]['host'] + ' ...')
result = [devices[x]['host'], devices[x]['device_type']]
for i in range(len(cmd)):
# 循环执行命令,根据不同设备执行不同命令
if 'cisco_ios' in devices[x]['device_type']:
output = conn.send_command(command_string=str(cmd[i]['cisco']))
elif 'huawei' or 'hp_comware' in devices[x]['device_type']:
conn.send_command(command_string='sys', expect_string=']')
output = conn.send_command(command_string=str(cmd[i]['huawei']))
result.append(output)
results.append(result)
except exceptions.NetmikoAuthenticationException:
print('\n**********' + devices[x]['host'] + ':登录验证失败!**********')
except exceptions.NetmikoTimeoutException:
print('\n**********' + devices[x]['host'] + ':目标不可达!**********')
except exceptions.ReadTimeout:
print('\n**********' + devices[x]['host'] + ':读取超时,请检查enable密码是否正确!**********')
return results
# 计算内存使用率
def get_mem(memstr, devtype=''):
if 'cisco' in devtype:
total_match = re.search(r'Processor Pool Total:\s+(\d+)', memstr)
used_match = re.search(r'Used:\s+(\d+)', memstr)
# 提取总数和已用数,并将其转换为整数
total = int(total_match.group(1))
used = int(used_match.group(1))
# 计算使用百分比
percentage = used / total * 100
return f"{percentage:.0f}%"
elif 'huawei' in devtype:
match = re.search(r"Memory Using Percentage Is:\s*(\d+)%", memstr)
if match:
memory_percentage = match.group(1)
return memory_percentage + '%'
else:
return "No match found."
# 获取CPU利用率
def get_cpu(cpustr, devtype=''):
if 'cisco' in devtype:
pattern = r"CPU utilization for five seconds: (\d+)%"
match = re.search(pattern, cpustr)
if match:
cpu_utilization = match.group(1)
return cpu_utilization + '%'
else:
return "No match found."
elif 'huawei' in devtype:
match = re.search(r"\b(\d+(\.\d+)?)%.*?\bMax", cpustr)
if match:
cpu_utilization = match.group(1)
return cpu_utilization + '%'
else:
return "No match found."
# 运行主程序
if __name__ == '__main__':
while True:
print("\n##############################################\n")
print("1:批量备份交换机信息与配置")
print("2:批量巡检交换机设备")
print("0:退出")
option = str(input("请输入需要的操作编号:"))
if option == '1':
dev = get_dev()
devices_confbak(devices=dev)
continue
elif option == '2':
# 定义巡检命令
cmds = [
{'cisco': 'show clock', 'huawei': 'display clock'}, # 检查时钟
{'cisco': 'show env power', 'huawei': 'display power'}, # 检查电源
{'cisco': 'show env fan', 'huawei': 'display fan'}, # 检查风扇
{'cisco': 'show env temperature status', 'huawei': 'display environment'}, # 检查温度
{'cisco': 'show processes cpu', 'huawei': 'display cpu-usage'}, # 检查CPU利用率
{'cisco': 'show processes memory', 'huawei': 'display memory-usage'}, # 检查内存利用率
]
dev = get_dev()
checkres = devices_autocheck(dev, cmds)
for res in checkres:
# print(res)
print('\n+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++')
print(res[0] + '-巡检结果:')
print('\n时钟:\n' + res[2])
print('电源:\n' + res[3])
print('风扇:\n' + res[4])
if 'Unrecognized command' in res[5]:
print('温度:\n该设备不支持获取此数据!')
else:
print('温度:\n' + res[5])
print('CPU利用率:\n' + get_cpu(res[6], res[1]))
print('内存利用率:\n' + get_mem(res[7], res[1]))
print('\n+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++')
continue
elif option == '0':
break
else:
print("请输入正确的编号!")
主程序代码(2.0版本;多线程)
[Python] 纯文本查看 复制代码 # coding=utf-8
import concurrent.futures
import json
import logging
import os
import threading
import time
from datetime import datetime
from subprocess import run, PIPE
from netmiko import ConnectHandler
from openpyxl import load_workbook
from prettytable import PrettyTable
# 获取excel数据并整合成dev字典
def check_and_get_dev_list(filename, sheet_name):
excel_information = []
sheet_header = []
wb = load_workbook(filename)
sh = wb[sheet_name]
# 获取最大行数
row = sh.max_row
# 获取最大列数
column = sh.max_column
data = []
# 获取表头写入列表中方便调用
for data_1 in range(1, column + 1):
get_sheet_header = sh.cell(row=1, column=data_1).value
sheet_header.append(get_sheet_header)
# 第一行为表头, 此处 row +1 是pyton循环时不读取最后一个数
for row_1 in range(2, row + 1):
# 存储一行信息
sheet_data_1 = dict()
# 逐行读取表中的数据
for b in range(1, column + 1):
cell = sh.cell(row=row_1, column=b).value
# 将数据已字典形式写入 sheet_data_1 中
# if cell != None:
sheet_data_1[sheet_header[b - 1]] = cell
excel_information.append(sheet_data_1)
for i in excel_information:
if i['ip'] is not None:
data.append(i)
return data
def ping(url='114.114.114.114'):
r = run('ping ' + url + ' -n 1 -w 500', stdout=PIPE, stderr=PIPE, stdin=PIPE, shell=True)
if r.returncode:
return False
else:
return True
# 读取excel内设备列表信息
def get_dev(xls_pwd: str):
res = check_and_get_dev_list(xls_pwd, 'Sheet1')
devices = []
for i in res:
if i['protocol'] == 'telnet':
i['type'] = i['type'] + '_telnet'
dev = {'device_type': i['type'],
'host': i['ip'],
'username': i['username'],
'password': i['password'],
'secret': i['enpassword'],
'port': i['port'], 'session_log': 'session.log'}
for key in dev: # 排除变量类型导致的错误
if dev[key] is not None:
dev[key] = str(dev[key])
devices.append(dev)
return devices
# 登陆设备获取配置
def get_cmds(): # dis_cmds.conf为json文件,引号必须英文双引号
with open("./dis_cmds.conf", 'r', encoding='utf-8') as f:
js = json.loads(f.read())
f.close()
return js
# 清屏
def clear_screen():
os.system('cls')
def show_table(columns: list, data: dict):
clear_screen()
# 创建表格
x = PrettyTable()
x.field_names = ['host'] + columns
for i in data:
x.add_row([i] + data[i])
print(x)
class NetworkDeviceManager:
log = logging.getLogger(__name__)
devices = [] # 设备列表
dis_cmds = [] # 命令字典列表
table_index = {} # 首页展示表格数据
devbak_columns = [] # 用来保存配置备份功能的表头
table_devbak = {} # 用来保存配置备份功能的表格数据
table_devbak_lock = threading.Lock() # table_devbak的线程锁
# 配置设置项
path = './conf_bak' # 定义保存目录 绝对路径相对路径均可 不要结尾的/
max_workers = 5 # 最大线程数
max_err_show = 5 # 错误信息展示数量
is_ping = True # 是否ping ping失败时放弃登录,加速爬取
def __init__(self, initial_devices=None):
self.devices = initial_devices if initial_devices else []
self.dis_cmds = get_cmds()
# 初始化日志配置
logging.basicConfig(level=logging.INFO)
# 取消以下注释开启调试模式
# logging.basicConfig(level=logging.DEBUG)
# 创建文件夹路径
if not os.path.exists(self.path):
os.makedirs(self.path)
self.init_index_table()
self.run()
# 初始化index表格索引
def init_index_table(self):
for dev in self.devices:
self.table_index[dev['host']] = [
dev['device_type']
]
columns = ['type']
show_table(columns, self.table_index)
def init_devbak_table(self):
self.table_devbak.clear()
if len(self.dis_cmds) > 0:
self.devbak_columns = [i['name'] for i in self.dis_cmds]
# 配置备份功能-主线程
def devices_confbak(self, devices=None):
if devices is None:
devices = []
# 创建备份文件夹
current_date = datetime.now().strftime("%Y-%m-%d") # 这里定义日期文件夹的格式 %Y年、%m月、%d日、%H时、%M分、%S秒
path = os.path.join(self.path, current_date)
try:
os.makedirs(path)
except FileExistsError:
pass
clear_screen()
self.init_devbak_table()
# 创建线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 提交任务到线程池
futures = [
executor.submit(self.get_cofbak, dev, self.dis_cmds, path)
for dev in devices
]
# 每隔一秒检查并打印进度
err_list = []
while futures:
# clear_screen()
done, futures = concurrent.futures.wait(futures, timeout=1) # 等待1秒或直到有任务完成
# 显示运行状态表格
show_table(self.devbak_columns, self.table_devbak)
# 显示进度
print(f"进度: {len(devices) - len(futures)}/{len(devices)} ")
# 显示错误信息
if len(err_list) > 0:
print("错误信息:")
for future in done:
result = future.result() # 获取任务的返回值
err_list += result
if len(err_list) > self.max_err_show: # 隐藏过多的错误信息,保障刷新的流畅度
show_err_list = err_list[-self.max_err_show:]
print(f"^^^隐藏{len(err_list) - self.max_err_show}条错误信息^^^")
else:
show_err_list = err_list
for err in show_err_list:
print(err)
# 在所有任务执行完毕后写入执行日志文件
try:
with open(os.path.join(path, "log.txt"), "w") as log_file:
x = PrettyTable()
x.field_names = ['host'] + self.devbak_columns
for i in self.table_devbak:
x.add_row([i] + self.table_devbak[i])
log_file.write(str(x) + "\n")
for err in err_list:
log_file.write(err + "\n")
except Exception as e:
logging.error(e)
# 配置备份功能-子线程
def get_cofbak(self, dev: dict, dis_cmds: list, path: str):
err_list = []
logging.debug(str(dev))
self.table_devbak[dev['host']] = [
"等待" for _ in range(len(self.devbak_columns))
]
try:
if self.is_ping:
if not ping(dev['host']):
# 主动触发错误
raise ConnectionRefusedError(f"{dev['host']} ping不通,跳过")
with (ConnectHandler(**dev) as conn):
logging.debug('\n----------成功登录到:' + dev['host'] + '----------')
try:
if dev['secret'] != "None": # enable密码不= "None"时,enable登录
conn.enable()
except:
# print("enable登录失败")
time.sleep(6) # 某些型号密码错误后会卡几秒
pass
pwd = path + '/' + dev['host']
try:
os.makedirs(pwd) # 创建以IP地址命名的文件夹
except FileExistsError:
pass
# 获取命令列表,遍历所有key获取所有型号
keys = []
for item in dis_cmds:
for key in item.keys():
if key != "name" and key not in keys:
keys.append(key)
# 执行命令列表 并写出到文件
cmdid = -1
for cmd_string_dict in dis_cmds:
cmdid += 1
# print("正在获取:{}".format(cmd_string_dict["name"]))
time.sleep(0.1) # 适当延迟,防止上一条命令没运行完导致卡住
with self.table_devbak_lock: # 获取锁
self.table_devbak[dev['host']][cmdid] = "正在执行" # 更新状态
output = None
for dev_type in keys: # 匹配设备所预设的命令,并执行获取返回信息
if dev['device_type'].startswith(dev_type):
output = conn.send_command(command_string=cmd_string_dict[dev_type])
if output is None: # 运行完以上for以后,如果output依然为None视为找不到命令
logging.debug('error:未找到设备类型为{}的{}预设命令,请检查并修改dis_cmds.conf文件'.format(
dev['device_type'], cmd_string_dict["name"]))
err_list.append(
'{}: error:未找到设备类型为{}的{}预设命令,请检查并修改dis_cmds.conf文件'.format(
dev['host'],
dev['device_type'],
cmd_string_dict["name"]))
continue
# 写出文件
fname = f"{dev['host']}_{cmd_string_dict['name']}.txt" # 定义写出的文件格式
with open(pwd + '/' + fname, mode='w', encoding='utf8') as f:
logging.debug('正在备份:' + dev['host'] + "配置到" + pwd + '/' + fname)
# 文件读写异常处理
try:
f.write(output)
except PermissionError:
logging.debug('*****-无写入权限,请将文件夹赋予读写权限-*****')
err_list.append(
'{}: error:无写入权限,请将文件夹赋予读写权限'.format(
dev['host']))
with self.table_devbak_lock: # 获取锁
self.table_devbak[dev['host']][cmdid] = "*失败" # 更新状态
continue
else:
logging.debug(f'备份{fname}成功!')
with self.table_devbak_lock: # 获取锁
self.table_devbak[dev['host']][cmdid] = "成功" # 更新状态
except Exception as e:
logging.debug(e)
err_list.append('{}: error:{}'.format(dev['host'], e))
with self.table_devbak_lock: # 获取锁
for i in range(len(self.table_devbak[dev['host']])):
self.table_devbak[dev['host']][i] = "*失败"
return err_list
# 类运行入口
def run(self):
while True:
print("1:批量备份交换机信息与配置")
print("0:退出")
option = str(input("请输入需要的操作编号:"))
if option == '1':
self.devices_confbak(devices=self.devices)
continue
elif option == '0':
break
else:
print("请输入正确的编号!")
# 运行主程序
if __name__ == '__main__':
ndm = NetworkDeviceManager(get_dev('./resource.xlsx'))
附上环境:
[Python] 纯文本查看 复制代码 PyNaCl==1.5.0
PyYAML==6.0.1
bcrypt==4.0.1
certifi==2024.2.2
cffi==1.16.0
charset-normalizer==3.3.2
cryptography==41.0.5
docopt==0.6.2
et-xmlfile==1.1.0
future==0.18.3
idna==3.6
netmiko==4.2.0
ntc-templates==4.0.1
openpyxl==3.1.2
paramiko==3.3.1
pip==23.2.1
pipreqs==0.4.13
pycparser==2.21
pyserial==3.5
requests==2.31.0
scp==0.14.5
setuptools==68.2.0
six==1.16.0
textfsm==1.1.3
urllib3==2.2.0
wheel==0.41.2
yarg==0.1.9
prettytable~=3.10.0
以下是配套的文件
dis_cmds.conf(修改时引号必须为英文双引号):
[Python] 纯文本查看 复制代码 [
{"name": "查看版本", "hp_comware": "display version", "huawei": "display version", "cisco_ios": "show version"},
{"name": "查看时间", "hp_comware": "display clock", "huawei": "display clock", "cisco_ios": "show clock"},
{"name": "查看序列号", "hp_comware": "display device ma", "huawei": "display device manufacture-info", "cisco_ios": "show inventory"},
{"name": "查看风扇", "hp_comware": "display fan", "huawei": "display fan", "cisco_ios": "show environment"},
{"name": "查看电源状态", "hp_comware": "display power", "huawei": "display power", "cisco_ios": "show environment power"},
{"name": "查看CPU利用率", "hp_comware": "display cpu-usage", "huawei": "display cpu-usage", "cisco_ios": "show processes cpu"},
{"name": "查看内存利用率", "hp_comware": "display memory", "huawei": "display memory-usage", "cisco_ios": "show processes memory"},
{"name": "查看VLAN配置", "hp_comware": "display vlan", "huawei": "display vlan", "cisco_ios": "show vlan"},
{"name": "查看二层接口状态", "hp_comware": "display interface brief", "huawei": "display interface brief", "cisco_ios": "show interfaces switchport"},
{"name": "查看三层接口IP", "hp_comware": "display ip interface brief", "huawei": "display ip interface brief", "cisco_ios": "show ip interface brief"},
{"name": "查看ACL控制列表", "hp_comware": "display acl", "huawei": "display acl all", "cisco_ios": "show access-lists"},
{"name": "查看路由表", "hp_comware": "display ip routing-table", "huawei": "display ip routing-table", "cisco_ios": "show ip route"},
{"name": "查看ARP表", "hp_comware": "display arp", "huawei": "display arp", "cisco_ios": "show arp"},
{"name": "查看MAC表", "hp_comware": "display mac-address", "huawei": "display mac-address", "cisco_ios": "show mac address-table"},
{"name": "查看日志", "hp_comware": "display logbuffer", "huawei": "display logbuffer", "cisco_ios": "show logging"},
{"name": "查看所有配置", "hp_comware": "display current-configuration", "huawei": "display current-configuration", "cisco_ios": "show running-config"}
]
resource.xlsx:
源码:
交换机批量备份_源码1.1.zip
(12.81 KB, 下载次数: 86)
解压密码:52pojie
打包后成品:https://wwf.lanzouq.com/b0143rdkj密码:8na1 |
免费评分
-
查看全部评分
本帖被以下淘专辑推荐:
- · 1|主题: 25, 订阅: 0
- · 2|主题: 4, 订阅: 0
|