python做一个http接口测试框架详解编程语言

目录结构

project

  

  case#测试用例

    

    suite#测试目录

  

  logs#测试日志

  

  papi#测试类

  

  result#测试结果

  

  setting.py#配置文件

1、日志类,用于测试时日志记录 

pyapilog.py 

  

# -*-coding:utf-8 -*-
# !/usr/bin/python
# __author__ = 'dongjie'
__data__ = '2015-05-20'

import datetime
import logging
import os

from yunjiweidian import setting

logLevel = {
1 : logging.NOTSET,
2 : logging.DEBUG,
3 : logging.INFO,
4 : logging.WARNING,
5 : logging.ERROR,
6 : logging.CRITICAL
}
setFile = os.path.join(setting.root_dir, 'setting.ini')
loggers = {}


# 定义日志方法,从配置文件读取日志等级,且定义日志输出路径
def pyapilog(**kwargs):
global loggers
log_level = setting.logLevel
log_path = setting.logFile
if os.path.exists(log_path):
log_file = os.path.join(log_path, datetime.datetime.now().strftime('%Y-%m-%d') + '.log')
else:
os.mkdir(r'%s' % log_path)
log_file = os.path.join(log_path, datetime.datetime.now().strftime('%Y-%m-%d') + '.log')
logger = logging.getLogger()
logger.setLevel(logLevel[log_level])
if not logger.handlers:
# 创建一个handler,用于写入日志文件
fh = logging.FileHandler(log_file)
fh.setLevel(logLevel[log_level])
# 再创建一个handler,用于输出到控制台
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# 定义handler的输出格式
formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# 给logger添加handler
logger.addHandler(fh)
logger.addHandler(ch)
loggers.update(dict(name=logger))
return logger

2、http测试类

httprequest.py 

  

# -*-coding:utf-8 -*- 
# !/usr/bin/python 
__author__ = 'dongjie' 
__data__ = '2015-05-20' 
 
from pyapilog import pyapilog 
import requests 
import json 
import urllib 
 
class SendHttpRequest(object): 
    def __init__(self, url): 
        self.url = url 
    # post request 
 
    def post(self, value=None): 
        params = urllib.urlencode(value) 
        try: 
            req = requests.post(self.url + "?%s" % params) 
        except Exception, err: 
            print err 
        if req.status_code == 200: 
            pyapilog().info(u"发送post请求: %s  服务器返回:  %s" % (req.url, req.status_code)) 
        else: 
            pyapilog().error(u"发送post请求: %s   服务器返回:  %s/n error info: %s " % (req.url, req.status_code, req.text)) 
        return req.text 
 
    def post_json(self, value): 
        head = {'content-type': 'application/json'} 
        try: 
            req = requests.post(self.url, data=json.dumps(value), headers=head) 
            print req.url 
        except Exception, err: 
            print err 
        if req.status_code == 200: 
            pyapilog().info(u"发送post请求: %s  服务器返回:  %s" % (req.url, req.status_code)) 
            return req.text 
        else: 
            pyapilog().error(u"发送post请求: %s   服务器返回:  %s/n error info: %s " % (req.url, req.status_code, req.text)) 
 
    def get(self, value=None): 
        try: 
            req = requests.get(self.url, params=value) 
        except Exception, err: 
            print err 
        if req.status_code == 200: 
            pyapilog().info(u"发送get请求: %s   服务器返回:  %s" % (req.url, req.status_code)) 
        else: 
            pyapilog().error(u"发送get请求: %s   服务器返回:  %s/n error info: %s " % (req.url, req.status_code, req.text)) 
        return req.text

3、数据库操作类

databasedriver.py 

  

# -*-coding:utf-8 -*# !/usr/bin/python 
__author__ = 'dongjie' 
__data__ = '2015-05-21' 
import pymssql 
import MySQLdb 
import setting 
from pyapilog import pyapilog 
 
class sqldriver(object): 
    def __init__(self, host, port, user, password, database): 
        self.host = host 
        self.port = port 
        self.user = user 
        self.password = password 
        self.database = database 
 
    # 执行SQLserver查询 
    def exec_mssql(self, sql): 
        try: 
            conn = pymssql.connect(host=self.host, 
                                   port=self.port, 
                                   user=self.user, 
                                   password=self.password, 
                                   database=self.database, 
                                   charset="utf8") 
            cur = conn.cursor() 
            if cur: 
                pyapilog().info(u"执行SQL语句%s" % sql) 
                cur.execute(sql) 
                rows = cur.fetchall() 
                if len(rows) == 0: 
                    pyapilog().warning(u"没有查询到数据") 
                return rows 
            else: 
                pyapilog().error(u"数据库连接不成功") 
            conn.close() 
        except Exception, e: 
            pyapilog().error(e) 
 
    # 执行Mysql查询 
    def exec_mysql(self, sql): 
        try: 
            conn = MySQLdb.connect(host=self.host, 
                                   port=self.port, 
                                   user=self.user, 
                                   passwd=self.password, 
                                   db=self.database, 
                                   ) 
            cur = conn.cursor() 
            if cur: 
                pyapilog().info(u"执行SQL语句%s" % sql) 
                resList = cur.execute(sql) 
                return resList 
        except Exception, e: 
            pyapilog().error(e) 
 
# 执行sql语句返回结果 
def execsql(sql): 
    config = setting.DATABASE 
    driver = config.get("ENGINE") 
    host = config.get("HOST") 
    port = config.get("PORT") 
    user = config.get("USER") 
    password = config.get("PWD") 
    database = config.get("DATABASE") 
    if driver == "MYSQL": 
        try: 
            sql_result = sqldriver( 
                host=host, 
                port=port, 
                user=user, 
                password=password, 
                database=database 
            ).exec_mysql(sql) 
            return sql_result 
        except Exception, e: 
            pyapilog().error(e) 
 
    elif driver == "MSSQL": 
        try: 
            sql_result = sqldriver( 
                host=host, 
                port=port, 
                user=user, 
                password=password, 
                database=database 
            ).exec_mssql(sql) 
            return sql_result 
        except Exception, e: 
            pyapilog().error(e) 
else: 
        pyapilog().error(u"[%s]数据库配置支持MYSQL、MSSQL、ORACLE" % driver)

 

4、解析json字符串

dataprase.py 

  

python做一个http接口测试框架详解编程语言

# -*-coding:utf-8 -*- 
# !/usr/bin/python 
__author__ = 'dongjie' 
__data__ = '2015-05-21' 
import json 
import xmltodict 
from pyapilog import pyapilog 
 
# 解析json字符串 
class jsonprase(object): 
    def __init__(self, json_value): 
        try: 
            self.json_value = json.loads(json_value) 
        except ValueError, e: 
            pyapilog().error(e) 
 
    def find_json_node_by_xpath(self, xpath): 
        elem = self.json_value 
        nodes = xpath.strip("/").split("/") 
        for x in range(len(nodes)): 
            try: 
                elem = elem.get(nodes[x]) 
            except AttributeError: 
                elem = [y.get(nodes[x]) for y in elem] 
        return elem 
 
    def datalength(self, xpath="/"): 
        return len(self.find_json_node_by_xpath(xpath)) 
 
    @property 
    def json_to_xml(self): 
        try: 
            root = {"root": self.json_value} 
            xml = xmltodict.unparse(root, pretty=True) 
        except ArithmeticError, e: 
            pyapilog().error(e) 
        return xml 
 
# 解析xml字符串 
class xmlprase(object): 
    def __init__(self, xml_value): 
        self.xml_str = xml_value 
 
    @property 
    def xml_to_json(self): 
        try: 
            xml_dic = xmltodict.parse(self.xml_str, 
                                      encoding="utf-8", 
                                      process_namespaces=True, 
                                      ) 
            json_str = json.dumps(xml_dic) 
        except Exception, e: 
            print e 
        return json_str

View Code

5、还有配置文件差点忘记说了

setting.py 

  

# -*-coding:utf-8 -*- 
# !/usr/bin/python 
__author__ = 'dongjie' 
__data__ = '2015-05-20' 
 
''' 
    配置系统相关的参数,提供全局的相关配置 
''' 
import os 
import sys 
root_dir = '/'.join(os.path.realpath(__file__).split('/')[:-1]) 
sys.path.append(root_dir) 
# log等级,1:notset 2:debug  3:info 4:warning 5:error 6:critical 
logLevel = 2 
# 日志文件路径 
logFile = os.path.join(root_dir, 'logs') 
 
# 数据库配置,支持MYSQL、MSSQL、ORACLE 
DATABASE = { 
    "ENGINE": "MSSQL", 
    "HOST": "", 
    "PORT": 3433, 
    "USER": "", 
    "PWD": "", 
    "DATABASE": "" 
} 

  6、最后看看我们的测试用例吧,当然是数据驱动了

python做一个http接口测试框架详解编程语言

# -*-coding:utf-8 -*- 
from ddt import ddt, data, unpack 
import unittest 
from papi.httprequest import SendHttpRequest 
from papi.dataparse import jsonprase, xmlprase 
 
@ddt 
class TestSingleRequest(unittest.TestCase): 
    def setUp(self): 
        self.url = "http://xxxxxxxxxxxxxxxxxxx/api/xxxxxx" 
    @data( 
        (32351, 6), 
        (9, 4) 
    ) 
    @unpack 
    def test_Single_right(self, sid, count): 
        value = {"sid": sid, "count": count} 
        data = SendHttpRequest(self.url).get(value) 
        json_data = jsonprase(data) 
        point_lat = json_data.find_json_node_by_xpath("/Point/Lat") 
        point_lng = json_data.find_json_node_by_xpath("/Point/Lng") 
        is_exists_map = json_data.find_json_node_by_xpath("/Ptd/AmapGuideMap155/IsExistsMap") 
        size = json_data.find_json_node_by_xpath("/Ptd/AmapGuideMap155/Size") 
        # 断言 
        assert float(point_lat) != 0 and float(point_lng) != 0 
        # 断言 
        assert json_data.find_json_node_by_xpath("/Ptd/AmapGuideMap155/DownUrl") is not None 
        if is_exists_map == True: 
            assert size != "" 
 
    # 导常请求SingleRequest接口 
    @data( 
        ("abceeffffg", 6), 
        (9, "") 
    ) 
    @unpack 
    def test_Single_error(self, sid, count): 
        value = {"sid": sid, "count": count} 
        data = SendHttpRequest(self.url).get(value) 
        self.assertEqual(data, u'{"Message":"请求无效。"}') 
 
@ddt 
class TourMaps(unittest.TestCase): 
    def setUp(self): 
        self.url = "http://xxxxxx/api/TourMap" 
 
    @data(32351, 9) 
    def test_requests_online_xml(self, tourId): 
        xml_url = self.url + "/%s" % tourId 
        data = SendHttpRequest(xml_url).get() 
        json_st = xmlprase(data).xml_to_json 
        json_data = jsonprase(json_st) 
        lng = json_data.find_json_node_by_xpath("[email protected]") 
        lat = json_data.find_json_node_by_xpath("[email protected]") 
        assert lng != "" and lat != "" 
        son_tour = json_data.find_json_node_by_xpath("/root/data/data") 
        assert len(son_tour) > 0 
 
class TourData(unittest.TestCase): 
    def setUp(self): 
        self.url = "http://xxxxxx/api/xxx" 
 
    @data( 
        (), 
        (), 
        (), 
    ) 
    @unpack 
    def test_tourList_Location_in_open(self): 
        pass 
 
    def test_tourList_Location_not_open(self): 
        pass 
 
    def test_tour_open_city(self): 
        pass 
 
 
if __name__ == "__main__": 
    suite = unittest.TestLoader().loadTestsFromTestCase(TourMaps, TestSingleRequest) 
    unittest.TextTestRunner(verbosity=2).run(suite)

View Code

关于python ddt查以参考https://ddt.readthedocs.org/en/latest/example.html

测试结果生成,可以查看python nose相关文档,生成hmtl

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/15559.html

(0)
上一篇 2021年7月19日 17:54
下一篇 2021年7月19日 17:54

相关推荐

发表回复

登录后才能评论