Detector

实现思路

使用excel管理用例用例信息,requests模块发送http请求,实现了记录日志,邮件发送测试报告的功能

目录结构如下:

下面直接上代码:

统筹脚本

# -*- coding:utf-8 -*-
import os
from interface import Interface
from testcase_get import Get_testcase
from result_save import Save_test_result
from result_send import Send_report
from config.config import Config
from logging_save import logger

if __name__ == "__main__":
    cur_path = os.path.split(os.path.realpath(__file__))[0]  # 获取当前文件绝对路径
    case_path = os.path.join(cur_path, "test_case", "20170602.xls")
    test_case = Get_testcase(case_path).readExcel()  # 获取用例
    if not isinstance(test_case, list):  # 判断用例是否获取成功
        logger.info("Test_case get failed... \n Done!")
    else:
        logger.info("获取用例成功")
        # 调用接口
        test_result = Interface().interfaceTest(test_case)
        # 获取执行结果,用于发邮件
        count_success = test_result[3]
        count_failure = test_result[4]
        failed_case_detail = test_result[5]
        # 保存测试结果
        Save_test_result().save_result(case_path, test_result[0], test_result[1], test_result[2])
        logger.info("保存测试结果成功")
        # 获取邮件配置信息
        mail_config = Config(os.path.join(cur_path, "config", "mail.conf")).get_mail_config()
        logger.info("获取邮箱配置成功")
        login_user = mail_config[0]
        login_pwd = mail_config[1]
        from_addr = mail_config[2]
        to_addrs = mail_config[3]
        smtp_server = mail_config[4]
        mail_send = Send_report(count_success, count_failure, failed_case_detail)
        # 获取最新测试报告
        last_report = mail_send.newest_report()
        logger.info("邮件发送结果")
        mail_send.send_result(login_user, login_pwd,from_addr, to_addrs,smtp_server,last_report)
        logger.info("DONE!")

请求封装

# coding:utf-8
import json
import requests

from logging_save import logger
from result_check import Result_check
from url_transform import urltransform


class Interface:
    def __init__(self, ):
        pass

    def interfaceTest(self, case_list):
        """
        接口调用主函数
        """
        # 用于存结果
        res_flags = []
        # 用于存请求报文
        request_urls = []
        # 用于存返回报文
        responses = []
        # 用户存失败的用例
        failed_case = []
        # 统计成功失败的用例数
        count_success = 0
        count_failure = 0
        for case in case_list:
            try:
                # 模块
                product = case[0]
                # 用例id
                case_id = case[1]
                # 用例标题
                interface_name = case[2].strip(\'\n\')
                # 用例描述
                case_detail = case[3]
                # 请求方式
                method = case[4]
                # 请求url
                url = case[5]
                # 入参
                param = case[6]
                # 预期结果
                res_check = case[7]
            except Exception as e:
                return \'测试用例格式不正确!%s\' % e
            # 定义消息头信息
            headers = {\'content-type\': \'application/json\',
                       \'User-Agent\': \'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:22.0) Gecko/20100101 Firefox/22.0\'}
            # 对url进行封装
            new_url = urltransform().urltransform(url, method, param)
            if method.upper() == \'GET\':
                results = requests.get(new_url).text
                logger.info(u\'正在调用接口: %s\' % interface_name)
                # print results
                responses.append(results)
                # 用于存储预期结果与实际结果的比较结果
                res = Result_check().interface_result_check(results, res_check)
                request_urls.append(new_url)
            else:
                request_urls.append(new_url)
                if param == \'\':
                    pass
                else:
                    data = json.loads(param)  # 将参数转化为json格式
                results = requests.post(new_url, data=json.dumps(data), headers=headers).text
                responses.append(results)
                res = Result_check().interface_result_check(results, res_check)
            if \'pass\' in res:
                res_flags.append(\'pass\')
                count_success += 1
            else:
                logger.warning(u\'接口返回结果与预期结果不一致!失败URL: %s METHOD :%s\' % (url, method))
                res_flags.append(\'fail\')
                count_failure += 1
                failed_case.append((interface_name, method, url))
        logger.info(u\'共执行 %s 条用例,PASS: %s,FAILED: %s\' % (len(case_list), count_success, count_failure))
        return res_flags, request_urls, responses, count_success, count_failure, failed_case

日志封装

# coding=utf-8
import logging
import sys
import traceback
import time

class LoggingUtils:
    \'\'\'
    ===========封装日志工具类的基本操作=============
    \'\'\'
    def __init__(self,logfile):
        \'\'\'
        :param logfile: 
        \'\'\'
        self.logger = logging.getLogger(logfile)
        self.hdlr = logging.FileHandler(logfile)
        formatter = logging.Formatter(\'%(asctime)s %(levelname)s - %(message)s\')
        self.ch = logging.StreamHandler()
        self.ch.setLevel(logging.INFO)
        self.ch.setFormatter(formatter)
        self.hdlr.setFormatter(formatter)
        self.logger.addHandler(self.hdlr)
        self.logger.addHandler(self.ch)
        self.logger.setLevel(logging.DEBUG)

    def debug(self, msg):
        \'\'\'
        :param msg: 
        :return: 
        \'\'\'
        self.logger.debug(msg)
        self.hdlr.flush()

    def info(self, msg):
        \'\'\'
        
        :param msg: 
        :return: 
        \'\'\'
        self.logger.info(msg)
        self.hdlr.flush()

    def warning(self,msg):
        self.logger.warning(msg)
        self.hdlr.flush()

    def error(self, msg):
        \'\'\'

        :param msg: 
        :return: 
        \'\'\'
        self.logger.error(msg)
        # self.logger.removeHandler(logging.StreamHandler())
        self.logger.removeHandler(self.ch)
        self.hdlr.flush()

    def error_sys(self, limit=None):
        \'\'\'
        :param limit: 
        :return: 
        \'\'\'
        exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
        if limit is None:
            if hasattr(sys, \'tracebacklimit\'):
                limit = sys.tracebacklimit
        n = 0
        eline = \'\n\'
        while exceptionTraceback is not None and (limit is None or n < limit):
            f = exceptionTraceback.tb_frame
            lineno = exceptionTraceback.tb_lineno
            co = f.f_code
            filename = co.co_filename
            name = co.co_name
            eline += \' File "%s", line %d, in %s \n \' % (filename, lineno, name)
            exceptionTraceback = exceptionTraceback.tb_next
            n = n + 1

        eline += "\n".join(traceback.format_exception_only(exceptionType, exceptionValue))
        self.logger.error(eline)
        self.hdlr.flush()
timer = time.strftime(\'%Y-%m-%d\',time.localtime())
logger = LoggingUtils(\'%s.log\'%timer)

结果比对

#coding:utf-8
class result_check():

    def __init__(self):
        pass

    def result_check(self,results,res_check):
        \'\'\'
        结果对比函数
        \'\'\'
        #返回结果,将结果中的json数据转化为可以和预期结果比较的数据
        res = results.replace(\'":"\',\'=\').replace(\'" : "\',\'=\')
        #预期结果,是xx=11;xx=22
        res_check = res_check.split(\';\')
        for s in res_check:
            if s in res:
                pass
            else:
                return \'结果不匹配 \'+ str(s)
        return \'pass\'
 result_save.py   保存测试结果的模块,复制原有的用例,保存为新的excel
#coding:utf-8
from xlutils import copy
import xlrd
import time
import os

class Save_test_result():

    def __init__(self):
        pass

    def save_result(self,file_path,res_flags,request_urls,responses):
        \'\'\'
        :return: 
        \'\'\'
        book = xlrd.open_workbook(file_path)
        new_book = copy.copy(book)
        sheet = new_book.get_sheet(0)
        i = 1
        for request_url, response, flag in zip(request_urls, responses, res_flags):
            sheet.write(i, 8, u\'%s\' % request_url)
            sheet.write(i, 9, u\'%s\' % response)
            sheet.write(i, 10, u\'%s\' % flag)
            i += 1
        report_path = os.path.abspath(os.path.join(\'report\'))
        if not os.path.exists(report_path):
            os.makedirs(report_path)
        new_book.save(os.path.abspath(os.path.join(report_path, \'Report@%s.xls\' % time.strftime(\'%Y.%m.%d@%H%M%S\'))))

结果邮件

#coding:utf-8
import smtplib
from email.mime.text import MIMEText
from email.header import Header
from email.mime.multipart import MIMEMultipart
import os
from logging_save import  logger


class Send_report(object):
    def __init__(self,count_success,count_failure,failed_case):
        \'\'\'
        :param count_success: 
        :param count_failure: 
        :param failed_case: 
        \'\'\'
        self.count_success = count_success
        self.count_failure = count_failure
        self.failed_case = failed_case

    def newest_report(self,testreport=\'report\'):
        \'\'\'
        获取最新的测试报告
        :param testreport: 
        :return: 
        \'\'\'
        lists = os.listdir(testreport)
        lists.sort(key=lambda fn: os.path.getmtime(os.path.join(testreport,fn)))
        file_new = os.path.join(testreport, lists[-1])
        logger.info(\'获取最新附件报告成功\')
        return file_new

    def send_result(self,username,passwd,from_addr,to_addrs,smtpserver,*args):
        \'\'\'
        :param username: 
        :param passwd: 
        :param from_addr: 
        :param to_addrs: 
        :param smtpserver: 
        :param args: 
        :return: 
        \'\'\'
        sender = from_addr
        subject = \'财富港接口测试结果\'
        username = username
        passwd = passwd

        \'\'\'邮件内容\'\'\'
        tille = (u\'用例名称\', u\'请求方式\', u\'url\')
        details = (u\'成功: \' + str(self.count_success) + u\'失败: \' + str(self.count_failure)) + \'\n\' + u\'失败的用例如下 :\' + \
                  \'\n\' + \'\n\'.join(str(zip(tille, i)) for i in self.failed_case).decode(\'unicode-escape\')
        logger.info(\'邮件附件为: %s\' %(args[0].split(\'\\\')[1]))

        if args != None: #判断是否添加附件
            msg = MIMEMultipart()
            msg.attach(MIMEText(details, \'plain\', \'utf-8\'))
            i = 0
            while i < len(args): #可以添加多个附件
                part = MIMEText(open(args[i], \'rb\').read(), \'base64\', \'utf-8\')
                part["Content-Type"] = \'application/octet-stream\'
                part["Content-Disposition"] = \'attachment; filename="%s"\'%args[i]
                msg.attach(part) #添加附件
                i += 1

            msg[\'subject\'] = Header(subject, \'utf-8\')
            msg[\'From\'] = from_addr
            msg[\'To\'] = \',\'.join(eval(to_addrs)) #兼容多个收件人
            smtp = smtplib.SMTP()
            try:
                smtp.connect(smtpserver)
                smtp.login(username, passwd)
                smtp.sendmail(sender, eval(to_addrs), msg.as_string())
                smtp.close()
                logger.info(\'带附件测试报告发送成功!\')
            except smtplib.SMTPAuthenticationError,e:
                logger.error(\'邮箱账户或密码错误: \'+ str(e))

        else:
            msg = MIMEText(details, \'plain\', \'utf-8\')
            msg[\'subject\'] = Header(subject, \'utf-8\')
            msg[\'From\'] = from_addr
            msg[\'To\'] =  \',\'.join(eval(to_addrs))
            smtp = smtplib.SMTP()
            try:
                smtp.connect(smtpserver)
                smtp.login(username, passwd)
                smtp.sendmail(sender, eval(to_addrs), msg.as_string())
                logger.info(\'测试报告发送成功!\')
                smtp.close()
            except smtplib.SMTPAuthenticationError,e:
                logger.error(\'邮箱账户或密码错误 : \'+str(e))

用例获取及数据格式化

#coding:utf-8
import xlrd

from logging_save import logger

class Get_testcase(object):

    def __init__(self, file_path):
        \'\'\'
        :param file_path: 用例文件路径
        \'\'\'
        self.file_path = file_path

    def readExcel(self):
        \'\'\'
        读取用例函数
        :return: 测试用例列表
        \'\'\'
        try:
            book = xlrd.open_workbook(self.file_path)  # 打开excel
        except Exception, error:
            logger.error(\'路径不在或者excel不正确 : \' + str(error))
            return error
        else:
            sheet = book.sheet_by_index(0)  # 取第一个sheet页
            rows = sheet.nrows  # 取这个sheet页的所有行数
            case_list = []  # 用于保存用例信息
            for i in range(rows):
                if i != 0:
                    case_list.append(sheet.row_values(i)) # 把每一条测试用例添加到case_list中
            return case_list

请求url转换

#coding:utf-8
class urltransform(object):
    def __init__(self):
        pass

    def urltransform(self, url, method, param):
        \'\'\'
        :return: 
        \'\'\'
        if param == \'\':
            new_url = url
        else:
            if method.upper() == \'GET\':
                new_url = url + \'?\' + param.replace(\';\', \'&\')  #如果有参数,且为GET方法则组装url
            else:
                new_url = url
        return new_url

测试用例excel结构

config目录下,config.py   获取配置文件信息的模块

#conding:utf-8
import ConfigParser

class Config(object):

    def __init__(self,file_path):
        self.config = ConfigParser.ConfigParser()
        self.config.read(file_path)

    def get_mail_config(self):
        login_user = self.config.get(\'SMTP\', \'login_user\')
        login_pwd = self.config.get(\'SMTP\', \'login_pwd\')
        from_addr = self.config.get(\'SMTP\', \'from_addr\')
        to_addrs = self.config.get(\'SMTP\', \'to_addrs\')
        smtp_server = self.config.get(\'SMTP\', \'smtp_server\')
        port = self.config.get(\'SMTP\', \'port\')
        return login_user, login_pwd , from_addr, to_addrs,smtp_server, port

    def report_save_config(self):
        pass

mail.conf

[SMTP]
login_user = 18******@163.com
login_pwd = ******
from_addr =  BI<18******@163.com>
to_addrs = [\'18******@163.com\']
#to_addrs = [\'1******@qq.com\',\'******.com\']
smtp_server = smtp.163.com
port = 25

测试报告

邮件接收结果

 

分类:

技术点:

相关文章: