python读取交换机配置_python连接远程服务器

Python (1) 2024-06-30 14:12

Hi,大家好,我是编程小6,很荣幸遇见你,我把这些年在开发过程中遇到的问题或想法写出来,今天说一说
python读取交换机配置_python连接远程服务器,希望能够帮助你!!!。

目录

  • 本文更新日志
  • 前言
  • github地址(敲黑板)
  • Python依赖的工具库
  • 核心思路(必看)
  • 兼容性 (必看)
  • 登录,建立一个连接
    • 命令
    • telnet + 交换机ip
    • 关键代码
    • 注意事项
  • 获取主机名等基础信息
    • 命令
    • 注意事项
  • 获取内存信息
    • 命令
    • 注意事项
  • 获取flash信息
    • 命令
    • 注意事项
  • 获取物理端口信息
    • 命令
    • 注意事项
  • 获取逻辑端口信息
    • 命令
    • 注意事项
  • 获取vlan信息
    • 命令
    • 注意事项
  • 互连端mac和ip信息
    • 命令
    • 注意事项
  • 设备序列号
    • 命令
    • 注意事项
  • netmiko 中 device_type枚举值
  • H3C交换机查询python3的脚本

本文更新日志

2023-06-16: 上传了H3C交换机信息查询的脚本

2023-04-13: 最新的写法(使用python3+netmiko+telnet,读取数据的方式对交换机更友好)
https://github.com/Sherlock-L/automation-py/blob/master/h3c_query_python3_telnet.py(其他品牌的可以参照这个写法)

from netmiko import ConnectHandler device = { 'device_type': 'huawei_comware_telnet',#这个值,可以查看本博客的”netmiko 中 device_type枚举值“的章节内容 'ip': self.ip, 'username': self.username, 'password': self.password, 'port': self.port, # Telnet 端口 } client = ConnectHandler(**device) cmd='display device manuinfo' retText = client.send_command(cmd) 

2023-03-22: 兄弟们,现在是2023年了,如果你现在有条件使用chatgpt,建议直接让它给你解答和编码,下面的内容是前几年写的,AI的编写水平和内容完整性可能会超越下文很多。*

前言

2023-03-22:兄弟们,现在是2023年了,如果你现在有条件使用chatgpt,建议直接让它给你解答和编码,下面的内容是前几年写的,AI的编写水平和内容完整性可能会超越下文很多。

要想真正的弄清楚,就得自己登录到交换机查看,分析文本,总结多个例子。这是必经之路。
文中涉及到的交换机,包括思科(Cisco)、博科(Brocade)、华为(Huawei)、华三(H3C)四个品牌。获取的信息包括:主机、内存、flash、固件版本(os_version、设备序列号)、物理端口、逻辑端口(聚合口)、vlan、以及互连端mac和ip信息。
如果你对交换机不是很熟悉,我建议你首先得先去了解几个关键的知识:

  • VLAN
  • 管理端口
  • 逻辑端口
  • 物理端口命名规则、
  • 速率
  • 连接方式:trunk、access、hybrid

github地址(敲黑板)

思科、华为、H3C脚本程序设计为常规思路,由于项目需要一次性获取多项数据,所以导致执行多条命令花费的时间很长,可能获取失败。执行消耗快的3、4分钟,慢的有可能10分钟。
重点: 博科交换机获取脚本是我经过不断的尝试,用了新的设计思路,其他老的脚本来不及优化了。执行时间从减少到1分钟之内。当然,如果你的网络不好那就另当别论。
git地址

Python依赖的工具库

  • 正则匹配函数库,用于提取交换机返回的关键信息
  • telnet模块函数库,用于与交换机进行通信
  • 睡眠,用于等待交换机的响应
import re import telnetlib import time 

核心思路(必看)

2023中心思想:之前代码里获取内容时出现类似more,导致命令返回的内容不完整的情况,使得之前的脚本不得不写个函数“模拟人工敲空格继续获取内容”的动作。其实可以通过取消分屏的方式去解决,取消分屏只对本次登录有效,所以不用担心。这样就可以一次性获取命令返回的全部内容了(使用netmiko包,会自动帮你敲取消分屏的口令,但是要注意如果账号没有权限的话,需要去赋予账号开通取消分屏的权限)

Cisco: terminal length 0 Huawei: screen-length 0 temporary H3C: screen-length disable HP: page-length 0 

然后只需简单的引入包,执行命令就能拿到结果了

from netmiko import ConnectHandler device = { 'device_type': 'huawei_comware_telnet',#这个值,可以查看本博客的”netmiko 中 device_type枚举值“的章节内容 'ip': self.ip, 'username': self.username, 'password': self.password, 'port': self.port, # Telnet 端口 } client = ConnectHandler(**device) cmd='display device manuinfo' retText = client.send_command(cmd) 

2019中心思想(下面的表格的思想可以放弃):减少睡眠等待的时间,若要最短响应时间需要多线程

常规思路(执行时间长) 优化思路(执行时间短) 备注
1. 登录交换机,建立一个连接 1. 登录交换机,建立一个连接
2.一次发送一条”下一页”指令=》睡眠=》读取;
结果:执行N条需要睡眠N次
2. 一次发送N条“下一页指令”(N可以自定义)
=》睡眠(我一般设置1.5秒)
=》读取
结果:执行N条需要睡眠1次
查询内容过多,交换机一般只输出一部分内容,然后尾部为“----more-------”字符串,这时候需要不断发送敲空格(下一页)指令,获取更多,直到内容获取完毕。
另外,如果需要获取所有端口的信息,尽量不要单个循环获取,如show interfaces GigabatEthernet1/0/1,show interfaces GigabatEthernet1/0/2… 。这样就需要多条命令执行;直接 show interfaces,一次读取所有的端口,然后在代码中自己分隔文本去提取。
4 等待交换机返回结果(睡眠等方法) 4 等待交换机返回结果(睡眠等方法)
5. 正则提取结果 5.正则提取结果 优化思路的做法因为一次获取了所有文本数据,所以需要根据关键字切割文本,然后遍历提取
6. 重复2、3、4操作 6.重复2、3、4操作
7. 存入变量或者数据库存入变量或者数据库 7. 存入变量或者数据库存入变量或者数据库

兼容性 (必看)

同一个品牌,不同型号两个地方不同,需要代码做一些兼容

  • 睡眠时间:考虑到保证所有交换机都能获取成功,睡眠时间不宜太短,也不宜太长
  • 同一内容,展示文本格式不同,同时命令可能有一丝丝的差别,可能就多个单词
  • 同一命令,部分型号include 后面有些需要 双引号包含关键字 (目前只发现H3C有这种情况)。

    如 display interfaces | include “GigabatEthernet” 或者 display interfaces | include GigabatEthernet (不带双引号)

  • 博科交换机 回车命令 是 \r\n,而其他是\n

登录,建立一个连接

命令

(如果是配有ssh的方式,可以参考:ssh访问交换机代码 )

  1. telnet + ip
  2. 输入账号
  3. 输入一级密码
  4. enable
  5. 输入二级密码(思科,博科交换机有,华为,h3c没有)

telnet + 交换机ip

关键代码

#以思科为例 self.tnIp = "192.168.1.1" #你的ip self.username = "admin" self.password1 = "pwd1" #你的ip self.password2 = "pwd2" #你的ip tnconn = telnetlib.Telnet() try: //telnet ip tnconn.open(self.tnIp) except: print "Cannot open host" return #读取交换机返回的文本提示,读到‘username:’, 说明需要输入账户名,不同品牌设备不一样的提示 tnconn.read_until('Username:') #表示程序一直等待,直到 收到交换机返回'Username:'字符串提示。 tnconn.write(self.username + '\n') #模拟输入账户名命令。并加一个回车按钮。表示输入完毕,模拟人工敲键盘 #读取交换机返回的文本提示,直到读到Password,说明需要输入密码 tnconn.read_until('Password:') tnconn.write(self.password1 + '\n') #程序睡眠一秒,视情况而调整,考虑到所有交换机都能成功,所以需要照顾反应慢的机器。 time.sleep(1) tnconn.write('enable\n') tnconn.read_until('Password:') tnconn.write(self.password2 + '\n') time.sleep(1) self.__tnConn = tnconn #返回建立成功的连接, return tnconn 

注意事项

  • 不同交换机登录的时候,输入命令响应时间有快有慢,有可能read_until()方法会超时,所以可以适当在登录的步骤加上程序睡眠逻辑,给交换机一定的处理命令返回的等待时间。避免死在开头。

获取主机名等基础信息

命令

思科 博科(Brocade) 华为 H3C
show running-config | include hostname show running-config | include hostname display current-configuration | include sysname display current-configuration

注意事项

考虑不同H3C设备的命令 include 后面的字符串是否需要加上双引号。具体可以参考H3C获取脚本

获取内存信息

命令

思科 博科(Brocade) 华为 H3C
show memory summary show version display memory-usage dis version

注意事项

  • 单位,不同类型有些是kb 有些是 b等。同时数字 可能有逗号 如 9,991KB。
  • 如果是多台交换机使用堆叠的连接方式,共用一个管理ip。那么会按照slot(插槽顺序)分块显示,比如有四台,那么会有四份数据。可以通过设备sn(序列号)区分哪个插槽属于哪一台交换机。一般它们的设备型号、内存配置是一模一样的。所以输入命令,会显示一样的总内存,只不过使用率会有不同。

获取flash信息

命令

思科 博科(Brocade) 华为 H3C
dir dir dir dir

注意事项

  • 单位:有些品牌类型有些是kb 有些是 b等。同时数字 可能有逗号 如 9,991KB。
  • 出现多个重复数据:如果是多台交换机堆叠在一起,共用一个管理ip。比如有四台,那么会有四份数据,具体依据返回文本进行分析。可以通过设备sn(序列号)区分哪个插槽属于哪一台交换机。一般它们的设备型号、内存配置是一模一样的。所以输入命令,会显示一样的总内存,只不过使用率会有不同。

获取物理端口信息

命令

思科 博科(Brocade) 华为 H3C
show interfaces
或者 show interfaces | include xxxx
show interfaces
或者 show interfaces | include xxxx
display current-configuration
或者 display current-configuration | include xxxx
display current-configuration
或者 display current-configuration | include xxxx

注意事项

  • 当前的速率 :如果端口关掉可能不一定有值,
  • 最大速率: 最大值可以根据端口名判断。如GigabatEthernet代表千兆口ten-GigabatEthernet,代表万兆口。fastEthernet代表百兆口。
  • duplex:不一定能读到,需要另一个指令,具体看获取端口信息的代码。
  • 交换机端口命名:如 GigabatEthernet1/0/1 解释: https://zhidao.baidu.com/question/539909844.html
  • 连接方式:trunk、还是access的名号不一定能读到,需要另一个指令,具体看获取端口信息的代码
  • 光口电口:文本内容可能显示双绞线,或者光纤等单词,总之就是不同英文展示,甚至是缩写,我整理了一波:
  • 大小写兼容:如up或者UP。
//PHP代码 //电口 if (stripos($value['port_type'], 'twisted') !== false ||$value['port_type'] == 'twistedpair' || $value['port_type'] == 'TX' || $value['port_type'] == 'COMMON COPPER' || $value['port_type'] == 'CX' || $value['port_type'] == 'T') { $value['port_type'] = AtDevNetPort::PORT_TYPE_T; $allContent['ihmDevNet']['port_num']++; } //光口 else if (stripos($value['port_type'], 'fiber') !== false || $value['port_type'] == 'opticalfiber' || $value['port_type'] == 'FX' || $value['port_type'] == 'COMMON FIBER' || $value['port_type'] == 'SX' || $value['port_type'] == 'LX') { 

获取逻辑端口信息

命令

这一栏目内容就具体看代码了

思科 博科(Brocade) 华为 H3C
show interfaces | include Port-channel show trunk display interfaces Eth-Trunk | include Eth-Trunk display interface Bridge-Aggregation | include Bridge-Aggregation

注意事项

  • 大小写兼容,如up或者UP
  • Brocade 在比较老的型号中,没有逻辑口的说法,只是在查看物理端口的时候,能看到此物理口和哪几个口是一个组的,可以把这种组当成是逻辑口,姑且这么认为。
  • 获取逻辑口包含哪些物理口,可能需要额外指令,具体看获取逻辑口信息的代码。
  • 逻辑口命名:可以通过指令发现,逻辑口的叫法命名不同品牌是不一样的。具体看代码或者百度。

获取vlan信息

命令

思科 博科(Brocade) 华为 H3C
show vlan show vlan display vlan include VLAN

注意事项

  • 关键还是要理解端口和vlan的关系

  • 默认vlan:名称可能就不是vlan+id,可能交换机直接叫做default,一般代表vlan 1(有些叫vlan0001)。

  • 物理端口名为缩写:需要自己重新转化名称,如可能会有这样的文本:


vlan 2:
Gi0/5,Fa0/12,XGE2
vlan 25:
Gi0/1,Fa0/1,XGE1(这个是万兆口的缩写).脚本中获取vlan的方法中有提到。


互连端mac和ip信息

命令

思科 博科(Brocade) 华为 H3C
show cdp neighbors detail
https://jingyan.baidu.com/article/37bce2be4bcc671002f3a226.html
show lldp display lldp nei dis lldp neighbor-information
(另一个型号:dis lldp neighbor-information verbose)https://jingyan.baidu.com/article/86fae3461bb6cc3c49121ad1.html

注意事项

  • 可以依赖 lldp 协议,可以知道端口连的对端设备信息
  • 思科有自己独立的协议,可以知道端口连的对端设备信息。但是对端应该也是思科设备才行
  • 要获取连接端的设备信息,首先端口要开启,同时协议也要开启。否则是无法知道的

设备序列号

命令

思科 博科(Brocade) 华为 H3C
show version 可能在version里,找你的网络工程师 可能在version里,找你的网络工程师 display device manuinfo

注意事项

  • 如果是堆叠(多台交换机组合到一起),可能读到多个序列号。但是可以根据插槽区分(目前最新代码已经对这种方式进行处理了https://github.com/Sherlock-L/automation-py/blob/master/h3c_query_python3_telnet.py)。

netmiko 中 device_type枚举值

在Netmiko库中,支持的设备类型(device_type)包括但不限于以下枚举值: (华三的没有,可以试着用惠普的:hp_comware,虽然品牌不同,但是有些交换机的命令是相似) accedian: Accedian device (routers and switches) alcatel_aos: Alcatel-Lucent Enterprise (AOS) (formerly known as OmniSwitch) alcatel_sros: Alcatel-Lucent Service Routers (SR OS) apresia_aeos: Hitachi Aprésia AEOS family of network switches. arista_eos: Arista EOS aruba_os: Aruba Operating System (formerly known as ProCurve) avaya_ers: Avaya Ethernet Routing Switch (ERS) (formerly known as Nortel) avaya_vsp: Avaya Virtual Services Platform (VSP) brocade_fastiron: Brocade FastIron brocade_netiron: Brocade NetIron brocade_nos: Brocade Network Operating System (NOS) brocade_vdx: Brocade VDX brocade_vyos: Brocade Vyatta / VyOS checkpoint_gaia: Check Point GAiA (Gen V, Next Generation Firewall) calix_b6: Calix B6 device ciena_saos: Ciena SAOS cisco_asa: Cisco ASA Firewall cisco_ios: Cisco IOS cisco_nxos: Cisco Nexus operating system (NX-OS) cisco_s300: Cisco Small Business 300 cisco_tp: Cisco Telepresence (TC or TE series) cisco_wlc: Cisco Wireless LAN Controller (WLC) cisco_xe: Cisco IOS-XE cisco_xr: Cisco IOS-XR cloudgenix_ion: CloudGenix ION coriant: Coriant proprietary operating system dell_force10: Dell Force10 OS dell_os10: Dell Networking OS10 dell_powerconnect: Dell PowerConnect OS eltex: Eltex switches enterasys: Enterasys devices extreme: Extreme Networks (EXOS or ExtremeXOS) extreme_wing: Extreme Networks Wing f5_ltm: F5 LTM fortinet: Fortinet FortiOS generic_termserver: Terminal servers (e.g. Digi) hp_comware: HP Comware (formerly H3C) (HP purchased H3C and 3Com) hp_procurve: HP ProCurve huawei: Huawei devices huawei_vrpv8: Huawei VRPv8 huawei_comware: Huawei Comware (1, 3, 5 & 7) ipinfusion_ocnos: IP Infusion OcNOS juniper: Juniper Junos juniper_junos: Juniper Junos (netmiko_junos driver) keymile: Keymile devices linux: Native Linux command execution (non-network) mellanox: Mellanox mrv_lx: MRV Communications LX Series switches netapp_cdot: NetApp Clustered Data ONTAP netapp_eseries: NetApp E-Series netscaler: Citrix Netscaler 

H3C交换机查询python3的脚本

#!/usr/local/python3/bin/python3 # coding=utf-8 from netmiko import ConnectHandler import time import datetime import json import sys import re import logging import os import subprocess class H3C: data={ 
    'hostname':None, 'crc_list':{ 
   }, 'os_version':None, 'port_list':{ 
   }, 'logic_port_list':{ 
   }, 'composed_logic_port':{ 
   }, 'port_vlan_list':{ 
   }, 'device_list':{ 
   }, 'port_connect_mac_list':[], 'arp_list':[], 'lldp_neighbor_info_list':{ 
   }, } init_log_done=False client=None transport=None username='' ip='' funcName='' password = '' port = '' prompt = '' def recordLog(self,txt): if self.init_log_done: logging.info(txt) # cmd = "echo \'{0}\' >>{1} ".format(txt,self.exeLog) # handle = os.popen(cmd) # retTxt = handle.read() # handle.close() def run_command(self,cmd,logCmdFlag =False ): if logCmdFlag: self.recordLog(cmd) p = subprocess.Popen(cmd, stdout = subprocess.PIPE, stderr = subprocess.PIPE, shell = True) (stdout, stderr) = p.communicate() if stdout: self.recordLog(stdout) if stderr: self.recordLog(stderr) return (stdout,stderr) def buildReponse(self,success = True,info = ''): res = { 
    "success":success, "info": info } return res def outJson(self,str): totalLen = len(str) readLen = 0 while readLen < totalLen: if readLen + 4096 < totalLen: sys.stdout.write(str[readLen:readLen + 4096]) readLen += 4096 else: sys.stdout.write(str[readLen:]) readLen = totalLen sys.stdout.flush() time.sleep(0.1) def initClient(self): device = { 
    'device_type': 'hp_comware_telnet', 'ip': self.ip, 'username': self.username, 'password': self.password, 'port': self.port, } self.client = ConnectHandler(**device) self.prompt = self.client.find_prompt() def closeClient(self): if self.client: self.client.disconnect() def queryDeviceManuinfo(self): cmd='display device manuinfo' retText = self.client.send_command(cmd) ret={ 
   } if 'Chassis' in retText: pattern = r'Chassis (\d+):\s+Chassis self:\s+DEVICE_NAME\s+:\s+(\S+)\s+DEVICE_SERIAL_NUMBER\s+:\s+(\S+)\s+MAC_ADDRESS\s+:\s+(\S+)\s+MANUFACTURING_DATE\s+:\s+(\S+)\s+VENDOR_NAME\s+:\s+(\S+)' matches = re.findall(pattern, retText, re.MULTILINE) for match in matches: chassis_number = match[0] device_name = match[1] device_serial_number = match[2] vendor_name = match[5] ret[device_serial_number]={ 
    "slot_num":-1, "device_sn":device_serial_number, "brand":vendor_name, "model":device_name, "chassis_num":chassis_number, } elif 'PRODUCT' in retText: regex = r'Slot\s+(\d+)\s+CPU\s+0:[\s\S]+?DEVICE_SERIAL_NUMBER\s+:\s+([\w\d]+)[\s\S]+?VENDOR_NAME\s+:\s+([\w\d-]+)[\s\S]+?PRODUCT ID\s+:\s+([\w\d-]+)' matches = re.findall(regex, retText) for match in matches: ret[match[1]]={ 
    "slot_num":match[0], "device_sn":match[1], "brand":match[2], "model":match[3], "chassis_num":-1, } else: regex = r'Slot\s+(\d+)\s+CPU\s+0:[\s\S]+?DEVICE_NAME\s+:\s+([\w\d-]+)[\s\S]+?DEVICE_SERIAL_NUMBER\s+:\s+([\w\d]+)[\s\S]+?VENDOR_NAME\s+:\s+([\w\d-]+)[\s\S]+?' matches = re.findall(regex, retText) for match in matches: ret[match[2]]={ 
    "slot_num":match[0], "device_sn":match[2], "brand":match[3], "model":match[1], "chassis_num":-1, } self.data['device_list']=ret return ret def queryAllPortVlan(self): self.data['logic_port_list']=logicPortList= self.queryLogicPortList() self.data['port_list']=portList= self.queryPortList() self.data['port_vlan_list']=self.queryPortVlan(portList) self.data['port_vlan_list'].update(self.queryPortVlan(logicPortList)) return self.data['port_vlan_list'] def queryPortVlan(self,portNameList): ret={ 
   } for port_name in portNameList: portVlan=[] time.sleep(0.1) portVlanTxt=self.client.send_command(f'display interface { 
     port_name} | include VLAN',expect_string=self.prompt, read_timeout=10) portVlanLines=portVlanTxt.split("\n") for portVlanline in portVlanLines: # portVlanline='VLAN permitted: 1(default vlan), 2-12,8888,9000' if 'VLAN permitted:' in portVlanline : text = re.sub(r'\([^)]*\)', '', portVlanline) match = re.search(r"(?<=permitted: ).*$", text) vlan_list = match.group(0).split(",") for vlanNumStr in vlan_list: if '-' in vlanNumStr: portVlan.append(vlanNumStr.strip()) # start, end = map(int, v.split('-')) # portVlan[vlanKey].extend(range(start, end+1)) else: vlanNum=re.search(r'\d+', vlanNumStr).group() portVlan.append(vlanNum) ret[port_name] = portVlan return ret def queryPortCRCCount(self,port): # disp interface Ten-GigabitEthernet1/3/0/27 | inc CRC, try: cmdPort='disp interface {0} | inc CRC'.format(port) time.sleep(0.05) resultTxt= self.client.send_command(cmdPort) regCrc= r"([0-9]{1}) CRC" pattern = re.compile(regCrc, re.M ) serObj = re.search(pattern, resultTxt.decode()) # print(serObj) # exit(0) if serObj: return int(serObj.group(1)) else: return -1 except Exception as e: pass return -2 def queryAllPortCRCCount(self): ret = { 
    'phyPortList':[], 'logicPortList':[] } phyPortList=self.queryPortList() logicPortList=self.queryLogicPortList() for port in phyPortList: tmpCount = self.quertPortCRCCount(port) ret['phyPortList'].append( { 
    'port_name':port, 'crc_count':tmpCount, } ) for port in logicPortList: tmpCount = self.quertPortCRCCount(port) ret['logicPortList'].append( { 
    'port_name':port, 'crc_count':tmpCount, } ) self.data['crc_list']=ret return ret def queryPortList(self): ret={ 
   } cmdPort='display interface brief ' resultTxt=self.client.send_command(cmdPort) lines = resultTxt.split("\n") regPort = r"(GE|XGE|FE|E0|E2|E1|E3|E4|40GE)([0-9\/]{2,})" patternPort = re.compile(regPort, re.M ) for line in lines: if 'BAGG' in line or 'Trunk' in line or 'MGE' in line: continue serObjPort= re.search(patternPort, line) if serObjPort and serObjPort.group(1) and serObjPort.group(2): parts = line.split() port_name = parts[0] port_link_status = parts[1] port_speed = parts[2] #Type: A - access; T - trunk; H - hybrid port_link_type = parts[4] pvid = parts[5] if len(parts)>6: port_desc=parts[6] else: port_desc='' ret[port_name] = { 
    'port_link_status': port_link_status, 'speed': port_speed, 'link_type': port_link_type, 'pvid': pvid, 'port_desc': port_desc, # 'mac_address': mac_address } self.data['port_list']=ret return ret def queryOsVersion(self): ret ='' cmd='disp version | include Software' retText = self.client.send_command(cmd) retText = retText.replace('"', '') reg = r"Comware Software, ([a-zA-Z0-9., ]*)" pattern = re.compile(reg, re.M | re.I) serObj = re.search(pattern, retText) if serObj: self.data['os_version']=ret= serObj.group(1) return ret def queryHostName(self): cmd='display current-configuration | include sysname' hostName = self.client.send_command(cmd) hostName=hostName.strip().split(' ')[1] #lines = resultTxt.decode().split("\n") self.data['hostname']=hostName return hostName def queryLogicPortList(self): ret={ 
   } cmdPort='display interface brief ' resultTxt=self.client.send_command(cmdPort) lines = resultTxt.split("\n") for line in lines: if 'BAGG' not in line and 'Trunk' not in line : continue parts = line.split() port_name = parts[0] port_link_status = parts[1] port_speed = parts[2] #Type: A - access; T - trunk; H - hybrid port_link_type = parts[4] pvid = parts[5] if len(parts)>6: port_desc=parts[6] else: port_desc='' ret[port_name] = { 
    'port_link_status': port_link_status, 'speed': port_speed, 'link_type': port_link_type, 'pvid': pvid, 'port_desc': port_desc, # 'mac_address': mac_address } self.data['logic_port_list']=ret return ret def queryComposedLogicPort(self): ret = { 
   } cmd='disp link-aggregation verbose' resultTxt = self.client.send_command(cmd) lines = resultTxt.split("\n") regPort = r"(GE|XGE|FE|E0|E2|E1|E3|E4|40GE)([0-9\/]{2,})" patternPort = re.compile(regPort, re.M ) currentAgg = '' find='Agg' for line in lines: if 'Remote' in line: find='' continue match_aggr = re.search(r"Aggregate Interface:\s+(\S+)", line) if match_aggr: find='port' aggr_name = match_aggr.group(1) ret[aggr_name]=[] currentAgg=aggr_name continue if find=='port': find='port' serObjPort= re.search(patternPort, line) if serObjPort and serObjPort.group(1) and serObjPort.group(2): port=serObjPort.group(1)+serObjPort.group(2) ret[currentAgg].append(port) continue self.data['composed_logic_port']=ret return ret def queryArpList(self): cmd='disp arp' resultTxt = self.client.send_command(cmd) lines = resultTxt.split("\n") for line in lines: if 'Type:' in line or 'IP address' in line: continue parts = line.split() if len(parts)>5: self.data['arp_list'].append({ 
    'ip': parts[0], 'mac': parts[1], 'port_name': parts[3], }) return self.data['arp_list'] def queryMacAddressList(self): cmd='disp mac-address' resultTxt = self.client.send_command(cmd) lines = resultTxt.split("\n") for line in lines: if 'MAC Address' in line or 'BAGG' in line or 'Trunk' in line or 'MGE' in line: continue parts = line.split() if len(parts)>3: self.data['port_connect_mac_list'].append({ 
    'con_mac_address': parts[0], 'local_port': parts[3], }) return self.data['port_connect_mac_list'] def queryLLdpNeighborInfoList(self): cmd='disp lldp neighbor list' resultTxt = self.client.send_command(cmd) lines = resultTxt.split("\n") valFlag=False pattern = r'(\S+)\s+(\S+)\s+(\S+)\s+(.*)' for line in lines: if 'Port ID' in line and 'Local Interface' in line : valFlag=True continue if valFlag==False: continue match = re.match(pattern, line) if match: self.data['lldp_neighbor_info_list'][match.group(1)]={ 
    'chassis_id': match.group(2), 'port_id': match.group(3), 'system_name': match.group(4).strip(), } return self.data['lldp_neighbor_info_list'] if __name__ == '__main__': try: obj = H3C() obj.run_command('mkdir -p /var/log/switchAuto/') t = time.time() timenow = (int(t)) obj.exeLog = '/var/log/ihm/swithc-query-{0}-{1}.log'.format(datetime.date.today(),timenow) logging.basicConfig(level=logging.DEBUG, filename=obj.exeLog, filemode='a', format= '%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s' ) obj.init_log_done=True # xxx.sh 192.168.1.1 test testpwd 23 queryLogicPort obj.ip = sys.argv[1] obj.username = sys.argv[2] obj.password = sys.argv[3] obj.port = int(sys.argv[4]) obj.funcName = sys.argv[5] obj.initClient() ret = '' if obj.funcName == 'queryComposedLogicPort': obj.queryComposedLogicPort() ret = obj.buildReponse(True,obj.data) elif obj.funcName =='queryAllPortCRCCount': obj.queryAllPortCRCCount() ret = obj.buildReponse(True,obj.data) elif obj.funcName =='queryDeviceManuinfo': obj.queryDeviceManuinfo() ret = obj.buildReponse(True,obj.data) elif obj.funcName =='queryPortList': obj.queryPortList() ret = obj.buildReponse(True,obj.data) elif obj.funcName =='queryHostName': obj.queryHostName() ret = obj.buildReponse(True,obj.data) elif obj.funcName =='queryOsVersion': obj.queryOsVersion() ret = obj.buildReponse(True,obj.data) elif obj.funcName =='queryLogicPortList': obj.queryLogicPortList() ret = obj.buildReponse(True,obj.data) elif obj.funcName =='queryAllPortVlan': obj.queryAllPortVlan() ret = obj.buildReponse(True,obj.data) elif obj.funcName =='queryArpList': obj.queryArpList() ret = obj.buildReponse(True,obj.data) elif obj.funcName =='queryMacAddressList': obj.queryMacAddressList() ret = obj.buildReponse(True,obj.data) elif obj.funcName =='queryLLdpNeighborInfoList': obj.queryLLdpNeighborInfoList() ret = obj.buildReponse(True,obj.data) elif obj.funcName =='queryBaseInfo': obj.queryHostName() obj.queryDeviceManuinfo() obj.queryOsVersion() obj.queryPortList() obj.queryLogicPortList() obj.queryComposedLogicPort() ret = obj.buildReponse(True,obj.data) else: ret = obj.buildReponse(False,'unknown function '+sys.argv[5]) tmpJson = json.dumps(ret) obj.outJson(tmpJson) except Exception as e: msg = str(e) exc_type, exc_obj, exc_tb = sys.exc_info() fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] info = (exc_type, fname, exc_tb.tb_lineno) msg += str(info) ret = obj.buildReponse(False,msg) obj.outJson(json.dumps(ret)) obj.closeClient() os._exit(0)``` 

今天的分享到此就结束了,感谢您的阅读,如果确实帮到您,您可以动动手指转发给其他人。

下一篇

已是最新文章

发表回复