吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 5579|回复: 44
上一主题 下一主题
收起左侧

[Python 原创] Python批量备份交换机配置+自动巡检

  [复制链接]
跳转到指定楼层
楼主
winty 发表于 2023-3-24 15:29 回帖奖励
继上次发帖批量备份的功能后,有坛友要求增加功能,目前版本支持SSH、Telnet、自定义端口,增加了部分异常处理。
设备厂商支持思科、华为、华三。
自动巡检功能考虑到不同设备回显有所不同,需要大量正则匹配,暂时没时间搞这些,所以索性将命令回显全部显示,没做进一步的回显提取。
以下是程序运行示例:
#自动备份配置:

备份完成后,将配置保存于程序目录下的conf_bak文件夹下,如图所示:


#自动巡检交换机设备:


使用前需创建excel文档,写入设备登录信息,格式如下:

本人编码能力一般,没学过多少,大佬们勿喷,源码奉上:


[Python] 纯文本查看 复制代码
# coding=utf-8
from netmiko import ConnectHandler
from openpyxl import load_workbook
import os
from sys import exit
from netmiko import exceptions
import re
# 读取excel内设备列表信息
def check_and_get_dev_list(filename, sheet_name):
    excel_information = []
    sheet_header = []
    wb = load_workbook(filename)
    sh = wb[sheet_name]
    # 获取最大行数
    row = sh.max_row
    # 获取最大列数
    column = sh.max_column
    data = []
    # 获取表头写入列表中方便调用
    for data_1 in range(1, column+1):
        get_sheet_header = sh.cell(row=1, column=data_1).value
        sheet_header.append(get_sheet_header)
    # 第一行为表头, 此处 row +1 是pyton循环时不读取最后一个数
    for row_1 in range(2, row + 1):
        # 存储一行信息
        sheet_data_1 = dict()
        # 逐行读取表中的数据
        for b in range(1, column + 1):
            cell = sh.cell(row=row_1, column=b).value
            # 将数据已字典形式写入 sheet_data_1 中
            # if cell != None:
            sheet_data_1[sheet_header[b-1]] = cell
        excel_information.append(sheet_data_1)
    for i in excel_information:
        if i['ip'] != None:
            data.append(i)
    return data

#获取excel数据并整合成dev字典
def get_dev():
    res = check_and_get_dev_list('./resource.xlsx', 'Sheet1')
    devices = []
    for i in res:
        if i['protocol'] == 'telnet':
            i['type'] = i['type']+'_telnet'
        dev = {'device_type':i['type'],
               'host': i['ip'],
               'username': i['username'],
               'password': i['password'],
               'secret': i['enpassword'],
               'port': i['port'],}
        devices.append(dev)
    return devices

# 配置批量备份导出
def devices_confbak(devices=''):
    # 创建备份文件夹
    try:
        path = './conf_bak'
        os.makedirs(path)
    except FileExistsError:
        pass
    # 存储连接失败的IP
    failed_ips = []
    # 循环登录设备获取配置
    for dev in devices:
        try:
            with ConnectHandler(**dev) as conn:
                print('\n----------成功登录到:' + dev['host'] + '----------')
                conn.enable()
                if 'cisco_ios' in dev['device_type']:
                    output = conn.send_command(command_string='show run')
                elif 'huawei' or 'hp_comware' in dev['device_type']:
                    output = conn.send_command(command_string='dis current-configuration')
                else:
                    print('error')
                with open('./conf_bak/'+ dev['host'] +'_conf_bak.txt', mode='w', encoding='utf8') as f:
                    print('正在备份:'+dev['host'])
                    # 文件读写异常处理
                    try:
                        f.write(output)
                    except PermissionError:
                        print('*****-无写入权限,请将文件夹赋予读写权限-*****')
                        continue
                    else:
                        print('备份成功!')
        # 连接异常处理
        except exceptions.NetmikoAuthenticationException:
            print('\n**********'+dev['host']+':登录验证失败!**********')
            failed_ips.append(dev['host'])
            continue
        except exceptions.NetmikoTimeoutException:
            print('\n**********'+dev['host']+':目标不可达!**********')
            failed_ips.append(dev['host'])
            continue
        except exceptions.ReadTimeout:
            print('\n**********'+dev['host']+':读取超时,请检查enable密码是否正确!**********')
            failed_ips.append(dev['host'])
            continue
    if len(failed_ips) > 0:
        print('\n以下设备连接失败,请检查:')
        for x in failed_ips:
            print(x)
    return 1

# 配置巡检
def devices_autocheck(devices='', cmd=''):
    # 存储命令执行回显
    results = []
    try:
        for x in range(len(devices)):
            # 循环登录设备
            with ConnectHandler(**devices[x]) as conn:
                conn.enable()
                print('正在巡检:'+devices[x]['host']+' ...')
                result = [devices[x]['host'],devices[x]['device_type']]
                for i in range(len(cmd)):
                    # 循环执行命令,根据不同设备执行不同命令
                    if 'cisco_ios' in devices[x]['device_type']:
                        output = conn.send_command(command_string=str(cmd[i]['cisco']))
                    elif 'huawei' or 'hp_comware' in devices[x]['device_type']:
                        conn.send_command(command_string='sys',expect_string=']')
                        output = conn.send_command(command_string=str(cmd[i]['huawei']))
                    result.append(output)
                results.append(result)

    except exceptions.NetmikoAuthenticationException:
        print('\n**********'+devices[x]['host']+':登录验证失败!**********')
    except exceptions.NetmikoTimeoutException:
        print('\n**********' + devices[x]['host'] + ':目标不可达!**********')
    except exceptions.ReadTimeout:
        print('\n**********' + devices[x]['host'] + ':读取超时,请检查enable密码是否正确!**********')

    return results
# 计算内存使用率
def get_mem(memstr,devtype=''):
    if 'cisco' in devtype:
        total_match = re.search(r'Processor Pool Total:\s+(\d+)', memstr)
        used_match = re.search(r'Used:\s+(\d+)', memstr)
        # 提取总数和已用数,并将其转换为整数
        total = int(total_match.group(1))
        used = int(used_match.group(1))
        # 计算使用百分比
        percentage = used / total * 100
        return f"{percentage:.0f}%"
    elif 'huawei' in devtype:
        match = re.search(r"Memory Using Percentage Is:\s*(\d+)%", memstr)
        if match:
            memory_percentage = match.group(1)
            return memory_percentage+'%'
        else:
            return "No match found."
# 获取CPU利用率
def get_cpu(cpustr,devtype=''):
    if 'cisco' in devtype:
        pattern = r"CPU utilization for five seconds: (\d+)%"
        match = re.search(pattern, cpustr)
        if match:
            cpu_utilization = match.group(1)
            return cpu_utilization+'%'
        else:
            return "No match found."
    elif 'huawei' in devtype:
        match = re.search(r"\b(\d+(\.\d+)?)%.*?\bMax", cpustr)
        if match:
            cpu_utilization = match.group(1)
            return cpu_utilization+'%'
        else:
            return "No match found."
# 运行主程序
if __name__ == '__main__':
    while True:
        print("\n##############################################\n")
        print("1:批量备份交换机配置")
        print("2:批量巡检交换机设备")
        print("0:退出")
        option = str(input("请输入需要的操作编号:"))
        if option == '1':
            dev = get_dev()
            devices_confbak(devices=dev)
            continue
        elif option == '2':
            # 定义巡检命令
            # cmds[x]['cisco']
            # cmds[x]['huawei']
            cmds = [
                {'cisco':'show clock','huawei':'display clock'},                        #检查时钟
                {'cisco':'show env power','huawei':'display power'},                    #检查电源
                {'cisco':'show env fan','huawei':'display fan'},                        #检查风扇
                {'cisco':'show env temperature status', 'huawei': 'display environment'},#检查温度
                {'cisco':'show processes cpu', 'huawei': 'display cpu-usage'},          #检查CPU利用率
                {'cisco':'show processes memory', 'huawei': 'display memory-usage'},    #检查内存利用率
            ]
            dev = get_dev()
            checkres = devices_autocheck(dev,cmds)
            for res in checkres:
                # print(res)
                print('\n+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++')
                print(res[0]+'-巡检结果:')
                print('\n时钟:\n'+res[2])
                print('电源:\n'+res[3])
                print('风扇:\n'+res[4])
                if 'Unrecognized command' in res[5]:
                    print('温度:\n该设备不支持获取此数据!')
                else:print('温度:\n'+res[5])
                print('CPU利用率:\n' + get_cpu(res[6],res[1]))
                print('内存利用率:\n' + get_mem(res[7],res[1]))
                print('\n+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++')
            continue
        elif option == '0':
            break
        else:
            print("请输入正确的编号!")


免费评分

参与人数 7吾爱币 +12 热心值 +6 收起 理由
lk99309494 + 1 谢谢@Thanks!
tonekey2016 + 1 谢谢@Thanks!
xiaotang123 + 1 + 1 谢谢@Thanks!
imqiuge + 1 + 1 谢谢@Thanks!
Bygx1 + 1 + 1 我很赞同!
ookk + 1 + 1 热心回复!
侃遍天下无二人 + 7 + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!

查看全部评分

本帖被以下淘专辑推荐:

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

推荐
splzhk 发表于 2023-12-9 16:22
感谢分享,学习学习,感觉挺有用
推荐
xiaotang123 发表于 2023-9-5 10:26
大佬为什么我运行报错呀,(本人不会py)
Traceback (most recent call last):
  File "D:\PYdemo\新建 文本文档.py", line 195, in <module>
    checkres = devices_autocheck(dev,cmds)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "D:\PYdemo\新建 文本文档.py", line 113, in devices_autocheck
    with ConnectHandler(**devices[x]) as conn:
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "D:\PY\Lib\site-packages\netmiko\ssh_dispatcher.py", line 383, in ConnectHandler
    raise ValueError(
ValueError: Unsupported 'device_type' currently supported platforms are:
a10
accedian
adtran_os
adva_fsp150f2
adva_fsp150f3
alcatel_aos
alcatel_sros
allied_telesis_awplus
apresia_aeos
arista_eos
arris_cer
aruba_os
aruba_osswitch
aruba_procurve
audiocode_66
audiocode_72
audiocode_shell
avaya_ers
avaya_vsp
broadcom_icos
brocade_fastiron
brocade_fos
brocade_netiron
brocade_nos
brocade_vdx
brocade_vyos
calix_b6
casa_cmts
cdot_cros
centec_os
checkpoint_gaia
ciena_saos
cisco_asa
cisco_ftd
cisco_ios
cisco_nxos
cisco_s200
cisco_s300
cisco_tp
cisco_viptela
cisco_wlc
cisco_xe
cisco_xr
cloudgenix_ion
coriant
dell_dnos9
dell_force10
dell_isilon
dell_os10
dell_os6
dell_os9
dell_powerconnect
dell_sonic
dlink_ds
eltex
eltex_esr
endace
enterasys
ericsson_ipos
ericsson_mltn63
ericsson_mltn66
extreme
extreme_ers
extreme_exos
extreme_netiron
extreme_nos
extreme_slx
extreme_tierra
extreme_vdx
extreme_vsp
extreme_wing
f5_linux
f5_ltm
f5_tmsh
flexvnf
fortinet
generic
generic_termserver
hillstone_stoneos
hp_comware
hp_procurve
huawei
huawei_olt
huawei_smartax
huawei_vrp
huawei_vrpv8
ipinfusion_ocnos
juniper
juniper_junos
juniper_screenos
keymile
keymile_nos
linux
mellanox
mellanox_mlnxos
mikrotik_routeros
mikrotik_switchos
mrv_lx
mrv_optiswitch
netapp_cdot
netgear_prosafe
netscaler
nokia_srl
nokia_sros
oneaccess_oneos
ovs_linux
paloalto_panos
pluribus
quanta_mesh
rad_etx
raisecom_roap
ruckus_fastiron
ruijie_os
sixwind_os
sophos_sfos
supermicro_smis
teldat_cit
tplink_jetstream
ubiquiti_edge
ubiquiti_edgerouter
ubiquiti_edgeswitch
ubiquiti_unifiswitch
vyatta_vyos
vyos
watchguard_fireware
yamaha
zte_zxros
zyxel_os

进程已结束,退出代码为 1
沙发
lcg888 发表于 2023-3-24 21:23
3#
hzxszxd 发表于 2023-3-25 13:06
谢谢分享
4#
dtclaiqi 发表于 2023-3-26 11:39
谢谢分享
5#
baigei2333 发表于 2023-3-27 08:43
菜鸡学习学习,想要把交换机巡检的输出打印在cmd显示怎么弄的?
6#
jffwoo 发表于 2023-3-27 08:52
需要这样的东西
7#
 楼主| winty 发表于 2023-3-27 09:12 |楼主
baigei2333 发表于 2023-3-27 08:43
菜鸡学习学习,想要把交换机巡检的输出打印在cmd显示怎么弄的?

在cmd里运行就好了呀,python ./xxx.py
8#
baigei2333 发表于 2023-3-27 09:22
winty 发表于 2023-3-27 09:12
在cmd里运行就好了呀,python ./xxx.py

这样吗,谢谢
9#
jianjian91 发表于 2023-3-27 16:07
谢谢,之前那个也不错,不过就不能用于其他交换机厂家
10#
Hacking2heart 发表于 2023-3-28 08:18
谢谢分享,学习中
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2025-1-7 16:42

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表