【网络】H3C 交换机telnet查看端口流量Python小工具
这两天实验室网络不给力,后来发现是有人占用了实验室太多的带宽,而登陆到实验室老的h3c s5500交换机上看端口流量情况很不方便,于是萌生写个小工具来统计端口流量情况,已求找到谁占用了大量带宽。
于是查了下,发现python 有个telnetlib的库,登陆交换机以及进行简单的操作相当简单,于是就写了这么个小工具:
*************************************工作原理********************************************
1、本程序采用telnet的方式登陆到交换机并通过不停的发送display interface [接口] 的方式请求交换机接口
信息并从中提取total input 和 total output的方式计算当前端口速率。
2、本程序仅统计状态为UP的端口的信息。使用display brief interface获取到状态为UP的接口。
3、本程序仅统计gigabytes端口。
*************************************其他**********************************************
1、第一次统计信息,由于没有时间,信息不准确
2、速度的单位都是MB
3、统计结果只能作为参考,并不能代表速率。
4、由于交换机本身更新接口信息速度并不快,而且telnet发送命令回传耗时也较大,本程序设置最小刷新时间,
当前为5s,若由于请求交换机接口所耗时超过5s,则刷新时间为请求耗时时间。
python3(未调试通过)
#!/usr/bin/python
import re
import telnetlib
import time
import platform
import os
Host = \'192.168.2.65\'
username = \'\'
password = \'\'
finish = \'<....>\'
MIN_INTERVAL = 5.0 # use float
PORT_COUNT = 52 # h3c s5500 has 52 gigabyte ports
# return system type as a string
def get_system_info():
sys_platform = platform.system()
if sys_platform == \'Linux\' or sys_platform == \'Darwin\':
return \'UNIX\'
elif sys_platform == \'Windows\':
return \'WINDOWS\'
else:
return \'UNIX\'
def clear_screen():
sys_type = get_system_info()
if sys_type == \'UNIX\':
os.system("clear")
elif sys_type == \'WINDOWS\':
os.system(\'cls\')
else:
os.system(\'clear\')
# login to the device and return the Telnet object
def login():
# telnet to the device
print("connect...")
tn = telnetlib.Telnet(Host,timeout = 5)
tn.read_until(\'Username:\',timeout=5)
tn.write(username + \'\n\')
tn.read_until(\'Password:\')
tn.write(password + \'\n\')
tn.read_until(finish)
print("telnet success")
return tn
#\'\'\'using Telnet object to get port status and return a tuple filled with \'UP\' ports\'\'\'
def get_up_ports(tn):
example = \'\'\'
The brief information of interface(s) under bridge mode:
Interface Link Speed Duplex Link-type PVID
GE1/0/1 UP 100M(a) full(a) access 409
GE1/0/2 UP 100M(a) full(a) access 409
GE1/0/3 DOWN auto auto access 409\'\'\'
tn.write(\'display brief interface\n\')
tn.write(\' \')
tn.write(\' \')
ports_brief = tn.read_until(finish)
up_ports = []
port_info = re.findall(r"GE1/0/(\d+)(\s+)(\w+)",ports_brief)
for i in port_info:
if i[-1] == \'UP\':
up_ports.append(i[0])
print(up_ports)
return tuple(up_ports)
def extract_data(port_str):
\'\'\' get the data from the result of command \'display interface GigabitEthernet 1/0/i\'
\'\'\'
# (VLAN_ID, total_input, total_output, max_input, max_output)
if re.search(\'GigabitEthernet\', port_str) == None:
return None
VLAN_ID_list = re.findall(r"PVID: (\d+)",port_str)
input_total_list = re.findall(r"Input \(total\): (\d+) packets, (\d+) bytes", port_str)
output_total_list = re.findall(r"Output \(total\): (\d+) packets, (\d+) bytes", port_str)
peak_input_list = re.findall(r"Peak value of input: (\d+) bytes/sec,", port_str);
peak_output_list = re.findall(r"Peak value of output: (\d+) bytes/sec,", port_str);
state_list = re.findall(r"current state: (.+)",port_str)
VLAN_ID = VLAN_ID_list[0] # string
input_total = long((list(input_total_list[0]))[1]) # long
output_total = long((list(output_total_list[0]))[1]) # long
peak_input = long(peak_input_list[0]) # long
peak_output = long(peak_output_list[0]) # long
state = str(state_list[0]) # string
return (VLAN_ID, input_total, output_total, peak_input, peak_output, state)
def do_statistic():
last_input = [0] * PORT_COUNT
last_output = [0] * PORT_COUNT
last_update = time.time()
tn = login()
up_ports = get_up_ports(tn)
clear_screen()
print("connected, waiting...")
while(True):
ports_str = []
# h3c s5500 g1/0/1 - g1/0/52
# input command to get output
for i in up_ports:
tn.write(\'display interface GigabitEthernet 1/0/\' + str(i) +\'\n\')
tn.write(\' \')
port_info = tn.read_until(finish)
ports_str.append(port_info)
# get interval
interval = (time.time() - last_update)
if interval < MIN_INTERVAL:
time.sleep(MIN_INTERVAL - interval)
interval = MIN_INTERVAL
# get data and print
clear_screen()
print("the input/output is from the port view of the switch.")
print("From the user\'s view: input <-> download; output <-> upload.")
print("the VLAN 1000 is connected to the firewall. So, it\'s opposite\n\n")
print(\'PORT_NO\tVLAN_ID\tINPUT\tOUTPUT\tMAX_IN\tMAX_OUT\tSTATE (MB)\')
port_index = 0
for _port_str in ports_str:
# (VLAN_ID, total_input, total_output, max_input, max_output)
data = extract_data(_port_str)
if data == None:
continue
port_no = up_ports[port_index]
vlan_id = data[0]
speed_input = (data[1] - last_input[port_index]) / (interval * 1024 * 1024)
speed_output = (data[2] - last_output[port_index]) / (interval * 1024 * 1024)
max_input = data[3] / (1024 * 1024 )
max_output = data[4] / (1024 * 1024 )
state = data[5]
last_input[port_index] = data[1]
last_output[port_index] = data[2]
port_index += 1
# show
print(port_no, \'\t\', vlan_id, \'\t\',float(\'%.2f\' %speed_input), \'\t\', float(\'%.2f\' %speed_output), \'\t\')
print(float(\'%.2f\' %max_input), \'\t\' ,float(\'%.2f\' %max_output), \'\t\', state)
last_update = time.time()
if __name__ == "__main__":
username = input("please input username:")
password = input("please input password:")
print(username)
print(password)
do_statistic()
tn.close()