Now the enterprise for software testing personnel more and more high requirements, automation has been one of the basic professional skills of intermediate software testing engineers, so today we come to build an interface automation framework to play
Since it is a framework, then there must be a structural design concept, we use this is the way of layered subcontracting, to achieve high cohesion and low coupling object-oriented programming.
Here I use unittest+requests+htmltestrunner+ DDT framework
Start by creating folders with different functions:
1.common package put public scripts, such as database connection, write logs, send requests, and other things to read data ah, take token ah, read configuration file to throw it into the end
Conf package mainly contains configuration files, such as test environment, SSH information, database information, etc., which is convenient for maintenance and parameterization
3. Data package is used to store test data. I used Excel here and I used YML before
The testCase package is used to store test cases
5. TestLogs is used to store log files. Log files are generated each time the script is run and stored in this folder
6. TestReport package to put htmlTestrunner generated test reports
7. Leave a run method as the entry point for the framework to run
After that, it’s time to start writing scripts.
I write is connect to the database scripts, because use SSH to connect mysql before, this time want to give it a try, and then a operation, without success, on the Internet to find a script, you change yourself, become, my IP and port number is used in the transmission way, so if you have bosses want to use, will be changed, don’t when the time comes to scold me:
import pymysql
from sshtunnel import SSHTunnelForwarder
from API.common.readConfig import ReadConfig
class DataBase:
def __init__(self):
read = ReadConfig()
self.server = SSHTunnelForwarder(
read.get_config_str("ssh"."sshHost"), read.get_config_int("ssh"."sshPort"), # SSH IP and port
ssh_password = read.get_config_str("ssh"."sshPassword"), # SSH password
ssh_username = read.get_config_str("ssh"."sshUser"), # SSH account
remote_bind_address = (read.get_config_str("mysql"."dbHost"), read.get_config_int("mysql"."dbPort")))The IP address and port where the database resides
# start service
self.server.start()
Check whether the local port is configured correctly
print(self.server.local_bind_host)
def Get_db(self,sql):
read = ReadConfig()
self.goDB = pymysql.connect(host = read.get_config_str("mysql"."dbHost"), # fixed
port = self.server.local_bind_port,
user = read.get_config_str("mysql"."dbUser"), # database account
passwd = read.get_config_str("mysql"."dbPassword"), # database password
db = read.get_config_str("mysql"."dbName")) Mysql > select * from 'mysql'; select * from 'mysql'; select * from 'mysql'
cur = self.goDB.cursor()
try:
SQL statement to check whether the connection is successful
cur.execute("select version()")
result = cur.fetchone()
print("Database version: %s" % result)
cur.execute(sql)
data = cur.fetchall()
return data
except:
print("Error")
# close the connection
def __del__(self):
self.goDB.close()
self.server.close()
Copy the code
Then there is the file that encapsulates the logging class:
import logging
import time
class GetLog:
def __init__(self):
curTime = time.strftime('%Y-%m-%d')
self.logname = 'testLogs/AutoTest' + '_' + curTime + '.txt'
def get_log(self,level,msg):
Create a log collector
logger = logging.getLogger()
logger.setLevel('DEBUG')
# to create a handler
fh = logging.FileHandler(self.logname,'a',encoding='gbk')
fh.setLevel('INFO')
ch = logging.StreamHandler()
ch.setLevel('INFO')
Define the output format of this handler
formatter = logging.Formatter('%(asctime)s - %(filename)s - %(name)s - %(LevelName)s -)
ch.setFormatter(formatter)
fh.setFormatter(formatter)
Add handler to Logger
logger.addHandler(fh)
logger.addHandler(ch)
if level == 'DEBUG':
logger.debug(msg)
elif level == 'INFO':
logger.info(msg)
elif level == 'WARNING':
logger.warning(msg)
elif level == 'ERROR':
logger.error(msg)
elif level == 'CRITICAL':
logger.critical(msg)
logger.removeHandler(fh)
logger.removeHandler(ch)
fh.close()
def log_debug(self,msg):
self.get_log('DEBUG',msg)
def log_info(self,msg):
self.get_log('INFO',msg)
def log_warning(self,msg):
self.get_log('WARNING',msg)
def log_error(self,msg):
self.get_log('ERROR',msg)
def log_critical(self,msg):
self.get_log('CRITICAL',msg)
Copy the code
Then encapsulate the requesting class:
import requests
class HttpRequest:
def __init__(self,method,url,data=None,token=None,headers=None):
try:
if method == 'GET':
self.response = requests.get(url=url,params=data,headers=headers)
elif method == 'POST':
self.response = requests.post(url=url,data=data,headers=headers)
elif method == 'detele':
self.response = requests.delete(url=url,data=data,headers=headers)
except Exception as e:
raise e
def get_status_code(self):
return self.response.status_code
def get_text(self):
return self.response.text
def get_json(self):
return self.response.json()
Copy the code
Read and write Excel classes:
import openpyxl
class Case:
def __init__(self):
self.case_id = None
self.url = None
self.data = None
self.title = None
self.method = None
self.expected = None
class ReadExcel:
def __init__(self,file_name):
try:
self.filename = file_name
self.wb = openpyxl.load_workbook(self.filename)
except FileNotFoundError as e:
print('{0} not found , please check file path'.format(self.filename))
raise e
def get_cases(self,sheet_name):
sheet = self.wb[sheet_name]
max_row = sheet.max_row
test_cases = []
for r in range(2,max_row+1):
case = Case() Instantiate a data object to store the read test data
case.case_id = sheet.cell(row=r,column=1).value
case.title = sheet.cell(row=r,column=2).value
case.method = sheet.cell(row=r,column=3).value
case.url = sheet.cell(row=r,column=4).value
case.data = sheet.cell(row=r,column=5).value
case.expected = sheet.cell(row=r,column=6).value
case.header = sheet.cell(row=r,column=7).value
test_cases.append(case)
return test_cases
Get a list of all the sheets in the workbook
def get_sheet_name(self):
return self.wb.sheetnames
Locate the sheet based on sheet_name, locate the row based on case_id, and write result
def write_result(self,sheet_name,case_id,actual,result):
sheet = self.wb[sheet_name]
max_row = sheet.max_row
for r in range(2,max_row+1) :Select case_id from row r, column 1
case_id_r = sheet.cell(row=r,column=1).value
Check whether the case_id_r of the current line read by Excel is the same as the case_id passed in
if case_id_r == case_id:
sheet.cell(row=r,column=7).value = actual
sheet.cell(row=r,column=8).value = result
self.wb.save(filename=self.filename)
break
Copy the code
Classes that read configuration files:
import configparser
class ReadConfig:
Instantiate the conf object and read the incoming configuration file
def __init__(self):
self.conf = configparser.ConfigParser()
self.conf.read("conf/config.ini")
def get_config_str(self,section,option):
return self.conf.get(section,option)
def get_config_boolean(self,section,option):
return self.conf.getboolean(section,option)
def get_config_int(self,section,option):
return self.conf.getint(section,option)
def get_config_float(self,section,option):
return self.conf.getfloat(section,option)
def get_items(self,section):
return self.conf.items(section)
Copy the code
At this point, the methods in the common package are almost written. I added a get_token class because of the design of the system under test, so I won’t write it here.
Then there is the run method:
import unittest
from HTMLTestRunner import HTMLTestRunner
import time
from API.testCase.test_1 import TestRecharge
from API.common.getlog import GetLog
get_log = GetLog()
def RunTest(a):
suite = unittest.TestSuite()
loader = unittest.TestLoader()
suite.addTest(loader.loadTestsFromTestCase(TestRecharge))
cur_time = time.strftime('%Y-%m-%d_%H_%M_%S')
report_name = 'testReport/test_results' + cur_time + '.html'
with open(report_name,'wb+') as file:
runner = HTMLTestRunner.HTMLTestRunner(stream=file,
verbosity=2,
title='Interface Test Report',
description='Automated testing of data-driven interfaces based on Python + UnitTest',
)
runner.run(suite)
if __name__ == '__main__':
get_log.log_info('" ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ Api Request this Start ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ "')
RunTest()
get_log.log_info('" ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ Api Request this End ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ "')
Copy the code
Test_case = testCase; testCase = testCase; testCase = testCase;
from API.common.readConfig import ReadConfig
from API.common.dataBase import DataBase
from API.common.http_request import HttpRequest
from API.common.oper_token import Get_token
from API.common.read_excel import ReadExcel
from API.common.getlog import GetLog
from ddt import ddt,data
import json
import unittest
read_config = ReadConfig()
url_pre = read_config.get_config_str('api'.'hostName')
# Read excel, get login test data
read_excel = ReadExcel("data/123.xlsx")
recharge_cases = read_excel.get_cases("Sheet1")
get_log = GetLog()
@ddt
class TestRecharge(unittest.TestCase):
def setUp(self):
print('Test Start')
def tearDown(self):
print('Test End')
@data(*recharge_cases)
def test_recharge(self,case):
url = url_pre + case.url
token=Get_token().get_token()
try:
payload= json.loads(case.data)
payload["token"]=token
except:
payload=case.data
Record the current test case information
get_log.log_info("Test Case Info: Case_id: {0} title: {1} method: {2} URL: {3} Data: {4} Expected: {5} Header: {6}".format(case.case_id,case.title,case.method,url,payload,case.expected,case.header))
response = HttpRequest(method=case.method,url=url,data=payload,headers=eval(case.header))
actual = response.get_text()
Record the current test case interface response information
get_log.log_info(Test Case Request Response Result: Response: {0} actual: {1}".format(response.get_json(),actual))
# assert and write the result to the last line of Excel
try:
self.assertEquals(case.expected,actual)
read_excel.write_result('recharge',case.case_id,actual,'Pass')
get_log.log_info('Test Result is Passed ! Case_id is {0}, title is {1} '.format(case.case_id,case.title))
except Exception as e:
read_excel.write_result('recharge',case.case_id,actual,'Fail')
get_log.log_info('Test Result is Failed ! Case_id is {0}, title is {1} '.format(case.case_id,case.title))
get_log.log_error('the Error MSG: {0}'.format(e))
raise e
Copy the code
The libraries you need to install for this framework are:
Configparser ==5.0.0 OpenPyXL ==3.0.3 requests==2.23.0 pymysql==0.9.3 sshtunnel==0.1.5 DDT ==1.4.1 HTMLTestRunner==0.8.0Copy the code
A bamboo hat and a boat
One foot silk, one inch hook
A song and a bottle of wine
One man catches a fish all by himself