You're Invited:Meet the Socket Team at RSAC and BSidesSF 2026, March 23–26.RSVP
Socket
Book a DemoSign in
Socket

aamt

Package Overview
Dependencies
Maintainers
1
Versions
25
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

aamt - pypi Package Compare versions

Comparing version
0.1.9
to
0.2.0
+1
-1
aamt/__init__.py

@@ -1,2 +0,2 @@

__version__ = "0.1.9"
__version__ = "0.2.0"
__description__ = "aamt is a api-automation-testing tool to help you write pytest more easily"

@@ -12,4 +12,4 @@ # -*- coding: UTF-8 -*-

import os
import yaml
from loguru import logger

@@ -19,65 +19,4 @@ class Config():

project_root_dir = ''
# project_root_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
class Operate_config(Config):
def __init__(self):
super().__init__()
self.env_configpath = os.path.join(self.project_root_dir, "resources", "aamt.ini")
# 实例化configParser对象
self.conf = configparser.ConfigParser()
self.conf.read(self.env_configpath, encoding="utf-8")
# 读取配置文件中的key值 (读取)
def read_token(self, section='Token', key=''):
return self.conf.get(section, key)
# 读取配置文件中的key值 (读取)
def read_environ_active(self, section='Environ', key='active'):
return self.conf.get(section, key)
# 将value的值写入配置文件中
def write(self, section='Token', key='', value=''):
self.conf.set(section, key, value) # 给iphone分组设置 key:value (iphone_url:www.xxx.com)
# 写入文件
with open(self.env_configpath, 'w', encoding="utf-8") as configfile:
self.conf.write(configfile)
class Read_yaml(Operate_config):
def __init__(self):
super().__init__()
def get_env_vars_yaml(self):
env_active = self.read_environ_active()
env_filename = f'env_vars_{env_active}.yaml'
with open(
os.path.join(self.project_root_dir, "resources", "env_vars", env_filename), encoding="utf-8") as f:
return yaml.load(f.read(), Loader=yaml.FullLoader)
def get_test_yaml(self, filepath):
'''
filepath="/brand/brand_controller.yaml"
'''
# 测试用例数据
test_data_path = f"{Config.project_root_dir}/data/{filepath}".replace("\\", "/").replace("//", "/")
with open(test_data_path, encoding="utf-8") as f:
return yaml.load(f.read(), Loader=yaml.FullLoader)
def get_file_path(file_name, middle='file'):
'''
file_name: 文件名,比如 xiaoxin.png
'''
filePath = f"{Config.project_root_dir}/{middle}/{file_name}".replace("\\", "/").replace("//", "/")
return filePath
def fixture_paths(root_path=Config.project_root_dir):

@@ -98,5 +37,7 @@ '''

paths.append("fixtures" + import_path)
logger.info(f'1、项目下的fixtures:{paths}')
# aamt下的fixture
paths.append("aamt.fixture")
logger.info(f'2、aamt下的fixtures:{[paths[-1]]}')
return paths

@@ -10,88 +10,37 @@ # -*- coding: UTF-8 -*-

import pytest
from aamt.logger import Logger
from faker import Faker
import json
import os
from aamt.config import *
from filelock import FileLock
from loguru import logger
class Project:
dir = ""
def _project_dir(session):
# 从缓存中获取项目根目录
project_dir = session.config.cache.get("project_dir", None)
if not project_dir:
# 第一次运行没有.pytest_cache
cwd = os.getcwd()
tests = cwd.find("tests")
samples = cwd.find("samples")
if tests > 0:
project_dir = cwd[:cwd.find("tests")]
elif samples > 0:
project_dir = cwd[:cwd.find("samples")]
else:
project_dir = cwd
return project_dir
def pytest_sessionstart(session):
Project.dir = _project_dir(session)
@pytest.fixture(scope="session")
def faker_ch():
"""中文造数据"""
return Faker(locale="zh_CN")
def aamt_context_manager(tmp_path_factory, worker_id):
"""
aamt上下文管理器,在xdist分布式执行时,多个session也只执行一次
参考:https://pytest-xdist.readthedocs.io/en/latest/how-to.html#making-session-scoped-fixtures-execute-only-once
命令不带-n auto也能正常执行,不受影响
"""
def inner(produce_expensive_data, *args, **kwargs):
if worker_id == "master":
# not executing in with multiple workers, just produce the data and let
# pytest's fixture caching do its job
return produce_expensive_data(*args, **kwargs)
@pytest.fixture(scope="session")
def faker_en():
"""英文造数据"""
return Faker()
# get the temp directory shared by all workers
root_tmp_dir = tmp_path_factory.getbasetemp().parent
fn = root_tmp_dir / "data.json"
with FileLock(str(fn) + ".lock"):
if fn.is_file():
data = json.loads(fn.read_text())
else:
data = produce_expensive_data(*args, **kwargs)
fn.write_text(json.dumps(data))
return data
@pytest.fixture(scope="session")
def pd():
"""pandas库"""
try:
import pandas
return pandas
except ModuleNotFoundError:
pass
@pytest.fixture(scope="session")
def file_dir():
"""file目录的路径"""
return os.path.join(Project.dir, "file")
@pytest.fixture(scope="session")
def env_vars():
"""读取激活环境下的yaml文件里的配置信息(账号、密码、数据库等)"""
return Read_yaml().get_env_vars_yaml()
class AamtVars:
# 全局变量池
def __init__(self):
self.vars_ = {'a': '初始值'}
def put(self, key, value):
self.vars_[key] = value
# Logger.info(f'变量池 成功新增:{{{key}:{value}}}')
def get(self, key):
value = ""
try:
value = self.vars_[key]
except KeyError:
Logger.error(f'异常:获取 {key} 失败, 当前变量池:{self.vars_}')
return value
return inner

@@ -7,3 +7,3 @@ # -*- coding: utf-8 -*-

# @E-mail: 120158568@qq.com,
# @Site:
# @Site:
# @Time: 11月 23, 2022

@@ -24,17 +24,25 @@

# allure临时目录
allure_temp = tempfile.mkdtemp()
# allure源文件临时目录,那一堆json文件,生成HTML报告会删除
allure_source_path = ".allure.source.temp"
def _aamt_reports(config):
"""
--aamt-reports命令行参数不能和allure命令行参数同时使用,否则可能出错。判断参数是否生效,防止跟allure自带参数冲突
"""
if config.getoption("--aamt-reports") and not config.getoption("allure_report_dir"):
return True
return False
def aamt_plugins():
# conftest 加载插件时,从调用堆栈列表里获取信息,再组装成所需路径
caller = inspect.stack()[1]
# 保存项目根路径
Config.project_root_dir = os.path.dirname(caller.filename)
# 获取所有fixture路径,1、项目下的fixtures;2、aamt下的fixture;
plugins_path = fixture_paths(root_path=Config.project_root_dir) # +[其他插件]
return plugins_path
def _is_master(config):
"""
pytest-xdist分布式执行时,判断是主节点master还是子节点
主节点没有workerinput属性
"""
return not hasattr(config, 'workerinput')
class Plugin:
reports_path = os.path.join(Config.project_root_dir, "reports")
@staticmethod

@@ -51,12 +59,10 @@ def pytest_addoption(parser):

@staticmethod
def _aamt_reports(config):
# 判断参数是否生效,防止跟allure自带参数冲突
if config.getoption("--aamt-reports") and not config.getoption("allure_report_dir"):
return True
else:
return False
@staticmethod
def pytest_configure(config):
if Plugin._aamt_reports(config):
"""
这段代码源自:https://github.com/allure-framework/allure-python/blob/master/allure-pytest/src/plugin.py
目的是生成allure源文件,用于生成HTML报告
"""
if _aamt_reports(config):
if os.path.exists(allure_source_path):
shutil.rmtree(allure_source_path)
test_listener = AllureListener(config)

@@ -68,3 +74,3 @@ config.pluginmanager.register(test_listener)

clean = config.option.clean_alluredir
file_logger = AllureFileLogger(allure_temp, clean)
file_logger = AllureFileLogger(allure_source_path, clean) # allure_source
allure_commons.plugin_manager.register(file_logger)

@@ -75,16 +81,67 @@ config.add_cleanup(cleanup_factory(file_logger))

def pytest_sessionfinish(session):
# 测试运行结束后生成allure报告
if Plugin._aamt_reports(session.config):
reports_dir = os.path.join(Config.project_root_dir, "reports")
current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime(time.time()))
new_report = os.path.join(reports_dir, "report-" + current_time)
if os.path.exists(reports_dir):
# 复制历史报告,填充allure趋势图数据
his_reports = os.listdir(reports_dir)
if his_reports:
latest_report_history = os.path.join(reports_dir, his_reports[-1], "history")
shutil.copytree(latest_report_history, os.path.join(allure_temp, "history"))
os.system(f"allure generate {allure_temp} -o {new_report} --clean")
shutil.rmtree(allure_temp)
"""
测试运行结束后生成allure报告
"""
reports_path = os.path.join(Config.project_root_dir, "report")
if _aamt_reports(session.config):
if _is_master(session.config): # 只在master节点才生成报告
# 最近一份报告的历史数据,填充allure趋势图
if os.path.exists(reports_path):
his_reports = os.listdir(reports_path)
# 判断 report文件夹下是否有历史报告
if his_reports:
# 如果有历史报告,就复制报告的历史数据(json文件)
latest_report_history = os.path.join(reports_path, "history")
# 将历史报告数据复制到新报告的临时文件里
shutil.copytree(latest_report_history, os.path.join(allure_source_path, "history"))
# 删除历史报告
shutil.rmtree(reports_path)
# html文件 存放路径
html_report_name = reports_path
# allure_source_path : allure源文件临时目录,那一堆json文件,生成HTML报告会删除
os.system(f"allure generate {allure_source_path} -o {html_report_name} --clean")
shutil.rmtree(allure_source_path)
# @staticmethod
# def pytest_sessionfinish(session):
# """
# 测试运行结束后生成allure报告
# """
# reports_path = os.path.join(Config.project_root_dir, "reports")
# if _aamt_reports(session.config):
# if _is_master(session.config): # 只在master节点才生成报告
# # 最近一份报告的历史数据,填充allure趋势图
# if os.path.exists(reports_path):
# his_reports = os.listdir(reports_path)
# if his_reports:
# # print(f'报告目录文件有:{his_reports}')
# # 清除多余的报告,只保留最近的一份报告
# for i in his_reports[:-1]:
# shutil.rmtree(os.path.join(Config.project_root_dir, 'reports', i))
# # print(f'操作只保留最近一份报告目录:{his_reports}')
# latest_report_history = os.path.join(reports_path, his_reports[-1], "history")
# # 复制最近一份报告的记录
# shutil.copytree(latest_report_history, os.path.join(allure_source_path, "history"))
# # 删除最近一份报告的记录,下面会生成新的
# shutil.rmtree(os.path.join(Config.project_root_dir, 'reports', his_reports[-1]))
#
# current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime(time.time()))
# # html_report_name = os.path.join(reports_path, "report")
# html_report_name = os.path.join(reports_path, "report-" + current_time)
# os.system(f"allure generate {allure_source_path} -o {html_report_name} --clean")
# shutil.rmtree(allure_source_path)
#
def aamt_plugins():
# conftest 加载插件时,从调用堆栈列表里获取信息,再组装成所需路径
caller = inspect.stack()[1]
# 保存项目根路径
Config.project_root_dir = os.path.dirname(caller.filename)
# 获取所有fixture路径,1、项目下的fixtures;2、aamt下的fixture;
plugins_path = fixture_paths(root_path=Config.project_root_dir) # +[其他插件]
return plugins_path

@@ -18,193 +18,60 @@ # -*- coding: utf-8 -*-

client_content = """
import json
import time
from mimetypes import MimeTypes
from urllib.parse import urlencode
logintoken_content = """
# -*- coding: utf-8 -*-
import allure
import jmespath
import requests
from requests import Response
from requests_toolbelt.multipart.encoder import MultipartEncoder
from aamt.config import *
from aamt.fixture import AamtVars
from aamt.logger import Logger
import hashlib
from until.client import *
from common.project import *
from loguru import logger
class Body_type():
none = '无类型'
# 默认
json = 'json数据'
form_text = '表单数据 :纯json数据'
form_file = '表单数据 :文件二进制流+json数据'
binary = '文件直传:文件二进制流'
graphql = '其他类型'
# *********** 后台登录开始 ***********
class Login_after(HttpClient):
class HttpClient(Body_type):
def __init__(self, user_info, token:str):
# 传入token(coolie 或者 session)字段:目的为了写入配置文件
super().__init__() # 继承上个类的ini
# ----------- 密码进行MD5加密 ------------ #
mima = str(user_info['password'])
# md5加密对象
md5 = hashlib.md5()
# 填入要加密的字符串
md5.update(mima.encode('utf-8'))
# 转化为16进制字符串
new_mima = md5.hexdigest()
# ----------- MD5加密结束 ------------ #
def __init__(self):
self.username = user_info['account']
self.password = new_mima
self.token_key = token
self.session = requests.Session()
# 变量池实例
self.aamt_vars = AamtVars()
# 日志实例
self.logging = Logger
# 获取当前使用的环境
self.environ_active = Operate_config().read_environ_active()
# 获取激活yaml的所有数据
self.env_vars_data = Read_yaml().get_env_vars_yaml()
self.host = self.env_vars_data['after_host']
self.default_header = {
"Content-Type": "application/json",
"language": "zh_CN"
def login_(self):
etc = {
"account": self.username
, "password": self.password
}
url = '/system/userInfo/login'
method = 'get'
url = self.get_full_url(url, etc=etc, h=self.env_vars_data['after_host'])
result = self.send(method=method, url=url)
assert jmespath.search('code', result) < 400, f"系统管理登录失败,接口响应: {result}"
def send(self, url:str='', method='post', body={}, body_type:str=Body_type.json,x_token='', file_key='picFile',file_path='',timeout=30, **kwargs):
token_data = result['result']['token']
logger.info(f'账号:{self.username} 登录成功,新token:{token_data}')
# 把token值写入配置文件中
Operate_token().write_token(section="Token", key=self.token_key, value=token_data)
return result
start_time = time.time()
# *********** 后台登录结束 ***********
if not url.startswith(("http://", "https://")):
raise Exception("请输入正确的url, 记得带上http:// 或者 https:// 哦")
# 用户传了headers,就用用户的,不传就用默认的
headers = kwargs.get("headers", self.default_header)
# *********** 前端登录开始 ***********
# XXXXXXXX 支持扩展 XXXXXXXXX
# *********** 前端登录结束 *************
if x_token:
headers["_token_"] = x_token.strip('"') # strip() 方法用于移除字符串头尾指定的字符(默认为空格或换行符)或字符序列。
if method == "get":
result = self.session.request(url=url, method=method, params=body, headers=headers,timeout=timeout,**kwargs)
elif method == "post":
self.logging.info(f'body_type类型:{body_type}')
if body_type == Body_type.json:
headers['Content-Type'] = 'application/json; charset=UTF-8'
result = self.session.request(url=url, method=method, json=body, headers=headers,timeout=timeout,**kwargs)
elif body_type == Body_type.form_file:
filename = file_path.split('\\\\')[-1] # xiaoxin.png
# 通过 mimetypes 来自动识别文件类型 : https://cloud.tencent.com/developer/section/1369143
fileType = MimeTypes().guess_type(file_path)[0]
# 没有识别到就不传 content_type
if fileType is None:
# files = {file_key: ('xiaoxin.png',open(file_path, 'rb'))}
files = {'file_key': (filename, open(file_path, 'rb'))}
# 也可以用已下的方法,不用加 filename
# files = {file_key: open(file_path, 'rb'))}
# files = {'file_key': open(file_path, 'rb')}
else:
# files = {file_key: ('xiaoxin.png',open(file_path, 'rb'),'image/png')}
files = {file_key: (filename, open(file_path, 'rb'), fileType)}
body.update(files)
# 把要传入的数据 转变为form_data格式
form_data = MultipartEncoder(body)
# 以下命令自动 转变 headers 中的 Content-Type 为:'multipart/form-data; boundary=。。。。。。。。。。。
headers['Content-Type'] = form_data.content_type
# :param data:(可选)字典,元组列表,字节,或文件类对象发送到:class: 'Request'的主体中。
result = self.session.request(url=url, method=method, data=form_data, headers=headers,timeout=timeout,**kwargs)
elif body_type == Body_type.form_text:
# 把要传入的数据 转变为form_data格式
form_data = MultipartEncoder(body)
# 以下命令自动 转变 headers 中的 Content-Type 为:'multipart/form-data; boundary=。。。。。。。。。。。
headers['Content-Type'] = form_data.content_type
result = self.session.request(url=url, method=method, data=form_data, headers=headers,timeout=timeout,**kwargs)
elif body_type == Body_type.binary:
files = {file_key: open(file_path, 'rb')}
# 文件流通过files 传给request的请求参数files
result = self.session.request(url=url, method=method, json=body, headers=headers, files=files,timeout=timeout,**kwargs)
else:
raise ValueError(f"=====body_type没有定义,{body_type} 请确认====")
elif method == "patch":
result = self.session.request(url=url, method=method, data=json.dumps(body), headers=headers, timeout=timeout,**kwargs)
elif method == "delete":
result = ''
elif method == "put":
result = ''
else:
raise ValueError(f"=====大兄弟===暂不支持{method} 请求呢====需要就自己补充吧====")
end_time = time.time()
# python 内置函数 保留4位小数
time_ = round((end_time-start_time), 4)
# 处理
result = AamtResponse(result)
try:
self.logging.info(f'\\n请求日志:\\nurl: {url}\\nmethod: {method}\\nbody: \\n{body}\\nbody_type: {body_type}\\nheaders: \\n{headers}\\n**********************************************************************************')
self.logging.debug(f'\\n响应日志:\\n响应码: {result.status_code}\\n请求>响应 时间开销: {time_}\\n**********************************************************************************\\n')
except AttributeError:
self.logging.error(
f'\\n无法获取响应码, 响应日志:\\n{result}\\n请求>响应 时间开销: {time_}\\n**********************************************************************************\\n')
except TypeError:
self.logging.warning(f'警告:{kwargs}')
self.__create_request_log(url, method, body, body_type, headers)
try:
self.__create_response_log(result.status_code, result.json(),time_)
return result.json()
except:
self.__create_response_log(result.status_code, result.text,time_)
self.logging.warning(f'\\n注意 响应内容:不可以序列化,具体响应如下:\\n{result.text}')
return result.text
def get_full_url(self, url, etc={}, replace={}, h=""):
if h:
host = h.rstrip('/') # rstrip() 删除 string 字符串末尾的指定字符(默认为空格).
else:
host = self.host.rstrip('/')
url = url.lstrip('/') # lstrip() 方法用于截掉字符串左边的空格或指定字符。
full_url = host + "/" + url
# full_url += "?"
full_url += "?platform={}".format(self.environ_active)
if etc:
s = urlencode(etc) # urlencode urllib库里面有个urlencode函数,可以把key-value这样的键值对转换成我们想要的格式,返回的是a=1&b=2这样的字符串
full_url += "&" + s
if replace:
full_url = full_url.format(replace) # str.format() 方法通过字符串中的花括号 {} 来识别替换字段 replacement field,从而完成字符串的格式化。
# full_url = str.format(full_url,replace) #str.format() 方法通过字符串中的花括号 {} 来识别替换字段 replacement field,从而完成字符串的格式化。
return full_url
# 目的就是 在allure显示,没什么实际意义
@allure.step("请求日志")
def __create_request_log(self, url, method, body, body_type, headers):
pass
# 目的就是 在allure显示
@allure.step('响应日志')
def __create_response_log(self, status_code, text,time_):
pass
class AamtResponse(Response):
# 包装requests.Response,简化jmespath写法
def __init__(self, response):
super().__init__()
for k, v in response.__dict__.items():
self.__dict__[k] = v
def jmespath(self, expression):
return jmespath.search(expression, self.json())
if __name__ == '__main__':
a = HttpClient()
a.send(url='http://www.baidu.com',method='get', body={}, body_type=Body_type.json)
"""

@@ -219,13 +86,7 @@ public_api_content = """

# @Time: 5月 05, 2022
import sys
import allure
import jmespath
from aamt.client import HttpClient
from aamt.config import *
from aamt.logger import Logger
from until.client import *
from common.project import *
from common.mysqlhelper import MysqlHelper
sys.path.append("..")

@@ -243,4 +104,5 @@

self.token = xin_token
Logger.warning(f'读取到的最新token:{xin_token}')
# logger.warning(f'读取到的最新token:{xin_token}')
@allure.step('新增全新公司:{name}')

@@ -289,18 +151,5 @@ def add_company(self, name='xuefeng', cid=''):

@allure.step('新增公司模板:{name},并给公司模板分配所有权限')
def company_template_assign_permissions(self, name='auto_template', resourceList=[]):
body = {"isDefault": "0", "name": name, "note": "自动化", "resourceList": resourceList}
url = 'system/resourceTemplate/saveTemplate'
method = 'post'
url = self.get_full_url(url, h=self.host)
ret = self.send(url=url, method=method, body=body, body_type=self.json, x_token=self.token)
if jmespath.search('code', ret) == 200:
print(f'公司模板 {name} 新增并分配权限成功')
elif jmespath.search('code', ret) == 500:
assert '存在' in ret['message'], f'异常, 新增公司模板 {name} 功能异常,请检查'
else:
assert False, f'异常,公司模板分配权限异常,接口响应信息;{ret}'
"""
"""
brand_controller_api_content = """

@@ -313,7 +162,5 @@

from api.brand.route import *
from aamt.client import *
from until.client import *
sys.path.append("..")
class Brand_ControllerApi(HttpClient):

@@ -400,8 +247,9 @@ ''' 初始化-传入配置文件里的token值 然后调用 依赖token的其他方法 ,比如加购物车 查看下订单等等 '''

brand_route_content = """
from aamt.config import *
from common.project import *
"后台品牌接口路径(前台)"
brand_controller = Read_yaml().get_test_yaml(filepath="/brand/brand_controller.yaml")
brand_controller = Read_yaml().get_test_yaml(filepath="brand/brand_controller.yaml")
"""
fixture_admin_content = """

@@ -414,3 +262,3 @@ # -*- coding: utf-8 -*-

# @E-mail: 120158568@qq.com,
# @Site: www.51automate.cn
# @Site: 51automate.cn
# @Time: 11月 25, 2022

@@ -420,3 +268,4 @@

import pytest
from aamt.config import *
from common.project import *
from loguru import logger

@@ -426,4 +275,7 @@ # 管理员维护

def env_vars_data():
return Read_yaml().get_env_vars_yaml()
read_yaml = Read_yaml()
logger.info(f'当前激活的环境是:{read_yaml.read_environ_active()}')
return read_yaml.get_env_vars_yaml()
"""

@@ -437,3 +289,3 @@ fixture_xf_content = """

# @E-mail: 120158568@qq.com,
# @Site: www.51automate.cn
# @Site: 51automate.cn
# @Time: 11月 25, 2022

@@ -443,3 +295,3 @@

import pytest
from aamt.logger import Logger
from loguru import logger

@@ -449,6 +301,11 @@ from api.logintoken import Login_after

@pytest.fixture(scope="session")
def systerm_admin_login(env_vars_data):
Login_after(env_vars_data['systerm'], 'systerm_admin_token')
def systerm_admin_login(aamt_context_manager, env_vars_data):
# aamt_context_manager是为了兼容pytest-xdist分布式执行的上下文管理器
# 该login只会在整个运行期间执行一次
def produce_expensive_data(variable):
# 这里的登录方法 ,抽离出来,1、方便换不同的账号登录 2、遇到前后台不同的方法登录,也可以有效的区分开,统一管理
return Login_after(variable['systerm'], 'systerm_admin_token').login_()
return aamt_context_manager(produce_expensive_data, env_vars_data)

@@ -459,3 +316,3 @@

def init_admin(systerm_admin_login):
Logger.info('调用公共方法 准备基础数据,返回一个实例')
logger.info('调用公共方法 准备基础数据,返回一个实例')
pub_d = Public(token='systerm_admin_token')

@@ -466,14 +323,21 @@ yield pub_d

# --------------- xuefeng 医生1 登录及其数据初始化 开始 -----------
@pytest.fixture(scope="session")
def login_doctor1(env_vars_data):
Login_after(env_vars_data['xuefeng_doctor1'], 'xuefeng_doctor1_token')
def login_doctor1(aamt_context_manager, env_vars_data):
# aamt_context_manager是为了兼容pytest-xdist分布式执行的上下文管理器
# 该login只会在整个运行期间执行一次
def produce_expensive_data(variable):
# 这里的登录方法 ,抽离出来,1、方便换不同的账号登录 2、遇到前后台不同的方法登录,也可以有效的区分开,统一管理
return Login_after(variable['xuefeng_doctor1'], 'xuefeng_doctor1_token').login_()
return aamt_context_manager(produce_expensive_data, env_vars_data)
@pytest.fixture(scope="session", autouse=True) # 自动运行
# @pytest.fixture(scope="session") # 调用才运行
def init_doctor1(login_doctor1):
# @pytest.fixture(scope="session", autouse=True) # 自动运行
@pytest.fixture(scope="session") # 调用才运行
def init_admin(systerm_admin_login):
logger.info('调用公共方法 准备基础数据,返回一个实例')
pub_d = Public(token='xuefeng_doctor1_token')
yield pub_d
# --------------- xuefeng 医生1 登录及其数据初始化 结束 -----------

@@ -483,14 +347,3 @@

# --------------- xuefeng 医生2 登录及其数据初始化 开始 -----------
@pytest.fixture(scope="session")
def login_doctor2(env_vars_data):
Login_after(env_vars_data['xuefeng_doctor2'], 'xuefeng_doctor2_token')
# @pytest.fixture(scope="session", autouse=True) # 自动运行
@pytest.fixture(scope="session") # 调用才运行
def init_doctor2(login_doctor2):
pub_d = Public(token='xuefeng_doctor2_token')
yield pub_d
内容同上
# --------------- xuefeng 医生2 登录及其数据初始化 结束 -----------

@@ -500,14 +353,20 @@

# --------------- xuefeng2 护士1 登录及其数据初始化 开始 -----------
@pytest.fixture(scope="session")
def login_nurse1(env_vars_data):
print('xuefeng2_nurse1 登录')
Login_after(env_vars_data['xuefeng2_nurse1'], 'xuefeng2_nurse1_token')
# @pytest.fixture(scope="session", autouse=True)
@pytest.fixture(scope="session")
def init_nurse1(login_nurse1):
pub_n = Public(token='xuefeng2_nurse1_token')
yield pub_n
def login_nurse1(aamt_context_manager, env_vars_data):
# aamt_context_manager是为了兼容pytest-xdist分布式执行的上下文管理器
# 该login只会在整个运行期间执行一次
def produce_expensive_data(variable):
# 这里的登录方法 ,抽离出来,1、方便换不同的账号登录 2、遇到前后台不同的方法登录,也可以有效的区分开,统一管理
return Login_after(variable['xuefeng2_nurse1'], 'xuefeng2_nurse1_token').login_()
return aamt_context_manager(produce_expensive_data, env_vars_data)
# @pytest.fixture(scope="session", autouse=True) # 自动运行
@pytest.fixture(scope="session") # 调用才运行
def init_admin(systerm_admin_login):
logger.info('调用公共方法 准备基础数据,返回一个实例')
pub_d = Public(token='xuefeng2_nurse1_token')
yield pub_d

@@ -518,120 +377,37 @@ # --------------- xuefeng2 护士1 登录及其数据初始化 结束 -----------

# --------------- xuefeng2 护士2 登录及其数据初始化 开始 -----------
@pytest.fixture(scope="session")
def login_nurse2(env_vars_data):
print('xuefeng2_nurse2 登录')
Login_after(env_vars_data['xuefeng2_nurse2'], 'xuefeng2_nurse2_token')
# @pytest.fixture(scope="session", autouse=True)
@pytest.fixture(scope="session")
def init_nurse2(login_nurse2):
pub_n = Public(token='xuefeng2_nurse2_token')
yield pub_n
内容同上
# --------------- xuefeng2 护士2 登录及其数据初始化 结束 -----------
"""
conftest_content = """
fixture_zhangsan_content = """
# -*- coding: utf-8 -*-
# @Software: PyCharm
# @File: conftest.py
# @File: fixture_zhangsan.py
# @Author: xuefeng365
# @E-mail: 120158568@qq.com,
# @Site: www.51automate.cn
# @Site: 51automate.cn
# @Time: 11月 25, 2022
# import sys
# sys.path.append("..")
from aamt.plugin import aamt_plugins
# 内容参考 fixture_zhangsan.py
# 加载aamt插件
pytest_plugins = aamt_plugins()
"""
test_brand_content = """
# -*- coding: UTF-8 -*-
import sys
conftest_content1 = """
# -*- coding: utf-8 -*-
import allure
import pytest
# @Software: PyCharm
# @File: conftest.py
# @Author: xuefeng365
# @E-mail: 120158568@qq.com,
# @Site: 51automate.cn
# @Time: 11月 25, 2022
from api.brand.brand_controller_api import Brand_ControllerApi
from api.brand.route import *
from common.assert_api import assert_api
# 占位
sys.path.append("..")
"""
# 后台身份会话
@pytest.fixture(scope="session")
def xuefeng_brand_controller():
return Brand_ControllerApi("gz_doctor1_token")
# 数据清理 -- 接口
@pytest.fixture() # 函数级别
def del_brand(xuefeng_brand_controller):
# xuefeng_brand_controller.del_brand()
yield
# 清楚自动化品牌数据
xuefeng_brand_controller.del_brand()
data = brand_controller['add_brand']['case_data']
@allure.feature('测试品牌模块')
@allure.story('增加品牌功能')
# @pytest.mark.smoke
# @pytest.mark.skip
class Test_add_brand1():
@pytest.mark.parametrize("case_data", data, ids=[data[i]['case_name'] for i in range(len(data))]) # 参数化测试用例
def test_add_brand(self, case_data, xuefeng_brand_controller, del_brand):
allure.dynamic.title(f'title:{case_data["case_name"]}')
case_body = case_data['body']
ret = xuefeng_brand_controller.add_brand(name=case_body['name'],
note=case_body['note'],
id=case_body['id'],
iconFile=case_body['iconFile'],
pictureUrl=case_body['pictureUrl']
)
assert_api(ret, expect_data=case_data["expect"])
data = brand_controller['del_brand']['case_data']
@allure.feature('测试品牌模块')
@allure.story('删除品牌功能')
# @pytest.mark.smoke
# @pytest.mark.skip
class Test_del_brand1():
@pytest.mark.parametrize("case_data", data, ids=[data[i]['case_name'] for i in range(len(data))]) # 参数化测试用例
def test_add_brand(self, case_data, xuefeng_brand_controller):
allure.dynamic.title(f'title:{case_data["case_name"]}')
case_body = case_data['body']
ret = xuefeng_brand_controller.del_brand(brand_name=case_body['name'])
assert ret and ret['message'] == 'success', f'删除品牌失败,请检查删除功能是否异常 或者 要删除的品牌:{case_body["name"]} 是否存在'
def test1():
pass
if __name__ == "__main__":
pytest.main(['-m', __file__])
# pytest.main(['-m smoke',__file__])
"""
assert_api_content = """
from common.logger import *
from loguru import logger

@@ -657,276 +433,4 @@ # @decorate_log

"""
config_content = """
# -*- coding: UTF-8 -*-
# @Software: PyCharm
# @File: config.py
# @Author: xuefeng365
# @E-mail: 120158568@qq.com,
# @Site:
# @Time: 11月 23, 2022
import configparser
import os
import yaml
class Config():
# 全局项目根目录
project_root_dir = ''
# project_root_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
class Operate_config(Config):
def __init__(self):
super().__init__()
self.env_configpath = os.path.join(self.project_root_dir, "resources", "aamt.ini")
# 实例化configParser对象
self.conf = configparser.ConfigParser()
self.conf.read(self.env_configpath, encoding="utf-8")
# 读取配置文件中的key值 (读取)
def read_token(self, section='Token', key=''):
return self.conf.get(section, key)
# 读取配置文件中的key值 (读取)
def read_environ_active(self, section='Environ', key='active'):
return self.conf.get(section, key)
# 将value的值写入配置文件中
def write(self, section='Token', key='', value=''):
self.conf.set(section, key, value) # 给iphone分组设置 key:value (iphone_url:www.xxx.com)
# 写入文件
with open(self.env_configpath, 'w', encoding="utf-8") as configfile:
self.conf.write(configfile)
class Read_yaml(Operate_config):
def __init__(self):
super().__init__()
def get_env_vars_yaml(self):
env_active = self.read_environ_active()
env_filename = f'env_vars_{env_active}.yaml'
with open(
os.path.join(self.project_root_dir, "resources", "env_vars", env_filename), encoding="utf-8") as f:
return yaml.load(f.read(), Loader=yaml.FullLoader)
def get_test_yaml(self, filepath):
'''
filepath="/brand/brand_controller.yaml"
'''
# 测试用例数据
test_data_path = f"{Config.project_root_dir}/data/{filepath}".replace("\\\\", "/").replace("//", "/")
with open(test_data_path, encoding="utf-8") as f:
return yaml.load(f.read(), Loader=yaml.FullLoader)
def get_file_path(file_name, middle='file'):
'''
file_name: 文件名,比如 xiaoxin.png
'''
filePath = f"{Config.project_root_dir}/{middle}/{file_name}".replace("\\\\", "/").replace("//", "/")
return filePath
def fixture_paths(root_path=Config.project_root_dir):
'''
fixture路径,1、项目下的fixtures;2、aamt下的fixture;
:return:
'''
_fixtures_dir = os.path.join(root_path, "fixtures")
paths = []
# 项目下的fixtures
for root, _, files in os.walk(_fixtures_dir):
for file in files:
if file.startswith("fixture_") and file.endswith(".py"):
full_path = os.path.join(root, file)
import_path = full_path.replace(_fixtures_dir, "").replace("\\\\", ".")
import_path = import_path.replace("/", ".").replace(".py", "")
paths.append("fixtures" + import_path)
# aamt下的fixture
paths.append("aamt.fixture")
return paths
"""
logger_content = """
import logging
import os
import time
from functools import wraps
import colorlog
from aamt.config import *
# logging模块中包含的类
# 用来自定义日志对象的规则(比如:设置日志输出格式、等级等)
# 常用子类:StreamHandler、FileHandler
# StreamHandler 控制台输出日志
# FileHandler 日志输出到文件
# BASE_PATH = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
# print(BASE_PATH)
# 日志文件路径
LOG_PATH = os.path.join(Config.project_root_dir, "log")
if not os.path.exists(LOG_PATH):
os.mkdir(LOG_PATH)
class Logger():
def __init__(self):
log_colors_config = {
'DEBUG': 'white', # cyan white
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'bold_red'}
# 创建一个logger日志对象
self.logger = logging.getLogger("log")
# 创建日志格式(不带颜色)对象
self.file_formater = logging.Formatter(
fmt='[%(asctime)s.%(msecs)03d] %(filename)s -> %(funcName)s line:%(lineno)d [%(levelname)s] : %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
# 创建颜色格式((带颜色))对象
self.console_formatter = colorlog.ColoredFormatter(
fmt='%(log_color)s[%(asctime)s.%(msecs)03d] %(filename)s -> %(funcName)s line:%(lineno)d [%(levelname)s] : %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
log_colors=log_colors_config)
# 输出到控制台
self.console = logging.StreamHandler()
self.logname = os.path.join(LOG_PATH, "{}.log".format(time.strftime("%Y%m%d"))) # 创建日志路径和时间
# 输出到文件:获取日志路径,并创建FileHandler对象()-- 设定写入方式:追加
self.filelogger = logging.FileHandler(self.logname, mode='a', encoding="UTF-8")
# 设置默认的日志级别 控制台logger 和 handler以最高级别为准,不同handler之间可以不一样,不相互影响
self.logger.setLevel(logging.INFO)
self.console.setLevel(logging.INFO)
self.filelogger.setLevel(logging.INFO)
# self.logger.setLevel(logging.INFO)
# self.logger.setLevel(logging.WARNING)
# 设置控制台日志的颜色
self.console.setFormatter(self.console_formatter)
# 设置文件日志的颜色
self.filelogger.setFormatter(self.file_formater)
# 重复日志问题:
# 1、防止多次addHandler;
# 2、loggername 保证每次添加的时候不一样;
# 3、显示完log之后调用removeHandler
if not self.logger.handlers:
self.logger.addHandler(self.console)
self.logger.addHandler(self.filelogger)
self.console.close()
self.filelogger.close()
Logger = Logger().logger
def decorate_log(func):
@wraps(func)
# Python装饰器中@wraps作用 https://blog.csdn.net/weixin_40576010/article/details/88639686
def log(*args, **kwargs):
Logger.info(f'-- 开始执行 {func.__name__} --')
try:
func(*args, **kwargs)
except Exception as e:
Logger.error(f'-- {func.__name__}执行失败,原因:{e} --')
raise e
else:
Logger.info(f'-- {func.__name__} 执行成功 --')
return log
# 调试用
@decorate_log
def xuefeng():
assert 2 == 2
if __name__ == '__main__':
xuefeng()
Logger.info("---测试开始---")
Logger.warning("---测试开始---")
Logger.error("---测试结束---")
Logger.debug("---测试结束---")
"""
logintoken_content = """
# -*- coding: utf-8 -*-
import hashlib
import jmespath
from aamt.client import HttpClient
from aamt.config import *
from aamt.logger import Logger
# *********** 后台登录开始 ***********
class Login_after(HttpClient):
def __init__(self, user_info, token):
# 传入token(coolie 或者 session)字段:目的为了写入配置文件
super().__init__() # 继承上个类的ini
self.username = user_info['account']
# ----------- 密码进行MD5加密 ------------ #
mima = str(user_info['password'])
# md5加密对象
md5 = hashlib.md5()
# 填入要加密的字符串
md5.update(mima.encode('utf-8'))
# 转化为16进制字符串
new_mima = md5.hexdigest()
# ----------- MD5加密结束 ------------ #
self.password = new_mima
result = self.login_after(username=self.username, password=self.password)
assert jmespath.search('code', result) == 200, f"系统管理登录失败,接口响应 \\n {result}\\n"
self.token = result['result']['token']
Logger.info(f'账号:{self.username} 登录成功,新token:{self.token}')
# 把token值写入配置文件中
Operate_config().write(section="Token", key=token, value=self.token)
def login_after(self, username, password):
etc = {
"account": username
, "password": password
}
url = '/system/userInfo/login'
method = 'get'
url = self.get_full_url(url, etc=etc, h=self.env_vars_data['after_host'])
return self.send(method=method, url=url)
# *********** 后台登录结束 ***********
# *********** 前端登录开始 ***********
# XXXXXXXX 支持扩展 XXXXXXXXX
# *********** 前端登录结束 *************
"""
mysqlhelper_content = """

@@ -1284,32 +788,182 @@ import json

"""
read_token_content = """
# -*- coding: utf-8 -*-
project_content = """
# -*- coding: UTF-8 -*-
# @Software: PyCharm
# @File: read_token.py
# @File: project.py
# @Author: xuefeng365
# @E-mail: 120158568@qq.com,
# @Site: www.51automate.cn
# @Time: 11月 25, 2022
# @Site:
# @Time: 11月 23, 2022
import os
import configparser
import yaml
from loguru import logger
from aamt.config import Operate_config
class Config:
# 全局项目根目录
# root_dir = os.path.dirname(os.path.dirname(os.getcwd()))
root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# logger.info(f'全局项目根目录:{root_dir}')
def read_token():
# 获取配置文件中的key值 (token值)
class Clazz:
Get = Operate_config()
systerm_admin_token = Get.read_token(key='systerm_admin_token')
xuefeng2_doctor1_token = Get.read_token(key='xuefeng_doctor1_token')
xuefeng2_doctor2_token = Get.read_token(key='xuefeng_doctor2_token')
xuefeng_nurse1_token = Get.read_token(key='xuefeng2_nurse1_token')
xuefeng_nurse2_token = Get.read_token(key='xuefeng2_nurse2_token')
data_dir = os.path.join(root_dir, "data")
file_dir = os.path.join(root_dir, "file")
gz_doctor1_token = Get.read_token(key='gz_doctor1_token')
gz_nurse1_token = Get.read_token(key='gz_nurse1_token')
settings = Config()
return Clazz
class Operate_token():
def __init__(self):
super().__init__()
self.env_configpath = os.path.join(settings.root_dir, "resources", "aamt.ini")
# 实例化configParser对象
self.conf = configparser.ConfigParser()
self.conf.read(self.env_configpath, encoding="utf-8")
# 读取配置文件中的key值 (读取)
def read_token(self, section='Token', key=''):
return self.conf.get(section, key)
# 将value的值写入配置文件中
def write_token(self, section='Token', key='', value=''):
self.conf.set(section, key, value) # 给iphone分组设置 key:value (iphone_url:www.xxx.com)
# 写入文件
with open(self.env_configpath, 'w', encoding="utf-8") as configfile:
self.conf.write(configfile)
# 读取配置文件中的key值 (读取)
def read_environ_active(self, section='Environ', key='active'):
return self.conf.get(section, key)
class Read_yaml(Operate_token):
def __init__(self):
super().__init__()
def get_env_vars_yaml(self):
# 获取当前激活的环境
env_active = self.read_environ_active()
env_filename = f'env_vars_{env_active}.yaml'
with open(
os.path.join(settings.root_dir, "resources", "env_vars", env_filename), encoding="utf-8") as f:
return yaml.load(f.read(), Loader=yaml.FullLoader)
def get_test_yaml(self, filepath):
'''
filepath="/brand/brand_controller.yaml"
'''
# 测试用例数据路径
test_data_path = os.path.join(settings.data_dir, filepath)
# logger.info(f'测试用例数据路径:{test_data_path}')
with open(test_data_path, encoding="utf-8") as f:
return yaml.load(f.read(), Loader=yaml.FullLoader)
def get_file_path(file_name, middle='file'):
'''
file_name: 文件名,比如 xiaoxin.png
'''
filePath = os.path.join(settings.root_dir, middle,file_name)
return filePath
if __name__ == '__main__':
root_dir = os.path.dirname(os.path.dirname(os.getcwd()))
print(f'项目根路径:{settings.root_dir}')
test_data_path = os.path.join(settings.data_dir, "./brand/brand_controller.yaml")
print(f'数据文件路径:{test_data_path}')
# 在Windows系统下 路径可以是`D:\\12-12\\xx`, 也可以是`D:/12-12/xx`
"""
test_brand_content = """
# -*- coding: UTF-8 -*-
import allure
import pytest
from api.brand.brand_controller_api import Brand_ControllerApi
from api.brand.route import *
from common.assert_api import assert_api
from loguru import logger
# 后台身份会话
@pytest.fixture(scope="session")
def xuefeng_brand_controller():
return Brand_ControllerApi("xuefeng_doctor1_token")
# 数据清理 -- 接口
@pytest.fixture() # 函数级别
def del_brand(xuefeng_brand_controller):
# xuefeng_brand_controller.del_brand()
yield
# 清楚自动化品牌数据
xuefeng_brand_controller.del_brand()
data = brand_controller['add_brand']['case_data']
@allure.feature('测试品牌模块')
@allure.story('增加品牌功能')
# @pytest.mark.smoke
# @pytest.mark.skip
class Test_add_brand1():
@pytest.mark.parametrize("case_data", data, ids=[data[i]['case_name'] for i in range(len(data))]) # 参数化测试用例
def test_add_brand(self, case_data, xuefeng_brand_controller, del_brand):
allure.dynamic.title(f'title:{case_data["case_name"]}')
case_body = case_data['body']
ret = xuefeng_brand_controller.add_brand(name=case_body['name'],
note=case_body['note'],
id=case_body['id'],
iconFile=case_body['iconFile'],
pictureUrl=case_body['pictureUrl']
)
assert_api(ret, expect_data=case_data["expect"])
data = brand_controller['del_brand']['case_data']
@allure.feature('测试品牌模块')
@allure.story('删除品牌功能')
# @pytest.mark.smoke
# @pytest.mark.skip
class Test_del_brand1():
@pytest.mark.parametrize("case_data", data, ids=[data[i]['case_name'] for i in range(len(data))]) # 参数化测试用例
def test_add_brand(self, case_data, xuefeng_brand_controller):
allure.dynamic.title(f'title:{case_data["case_name"]}')
case_body = case_data['body']
ret = xuefeng_brand_controller.del_brand(brand_name=case_body['name'])
assert ret and ret['message'] == 'success', f'删除品牌失败,请检查删除功能是否异常 或者 要删除的品牌:{case_body["name"]} 是否存在'
if __name__ == "__main__":
pytest.main(['-m', __file__])
# pytest.main(['-m smoke',__file__])
"""
brand_controller_content = """

@@ -1330,3 +984,3 @@ add_brand:

- case_name: case1_后台新增品牌,必填参数全部正确入参,新增成功
- case_name: case1_后台新增品牌,品牌名不填,其他必填参数全部正确入参,新增失败
body: {"name":"",

@@ -1349,4 +1003,6 @@ "note":"自动化新建",

expect: {'code': 200, 'message': 'success'}
"""
aamt_content = """
aamt_ini_content = """
[Token]

@@ -1362,3 +1018,5 @@ systerm_admin_token = eH-nczOHaPLWo4m9Lnk

[Environ]
active = test
;active = test
active = uat
"""

@@ -1485,153 +1143,34 @@ env_vars_test_yaml_content = """

"""
clear_results_content = """
import os
import shutil
# shutil.rmtree( src ) #递归删除一个目录以及目录内的所有内容
# os.makedirs() 方法用于递归创建目录。
# 解决allure报告缓存和Jenkins无文件报错
main_content = """
def clear_allure():
filepath = (os.path.abspath(os.path.dirname(os.path.dirname(__file__))) + "/report/allure-results/")
if os.path.exists(filepath):
shutil.rmtree("{}".format(filepath))
os.makedirs("{}".format(filepath))
else:
os.makedirs("{}".format(filepath))
path_report = (os.path.abspath(os.path.dirname(os.path.dirname(__file__))) + "/report/allure-report")
if os.path.exists(path_report):
shutil.rmtree("{}".format(path_report))
if __name__ == '__main__':
print(os.path.abspath(os.path.dirname(__file__)) + "/report/allure-results/")
"""
fake_content = """
from faker import Faker
from common.logger import Logger
fake = Faker("zh_CN")
class Faker_:
def __init__(self):
self.fake = Faker("zh_CN")
# 选择中文
def get_phone_number(self):
return self.fake.phone_number()
def get_name(self):
return self.fake.name()
def get_email(self):
return self.fake.email()
def md5(self):
return self.fake.md5()
if __name__ == "__main__":
test = Faker_()
print(test.get_name())
print(test.get_phone_number())
print(test.get_email())
print('年月日:', fake.date(pattern=' %Y-%m-%d'))
print('随机年份:', fake.year)
print('随机年份:', fake.year)
print('随机月份:', fake.month)
print('随机几号:', fake.day_of_month)
print('随机星期数:', fake.day_of_week)
print('时间:', fake.time(pattern='%H:%M:%S'))
# -30y是过去30年前为开始日期,end_date表示结束到今天
print('过去某一天:', fake.date_between(start_date="-30y", end_date="today"))
print('今天:', fake.date_between_dates) # 今天
print('日期和时间:', fake.date_time) # 2021-05-14 19:36:00
print('当前日期时间:', fake.date_time_between_dates)
# print('某个区间内随机日期时 间:', fake.date_time_between_dates(datetime_start=datetime(1999, 2, 2, 10, 30, 20), dat
# etime_end = datetime(2000, 2, 2, 10, 30, 20)))
print('未来的日期:', fake.future_date(end_date="+30d"), str(fake.future_date(end_date="+30d")))
print('未来的日期时间:', fake.future_datetime(end_date="+30d"),
f'类型是:{type(fake.future_datetime(end_date="+30d"))}') # 未来日期和时间)
print('过去的日期:', fake.past_date(start_date="-30m")) # 过去日期
print('过去的日期时间:', fake.past_datetime(start_date="-30d")) # 过去日期和时间
print('时间戳:', fake.unix_time(), f'类型是:{type(fake.unix_time())}')
print('时间戳:', fake.time(), f'类型是:{type(fake.time())}')
print('随机md5:', fake.md5(), f'类型是:{type(fake.md5())}')
"""
main_content = """
# -*- coding: UTF-8 -*-
import os
from random import randint
import pytest
from loguru import logger
from common.logger import Logger
from until.clear_results import clear_allure
if __name__ == '__main__':
Logger.info("Starting......... 走起 ......")
logger.info("Starting......... 走起 ......")
# # 执行用例
# # 执行shell(cmd)命令 (不生成报告文件) -- 串行
# os.system('pytest')
# 解决allure报告缓存和Jenkins无文件报错
# 执行shell(cmd)命令 (同时生成报告文件) -- 串行
os.system('pytest --aamt-reports')
# # 执行shell(cmd)命令 (同时生成报告文件) -- 并行
# os.system('pytest -n auto --aamt-reports')
clear_allure()
# pytest.main(['-m','smoke'])
# pytest.main(["-s",'--workers=1', '--tests-per-worker=4'])
# # --workers=n:多进程运行需要加此参数, n是进程数。默认为1
#   --tests-per-worker=n:多线程需要添加此参数,n是线程数
#   如果两个参数都配置了,就是进程并行,每个进程最多n个线程,总线程数:进程数*线程数
# 注意:pytest-parallel支持python3.6及以上版本,如果是想做多进程并发的需要在linux平台或mac上做,windows上不起作用即(workers永远=1),如果是做多线程的Linux/Mac/Windows平台都支持,进程数为workers设置的值。
pytest.main()
# 测试报告本地静态数据生成--allure generate ./allure-xml -o ./allure-result
os.system(r"allure generate ./report/allure-results -o ./report/allure-report --clean")
# # 执行结束后 自动启动web服务报告
# port = randint(1000, 9999)
# os.system('allure open ./report/allure-report --port {}'.format(port)) # 打开报告
# os.system('allure open ./report --port {}'.format(port)) # 打开报告
# 失败重试
# • 测试失败后要重新运行n次,要在重新运行之间添加延迟时 间,间隔n秒再运行。
# • 执行:
# • 安装:pip install pytest-rerunfailures
# • pytest -v - -reruns 5 --reruns-delay 1 —每次等1秒 重试5次
# 在windows下想用多进程的选pytest-xdist; 想用多线程的选pytest-parallel
"""
pytest_content = """
[pytest]
;addopts = -sq --strict -m smoke --alluredir ./report/allure-results
;addopts = -sq --reruns 0 --reruns-delay 1 --alluredir ./report/allure-results
;addopts = -sq --workers 1 --tests-per-worker 1 --alluredir ./report/allure-results
;addopts = -sq --workers 1 --tests-per-worker 1 --reruns 3 --reruns-delay 1 --alluredir ./report/allure-results
addopts = -sq --alluredir ./report/allure-results
testpaths = ./case
;python_files = test_order.py
;python_files = test_brand.py
python_files = test_offer_doctor.py
;python_files = test_offer_nurse.py
;python_files = test_*.py

@@ -1645,15 +1184,21 @@ ;python_classes = Test*

"""
requirements_content = """
allure-pytest==2.11.1
allure-python-commons==2.11.1
aamt==0.2.0
allure-pytest==2.12.0
allure-python-commons==2.12.0
attrs==22.1.0
certifi==2022.9.24
cffi==1.15.1
charset-normalizer==2.1.1
colorama==0.4.6
colorlog==6.7.0
cryptography==39.0.0
et-xmlfile==1.1.0
exceptiongroup==1.0.0
execnet==1.9.0
Faker==15.1.1
Faker==15.3.4
filelock==3.9.0
greenlet==2.0.1
idna==3.4

@@ -1663,2 +1208,3 @@ iniconfig==1.1.1

jsonpath==0.82
loguru==0.6.0
openpyxl==3.0.10

@@ -1668,2 +1214,3 @@ packaging==21.3

pycodestyle==2.9.1
pycparser==2.21
PyMySQL==1.0.2

@@ -1679,2 +1226,4 @@ pyparsing==3.0.9

six==1.16.0
SQLAlchemy==1.4.45
texttable==1.6.7
tomli==2.0.1

@@ -1685,29 +1234,134 @@ urllib3==1.26.12

"""
conftest_content = """
# -*- coding: utf-8 -*-
# @Software: PyCharm
# @File: conftest.py
# @Author: xuefeng365
# @E-mail: 120158568@qq.com,
# @Site: 51automate.cn
# @Time: 11月 25, 2022
import pytest
from aamt.plugin import aamt_plugins
# 加载aamt插件
from api.offer_doctor.doctor_need_api import Dictor_need
pytest_plugins = aamt_plugins()
def pytest_collection_modifyitems(items):
# 测试用例收集完成时,将收集到的item的name和nodeid的中文显示
for item in items:
item.name = item.name.encode("utf-8").decode("unicode_escape")
item._nodeid = item.nodeid.encode("utf-8").decode("unicode_escape")
"""
run_shell_content = """
#!/bin/bash
set -X
rm -rf /var/jenkins_home/workspace/api_auotmationtest/report/*
sleep 1
/var/jenkins_home/test_project/py3.9.15/bin/python3 /var/jenkins_home/workspace/api_auotmationtest/main.py
"""
README_content = """
安装最新版本
## AAMT 项目模版
> 用于生成 基于pytest的接口自动化脚手架
> pip install aamt
"""
structure_content = """项目结构说明:
api :各模块接口文件
指定版本安装
data:用例yaml数据
> pip install tep==0.1.0
case:测试用例;
升级aamt
fixtures:Pytest fixture,自动导入;
> pip install -U aamt
common:个人封装 比如邮件、mysql、project项目各种路径、token信息读取等
创建项目脚手架
report:测试报告;
> aamt startproject demo
conftest.py:Pytest挂载;
创建项目脚手架(自动创建虚拟环境)
file:待上传的文件目录;
> aamt startproject demo -venv
utils:工具包;
外网速度慢,pandas可能安装失败,推荐用国内镜像
pytest.ini:Pytest配置;
> pip --default-timeout=6000 install -i https://pypi.tuna.tsinghua.edu.cn/simple aamt
main.py:执行入口;
requirements.txt:项目依赖;
项目结构说明.txt
"""
fake_content = """
from faker import Faker
fake = Faker(locale="zh_CN")
print('姓名:', fake.name())
print('手机号:', fake.phone_number())
phones = [fake.phone_number() for _ in range(5)] # 列表推导,把生成的数据组成一个列表
print('批量生成手机号:', phones)
print('邮箱:', fake.email())
print('MD5:', fake.md5())
print(fake.items())
print('年月日:', fake.date(pattern=' %Y-%m-%d'))
print('随机年份:', fake.year())
print('随机月份:', fake.month())
print('随机几号:', fake.day_of_month())
print('随机星期数:', fake.day_of_week())
print('随机时间字符串1:', fake.time(pattern='%Y-%m-%d %H:%M:%S'))
print('随机时间字符串2:', fake.time(pattern='%H:%M:%S'))
# -30y是过去30年前为开始日期,end_date表示结束到今天
print('过去某一天:', fake.date_between(start_date="-30y", end_date="today"))
print('今天:', fake.date_between_dates()) # 今天
print('日期和时间:', fake.date_time()) # 2021-05-14 19:36:00
print('当前日期时间:', fake.date_time_between_dates())
# print('某个区间内随机日期时 间:', fake.date_time_between_dates(datetime_start=datetime(1999, 2, 2, 10, 30, 20), dat
# etime_end = datetime(2000, 2, 2, 10, 30, 20)))
print('未来30天内的随机日期:', fake.future_date(end_date="+30d"), f'类型是:{type(fake.future_date(end_date="+30d"))},', str(fake.future_date(end_date="+30d")),'已转成字符串')
print('未来30天内的随机日期时间:', fake.future_datetime(end_date="+30d"),
f'类型是:{type(fake.future_datetime(end_date="+30d"))}') # 未来日期和时间)
print('过去的日期:', fake.past_date(start_date="-30m")) # 过去日期
print('过去的日期时间:', fake.past_datetime(start_date="-30d")) # 过去日期和时间
print('时间戳:', fake.unix_time(), f'类型是:{type(fake.unix_time())}')
print('时间戳(时分秒):', fake.time(), f'类型是:{type(fake.time())}')
print('随机md5:', fake.md5(), f'类型是:{type(fake.md5())}')
"""
fastapi_mock_content = """#!/usr/bin/python

@@ -1764,3 +1418,2 @@ # encoding=utf-8

"""
mitm_content = """#!/usr/bin/python

@@ -1870,5 +1523,262 @@ # encoding=utf-8

"""
dao_conftent = """
# -*- coding: UTF-8 -*-
# @Software: PyCharm
# @File: dao.py
# @Author: xuefeng365
# @E-mail: 120158568@qq.com,
# @Site:
# @Time: 11月 23, 2022
structure_content = """项目结构说明:
待完善
from loguru import logger
try:
from sqlalchemy import create_engine
from texttable import Texttable
except ModuleNotFoundError:
pass
def mysql_engine(host, port, user, password, db):
# sqlalchemy create_engine
try:
engine = create_engine(f"mysql+pymysql://{user}:{password}@{host}:{port}/{db}")
except NameError:
return ""
return engine
def print_db_table(data_frame):
#以表格形式打印数据表
tb = Texttable()
tb.header(data_frame.columns.array)
tb.set_max_width(0)
# text * cols
tb.set_cols_dtype(['t'] * data_frame.shape[1])
tb.add_rows(data_frame.to_numpy(), header=False)
logger.info(tb.draw())
"""
client_content = """
import json
import time
from mimetypes import MimeTypes
from urllib.parse import urlencode
import allure
import jmespath
import requests
from requests import Response
from requests_toolbelt.multipart.encoder import MultipartEncoder
from common.project import *
from loguru import logger
class Body_type():
none = '无类型'
# 默认
json = 'json数据'
form_text = '表单数据 :纯json数据'
form_file = '表单数据 :文件二进制流+json数据'
binary = '文件直传:文件二进制流'
graphql = '其他类型'
# 缓存变量池
class Cache_vars:
def __init__(self):
self.var_ = {'init_data': '初始值'}
def put(self, key, value):
self.var_[key] = value
# logger.info(f'变量池 成功新增:{{{key}:{value}}}')
def get(self, key):
value = ""
try:
value = self.var_[key]
except KeyError:
logger.error(f'异常:获取 api缓存变量池的key:{key}不存在, 返回空字符串。类实例变量池:{self.var_}')
return value
class HttpClient(Body_type):
def __init__(self):
self.session = requests.Session()
# 缓存变量池实例
self.cache = Cache_vars()
# 获取当前使用的环境
self.environ_active = Operate_token().read_environ_active()
# 获取激活yaml的所有数据
self.env_vars_data = Read_yaml().get_env_vars_yaml()
self.host = self.env_vars_data['after_host']
self.default_header = {
"Content-Type": "application/json",
"language": "zh_CN"
}
def send(self, url: str = '', method='post', body={}, body_type: str = Body_type.json, x_token='',
file_key='picFile', file_path='', timeout=30, **kwargs):
start_time = time.time()
if not url.startswith(("http://", "https://")):
raise Exception("请输入正确的url, 记得带上http:// 或者 https:// 哦")
# 用户传了headers,就用用户的,不传就用默认的
headers = kwargs.get("headers", self.default_header)
if x_token:
headers["_token_"] = x_token.strip('"') # strip() 方法用于移除字符串头尾指定的字符(默认为空格或换行符)或字符序列。
if method == "get":
result = self.session.request(url=url, method=method, params=body, headers=headers, timeout=timeout,
**kwargs)
elif method == "post":
logger.info(f'body_type类型:{body_type}')
if body_type == Body_type.json:
headers['Content-Type'] = 'application/json; charset=UTF-8'
result = self.session.request(url=url, method=method, json=body, headers=headers, timeout=timeout,
**kwargs)
elif body_type == Body_type.form_file:
filename = file_path.split('\\\\')[-1] # xiaoxin.png
# 通过 mimetypes 来自动识别文件类型 : https://cloud.tencent.com/developer/section/1369143
fileType = MimeTypes().guess_type(file_path)[0]
# 没有识别到就不传 content_type
if fileType is None:
# files = {file_key: ('xiaoxin.png',open(file_path, 'rb'))}
files = {'file_key': (filename, open(file_path, 'rb'))}
# 也可以用已下的方法,不用加 filename
# files = {file_key: open(file_path, 'rb'))}
# files = {'file_key': open(file_path, 'rb')}
else:
# files = {file_key: ('xiaoxin.png',open(file_path, 'rb'),'image/png')}
files = {file_key: (filename, open(file_path, 'rb'), fileType)}
body.update(files)
# 把要传入的数据 转变为form_data格式
form_data = MultipartEncoder(body)
# 以下命令自动 转变 headers 中的 Content-Type 为:'multipart/form-data; boundary=。。。。。。。。。。。
headers['Content-Type'] = form_data.content_type
# :param data:(可选)字典,元组列表,字节,或文件类对象发送到:class: 'Request'的主体中。
result = self.session.request(url=url, method=method, data=form_data, headers=headers, timeout=timeout,
**kwargs)
elif body_type == Body_type.form_text:
# 把要传入的数据 转变为form_data格式
form_data = MultipartEncoder(body)
# 以下命令自动 转变 headers 中的 Content-Type 为:'multipart/form-data; boundary=。。。。。。。。。。。
headers['Content-Type'] = form_data.content_type
result = self.session.request(url=url, method=method, data=form_data, headers=headers, timeout=timeout,
**kwargs)
elif body_type == Body_type.binary:
files = {file_key: open(file_path, 'rb')}
# 文件流通过files 传给request的请求参数files
result = self.session.request(url=url, method=method, json=body, headers=headers, files=files,
timeout=timeout, **kwargs)
else:
raise ValueError(f"=====body_type没有定义,{body_type} 请确认====")
elif method == "patch":
result = self.session.request(url=url, method=method, data=json.dumps(body), headers=headers,
timeout=timeout, **kwargs)
elif method == "delete":
result = ''
elif method == "put":
result = ''
else:
raise ValueError(f"=====大兄弟===暂不支持{method} 请求呢====需要就自己补充吧====")
end_time = time.time()
# python 内置函数 保留4位小数
time_ = round((end_time - start_time), 4)
# # 处理
# result = AamtResponse(result)
try:
logger.info(
f'\\n请求日志:\\nurl: {url}\\nmethod: {method}\\nbody: \\n{body}\\nbody_type: {body_type}\\nheaders: \\n{headers}\\n**********************************************************************************')
logger.debug(
f'\\n响应日志:\\n响应码: {result.status_code}\\n请求>响应 时间开销: {time_}\\n**********************************************************************************\\n')
except AttributeError:
logger.error(
f'\\n无法获取响应码, 响应日志:\\n{result}\\n请求>响应 时间开销: {time_}\\n**********************************************************************************\\n')
except TypeError:
logger.warning(f'警告:{kwargs}')
self.__create_request_log(url, method, body, body_type, headers)
try:
self.__create_response_log(result.status_code, result.json(), time_)
return result.json()
except:
self.__create_response_log(result.status_code, result.text, time_)
logger.warning(f'\\n注意 响应内容:不可以序列化,具体响应如下:\\n{result.text}')
return result.text
def get_full_url(self, url, etc={}, replace={}, h=""):
if h:
host = h.rstrip('/') # rstrip() 删除 string 字符串末尾的指定字符(默认为空格).
else:
host = self.host.rstrip('/')
url = url.lstrip('/') # lstrip() 方法用于截掉字符串左边的空格或指定字符。
full_url = host + "/" + url
# full_url += "?"
full_url += "?platform={}".format(self.environ_active)
if etc:
s = urlencode(etc) # urlencode urllib库里面有个urlencode函数,可以把key-value这样的键值对转换成我们想要的格式,返回的是a=1&b=2这样的字符串
full_url += "&" + s
if replace:
full_url = full_url.format(replace) # str.format() 方法通过字符串中的花括号 {} 来识别替换字段 replacement field,从而完成字符串的格式化。
# full_url = str.format(full_url,replace) #str.format() 方法通过字符串中的花括号 {} 来识别替换字段 replacement field,从而完成字符串的格式化。
return full_url
# 目的就是 在allure显示,没什么实际意义
@allure.step("请求日志")
def __create_request_log(self, url, method, body, body_type, headers):
pass
# 目的就是 在allure显示
@allure.step('响应日志')
def __create_response_log(self, status_code, text,time_):
pass
class AamtResponse(Response):
# 包装requests.Response,简化jmespath写法
def __init__(self, response):
super().__init__()
for k, v in response.__dict__.items():
self.__dict__[k] = v
def jmespath(self, expression):
return jmespath.search(expression, self.json())
if __name__ == '__main__':
a = HttpClient()
a.send(url='http://www.baidu.com',method='get', body={}, body_type=Body_type.json)
"""

@@ -15,3 +15,3 @@ # -*- coding: utf-8 -*-

from aamt.sample import *
from aamt.logger import Logger
from loguru import logger

@@ -33,3 +33,3 @@

action="store_true",
help="Create virtual environment in the project, and install tep.",
help="Create virtual environment in the project, and install aamt.",
)

@@ -42,3 +42,3 @@ return sub_parser_scaffold

if os.path.isdir(project_name):
Logger.warning(
logger.warning(
f"Project folder {project_name} exists, please specify a new project name."

@@ -48,3 +48,3 @@ )

elif os.path.isfile(project_name):
Logger.warning(
logger.warning(
f"Project name {project_name} conflicts with existed file, please specify a new one."

@@ -54,3 +54,3 @@ )

Logger.info(f"Create new project: {project_name}")
logger.info(f"Create new project: {project_name}")
# os.getcwd() 获取当前目录

@@ -89,18 +89,19 @@ print(f"Project root dir: {os.path.join(os.getcwd(), project_name)}\n")

create_file(os.path.join(project_name, "fixtures", "xf", "fixture_xf.py"), file_content=fixture_xf_content)
create_folder(os.path.join(project_name , "fixtures", "zhangsan"))
create_file(os.path.join(project_name, "fixtures", "zhangsan", "__init__.py"))
create_file(os.path.join(project_name, "fixtures", "zhangsan", "fixture_zhangsan.py"), file_content=fixture_zhangsan_content)
create_folder(os.path.join(project_name, "case"))
create_file(os.path.join(project_name, "case", "__init__.py"))
create_file(os.path.join(project_name, "case", "conftest.py"),file_content=conftest_content)
create_file(os.path.join(project_name, "case", "conftest.py"),file_content=conftest_content1)
create_folder(os.path.join(project_name, "case", "test_brand"))
create_file(os.path.join(project_name, "case", "test_brand", "test_brand.py"),file_content=test_brand_content)
create_folder(os.path.join(project_name, "common"))
create_file(os.path.join(project_name, "common", "__init__.py"))
create_file(os.path.join(project_name, "common", "assert_api.py"),file_content=assert_api_content)
create_file(os.path.join(project_name, "common", "clear_results.py"),file_content=clear_results_content)
# create_file(os.path.join(project_name, "common", "config.py"),file_content=config_content)
create_file(os.path.join(project_name, "common", "logger.py"),file_content=logger_content)
create_file(os.path.join(project_name, "common", "mysqlhelper.py"),file_content=mysqlhelper_content)
create_file(os.path.join(project_name, "common", "emailhelper.py"),file_content=emailhelper_content)
create_file(os.path.join(project_name, "common", "read_token.py"),file_content=read_token_content)
create_file(os.path.join(project_name, "common", "project.py"),file_content=project_content)

@@ -115,3 +116,3 @@ create_folder(os.path.join(project_name, "data"))

create_folder(os.path.join(project_name, "resources"))
create_file(os.path.join(project_name, "resources", "aamt.ini"),file_content=aamt_content)
create_file(os.path.join(project_name, "resources", "aamt.ini"),file_content=aamt_ini_content)
create_folder(os.path.join(project_name, "resources", "env_vars"))

@@ -127,2 +128,3 @@ create_file(os.path.join(project_name, "resources", "env_vars", "env_vars_test.yaml"),file_content=env_vars_test_yaml_content)

create_file(os.path.join(project_name, "until", "mitm.py"), file_content=mitm_content)
create_file(os.path.join(project_name, "until", "dao.py"), file_content=dao_conftent)

@@ -133,3 +135,5 @@ create_file(os.path.join(project_name, "main.py"), file_content=main_content)

create_file(os.path.join(project_name, "README.md"), file_content=README_content)
create_file(os.path.join(project_name, "conftest.py"), file_content=conftest_content)
create_file(os.path.join(project_name, "项目结构说明.txt"), structure_content)
create_file(os.path.join(project_name, "run.sh"), run_shell_content)

@@ -136,0 +140,0 @@

Metadata-Version: 2.1
Name: aamt
Version: 0.1.9
Version: 0.2.0
Summary: 基于pytest的接口自动化测试工具模板

@@ -5,0 +5,0 @@ Home-page: https://github.com/xuefeng365/aamt-template.git

[tool.poetry]
name = "aamt"
version = "0.1.9"
version = "0.2.0"
description = "基于pytest的接口自动化测试工具模板"

@@ -5,0 +5,0 @@ authors = ["xuefeng365 <120158568@qq.com>"]

@@ -26,3 +26,3 @@ # -*- coding: utf-8 -*-

'name': 'aamt',
'version': '0.1.9',
'version': '0.2.0',
'description': '基于pytest的接口自动化测试工具模板',

@@ -29,0 +29,0 @@ 'long_description': '## AAMT 项目模版\n> 用于生成 基于pytest的接口自动化脚手架\n\npython 版本\n\n> 3.9\n\n安装最新版本\n\n> pip install aamt\n\n指定版本安装\n\n> pip install tep==0.1.0\n\n升级aamt\n\n> pip install -U aamt\n\n创建项目脚手架 \n\n> aamt startproject demo\n\n创建项目脚手架(自动创建虚拟环境)\n\n> aamt startproject demo -venv\n\n外网速度慢,pandas可能安装失败,推荐用国内镜像\n\n> pip --default-timeout=6000 install -i https://pypi.tuna.tsinghua.edu.cn/simple aamt\n\n\n\n',

import json
import time
from mimetypes import MimeTypes
from urllib.parse import urlencode
import allure
import jmespath
import requests
from requests import Response
from requests_toolbelt.multipart.encoder import MultipartEncoder
from aamt.config import *
from aamt.fixture import AamtVars
from aamt.logger import Logger
class Body_type():
none = '无类型'
# 默认
json = 'json数据'
form_text = '表单数据 :纯json数据'
form_file = '表单数据 :文件二进制流+json数据'
binary = '文件直传:文件二进制流'
graphql = '其他类型'
class HttpClient(Body_type):
def __init__(self):
self.session = requests.Session()
# 变量池实例
self.aamt_vars = AamtVars()
# 日志实例
self.logging = Logger
# 获取当前使用的环境
self.environ_active = Operate_config().read_environ_active()
# 获取激活yaml的所有数据
self.env_vars_data = Read_yaml().get_env_vars_yaml()
self.host = self.env_vars_data['after_host']
self.default_header = {
"Content-Type": "application/json",
"language": "zh_CN"
}
def send(self, url: str = '', method='post', body={}, body_type: str = Body_type.json, x_token='',
file_key='picFile', file_path='', timeout=30, **kwargs):
start_time = time.time()
if not url.startswith(("http://", "https://")):
raise Exception("请输入正确的url, 记得带上http:// 或者 https:// 哦")
# 用户传了headers,就用用户的,不传就用默认的
headers = kwargs.get("headers", self.default_header)
if x_token:
headers["_token_"] = x_token.strip('"') # strip() 方法用于移除字符串头尾指定的字符(默认为空格或换行符)或字符序列。
if method == "get":
result = self.session.request(url=url, method=method, params=body, headers=headers, timeout=timeout,
**kwargs)
elif method == "post":
self.logging.info(f'body_type类型:{body_type}')
if body_type == Body_type.json:
headers['Content-Type'] = 'application/json; charset=UTF-8'
result = self.session.request(url=url, method=method, json=body, headers=headers, timeout=timeout,
**kwargs)
elif body_type == Body_type.form_file:
filename = file_path.split('\\')[-1] # xiaoxin.png
# 通过 mimetypes 来自动识别文件类型 : https://cloud.tencent.com/developer/section/1369143
fileType = MimeTypes().guess_type(file_path)[0]
# 没有识别到就不传 content_type
if fileType is None:
# files = {file_key: ('xiaoxin.png',open(file_path, 'rb'))}
files = {'file_key': (filename, open(file_path, 'rb'))}
# 也可以用已下的方法,不用加 filename
# files = {file_key: open(file_path, 'rb'))}
# files = {'file_key': open(file_path, 'rb')}
else:
# files = {file_key: ('xiaoxin.png',open(file_path, 'rb'),'image/png')}
files = {file_key: (filename, open(file_path, 'rb'), fileType)}
body.update(files)
# 把要传入的数据 转变为form_data格式
form_data = MultipartEncoder(body)
# 以下命令自动 转变 headers 中的 Content-Type 为:'multipart/form-data; boundary=。。。。。。。。。。。
headers['Content-Type'] = form_data.content_type
# :param data:(可选)字典,元组列表,字节,或文件类对象发送到:class: 'Request'的主体中。
result = self.session.request(url=url, method=method, data=form_data, headers=headers, timeout=timeout,
**kwargs)
elif body_type == Body_type.form_text:
# 把要传入的数据 转变为form_data格式
form_data = MultipartEncoder(body)
# 以下命令自动 转变 headers 中的 Content-Type 为:'multipart/form-data; boundary=。。。。。。。。。。。
headers['Content-Type'] = form_data.content_type
result = self.session.request(url=url, method=method, data=form_data, headers=headers, timeout=timeout,
**kwargs)
elif body_type == Body_type.binary:
files = {file_key: open(file_path, 'rb')}
# 文件流通过files 传给request的请求参数files
result = self.session.request(url=url, method=method, json=body, headers=headers, files=files,
timeout=timeout, **kwargs)
else:
raise ValueError(f"=====body_type没有定义,{body_type} 请确认====")
elif method == "patch":
result = self.session.request(url=url, method=method, data=json.dumps(body), headers=headers,
timeout=timeout, **kwargs)
elif method == "delete":
result = ''
elif method == "put":
result = ''
else:
raise ValueError(f"=====大兄弟===暂不支持{method} 请求呢====需要就自己补充吧====")
end_time = time.time()
# python 内置函数 保留4位小数
time_ = round((end_time - start_time), 4)
# 处理
result = AamtResponse(result)
try:
self.logging.info(
f'\n请求日志:\nurl: {url}\nmethod: {method}\nbody: \n{body}\nbody_type: {body_type}\nheaders: \n{headers}\n**********************************************************************************')
self.logging.debug(
f'\n响应日志:\n响应码: {result.status_code}\n请求>响应 时间开销: {time_}\n**********************************************************************************\n')
except AttributeError:
self.logging.error(
f'\n无法获取响应码, 响应日志:\n{result}\n请求>响应 时间开销: {time_}\n**********************************************************************************\n')
except TypeError:
self.logging.warning(f'警告:{kwargs}')
self.__create_request_log(url, method, body, body_type, headers)
try:
self.__create_response_log(result.status_code, result.json(), time_)
return result.json()
except:
self.__create_response_log(result.status_code, result.text, time_)
self.logging.warning(f'\n注意 响应内容:不可以序列化,具体响应如下:\n{result.text}')
return result.text
def get_full_url(self, url, etc={}, replace={}, h=""):
if h:
host = h.rstrip('/') # rstrip() 删除 string 字符串末尾的指定字符(默认为空格).
else:
host = self.host.rstrip('/')
url = url.lstrip('/') # lstrip() 方法用于截掉字符串左边的空格或指定字符。
full_url = host + "/" + url
# full_url += "?"
full_url += "?platform={}".format(self.environ_active)
if etc:
s = urlencode(etc) # urlencode urllib库里面有个urlencode函数,可以把key-value这样的键值对转换成我们想要的格式,返回的是a=1&b=2这样的字符串
full_url += "&" + s
if replace:
full_url = full_url.format(replace) # str.format() 方法通过字符串中的花括号 {} 来识别替换字段 replacement field,从而完成字符串的格式化。
# full_url = str.format(full_url,replace) #str.format() 方法通过字符串中的花括号 {} 来识别替换字段 replacement field,从而完成字符串的格式化。
return full_url
# 目的就是 在allure显示,没什么实际意义
@allure.step("请求日志")
def __create_request_log(self, url, method, body, body_type, headers):
pass
# 目的就是 在allure显示
@allure.step('响应日志')
def __create_response_log(self, status_code, text, time_):
pass
class AamtResponse(Response):
# 包装requests.Response,简化jmespath写法
def __init__(self, response):
super().__init__()
for k, v in response.__dict__.items():
self.__dict__[k] = v
def jmespath(self, expression):
return jmespath.search(expression, self.json())
if __name__ == '__main__':
a = HttpClient()
a.send(url='http://www.baidu.com', method='get', body={}, body_type=Body_type.json)
# -*- coding: UTF-8 -*-
# @Software: PyCharm
# @File: dao.py
# @Author: xuefeng365
# @E-mail: 120158568@qq.com,
# @Site:
# @Time: 11月 23, 2022
from aamt.logger import Logger
try:
from sqlalchemy import create_engine
from texttable import Texttable
except ModuleNotFoundError:
pass
def mysql_engine(host, port, user, password, db):
# sqlalchemy create_engine
try:
engine = create_engine(f"mysql+pymysql://{user}:{password}@{host}:{port}/{db}")
except NameError:
return ""
return engine
def print_db_table(data_frame):
"""以表格形式打印数据表"""
tb = Texttable()
tb.header(data_frame.columns.array)
tb.set_max_width(0)
# text * cols
tb.set_cols_dtype(['t'] * data_frame.shape[1])
tb.add_rows(data_frame.to_numpy(), header=False)
Logger.info(tb.draw())
# -*- coding: UTF-8 -*-
# @Software: PyCharm
# @File: func.py
# @Author: xuefeng365
# @E-mail: 120158568@qq.com,
# @Site:
# @Time: 11月 23, 2022
import copy
import itertools
import json
import time
from sys import stdout
from aamt.logger import Logger
try:
import numpy as np
except ModuleNotFoundError:
pass
def current_time():
"""当前时间"""
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
def current_date():
"""当前日期"""
return time.strftime("%Y-%m-%d", time.localtime(time.time()))
def progress_bar(i):
"""进度条"""
c = int(i / 10)
progress = '\r %2d%% [%s%s]'
a = '■' * c
b = '□' * (10 - c)
msg = progress % (i, a, b)
stdout.write(msg)
stdout.flush()
def pairwise(option):
"""pairwise算法"""
cp = [] # 笛卡尔积
s = [] # 两两拆分
for x in eval('itertools.product' + str(tuple(option))):
cp.append(x)
s.append([i for i in itertools.combinations(x, 2)])
Logger.info('笛卡尔积:%s' % len(cp))
del_row = []
progress_bar(0)
s2 = copy.deepcopy(s)
for i in range(len(s)): # 对每行用例进行匹配
if (i % 100) == 0 or i == len(s) - 1:
progress_bar(int(100 * i / (len(s) - 1)))
t = 0
for j in range(len(s[i])): # 对每行用例的两两拆分进行判断,是否出现在其他行
flag = False
for i2 in [x for x in range(len(s2)) if s2[x] != s[i]]: # 找同一列
if s[i][j] == s2[i2][j]:
t = t + 1
flag = True
break
if not flag: # 同一列没找到,不用找剩余列了
break
if t == len(s[i]):
del_row.append(i)
s2.remove(s[i])
res = [cp[i] for i in range(len(cp)) if i not in del_row]
Logger.info('过滤后:%s' % len(res))
return res
class NpEncoder(json.JSONEncoder):
"""numpy类型转换"""
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.int64):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.float64):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
else:
return super(NpEncoder, self).default(obj)
def textbox_input():
"""文本框输入数据"""
return ["你好", "HELLO", "hello", "8", "0", "-5", "8.5", "", " 123 ", " 你好 ", "你 好", "!", "@", "#", "$",
"$$", "^", "&", "*", "_", "null", "<html></html>", "'", "''", '"', '""', "[", "]", "[]", "{", "}",
"{}", "\r\n", "HE1",
""" 你好HEllo 10 -3 0 0.5 !@#$%^&*_、null、<html></html><html>、'、"、[]、{}、/r/n\\r\\n、ces """,
# 10
"1234567890",
# 50
"12345678901234567890123456789012345678901234567890",
# 100
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890",
# 200
"12345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890",
# 500
"12345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890",
# 1000
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890",
# 2000
"12345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890",
# 4000
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
]
# -*- coding: UTF-8 -*-
# @Software: PyCharm
# @File: logger.py
# @Author: xuefeng365
# @E-mail: 120158568@qq.com,
# @Site:
# @Time: 11月 23, 2022
import logging
import time
from functools import wraps
import colorlog
from aamt.config import *
# logging模块中包含的类
# 用来自定义日志对象的规则(比如:设置日志输出格式、等级等)
# 常用子类:StreamHandler、FileHandler
# StreamHandler 控制台输出日志
# FileHandler 日志输出到文件
# BASE_PATH = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
# print(BASE_PATH)
# 日志文件路径
LOG_PATH = os.path.join(Config.project_root_dir, "log")
if not os.path.exists(LOG_PATH):
os.mkdir(LOG_PATH)
class Logger():
def __init__(self):
log_colors_config = {
'DEBUG': 'white', # cyan white
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'bold_red'}
# 创建一个logger日志对象
self.logger = logging.getLogger("log")
# 创建日志格式(不带颜色)对象
self.file_formater = logging.Formatter(
fmt='[%(asctime)s.%(msecs)03d] %(filename)s -> %(funcName)s line:%(lineno)d [%(levelname)s] : %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
# 创建颜色格式((带颜色))对象
self.console_formatter = colorlog.ColoredFormatter(
fmt='%(log_color)s[%(asctime)s.%(msecs)03d] %(filename)s -> %(funcName)s line:%(lineno)d [%(levelname)s] : %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
log_colors=log_colors_config)
# 输出到控制台
self.console = logging.StreamHandler()
self.logname = os.path.join(LOG_PATH, "{}.log".format(time.strftime("%Y%m%d"))) # 创建日志路径和时间
# 输出到文件:获取日志路径,并创建FileHandler对象()-- 设定写入方式:追加
self.filelogger = logging.FileHandler(self.logname, mode='a', encoding="UTF-8")
# 设置默认的日志级别 控制台logger 和 handler以最高级别为准,不同handler之间可以不一样,不相互影响
self.logger.setLevel(logging.INFO)
self.console.setLevel(logging.INFO)
self.filelogger.setLevel(logging.INFO)
# self.logger.setLevel(logging.INFO)
# self.logger.setLevel(logging.WARNING)
# 设置控制台日志的颜色
self.console.setFormatter(self.console_formatter)
# 设置文件日志的颜色
self.filelogger.setFormatter(self.file_formater)
# 重复日志问题:
# 1、防止多次addHandler;
# 2、loggername 保证每次添加的时候不一样;
# 3、显示完log之后调用removeHandler
if not self.logger.handlers:
self.logger.addHandler(self.console)
self.logger.addHandler(self.filelogger)
self.console.close()
self.filelogger.close()
Logger = Logger().logger
def decorate_log(func):
@wraps(func)
# Python装饰器中@wraps作用 https://blog.csdn.net/weixin_40576010/article/details/88639686
def log(*args, **kwargs):
Logger.info(f'-- 开始执行 {func.__name__} --')
try:
func(*args, **kwargs)
except Exception as e:
Logger.error(f'-- {func.__name__}执行失败,原因:{e} --')
raise e
else:
Logger.info(f'-- {func.__name__} 执行成功 --')
return log
# 调试用
@decorate_log
def xuefeng():
assert 2 == 2
if __name__ == '__main__':
xuefeng()
Logger.info("---测试开始---")
Logger.warning("---测试开始---")
Logger.error("---测试结束---")
Logger.debug("---测试结束---")