三.接口自动化项目1

少壮不努力123 / 2024-03-10 / 原文

一.接口自动化需求分析:
接口自动化测试用例:
1.用例写在excel表格里面,定义函数获取excel表格中数据并加入到用例列表
中进行返回
a.Excel表格中的数据只有url/请求方式、请求参数、headers、是否json、
预期结果才是有效数据
b.请求参数定义格式是"xxx=123,sss>456,phone=<phone>"这种的,需要
转换成字典的形式,并且<phone>需要进行参数化
c.headers请求头需要转换成字典
 
2.用例读取完成后,需要进行发送请求,需要构造一个请求的函数,请求成功后返
回请求结果json格式和字符串格式的结果
 
 
3.拿到请求结果后,需要对预期结果和实际结果进行比对,只要有一条数据不通过
就失败
 
4.最后进行串联整个流程
a.先读取测试用例文件大,获取所有测试用例excel文件
b.运行测试用例,
先根据excel文件获取到所有测试用例;
再循环测试用例并进行接口请求,获取到接口返回的结果;
然后根据结果返回的实际结果与预期结果进行比较,并把运行总条数、成功数、状态、失败原因放在列表中保存在response_list中;
最后把运行结果写入到excel表格中
c.最后发送接口测试用例运行结果报告邮件
 
二.进行接口自动化项目程序定义分类:
1.创建一个AutoTestProject文件夹后并在该文件夹下分别创建bin、case、config、lib、logs、report文件夹,bin目录做完程序的入口目录并在下面创建start.py文件;case目录用来存放结果自动化测试用例;config用来存放配置文件及自动化测试用例模板文件;lib文件夹下存放read_case.py、request.py、parse_response.py、utils.py等文件夹;logs文件夹用来存放日志的;report文件夹用来存放测试报告的

 2.在case文件目录下存放测试用例文件,测试用例以test开头和xls或xlsx结尾

 

3.config文件夹下定义配置相关内容及模板文件
setting.py文件
import os
import random
import string

import faker


# 发送邮件
EMAIL_INFO={
    "user":"13424210282@163.com",
    "password":"HLXWWGTUWGNAAAEA",
    "host":"smtp.163.com",
}
# 邮件接收人
T0=["974219141@qq.com"]
# 邮件抄送人
CC=['751462075@qq.com']

# 接口请求url地址配置
host_map={
    "fat":"http://127.0.0.1:8787",
    "uat":"",
    "pro":""
}
default="fat"

HOST=host_map.get(default)

# 自动化项目父目录
BASE_PATH=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# 测试用例目录
CASE_PATH=os.path.join(BASE_PATH,'case')
# 日志文件目录
LOG_PATH=os.path.join(BASE_PATH,"logs",'atp.log')
# 测试报告目录
REPORT_PATH=os.path.join(BASE_PATH,'report')

# faker.Faker生成数据进行参数化
f=faker.Faker(locale="zh-CN")

"""
进行参数化的字典,后续有字段数值需要进行参数化的时候只需要在这里进行维护
"""
def getPassword():
    """
    生成随机密码的
    :return:
    """
    a=random.sample(string.digits,2)
    b=random.sample(string.ascii_uppercase,2)
    c=random.sample(string.ascii_lowercase,2)
    d=random.sample("@#$%^&*",2)
    s=a+b+c+d
    # 数据清洗或打乱
    random.shuffle(s)
    return "".join(s)

func_map={
    "<phone>":f.phone_number,
    "<id_card>":f.ssn,
    "<email>":f.email,
    "<name>":f.name,
    "<addr>":f.address,
    "<password>":getPassword
}

import nnlog
LOG=nnlog.Logger(LOG_PATH)
View Code
4.lib文件下定义utils.py进行发邮件、获取字典中的值、写报告功能;read_case.py定义获取用例列表;request.py定义发送请求;parse_response.py定义结果比对
A.utils.py
from xlutils import copy
import os
import random
import string
import time
import traceback

import yagmail

from config.setting import REPORT_PATH,LOG,EMAIL_INFO,T0,CC
import jsonpath
import xlrd


def get_value(d,k):
    # 通过jsonpath.jsonpath来取字典中key所对应的值
    result=jsonpath.jsonpath(d,'$..%s'%k)
    # 如果取到了返回第一个值,否则返回空字符串
    if result:
        return result[0]
    return ''

"""
result_list=[
    [{"code":0,"msg":"成功"},"通过"],
    [{"code":0,"msg":"成功"},"失败"],
]
"""

def write_excel(file_name,result_list):

    """
    result_list存放用例执行结果["参数","执行结果","原因","状态"]
    :param file_name:
    :param result_list:
    :return:
    """
    LOG.debug("现在开始写报告了:文件名是%s,内容是%s"%(file_name,result_list))
    book=xlrd.open_workbook(file_name)
    newbook=copy.copy(book)
    try:
        sheet=newbook.get_sheet(0)
    except Exception as e:
        print("无法打开excel=============",file_name)
    for row,result in enumerate(result_list,1):
        for col ,value in enumerate(result[1:],7):
            sheet.write(row,col,value)
        sheet.write(row,3,result[0])
    file_name=os.path.split(file_name)[-1].replace("xlsx","xls")
    new_file_name=time.strftime("%Y%m%d%H%M%S")+"_"+file_name
    case_file=os.path.join(REPORT_PATH,new_file_name)
    newbook.save(case_file)
    LOG.debug("报告生成完成,文件名是%s"%(case_file))
    return case_file


def send_email(all_count,pass_count,file_name):
    LOG.debug("开始发送报告了,文件名是%s"%file_name)
    content='''
各位好:
    本次接口自动化测试结果如下,总共运行%s条用例,通过%s条,失败【%s】条。
    详情请查看附件。
    '''%(all_count,pass_count,(all_count-pass_count))

    subject='%s-接口测试报告'%time.strftime("%Y-%m-%d %H%M%S")

    email=yagmail.SMTP(**EMAIL_INFO,encoding="GBK")

    try:
        email.send(to=T0,cc=CC,subject=subject,contents=content,attachments=file_name)
    except Exception as e:
        LOG.error("发送报告失败,具体失败原因是%s"%traceback.format_exc())
    else:
        LOG.debug("报告发送成功")
View Code
B.read_case.py
import nnlog
import traceback
import xlrd
from config.setting import LOG_PATH,func_map
import jsonpath

log=nnlog.Logger(LOG_PATH)

# "$..key"是进行模糊查询
# jsonpath.jsonpath(d,"$..key")
# faker模板的使用

class ParamDeal():

    def read_excel(self,file_path):
        # case_list存放测试用例
        case_list=[]
        try:
            # 打开excel
            book=xlrd.open_workbook(file_path)
            sheet=book.sheet_by_index(0)
            #
            for row in range(1,sheet.nrows):
                # 只取有用的那几行
                line=sheet.row_values(row)[1:7]

                # 把参数先进行替换
                line[2]=self.replace_param(line[2])

                # 将header转换成字典
                line[3]=self.replace_param(line[3])
                # 再把参数进行转换成字典
                line[2]=self.str_to_dict(line[2])

                line[3]=self.str_to_dict(line[3])
                case_list.append(line)
        except Exception as e:
            log.error("读取文件出错,文件名是:%s"%file_path)
            log.error("具体的错误信息是%s"%traceback.format_exc())
        return case_list

    def replace_param(self,s):
        """
        进行参数替换
        :param s:
        :return:
        """
        for func_name,func in func_map.items():
            if func_name in s:
                result=func()
                s=s.replace(func_name,result)
        return s

    def str_to_dict(self,s):
        """
        字符串转换成字典,如果是空字符串直接返回空字典
        :param s:
        :return:
        """
        d={}
        # 判断参数是否为空
        if s.strip():
            # 字符串通过逗号进行分割后通过等号进行分割后再转换成字典
            for i in s.split(","):
                k,v=i.split("=")
                d[k]=v
        return d
View Code

C.request.py

import traceback

import requests
from config import setting
from urllib.parse import  urljoin
import nnlog


log=nnlog.Logger(setting.LOG_PATH)
class MyRequest:
    def __init__(self,url,method,data=None,headers=None,is_json=''):
        # 拼接url,通过urljoin进行拼装url
        self.url=urljoin(setting.HOST,url)
        self.data=data
        self.headers=headers
        self.is_json=is_json
        self.method=method.lower()
        self.req()

    def req(self):
        try:
            if self.is_json=='':
                response= requests.request(self.method,self.url,params=self.data,json=self.data,headers=self.headers).json()
            else:
                response= requests.request(self.method,self.url,params=self.data,data=self.data,headers=self.headers).json()
        except Exception as e:
            log.error("请求%s的时候出错了,请求参数是%s,错误信息是%s"%(self.url,self.data,traceback.format_exc()))
            self.result={"msg":"请求接口出错了","error_msg":traceback.format_exc()}
            self.text='{"msg":"请求接口出错了","error_msg":%s}'%(traceback.format_exc())
        else:
            self.result=response
            self.text=str(response)
View Code

D.parse_response.py

from lib.utils import get_value
from config.setting import LOG_PATH
import nnlog

log=nnlog.Logger(LOG_PATH)

class ParseResponse:

    fuhao=['!=','>=','<=','>','<','=']

    def __init__(self,check_str,response):
        self.check_str=check_str
        self.response=response
        self.reason="都通过了"
        self.status='通过'
        self.check_response()

    def check_response(self):
        """
        预期结果格式是:yuqijieguo="status=1,score>60,age!=15"
        :return:
        """
        if self.check_str.strip():
            for s in self.check_str.split(","):
                for f in self.fuhao:
                    if f in s:
                        # 分割取出预期结果中的key
                        key, value = s.split(f)
                        # 取到实际结果
                        shijijieguo = get_value(self.response, key)
                        # 如果是实际结果类型是字符串的话,实际结果和预期结果都转换成字符串进行比较
                        if type(shijijieguo) == str:
                            shijijieguo = "'%s'" % (shijijieguo)
                            value = "'%s'" % (value)

                        # 转换成字符串进行eval运行
                        f = "==" if f == "=" else f
                        code = "%s %s %s" % (shijijieguo, f, value)
                        tag=eval(code)
                        # 判断tag不是true时表示有预期结果与实际结果对比是失败的
                        if tag!=True:
                            self.reason='key是%s,运算的代码是%s'%(key,code)
                            log.debug(self.reason)
                            self.status='失败'
                            return False
                        break
        return True
View Code
5.bin文件下定义start.py文件是程序主入口;duoxiancheng.py是文件程序多线程运行入口
A. start.py
#! /usr/bin python
#-*- encoding=utf-8 -*-

from lib.read_case import ParamDeal
from lib.request import MyRequest
from lib.parse_response import get_value,ParseResponse
from lib.utils import send_email,write_excel
import os
from config.setting import CASE_PATH


class CaseRun:
    all_count=0
    pass_count=0
    def get_case(self):
        # 获取excel测试用例文件
        excel_list=[]
        for file in os.listdir(CASE_PATH):
            if file.startswith("test") and (file.endswith('.xls') or file.endswith(".xlsx")):
                abs_path = os.path.join(CASE_PATH, file)
                excel_list.append(abs_path)
        return excel_list

    def run_case(self,file_name):
        read_case_obj=ParamDeal()
        # 根据excel文件获取其中的测试用例
        case_list=read_case_obj.read_excel(file_name)

        # response_list用于存放所有的结果
        response_list=[]

        # 循环用例列表进行发送请求
        for case in case_list:
            # 统计用例的总个数,循环一次,用例加一条
            self.all_count+=1
            # 发送接口请求
            req_obj=MyRequest(*case[:5])

            # 校验结果

            response_obj=ParseResponse(case[-1],req_obj.result)

            if response_obj.status=='通过':
                # 统计用例执行个数
                self.pass_count+=1
            response_list.append([str(req_obj.data),req_obj.text,response_obj.status,response_obj.reason])
        return  write_excel(file_name,response_list)



    def main(self):
        report_list=[]
        excel_list=self.get_case()
        for excel in excel_list:
            report_name=self.run_case(excel)
            report_list.append(report_name)

        send_email(self.all_count,self.pass_count,report_list)

if __name__=="__main__":
    c=CaseRun()
    c.main()
View Code

B. duoxiancheng.py

import os,sys
sys.path.insert(0,os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from lib.read_case import ParamDeal
from lib.request import MyRequest
from lib.parse_response import get_value,ParseResponse
from lib.utils import send_email,write_excel

from config.setting import CASE_PATH
import threading

class CaseRun:
    all_count=0
    pass_count=0
    report_list = []
    def get_case(self):
        # 获取excel测试用例文件
        excel_list=[]
        for file in os.listdir(CASE_PATH):
            if file.startswith("test") and (file.endswith('.xls') or file.endswith(".xlsx")):
                abs_path = os.path.join(CASE_PATH, file)
                excel_list.append(abs_path)
        return excel_list

    def run_case(self,file_name):
        read_case_obj=ParamDeal()
        # 根据excel文件获取其中的测试用例
        case_list=read_case_obj.read_excel(file_name)
        # response_list用于存放所有的结果
        response_list=[]

        # 循环用例列表进行发送请求
        for case in case_list:
            # 统计用例的总个数,循环一次,用例加一条
            self.all_count+=1
            # 发送接口请求
            req_obj=MyRequest(*case[:5])
            # 校验结果
            response_obj=ParseResponse(case[-1],req_obj.result)

            if response_obj.status=='通过':
                # 统计用例执行个数
                self.pass_count+=1
            response_list.append([str(req_obj.data),req_obj.text,response_obj.status,response_obj.reason])
        report_name=  write_excel(file_name,response_list)
        self.report_list.append(report_name)



    def main(self):
        """
        启动多线程运行
        :return:
        """
        excel_list=self.get_case()
        for excel in excel_list:
            t=threading.Thread(target=self.run_case,args=(excel,))
            t.start()
        while threading.active_count()!=1:
            pass

        send_email(self.all_count,self.pass_count,self.report_list)

if __name__ == '__main__':
    c=CaseRun()
    c.main()
View Code