实现思路
使用excel管理用例用例信息,requests模块发送http请求,实现了记录日志,邮件发送测试报告的功能
目录结构如下:
下面直接上代码:
统筹脚本
# -*- coding:utf-8 -*-
import os
from interface import Interface
from testcase_get import Get_testcase
from result_save import Save_test_result
from result_send import Send_report
from config.config import Config
from logging_save import logger
if __name__ == "__main__":
cur_path = os.path.split(os.path.realpath(__file__))[0] # 获取当前文件绝对路径
case_path = os.path.join(cur_path, "test_case", "20170602.xls")
test_case = Get_testcase(case_path).readExcel() # 获取用例
if not isinstance(test_case, list): # 判断用例是否获取成功
logger.info("Test_case get failed... \n Done!")
else:
logger.info("获取用例成功")
# 调用接口
test_result = Interface().interfaceTest(test_case)
# 获取执行结果,用于发邮件
count_success = test_result[3]
count_failure = test_result[4]
failed_case_detail = test_result[5]
# 保存测试结果
Save_test_result().save_result(case_path, test_result[0], test_result[1], test_result[2])
logger.info("保存测试结果成功")
# 获取邮件配置信息
mail_config = Config(os.path.join(cur_path, "config", "mail.conf")).get_mail_config()
logger.info("获取邮箱配置成功")
login_user = mail_config[0]
login_pwd = mail_config[1]
from_addr = mail_config[2]
to_addrs = mail_config[3]
smtp_server = mail_config[4]
mail_send = Send_report(count_success, count_failure, failed_case_detail)
# 获取最新测试报告
last_report = mail_send.newest_report()
logger.info("邮件发送结果")
mail_send.send_result(login_user, login_pwd,from_addr, to_addrs,smtp_server,last_report)
logger.info("DONE!")
请求封装
# coding:utf-8
import json
import requests
from logging_save import logger
from result_check import Result_check
from url_transform import urltransform
class Interface:
def __init__(self, ):
pass
def interfaceTest(self, case_list):
"""
接口调用主函数
"""
# 用于存结果
res_flags = []
# 用于存请求报文
request_urls = []
# 用于存返回报文
responses = []
# 用户存失败的用例
failed_case = []
# 统计成功失败的用例数
count_success = 0
count_failure = 0
for case in case_list:
try:
# 模块
product = case[0]
# 用例id
case_id = case[1]
# 用例标题
interface_name = case[2].strip(\'\n\')
# 用例描述
case_detail = case[3]
# 请求方式
method = case[4]
# 请求url
url = case[5]
# 入参
param = case[6]
# 预期结果
res_check = case[7]
except Exception as e:
return \'测试用例格式不正确!%s\' % e
# 定义消息头信息
headers = {\'content-type\': \'application/json\',
\'User-Agent\': \'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:22.0) Gecko/20100101 Firefox/22.0\'}
# 对url进行封装
new_url = urltransform().urltransform(url, method, param)
if method.upper() == \'GET\':
results = requests.get(new_url).text
logger.info(u\'正在调用接口: %s\' % interface_name)
# print results
responses.append(results)
# 用于存储预期结果与实际结果的比较结果
res = Result_check().interface_result_check(results, res_check)
request_urls.append(new_url)
else:
request_urls.append(new_url)
if param == \'\':
pass
else:
data = json.loads(param) # 将参数转化为json格式
results = requests.post(new_url, data=json.dumps(data), headers=headers).text
responses.append(results)
res = Result_check().interface_result_check(results, res_check)
if \'pass\' in res:
res_flags.append(\'pass\')
count_success += 1
else:
logger.warning(u\'接口返回结果与预期结果不一致!失败URL: %s METHOD :%s\' % (url, method))
res_flags.append(\'fail\')
count_failure += 1
failed_case.append((interface_name, method, url))
logger.info(u\'共执行 %s 条用例,PASS: %s,FAILED: %s\' % (len(case_list), count_success, count_failure))
return res_flags, request_urls, responses, count_success, count_failure, failed_case
日志封装
# coding=utf-8
import logging
import sys
import traceback
import time
class LoggingUtils:
\'\'\'
===========封装日志工具类的基本操作=============
\'\'\'
def __init__(self,logfile):
\'\'\'
:param logfile:
\'\'\'
self.logger = logging.getLogger(logfile)
self.hdlr = logging.FileHandler(logfile)
formatter = logging.Formatter(\'%(asctime)s %(levelname)s - %(message)s\')
self.ch = logging.StreamHandler()
self.ch.setLevel(logging.INFO)
self.ch.setFormatter(formatter)
self.hdlr.setFormatter(formatter)
self.logger.addHandler(self.hdlr)
self.logger.addHandler(self.ch)
self.logger.setLevel(logging.DEBUG)
def debug(self, msg):
\'\'\'
:param msg:
:return:
\'\'\'
self.logger.debug(msg)
self.hdlr.flush()
def info(self, msg):
\'\'\'
:param msg:
:return:
\'\'\'
self.logger.info(msg)
self.hdlr.flush()
def warning(self,msg):
self.logger.warning(msg)
self.hdlr.flush()
def error(self, msg):
\'\'\'
:param msg:
:return:
\'\'\'
self.logger.error(msg)
# self.logger.removeHandler(logging.StreamHandler())
self.logger.removeHandler(self.ch)
self.hdlr.flush()
def error_sys(self, limit=None):
\'\'\'
:param limit:
:return:
\'\'\'
exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
if limit is None:
if hasattr(sys, \'tracebacklimit\'):
limit = sys.tracebacklimit
n = 0
eline = \'\n\'
while exceptionTraceback is not None and (limit is None or n < limit):
f = exceptionTraceback.tb_frame
lineno = exceptionTraceback.tb_lineno
co = f.f_code
filename = co.co_filename
name = co.co_name
eline += \' File "%s", line %d, in %s \n \' % (filename, lineno, name)
exceptionTraceback = exceptionTraceback.tb_next
n = n + 1
eline += "\n".join(traceback.format_exception_only(exceptionType, exceptionValue))
self.logger.error(eline)
self.hdlr.flush()
timer = time.strftime(\'%Y-%m-%d\',time.localtime())
logger = LoggingUtils(\'%s.log\'%timer)
结果比对
#coding:utf-8
class result_check():
def __init__(self):
pass
def result_check(self,results,res_check):
\'\'\'
结果对比函数
\'\'\'
#返回结果,将结果中的json数据转化为可以和预期结果比较的数据
res = results.replace(\'":"\',\'=\').replace(\'" : "\',\'=\')
#预期结果,是xx=11;xx=22
res_check = res_check.split(\';\')
for s in res_check:
if s in res:
pass
else:
return \'结果不匹配 \'+ str(s)
return \'pass\'
result_save.py 保存测试结果的模块,复制原有的用例,保存为新的excel
#coding:utf-8
from xlutils import copy
import xlrd
import time
import os
class Save_test_result():
def __init__(self):
pass
def save_result(self,file_path,res_flags,request_urls,responses):
\'\'\'
:return:
\'\'\'
book = xlrd.open_workbook(file_path)
new_book = copy.copy(book)
sheet = new_book.get_sheet(0)
i = 1
for request_url, response, flag in zip(request_urls, responses, res_flags):
sheet.write(i, 8, u\'%s\' % request_url)
sheet.write(i, 9, u\'%s\' % response)
sheet.write(i, 10, u\'%s\' % flag)
i += 1
report_path = os.path.abspath(os.path.join(\'report\'))
if not os.path.exists(report_path):
os.makedirs(report_path)
new_book.save(os.path.abspath(os.path.join(report_path, \'Report@%s.xls\' % time.strftime(\'%Y.%m.%d@%H%M%S\'))))
结果邮件
#coding:utf-8
import smtplib
from email.mime.text import MIMEText
from email.header import Header
from email.mime.multipart import MIMEMultipart
import os
from logging_save import logger
class Send_report(object):
def __init__(self,count_success,count_failure,failed_case):
\'\'\'
:param count_success:
:param count_failure:
:param failed_case:
\'\'\'
self.count_success = count_success
self.count_failure = count_failure
self.failed_case = failed_case
def newest_report(self,testreport=\'report\'):
\'\'\'
获取最新的测试报告
:param testreport:
:return:
\'\'\'
lists = os.listdir(testreport)
lists.sort(key=lambda fn: os.path.getmtime(os.path.join(testreport,fn)))
file_new = os.path.join(testreport, lists[-1])
logger.info(\'获取最新附件报告成功\')
return file_new
def send_result(self,username,passwd,from_addr,to_addrs,smtpserver,*args):
\'\'\'
:param username:
:param passwd:
:param from_addr:
:param to_addrs:
:param smtpserver:
:param args:
:return:
\'\'\'
sender = from_addr
subject = \'财富港接口测试结果\'
username = username
passwd = passwd
\'\'\'邮件内容\'\'\'
tille = (u\'用例名称\', u\'请求方式\', u\'url\')
details = (u\'成功: \' + str(self.count_success) + u\'失败: \' + str(self.count_failure)) + \'\n\' + u\'失败的用例如下 :\' + \
\'\n\' + \'\n\'.join(str(zip(tille, i)) for i in self.failed_case).decode(\'unicode-escape\')
logger.info(\'邮件附件为: %s\' %(args[0].split(\'\\\')[1]))
if args != None: #判断是否添加附件
msg = MIMEMultipart()
msg.attach(MIMEText(details, \'plain\', \'utf-8\'))
i = 0
while i < len(args): #可以添加多个附件
part = MIMEText(open(args[i], \'rb\').read(), \'base64\', \'utf-8\')
part["Content-Type"] = \'application/octet-stream\'
part["Content-Disposition"] = \'attachment; filename="%s"\'%args[i]
msg.attach(part) #添加附件
i += 1
msg[\'subject\'] = Header(subject, \'utf-8\')
msg[\'From\'] = from_addr
msg[\'To\'] = \',\'.join(eval(to_addrs)) #兼容多个收件人
smtp = smtplib.SMTP()
try:
smtp.connect(smtpserver)
smtp.login(username, passwd)
smtp.sendmail(sender, eval(to_addrs), msg.as_string())
smtp.close()
logger.info(\'带附件测试报告发送成功!\')
except smtplib.SMTPAuthenticationError,e:
logger.error(\'邮箱账户或密码错误: \'+ str(e))
else:
msg = MIMEText(details, \'plain\', \'utf-8\')
msg[\'subject\'] = Header(subject, \'utf-8\')
msg[\'From\'] = from_addr
msg[\'To\'] = \',\'.join(eval(to_addrs))
smtp = smtplib.SMTP()
try:
smtp.connect(smtpserver)
smtp.login(username, passwd)
smtp.sendmail(sender, eval(to_addrs), msg.as_string())
logger.info(\'测试报告发送成功!\')
smtp.close()
except smtplib.SMTPAuthenticationError,e:
logger.error(\'邮箱账户或密码错误 : \'+str(e))
用例获取及数据格式化
#coding:utf-8
import xlrd
from logging_save import logger
class Get_testcase(object):
def __init__(self, file_path):
\'\'\'
:param file_path: 用例文件路径
\'\'\'
self.file_path = file_path
def readExcel(self):
\'\'\'
读取用例函数
:return: 测试用例列表
\'\'\'
try:
book = xlrd.open_workbook(self.file_path) # 打开excel
except Exception, error:
logger.error(\'路径不在或者excel不正确 : \' + str(error))
return error
else:
sheet = book.sheet_by_index(0) # 取第一个sheet页
rows = sheet.nrows # 取这个sheet页的所有行数
case_list = [] # 用于保存用例信息
for i in range(rows):
if i != 0:
case_list.append(sheet.row_values(i)) # 把每一条测试用例添加到case_list中
return case_list
请求url转换
#coding:utf-8
class urltransform(object):
def __init__(self):
pass
def urltransform(self, url, method, param):
\'\'\'
:return:
\'\'\'
if param == \'\':
new_url = url
else:
if method.upper() == \'GET\':
new_url = url + \'?\' + param.replace(\';\', \'&\') #如果有参数,且为GET方法则组装url
else:
new_url = url
return new_url
测试用例excel结构
config目录下,config.py 获取配置文件信息的模块
#conding:utf-8
import ConfigParser
class Config(object):
def __init__(self,file_path):
self.config = ConfigParser.ConfigParser()
self.config.read(file_path)
def get_mail_config(self):
login_user = self.config.get(\'SMTP\', \'login_user\')
login_pwd = self.config.get(\'SMTP\', \'login_pwd\')
from_addr = self.config.get(\'SMTP\', \'from_addr\')
to_addrs = self.config.get(\'SMTP\', \'to_addrs\')
smtp_server = self.config.get(\'SMTP\', \'smtp_server\')
port = self.config.get(\'SMTP\', \'port\')
return login_user, login_pwd , from_addr, to_addrs,smtp_server, port
def report_save_config(self):
pass
mail.conf
[SMTP] login_user = 18******@163.com login_pwd = ****** from_addr = BI<18******@163.com> to_addrs = [\'18******@163.com\'] #to_addrs = [\'1******@qq.com\',\'******.com\'] smtp_server = smtp.163.com port = 25
测试报告
邮件接收结果