1、STA 反复断开连接
# _*_coding:utf-8 _*_
__author__ = \'juzi_juzi\'
"""
环境准备,windows 平台
1、安装python3 环境,对具体版本无要求,具体安装这里不介绍,https://www.python.org/ 下载即可;
2、安装pywifi 模块;如果不是用第三方IDE工具,直接cmd 安装:pip3 install pywifi;
3、安装comtypes;如果不是用第三方IDE工具,直接cmd 安装:pip3 install comtypes;
4、不使用第三方IDE 工具,直接cmd(管理员权限)运行:python3 wifi.py
"""
import pywifi
import time,subprocess
import logging, os
from datetime import date
GCOUNT = 0
GSUCCESSCOUNT = 0
GFAILEDCOUNT = 0
PINGFAILCOUNT = 0
def log():
"""
定义日志相关内容;
:return:None;
"""
logger = logging.getLogger()
logger.setLevel(logging.INFO) # Log等级总开关
timestruct =date.today().strftime(\'%Y-%m-%d\')
lastname = \'{}-{}\'.format(timestruct,\'wifi.log\')
filename = os.path.join(os.getcwd(), lastname)
fh = logging.FileHandler(filename)
fh.setLevel(logging.DEBUG) # 输出到file的log等级的开关
formatter = logging.Formatter("%(asctime)s==%(levelname)s :%(message)s")
fh.setFormatter(formatter)
logger.addHandler(fh)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG) # 输出到file的log等级的开关
ch.setFormatter(logging.Formatter("%(asctime)s==%(levelname)s :%(message)s"))
logger.addHandler(ch)
def loginAP(ssid, password, waittimes=10):
"""
STA connect AP
:param ssid: SSID;
:param password: login password;
:return: connect result;
"""
status = False
pywifi.set_loglevel(logging.INFO) # 输出日志
wifi = pywifi.PyWiFi() # 抓取WiFi接口
ifaceList = wifi.interfaces() # 抓取无线网卡列表
global iface
iface = ifaceList[0] # 如果有无线网卡第一个一般就是你要的
logging.info(\'get wireless adapter name:{}\'.format(iface.name()))
# iface.disconnect()
profile = pywifi.Profile() # 创建wifi链接文件
profile.ssid = ssid # wifi名称,不加会报错
profile.auth = pywifi.const.AUTH_ALG_OPEN # 网卡的开放
profile.akm.append(pywifi.const.AKM_TYPE_WPA2PSK) # wifi加密算法
profile.cipher = pywifi.const.CIPHER_TYPE_CCMP # 加密单元
profile.key = password # 密码
time.sleep(2)
tmp_profile = iface.add_network_profile(profile) # 设定新的链接文件
iface.connect(tmp_profile) # 连接AP\'
logging.info(\'connectinig AP......\')
time.sleep(5)
logging.info(\'wait connection 5s.....\')
for i in range(waittimes):
if iface.status() != pywifi.const.IFACE_CONNECTED:
logging.info(\'connection status is :{}\'.format(iface.status()))
logging.info(\'retry check wirless status {}... wait 1s\'.format(i))
time.sleep(1)
if iface.status() == pywifi.const.IFACE_CONNECTED:
logging.info(\'connection status is :{}\'.format(iface.status()))
status = True
break
else:
logging.info(\'connection AP succeed!\')
status = True
break
return status
def ping(host,count=5):
"""
执行ping ,并且将结果进行按行保存;
:param host: ping 的目标机器;
:param count: ping 次数;
:return: 结果返回并按行读取放到list 进行返回;
"""
cmd = \'ping {} -n {}\'.format(host, count)
res = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE)
logging.info(\'execute ping {}:,count:{}\'.format(host,count))
out = res.stdout.readlines()
rl =[]
for o in out:
print(o.decode(encoding=\'gb18030\'))
logging.info(o.decode(encoding=\'gb18030\'))
rl.append(o.decode(encoding=\'gb18030\'))
return rl
def check_ping(data, errorrange=25):
"""
检查Ping是否通过,如果在允许的范围内,那么认为通过,否则失败;
:param data: ping 获取到的执行结果的数组;
:param errorrange:
:return: list:True:ping 结果检查通过,False:结果检查失败;同时返回失败的次数;
"""
result = False
s = (data[-4:])
logging.info(\'recevied ping result from ping:{}\'.format(s))
for sl in s:
begin = \'\'.join(sl).find(\'(\')
end = \'\'.join(sl).find(\'%\')
if begin != -1 and end != -1:
res =sl[begin + 1 : end ]
logging.info(res)
if int(res.strip()) <= errorrange:
result = True
break
else:
global PINGFAILCOUNT
PINGFAILCOUNT += 1
logging.info(\'ping failed .... pingfailecount:\'.format(PINGFAILCOUNT))
break
logging.info(\'check ping result:{} \'.format(result))
return [result,PINGFAILCOUNT]
def apper_ping(ssid, passd, host, waittimes):
"""
逻辑调用方法,登录AP并且执行ping 操作;
:return:None;
"""
pingr = False
failedcount = PINGFAILCOUNT #这里可能有点问题
res = loginAP(ssid, passd, waittimes=waittimes)
time.sleep(3)
if res:
pingres = ping(host, count=10)
chkp = check_ping(pingres,errorrange=20)
if chkp[0]:
logging.info(\'check ping succeed!\')
pingr = True
else:
failedcount = chkp[1]
logging.error(\'check ping FAIL!.... and failed count: \'.format(failedcount))
else:
logging.info(\'connect AP FAIL\')
logging.info(res)
return [pingr, failedcount]
if __name__ == \'__main__\':
log()
while True:
GCOUNT += 1
logging.info(\'\r\n\' * 2)
logging.info(\'*\' * 80)
logging.info(\'test {} times.........\'.format(GCOUNT))
logging.info(\'*\' * 80)
if apper_ping(\'testpsk\',\'12345678\', host=\'192.168.204.254\', waittimes=10)[0]:
# iface.disconnect()
GSUCCESSCOUNT += 1
else:
GFAILEDCOUNT += 1
logging.info("=" * 40)
logging.info(\'login all times:{}\'.format(GCOUNT))
logging.info(\'login all succeed times:{}\'.format(GSUCCESSCOUNT))
logging.info(\'login all failed times:{}\'.format(GFAILEDCOUNT))
logging.info(\'login ping failed times:{}\'.format(PINGFAILCOUNT))
logging.info(\'login ping failed rate:{}%\'.format(PINGFAILCOUNT / GCOUNT * 100))
logging.info(\'logdin succeed rate:{}%\'.format(GSUCCESSCOUNT / GCOUNT * 100))
logging.info("=" * 40)
try:
iface.remove_all_network_profiles() # 删除所有的wifi文件
logging.info(\'delete all connection......\')
time.sleep(60)
except Exception as e:
logging.info(\'delete connection error,msg:{}\'.format(e))