aamt
Advanced tools
+1
-1
@@ -1,2 +0,2 @@ | ||
| __version__ = "0.1.9" | ||
| __version__ = "0.2.0" | ||
| __description__ = "aamt is a api-automation-testing tool to help you write pytest more easily" |
+3
-62
@@ -12,4 +12,4 @@ # -*- coding: UTF-8 -*- | ||
| import os | ||
| import yaml | ||
| from loguru import logger | ||
@@ -19,65 +19,4 @@ class Config(): | ||
| project_root_dir = '' | ||
| # project_root_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) | ||
| class Operate_config(Config): | ||
| def __init__(self): | ||
| super().__init__() | ||
| self.env_configpath = os.path.join(self.project_root_dir, "resources", "aamt.ini") | ||
| # 实例化configParser对象 | ||
| self.conf = configparser.ConfigParser() | ||
| self.conf.read(self.env_configpath, encoding="utf-8") | ||
| # 读取配置文件中的key值 (读取) | ||
| def read_token(self, section='Token', key=''): | ||
| return self.conf.get(section, key) | ||
| # 读取配置文件中的key值 (读取) | ||
| def read_environ_active(self, section='Environ', key='active'): | ||
| return self.conf.get(section, key) | ||
| # 将value的值写入配置文件中 | ||
| def write(self, section='Token', key='', value=''): | ||
| self.conf.set(section, key, value) # 给iphone分组设置 key:value (iphone_url:www.xxx.com) | ||
| # 写入文件 | ||
| with open(self.env_configpath, 'w', encoding="utf-8") as configfile: | ||
| self.conf.write(configfile) | ||
| class Read_yaml(Operate_config): | ||
| def __init__(self): | ||
| super().__init__() | ||
| def get_env_vars_yaml(self): | ||
| env_active = self.read_environ_active() | ||
| env_filename = f'env_vars_{env_active}.yaml' | ||
| with open( | ||
| os.path.join(self.project_root_dir, "resources", "env_vars", env_filename), encoding="utf-8") as f: | ||
| return yaml.load(f.read(), Loader=yaml.FullLoader) | ||
| def get_test_yaml(self, filepath): | ||
| ''' | ||
| filepath="/brand/brand_controller.yaml" | ||
| ''' | ||
| # 测试用例数据 | ||
| test_data_path = f"{Config.project_root_dir}/data/{filepath}".replace("\\", "/").replace("//", "/") | ||
| with open(test_data_path, encoding="utf-8") as f: | ||
| return yaml.load(f.read(), Loader=yaml.FullLoader) | ||
| def get_file_path(file_name, middle='file'): | ||
| ''' | ||
| file_name: 文件名,比如 xiaoxin.png | ||
| ''' | ||
| filePath = f"{Config.project_root_dir}/{middle}/{file_name}".replace("\\", "/").replace("//", "/") | ||
| return filePath | ||
| def fixture_paths(root_path=Config.project_root_dir): | ||
@@ -98,5 +37,7 @@ ''' | ||
| paths.append("fixtures" + import_path) | ||
| logger.info(f'1、项目下的fixtures:{paths}') | ||
| # aamt下的fixture | ||
| paths.append("aamt.fixture") | ||
| logger.info(f'2、aamt下的fixtures:{[paths[-1]]}') | ||
| return paths | ||
+26
-77
@@ -10,88 +10,37 @@ # -*- coding: UTF-8 -*- | ||
| import pytest | ||
| from aamt.logger import Logger | ||
| from faker import Faker | ||
| import json | ||
| import os | ||
| from aamt.config import * | ||
| from filelock import FileLock | ||
| from loguru import logger | ||
| class Project: | ||
| dir = "" | ||
| def _project_dir(session): | ||
| # 从缓存中获取项目根目录 | ||
| project_dir = session.config.cache.get("project_dir", None) | ||
| if not project_dir: | ||
| # 第一次运行没有.pytest_cache | ||
| cwd = os.getcwd() | ||
| tests = cwd.find("tests") | ||
| samples = cwd.find("samples") | ||
| if tests > 0: | ||
| project_dir = cwd[:cwd.find("tests")] | ||
| elif samples > 0: | ||
| project_dir = cwd[:cwd.find("samples")] | ||
| else: | ||
| project_dir = cwd | ||
| return project_dir | ||
| def pytest_sessionstart(session): | ||
| Project.dir = _project_dir(session) | ||
| @pytest.fixture(scope="session") | ||
| def faker_ch(): | ||
| """中文造数据""" | ||
| return Faker(locale="zh_CN") | ||
| def aamt_context_manager(tmp_path_factory, worker_id): | ||
| """ | ||
| aamt上下文管理器,在xdist分布式执行时,多个session也只执行一次 | ||
| 参考:https://pytest-xdist.readthedocs.io/en/latest/how-to.html#making-session-scoped-fixtures-execute-only-once | ||
| 命令不带-n auto也能正常执行,不受影响 | ||
| """ | ||
| def inner(produce_expensive_data, *args, **kwargs): | ||
| if worker_id == "master": | ||
| # not executing in with multiple workers, just produce the data and let | ||
| # pytest's fixture caching do its job | ||
| return produce_expensive_data(*args, **kwargs) | ||
| @pytest.fixture(scope="session") | ||
| def faker_en(): | ||
| """英文造数据""" | ||
| return Faker() | ||
| # get the temp directory shared by all workers | ||
| root_tmp_dir = tmp_path_factory.getbasetemp().parent | ||
| fn = root_tmp_dir / "data.json" | ||
| with FileLock(str(fn) + ".lock"): | ||
| if fn.is_file(): | ||
| data = json.loads(fn.read_text()) | ||
| else: | ||
| data = produce_expensive_data(*args, **kwargs) | ||
| fn.write_text(json.dumps(data)) | ||
| return data | ||
| @pytest.fixture(scope="session") | ||
| def pd(): | ||
| """pandas库""" | ||
| try: | ||
| import pandas | ||
| return pandas | ||
| except ModuleNotFoundError: | ||
| pass | ||
| @pytest.fixture(scope="session") | ||
| def file_dir(): | ||
| """file目录的路径""" | ||
| return os.path.join(Project.dir, "file") | ||
| @pytest.fixture(scope="session") | ||
| def env_vars(): | ||
| """读取激活环境下的yaml文件里的配置信息(账号、密码、数据库等)""" | ||
| return Read_yaml().get_env_vars_yaml() | ||
| class AamtVars: | ||
| # 全局变量池 | ||
| def __init__(self): | ||
| self.vars_ = {'a': '初始值'} | ||
| def put(self, key, value): | ||
| self.vars_[key] = value | ||
| # Logger.info(f'变量池 成功新增:{{{key}:{value}}}') | ||
| def get(self, key): | ||
| value = "" | ||
| try: | ||
| value = self.vars_[key] | ||
| except KeyError: | ||
| Logger.error(f'异常:获取 {key} 失败, 当前变量池:{self.vars_}') | ||
| return value | ||
| return inner |
+91
-34
@@ -7,3 +7,3 @@ # -*- coding: utf-8 -*- | ||
| # @E-mail: 120158568@qq.com, | ||
| # @Site: | ||
| # @Site: | ||
| # @Time: 11月 23, 2022 | ||
@@ -24,17 +24,25 @@ | ||
| # allure临时目录 | ||
| allure_temp = tempfile.mkdtemp() | ||
| # allure源文件临时目录,那一堆json文件,生成HTML报告会删除 | ||
| allure_source_path = ".allure.source.temp" | ||
| def _aamt_reports(config): | ||
| """ | ||
| --aamt-reports命令行参数不能和allure命令行参数同时使用,否则可能出错。判断参数是否生效,防止跟allure自带参数冲突 | ||
| """ | ||
| if config.getoption("--aamt-reports") and not config.getoption("allure_report_dir"): | ||
| return True | ||
| return False | ||
| def aamt_plugins(): | ||
| # conftest 加载插件时,从调用堆栈列表里获取信息,再组装成所需路径 | ||
| caller = inspect.stack()[1] | ||
| # 保存项目根路径 | ||
| Config.project_root_dir = os.path.dirname(caller.filename) | ||
| # 获取所有fixture路径,1、项目下的fixtures;2、aamt下的fixture; | ||
| plugins_path = fixture_paths(root_path=Config.project_root_dir) # +[其他插件] | ||
| return plugins_path | ||
| def _is_master(config): | ||
| """ | ||
| pytest-xdist分布式执行时,判断是主节点master还是子节点 | ||
| 主节点没有workerinput属性 | ||
| """ | ||
| return not hasattr(config, 'workerinput') | ||
| class Plugin: | ||
| reports_path = os.path.join(Config.project_root_dir, "reports") | ||
| @staticmethod | ||
@@ -51,12 +59,10 @@ def pytest_addoption(parser): | ||
| @staticmethod | ||
| def _aamt_reports(config): | ||
| # 判断参数是否生效,防止跟allure自带参数冲突 | ||
| if config.getoption("--aamt-reports") and not config.getoption("allure_report_dir"): | ||
| return True | ||
| else: | ||
| return False | ||
| @staticmethod | ||
| def pytest_configure(config): | ||
| if Plugin._aamt_reports(config): | ||
| """ | ||
| 这段代码源自:https://github.com/allure-framework/allure-python/blob/master/allure-pytest/src/plugin.py | ||
| 目的是生成allure源文件,用于生成HTML报告 | ||
| """ | ||
| if _aamt_reports(config): | ||
| if os.path.exists(allure_source_path): | ||
| shutil.rmtree(allure_source_path) | ||
| test_listener = AllureListener(config) | ||
@@ -68,3 +74,3 @@ config.pluginmanager.register(test_listener) | ||
| clean = config.option.clean_alluredir | ||
| file_logger = AllureFileLogger(allure_temp, clean) | ||
| file_logger = AllureFileLogger(allure_source_path, clean) # allure_source | ||
| allure_commons.plugin_manager.register(file_logger) | ||
@@ -75,16 +81,67 @@ config.add_cleanup(cleanup_factory(file_logger)) | ||
| def pytest_sessionfinish(session): | ||
| # 测试运行结束后生成allure报告 | ||
| if Plugin._aamt_reports(session.config): | ||
| reports_dir = os.path.join(Config.project_root_dir, "reports") | ||
| current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime(time.time())) | ||
| new_report = os.path.join(reports_dir, "report-" + current_time) | ||
| if os.path.exists(reports_dir): | ||
| # 复制历史报告,填充allure趋势图数据 | ||
| his_reports = os.listdir(reports_dir) | ||
| if his_reports: | ||
| latest_report_history = os.path.join(reports_dir, his_reports[-1], "history") | ||
| shutil.copytree(latest_report_history, os.path.join(allure_temp, "history")) | ||
| os.system(f"allure generate {allure_temp} -o {new_report} --clean") | ||
| shutil.rmtree(allure_temp) | ||
| """ | ||
| 测试运行结束后生成allure报告 | ||
| """ | ||
| reports_path = os.path.join(Config.project_root_dir, "report") | ||
| if _aamt_reports(session.config): | ||
| if _is_master(session.config): # 只在master节点才生成报告 | ||
| # 最近一份报告的历史数据,填充allure趋势图 | ||
| if os.path.exists(reports_path): | ||
| his_reports = os.listdir(reports_path) | ||
| # 判断 report文件夹下是否有历史报告 | ||
| if his_reports: | ||
| # 如果有历史报告,就复制报告的历史数据(json文件) | ||
| latest_report_history = os.path.join(reports_path, "history") | ||
| # 将历史报告数据复制到新报告的临时文件里 | ||
| shutil.copytree(latest_report_history, os.path.join(allure_source_path, "history")) | ||
| # 删除历史报告 | ||
| shutil.rmtree(reports_path) | ||
| # html文件 存放路径 | ||
| html_report_name = reports_path | ||
| # allure_source_path : allure源文件临时目录,那一堆json文件,生成HTML报告会删除 | ||
| os.system(f"allure generate {allure_source_path} -o {html_report_name} --clean") | ||
| shutil.rmtree(allure_source_path) | ||
| # @staticmethod | ||
| # def pytest_sessionfinish(session): | ||
| # """ | ||
| # 测试运行结束后生成allure报告 | ||
| # """ | ||
| # reports_path = os.path.join(Config.project_root_dir, "reports") | ||
| # if _aamt_reports(session.config): | ||
| # if _is_master(session.config): # 只在master节点才生成报告 | ||
| # # 最近一份报告的历史数据,填充allure趋势图 | ||
| # if os.path.exists(reports_path): | ||
| # his_reports = os.listdir(reports_path) | ||
| # if his_reports: | ||
| # # print(f'报告目录文件有:{his_reports}') | ||
| # # 清除多余的报告,只保留最近的一份报告 | ||
| # for i in his_reports[:-1]: | ||
| # shutil.rmtree(os.path.join(Config.project_root_dir, 'reports', i)) | ||
| # # print(f'操作只保留最近一份报告目录:{his_reports}') | ||
| # latest_report_history = os.path.join(reports_path, his_reports[-1], "history") | ||
| # # 复制最近一份报告的记录 | ||
| # shutil.copytree(latest_report_history, os.path.join(allure_source_path, "history")) | ||
| # # 删除最近一份报告的记录,下面会生成新的 | ||
| # shutil.rmtree(os.path.join(Config.project_root_dir, 'reports', his_reports[-1])) | ||
| # | ||
| # current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime(time.time())) | ||
| # # html_report_name = os.path.join(reports_path, "report") | ||
| # html_report_name = os.path.join(reports_path, "report-" + current_time) | ||
| # os.system(f"allure generate {allure_source_path} -o {html_report_name} --clean") | ||
| # shutil.rmtree(allure_source_path) | ||
| # | ||
| def aamt_plugins(): | ||
| # conftest 加载插件时,从调用堆栈列表里获取信息,再组装成所需路径 | ||
| caller = inspect.stack()[1] | ||
| # 保存项目根路径 | ||
| Config.project_root_dir = os.path.dirname(caller.filename) | ||
| # 获取所有fixture路径,1、项目下的fixtures;2、aamt下的fixture; | ||
| plugins_path = fixture_paths(root_path=Config.project_root_dir) # +[其他插件] | ||
| return plugins_path | ||
+695
-785
@@ -18,193 +18,60 @@ # -*- coding: utf-8 -*- | ||
| client_content = """ | ||
| import json | ||
| import time | ||
| from mimetypes import MimeTypes | ||
| from urllib.parse import urlencode | ||
| logintoken_content = """ | ||
| # -*- coding: utf-8 -*- | ||
| import allure | ||
| import jmespath | ||
| import requests | ||
| from requests import Response | ||
| from requests_toolbelt.multipart.encoder import MultipartEncoder | ||
| from aamt.config import * | ||
| from aamt.fixture import AamtVars | ||
| from aamt.logger import Logger | ||
| import hashlib | ||
| from until.client import * | ||
| from common.project import * | ||
| from loguru import logger | ||
| class Body_type(): | ||
| none = '无类型' | ||
| # 默认 | ||
| json = 'json数据' | ||
| form_text = '表单数据 :纯json数据' | ||
| form_file = '表单数据 :文件二进制流+json数据' | ||
| binary = '文件直传:文件二进制流' | ||
| graphql = '其他类型' | ||
| # *********** 后台登录开始 *********** | ||
| class Login_after(HttpClient): | ||
| class HttpClient(Body_type): | ||
| def __init__(self, user_info, token:str): | ||
| # 传入token(coolie 或者 session)字段:目的为了写入配置文件 | ||
| super().__init__() # 继承上个类的ini | ||
| # ----------- 密码进行MD5加密 ------------ # | ||
| mima = str(user_info['password']) | ||
| # md5加密对象 | ||
| md5 = hashlib.md5() | ||
| # 填入要加密的字符串 | ||
| md5.update(mima.encode('utf-8')) | ||
| # 转化为16进制字符串 | ||
| new_mima = md5.hexdigest() | ||
| # ----------- MD5加密结束 ------------ # | ||
| def __init__(self): | ||
| self.username = user_info['account'] | ||
| self.password = new_mima | ||
| self.token_key = token | ||
| self.session = requests.Session() | ||
| # 变量池实例 | ||
| self.aamt_vars = AamtVars() | ||
| # 日志实例 | ||
| self.logging = Logger | ||
| # 获取当前使用的环境 | ||
| self.environ_active = Operate_config().read_environ_active() | ||
| # 获取激活yaml的所有数据 | ||
| self.env_vars_data = Read_yaml().get_env_vars_yaml() | ||
| self.host = self.env_vars_data['after_host'] | ||
| self.default_header = { | ||
| "Content-Type": "application/json", | ||
| "language": "zh_CN" | ||
| def login_(self): | ||
| etc = { | ||
| "account": self.username | ||
| , "password": self.password | ||
| } | ||
| url = '/system/userInfo/login' | ||
| method = 'get' | ||
| url = self.get_full_url(url, etc=etc, h=self.env_vars_data['after_host']) | ||
| result = self.send(method=method, url=url) | ||
| assert jmespath.search('code', result) < 400, f"系统管理登录失败,接口响应: {result}" | ||
| def send(self, url:str='', method='post', body={}, body_type:str=Body_type.json,x_token='', file_key='picFile',file_path='',timeout=30, **kwargs): | ||
| token_data = result['result']['token'] | ||
| logger.info(f'账号:{self.username} 登录成功,新token:{token_data}') | ||
| # 把token值写入配置文件中 | ||
| Operate_token().write_token(section="Token", key=self.token_key, value=token_data) | ||
| return result | ||
| start_time = time.time() | ||
| # *********** 后台登录结束 *********** | ||
| if not url.startswith(("http://", "https://")): | ||
| raise Exception("请输入正确的url, 记得带上http:// 或者 https:// 哦") | ||
| # 用户传了headers,就用用户的,不传就用默认的 | ||
| headers = kwargs.get("headers", self.default_header) | ||
| # *********** 前端登录开始 *********** | ||
| # XXXXXXXX 支持扩展 XXXXXXXXX | ||
| # *********** 前端登录结束 ************* | ||
| if x_token: | ||
| headers["_token_"] = x_token.strip('"') # strip() 方法用于移除字符串头尾指定的字符(默认为空格或换行符)或字符序列。 | ||
| if method == "get": | ||
| result = self.session.request(url=url, method=method, params=body, headers=headers,timeout=timeout,**kwargs) | ||
| elif method == "post": | ||
| self.logging.info(f'body_type类型:{body_type}') | ||
| if body_type == Body_type.json: | ||
| headers['Content-Type'] = 'application/json; charset=UTF-8' | ||
| result = self.session.request(url=url, method=method, json=body, headers=headers,timeout=timeout,**kwargs) | ||
| elif body_type == Body_type.form_file: | ||
| filename = file_path.split('\\\\')[-1] # xiaoxin.png | ||
| # 通过 mimetypes 来自动识别文件类型 : https://cloud.tencent.com/developer/section/1369143 | ||
| fileType = MimeTypes().guess_type(file_path)[0] | ||
| # 没有识别到就不传 content_type | ||
| if fileType is None: | ||
| # files = {file_key: ('xiaoxin.png',open(file_path, 'rb'))} | ||
| files = {'file_key': (filename, open(file_path, 'rb'))} | ||
| # 也可以用已下的方法,不用加 filename | ||
| # files = {file_key: open(file_path, 'rb'))} | ||
| # files = {'file_key': open(file_path, 'rb')} | ||
| else: | ||
| # files = {file_key: ('xiaoxin.png',open(file_path, 'rb'),'image/png')} | ||
| files = {file_key: (filename, open(file_path, 'rb'), fileType)} | ||
| body.update(files) | ||
| # 把要传入的数据 转变为form_data格式 | ||
| form_data = MultipartEncoder(body) | ||
| # 以下命令自动 转变 headers 中的 Content-Type 为:'multipart/form-data; boundary=。。。。。。。。。。。 | ||
| headers['Content-Type'] = form_data.content_type | ||
| # :param data:(可选)字典,元组列表,字节,或文件类对象发送到:class: 'Request'的主体中。 | ||
| result = self.session.request(url=url, method=method, data=form_data, headers=headers,timeout=timeout,**kwargs) | ||
| elif body_type == Body_type.form_text: | ||
| # 把要传入的数据 转变为form_data格式 | ||
| form_data = MultipartEncoder(body) | ||
| # 以下命令自动 转变 headers 中的 Content-Type 为:'multipart/form-data; boundary=。。。。。。。。。。。 | ||
| headers['Content-Type'] = form_data.content_type | ||
| result = self.session.request(url=url, method=method, data=form_data, headers=headers,timeout=timeout,**kwargs) | ||
| elif body_type == Body_type.binary: | ||
| files = {file_key: open(file_path, 'rb')} | ||
| # 文件流通过files 传给request的请求参数files | ||
| result = self.session.request(url=url, method=method, json=body, headers=headers, files=files,timeout=timeout,**kwargs) | ||
| else: | ||
| raise ValueError(f"=====body_type没有定义,{body_type} 请确认====") | ||
| elif method == "patch": | ||
| result = self.session.request(url=url, method=method, data=json.dumps(body), headers=headers, timeout=timeout,**kwargs) | ||
| elif method == "delete": | ||
| result = '' | ||
| elif method == "put": | ||
| result = '' | ||
| else: | ||
| raise ValueError(f"=====大兄弟===暂不支持{method} 请求呢====需要就自己补充吧====") | ||
| end_time = time.time() | ||
| # python 内置函数 保留4位小数 | ||
| time_ = round((end_time-start_time), 4) | ||
| # 处理 | ||
| result = AamtResponse(result) | ||
| try: | ||
| self.logging.info(f'\\n请求日志:\\nurl: {url}\\nmethod: {method}\\nbody: \\n{body}\\nbody_type: {body_type}\\nheaders: \\n{headers}\\n**********************************************************************************') | ||
| self.logging.debug(f'\\n响应日志:\\n响应码: {result.status_code}\\n请求>响应 时间开销: {time_}\\n**********************************************************************************\\n') | ||
| except AttributeError: | ||
| self.logging.error( | ||
| f'\\n无法获取响应码, 响应日志:\\n{result}\\n请求>响应 时间开销: {time_}\\n**********************************************************************************\\n') | ||
| except TypeError: | ||
| self.logging.warning(f'警告:{kwargs}') | ||
| self.__create_request_log(url, method, body, body_type, headers) | ||
| try: | ||
| self.__create_response_log(result.status_code, result.json(),time_) | ||
| return result.json() | ||
| except: | ||
| self.__create_response_log(result.status_code, result.text,time_) | ||
| self.logging.warning(f'\\n注意 响应内容:不可以序列化,具体响应如下:\\n{result.text}') | ||
| return result.text | ||
| def get_full_url(self, url, etc={}, replace={}, h=""): | ||
| if h: | ||
| host = h.rstrip('/') # rstrip() 删除 string 字符串末尾的指定字符(默认为空格). | ||
| else: | ||
| host = self.host.rstrip('/') | ||
| url = url.lstrip('/') # lstrip() 方法用于截掉字符串左边的空格或指定字符。 | ||
| full_url = host + "/" + url | ||
| # full_url += "?" | ||
| full_url += "?platform={}".format(self.environ_active) | ||
| if etc: | ||
| s = urlencode(etc) # urlencode urllib库里面有个urlencode函数,可以把key-value这样的键值对转换成我们想要的格式,返回的是a=1&b=2这样的字符串 | ||
| full_url += "&" + s | ||
| if replace: | ||
| full_url = full_url.format(replace) # str.format() 方法通过字符串中的花括号 {} 来识别替换字段 replacement field,从而完成字符串的格式化。 | ||
| # full_url = str.format(full_url,replace) #str.format() 方法通过字符串中的花括号 {} 来识别替换字段 replacement field,从而完成字符串的格式化。 | ||
| return full_url | ||
| # 目的就是 在allure显示,没什么实际意义 | ||
| @allure.step("请求日志") | ||
| def __create_request_log(self, url, method, body, body_type, headers): | ||
| pass | ||
| # 目的就是 在allure显示 | ||
| @allure.step('响应日志') | ||
| def __create_response_log(self, status_code, text,time_): | ||
| pass | ||
| class AamtResponse(Response): | ||
| # 包装requests.Response,简化jmespath写法 | ||
| def __init__(self, response): | ||
| super().__init__() | ||
| for k, v in response.__dict__.items(): | ||
| self.__dict__[k] = v | ||
| def jmespath(self, expression): | ||
| return jmespath.search(expression, self.json()) | ||
| if __name__ == '__main__': | ||
| a = HttpClient() | ||
| a.send(url='http://www.baidu.com',method='get', body={}, body_type=Body_type.json) | ||
| """ | ||
@@ -219,13 +86,7 @@ public_api_content = """ | ||
| # @Time: 5月 05, 2022 | ||
| import sys | ||
| import allure | ||
| import jmespath | ||
| from aamt.client import HttpClient | ||
| from aamt.config import * | ||
| from aamt.logger import Logger | ||
| from until.client import * | ||
| from common.project import * | ||
| from common.mysqlhelper import MysqlHelper | ||
| sys.path.append("..") | ||
@@ -243,4 +104,5 @@ | ||
| self.token = xin_token | ||
| Logger.warning(f'读取到的最新token:{xin_token}') | ||
| # logger.warning(f'读取到的最新token:{xin_token}') | ||
| @allure.step('新增全新公司:{name}') | ||
@@ -289,18 +151,5 @@ def add_company(self, name='xuefeng', cid=''): | ||
| @allure.step('新增公司模板:{name},并给公司模板分配所有权限') | ||
| def company_template_assign_permissions(self, name='auto_template', resourceList=[]): | ||
| body = {"isDefault": "0", "name": name, "note": "自动化", "resourceList": resourceList} | ||
| url = 'system/resourceTemplate/saveTemplate' | ||
| method = 'post' | ||
| url = self.get_full_url(url, h=self.host) | ||
| ret = self.send(url=url, method=method, body=body, body_type=self.json, x_token=self.token) | ||
| if jmespath.search('code', ret) == 200: | ||
| print(f'公司模板 {name} 新增并分配权限成功') | ||
| elif jmespath.search('code', ret) == 500: | ||
| assert '存在' in ret['message'], f'异常, 新增公司模板 {name} 功能异常,请检查' | ||
| else: | ||
| assert False, f'异常,公司模板分配权限异常,接口响应信息;{ret}' | ||
| """ | ||
| """ | ||
| brand_controller_api_content = """ | ||
@@ -313,7 +162,5 @@ | ||
| from api.brand.route import * | ||
| from aamt.client import * | ||
| from until.client import * | ||
| sys.path.append("..") | ||
| class Brand_ControllerApi(HttpClient): | ||
@@ -400,8 +247,9 @@ ''' 初始化-传入配置文件里的token值 然后调用 依赖token的其他方法 ,比如加购物车 查看下订单等等 ''' | ||
| brand_route_content = """ | ||
| from aamt.config import * | ||
| from common.project import * | ||
| "后台品牌接口路径(前台)" | ||
| brand_controller = Read_yaml().get_test_yaml(filepath="/brand/brand_controller.yaml") | ||
| brand_controller = Read_yaml().get_test_yaml(filepath="brand/brand_controller.yaml") | ||
| """ | ||
| fixture_admin_content = """ | ||
@@ -414,3 +262,3 @@ # -*- coding: utf-8 -*- | ||
| # @E-mail: 120158568@qq.com, | ||
| # @Site: www.51automate.cn | ||
| # @Site: 51automate.cn | ||
| # @Time: 11月 25, 2022 | ||
@@ -420,3 +268,4 @@ | ||
| import pytest | ||
| from aamt.config import * | ||
| from common.project import * | ||
| from loguru import logger | ||
@@ -426,4 +275,7 @@ # 管理员维护 | ||
| def env_vars_data(): | ||
| return Read_yaml().get_env_vars_yaml() | ||
| read_yaml = Read_yaml() | ||
| logger.info(f'当前激活的环境是:{read_yaml.read_environ_active()}') | ||
| return read_yaml.get_env_vars_yaml() | ||
| """ | ||
@@ -437,3 +289,3 @@ fixture_xf_content = """ | ||
| # @E-mail: 120158568@qq.com, | ||
| # @Site: www.51automate.cn | ||
| # @Site: 51automate.cn | ||
| # @Time: 11月 25, 2022 | ||
@@ -443,3 +295,3 @@ | ||
| import pytest | ||
| from aamt.logger import Logger | ||
| from loguru import logger | ||
@@ -449,6 +301,11 @@ from api.logintoken import Login_after | ||
| @pytest.fixture(scope="session") | ||
| def systerm_admin_login(env_vars_data): | ||
| Login_after(env_vars_data['systerm'], 'systerm_admin_token') | ||
| def systerm_admin_login(aamt_context_manager, env_vars_data): | ||
| # aamt_context_manager是为了兼容pytest-xdist分布式执行的上下文管理器 | ||
| # 该login只会在整个运行期间执行一次 | ||
| def produce_expensive_data(variable): | ||
| # 这里的登录方法 ,抽离出来,1、方便换不同的账号登录 2、遇到前后台不同的方法登录,也可以有效的区分开,统一管理 | ||
| return Login_after(variable['systerm'], 'systerm_admin_token').login_() | ||
| return aamt_context_manager(produce_expensive_data, env_vars_data) | ||
@@ -459,3 +316,3 @@ | ||
| def init_admin(systerm_admin_login): | ||
| Logger.info('调用公共方法 准备基础数据,返回一个实例') | ||
| logger.info('调用公共方法 准备基础数据,返回一个实例') | ||
| pub_d = Public(token='systerm_admin_token') | ||
@@ -466,14 +323,21 @@ yield pub_d | ||
| # --------------- xuefeng 医生1 登录及其数据初始化 开始 ----------- | ||
| @pytest.fixture(scope="session") | ||
| def login_doctor1(env_vars_data): | ||
| Login_after(env_vars_data['xuefeng_doctor1'], 'xuefeng_doctor1_token') | ||
| def login_doctor1(aamt_context_manager, env_vars_data): | ||
| # aamt_context_manager是为了兼容pytest-xdist分布式执行的上下文管理器 | ||
| # 该login只会在整个运行期间执行一次 | ||
| def produce_expensive_data(variable): | ||
| # 这里的登录方法 ,抽离出来,1、方便换不同的账号登录 2、遇到前后台不同的方法登录,也可以有效的区分开,统一管理 | ||
| return Login_after(variable['xuefeng_doctor1'], 'xuefeng_doctor1_token').login_() | ||
| return aamt_context_manager(produce_expensive_data, env_vars_data) | ||
| @pytest.fixture(scope="session", autouse=True) # 自动运行 | ||
| # @pytest.fixture(scope="session") # 调用才运行 | ||
| def init_doctor1(login_doctor1): | ||
| # @pytest.fixture(scope="session", autouse=True) # 自动运行 | ||
| @pytest.fixture(scope="session") # 调用才运行 | ||
| def init_admin(systerm_admin_login): | ||
| logger.info('调用公共方法 准备基础数据,返回一个实例') | ||
| pub_d = Public(token='xuefeng_doctor1_token') | ||
| yield pub_d | ||
| # --------------- xuefeng 医生1 登录及其数据初始化 结束 ----------- | ||
@@ -483,14 +347,3 @@ | ||
| # --------------- xuefeng 医生2 登录及其数据初始化 开始 ----------- | ||
| @pytest.fixture(scope="session") | ||
| def login_doctor2(env_vars_data): | ||
| Login_after(env_vars_data['xuefeng_doctor2'], 'xuefeng_doctor2_token') | ||
| # @pytest.fixture(scope="session", autouse=True) # 自动运行 | ||
| @pytest.fixture(scope="session") # 调用才运行 | ||
| def init_doctor2(login_doctor2): | ||
| pub_d = Public(token='xuefeng_doctor2_token') | ||
| yield pub_d | ||
| 内容同上 | ||
| # --------------- xuefeng 医生2 登录及其数据初始化 结束 ----------- | ||
@@ -500,14 +353,20 @@ | ||
| # --------------- xuefeng2 护士1 登录及其数据初始化 开始 ----------- | ||
| @pytest.fixture(scope="session") | ||
| def login_nurse1(env_vars_data): | ||
| print('xuefeng2_nurse1 登录') | ||
| Login_after(env_vars_data['xuefeng2_nurse1'], 'xuefeng2_nurse1_token') | ||
| # @pytest.fixture(scope="session", autouse=True) | ||
| @pytest.fixture(scope="session") | ||
| def init_nurse1(login_nurse1): | ||
| pub_n = Public(token='xuefeng2_nurse1_token') | ||
| yield pub_n | ||
| def login_nurse1(aamt_context_manager, env_vars_data): | ||
| # aamt_context_manager是为了兼容pytest-xdist分布式执行的上下文管理器 | ||
| # 该login只会在整个运行期间执行一次 | ||
| def produce_expensive_data(variable): | ||
| # 这里的登录方法 ,抽离出来,1、方便换不同的账号登录 2、遇到前后台不同的方法登录,也可以有效的区分开,统一管理 | ||
| return Login_after(variable['xuefeng2_nurse1'], 'xuefeng2_nurse1_token').login_() | ||
| return aamt_context_manager(produce_expensive_data, env_vars_data) | ||
| # @pytest.fixture(scope="session", autouse=True) # 自动运行 | ||
| @pytest.fixture(scope="session") # 调用才运行 | ||
| def init_admin(systerm_admin_login): | ||
| logger.info('调用公共方法 准备基础数据,返回一个实例') | ||
| pub_d = Public(token='xuefeng2_nurse1_token') | ||
| yield pub_d | ||
@@ -518,120 +377,37 @@ # --------------- xuefeng2 护士1 登录及其数据初始化 结束 ----------- | ||
| # --------------- xuefeng2 护士2 登录及其数据初始化 开始 ----------- | ||
| @pytest.fixture(scope="session") | ||
| def login_nurse2(env_vars_data): | ||
| print('xuefeng2_nurse2 登录') | ||
| Login_after(env_vars_data['xuefeng2_nurse2'], 'xuefeng2_nurse2_token') | ||
| # @pytest.fixture(scope="session", autouse=True) | ||
| @pytest.fixture(scope="session") | ||
| def init_nurse2(login_nurse2): | ||
| pub_n = Public(token='xuefeng2_nurse2_token') | ||
| yield pub_n | ||
| 内容同上 | ||
| # --------------- xuefeng2 护士2 登录及其数据初始化 结束 ----------- | ||
| """ | ||
| conftest_content = """ | ||
| fixture_zhangsan_content = """ | ||
| # -*- coding: utf-8 -*- | ||
| # @Software: PyCharm | ||
| # @File: conftest.py | ||
| # @File: fixture_zhangsan.py | ||
| # @Author: xuefeng365 | ||
| # @E-mail: 120158568@qq.com, | ||
| # @Site: www.51automate.cn | ||
| # @Site: 51automate.cn | ||
| # @Time: 11月 25, 2022 | ||
| # import sys | ||
| # sys.path.append("..") | ||
| from aamt.plugin import aamt_plugins | ||
| # 内容参考 fixture_zhangsan.py | ||
| # 加载aamt插件 | ||
| pytest_plugins = aamt_plugins() | ||
| """ | ||
| test_brand_content = """ | ||
| # -*- coding: UTF-8 -*- | ||
| import sys | ||
| conftest_content1 = """ | ||
| # -*- coding: utf-8 -*- | ||
| import allure | ||
| import pytest | ||
| # @Software: PyCharm | ||
| # @File: conftest.py | ||
| # @Author: xuefeng365 | ||
| # @E-mail: 120158568@qq.com, | ||
| # @Site: 51automate.cn | ||
| # @Time: 11月 25, 2022 | ||
| from api.brand.brand_controller_api import Brand_ControllerApi | ||
| from api.brand.route import * | ||
| from common.assert_api import assert_api | ||
| # 占位 | ||
| sys.path.append("..") | ||
| """ | ||
| # 后台身份会话 | ||
| @pytest.fixture(scope="session") | ||
| def xuefeng_brand_controller(): | ||
| return Brand_ControllerApi("gz_doctor1_token") | ||
| # 数据清理 -- 接口 | ||
| @pytest.fixture() # 函数级别 | ||
| def del_brand(xuefeng_brand_controller): | ||
| # xuefeng_brand_controller.del_brand() | ||
| yield | ||
| # 清楚自动化品牌数据 | ||
| xuefeng_brand_controller.del_brand() | ||
| data = brand_controller['add_brand']['case_data'] | ||
| @allure.feature('测试品牌模块') | ||
| @allure.story('增加品牌功能') | ||
| # @pytest.mark.smoke | ||
| # @pytest.mark.skip | ||
| class Test_add_brand1(): | ||
| @pytest.mark.parametrize("case_data", data, ids=[data[i]['case_name'] for i in range(len(data))]) # 参数化测试用例 | ||
| def test_add_brand(self, case_data, xuefeng_brand_controller, del_brand): | ||
| allure.dynamic.title(f'title:{case_data["case_name"]}') | ||
| case_body = case_data['body'] | ||
| ret = xuefeng_brand_controller.add_brand(name=case_body['name'], | ||
| note=case_body['note'], | ||
| id=case_body['id'], | ||
| iconFile=case_body['iconFile'], | ||
| pictureUrl=case_body['pictureUrl'] | ||
| ) | ||
| assert_api(ret, expect_data=case_data["expect"]) | ||
| data = brand_controller['del_brand']['case_data'] | ||
| @allure.feature('测试品牌模块') | ||
| @allure.story('删除品牌功能') | ||
| # @pytest.mark.smoke | ||
| # @pytest.mark.skip | ||
| class Test_del_brand1(): | ||
| @pytest.mark.parametrize("case_data", data, ids=[data[i]['case_name'] for i in range(len(data))]) # 参数化测试用例 | ||
| def test_add_brand(self, case_data, xuefeng_brand_controller): | ||
| allure.dynamic.title(f'title:{case_data["case_name"]}') | ||
| case_body = case_data['body'] | ||
| ret = xuefeng_brand_controller.del_brand(brand_name=case_body['name']) | ||
| assert ret and ret['message'] == 'success', f'删除品牌失败,请检查删除功能是否异常 或者 要删除的品牌:{case_body["name"]} 是否存在' | ||
| def test1(): | ||
| pass | ||
| if __name__ == "__main__": | ||
| pytest.main(['-m', __file__]) | ||
| # pytest.main(['-m smoke',__file__]) | ||
| """ | ||
| assert_api_content = """ | ||
| from common.logger import * | ||
| from loguru import logger | ||
@@ -657,276 +433,4 @@ # @decorate_log | ||
| """ | ||
| config_content = """ | ||
| # -*- coding: UTF-8 -*- | ||
| # @Software: PyCharm | ||
| # @File: config.py | ||
| # @Author: xuefeng365 | ||
| # @E-mail: 120158568@qq.com, | ||
| # @Site: | ||
| # @Time: 11月 23, 2022 | ||
| import configparser | ||
| import os | ||
| import yaml | ||
| class Config(): | ||
| # 全局项目根目录 | ||
| project_root_dir = '' | ||
| # project_root_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) | ||
| class Operate_config(Config): | ||
| def __init__(self): | ||
| super().__init__() | ||
| self.env_configpath = os.path.join(self.project_root_dir, "resources", "aamt.ini") | ||
| # 实例化configParser对象 | ||
| self.conf = configparser.ConfigParser() | ||
| self.conf.read(self.env_configpath, encoding="utf-8") | ||
| # 读取配置文件中的key值 (读取) | ||
| def read_token(self, section='Token', key=''): | ||
| return self.conf.get(section, key) | ||
| # 读取配置文件中的key值 (读取) | ||
| def read_environ_active(self, section='Environ', key='active'): | ||
| return self.conf.get(section, key) | ||
| # 将value的值写入配置文件中 | ||
| def write(self, section='Token', key='', value=''): | ||
| self.conf.set(section, key, value) # 给iphone分组设置 key:value (iphone_url:www.xxx.com) | ||
| # 写入文件 | ||
| with open(self.env_configpath, 'w', encoding="utf-8") as configfile: | ||
| self.conf.write(configfile) | ||
| class Read_yaml(Operate_config): | ||
| def __init__(self): | ||
| super().__init__() | ||
| def get_env_vars_yaml(self): | ||
| env_active = self.read_environ_active() | ||
| env_filename = f'env_vars_{env_active}.yaml' | ||
| with open( | ||
| os.path.join(self.project_root_dir, "resources", "env_vars", env_filename), encoding="utf-8") as f: | ||
| return yaml.load(f.read(), Loader=yaml.FullLoader) | ||
| def get_test_yaml(self, filepath): | ||
| ''' | ||
| filepath="/brand/brand_controller.yaml" | ||
| ''' | ||
| # 测试用例数据 | ||
| test_data_path = f"{Config.project_root_dir}/data/{filepath}".replace("\\\\", "/").replace("//", "/") | ||
| with open(test_data_path, encoding="utf-8") as f: | ||
| return yaml.load(f.read(), Loader=yaml.FullLoader) | ||
| def get_file_path(file_name, middle='file'): | ||
| ''' | ||
| file_name: 文件名,比如 xiaoxin.png | ||
| ''' | ||
| filePath = f"{Config.project_root_dir}/{middle}/{file_name}".replace("\\\\", "/").replace("//", "/") | ||
| return filePath | ||
| def fixture_paths(root_path=Config.project_root_dir): | ||
| ''' | ||
| fixture路径,1、项目下的fixtures;2、aamt下的fixture; | ||
| :return: | ||
| ''' | ||
| _fixtures_dir = os.path.join(root_path, "fixtures") | ||
| paths = [] | ||
| # 项目下的fixtures | ||
| for root, _, files in os.walk(_fixtures_dir): | ||
| for file in files: | ||
| if file.startswith("fixture_") and file.endswith(".py"): | ||
| full_path = os.path.join(root, file) | ||
| import_path = full_path.replace(_fixtures_dir, "").replace("\\\\", ".") | ||
| import_path = import_path.replace("/", ".").replace(".py", "") | ||
| paths.append("fixtures" + import_path) | ||
| # aamt下的fixture | ||
| paths.append("aamt.fixture") | ||
| return paths | ||
| """ | ||
| logger_content = """ | ||
| import logging | ||
| import os | ||
| import time | ||
| from functools import wraps | ||
| import colorlog | ||
| from aamt.config import * | ||
| # logging模块中包含的类 | ||
| # 用来自定义日志对象的规则(比如:设置日志输出格式、等级等) | ||
| # 常用子类:StreamHandler、FileHandler | ||
| # StreamHandler 控制台输出日志 | ||
| # FileHandler 日志输出到文件 | ||
| # BASE_PATH = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) | ||
| # print(BASE_PATH) | ||
| # 日志文件路径 | ||
| LOG_PATH = os.path.join(Config.project_root_dir, "log") | ||
| if not os.path.exists(LOG_PATH): | ||
| os.mkdir(LOG_PATH) | ||
| class Logger(): | ||
| def __init__(self): | ||
| log_colors_config = { | ||
| 'DEBUG': 'white', # cyan white | ||
| 'INFO': 'green', | ||
| 'WARNING': 'yellow', | ||
| 'ERROR': 'red', | ||
| 'CRITICAL': 'bold_red'} | ||
| # 创建一个logger日志对象 | ||
| self.logger = logging.getLogger("log") | ||
| # 创建日志格式(不带颜色)对象 | ||
| self.file_formater = logging.Formatter( | ||
| fmt='[%(asctime)s.%(msecs)03d] %(filename)s -> %(funcName)s line:%(lineno)d [%(levelname)s] : %(message)s', | ||
| datefmt='%Y-%m-%d %H:%M:%S') | ||
| # 创建颜色格式((带颜色))对象 | ||
| self.console_formatter = colorlog.ColoredFormatter( | ||
| fmt='%(log_color)s[%(asctime)s.%(msecs)03d] %(filename)s -> %(funcName)s line:%(lineno)d [%(levelname)s] : %(message)s', | ||
| datefmt='%Y-%m-%d %H:%M:%S', | ||
| log_colors=log_colors_config) | ||
| # 输出到控制台 | ||
| self.console = logging.StreamHandler() | ||
| self.logname = os.path.join(LOG_PATH, "{}.log".format(time.strftime("%Y%m%d"))) # 创建日志路径和时间 | ||
| # 输出到文件:获取日志路径,并创建FileHandler对象()-- 设定写入方式:追加 | ||
| self.filelogger = logging.FileHandler(self.logname, mode='a', encoding="UTF-8") | ||
| # 设置默认的日志级别 控制台logger 和 handler以最高级别为准,不同handler之间可以不一样,不相互影响 | ||
| self.logger.setLevel(logging.INFO) | ||
| self.console.setLevel(logging.INFO) | ||
| self.filelogger.setLevel(logging.INFO) | ||
| # self.logger.setLevel(logging.INFO) | ||
| # self.logger.setLevel(logging.WARNING) | ||
| # 设置控制台日志的颜色 | ||
| self.console.setFormatter(self.console_formatter) | ||
| # 设置文件日志的颜色 | ||
| self.filelogger.setFormatter(self.file_formater) | ||
| # 重复日志问题: | ||
| # 1、防止多次addHandler; | ||
| # 2、loggername 保证每次添加的时候不一样; | ||
| # 3、显示完log之后调用removeHandler | ||
| if not self.logger.handlers: | ||
| self.logger.addHandler(self.console) | ||
| self.logger.addHandler(self.filelogger) | ||
| self.console.close() | ||
| self.filelogger.close() | ||
| Logger = Logger().logger | ||
| def decorate_log(func): | ||
| @wraps(func) | ||
| # Python装饰器中@wraps作用 https://blog.csdn.net/weixin_40576010/article/details/88639686 | ||
| def log(*args, **kwargs): | ||
| Logger.info(f'-- 开始执行 {func.__name__} --') | ||
| try: | ||
| func(*args, **kwargs) | ||
| except Exception as e: | ||
| Logger.error(f'-- {func.__name__}执行失败,原因:{e} --') | ||
| raise e | ||
| else: | ||
| Logger.info(f'-- {func.__name__} 执行成功 --') | ||
| return log | ||
| # 调试用 | ||
| @decorate_log | ||
| def xuefeng(): | ||
| assert 2 == 2 | ||
| if __name__ == '__main__': | ||
| xuefeng() | ||
| Logger.info("---测试开始---") | ||
| Logger.warning("---测试开始---") | ||
| Logger.error("---测试结束---") | ||
| Logger.debug("---测试结束---") | ||
| """ | ||
| logintoken_content = """ | ||
| # -*- coding: utf-8 -*- | ||
| import hashlib | ||
| import jmespath | ||
| from aamt.client import HttpClient | ||
| from aamt.config import * | ||
| from aamt.logger import Logger | ||
| # *********** 后台登录开始 *********** | ||
| class Login_after(HttpClient): | ||
| def __init__(self, user_info, token): | ||
| # 传入token(coolie 或者 session)字段:目的为了写入配置文件 | ||
| super().__init__() # 继承上个类的ini | ||
| self.username = user_info['account'] | ||
| # ----------- 密码进行MD5加密 ------------ # | ||
| mima = str(user_info['password']) | ||
| # md5加密对象 | ||
| md5 = hashlib.md5() | ||
| # 填入要加密的字符串 | ||
| md5.update(mima.encode('utf-8')) | ||
| # 转化为16进制字符串 | ||
| new_mima = md5.hexdigest() | ||
| # ----------- MD5加密结束 ------------ # | ||
| self.password = new_mima | ||
| result = self.login_after(username=self.username, password=self.password) | ||
| assert jmespath.search('code', result) == 200, f"系统管理登录失败,接口响应 \\n {result}\\n" | ||
| self.token = result['result']['token'] | ||
| Logger.info(f'账号:{self.username} 登录成功,新token:{self.token}') | ||
| # 把token值写入配置文件中 | ||
| Operate_config().write(section="Token", key=token, value=self.token) | ||
| def login_after(self, username, password): | ||
| etc = { | ||
| "account": username | ||
| , "password": password | ||
| } | ||
| url = '/system/userInfo/login' | ||
| method = 'get' | ||
| url = self.get_full_url(url, etc=etc, h=self.env_vars_data['after_host']) | ||
| return self.send(method=method, url=url) | ||
| # *********** 后台登录结束 *********** | ||
| # *********** 前端登录开始 *********** | ||
| # XXXXXXXX 支持扩展 XXXXXXXXX | ||
| # *********** 前端登录结束 ************* | ||
| """ | ||
| mysqlhelper_content = """ | ||
@@ -1284,32 +788,182 @@ import json | ||
| """ | ||
| read_token_content = """ | ||
| # -*- coding: utf-8 -*- | ||
| project_content = """ | ||
| # -*- coding: UTF-8 -*- | ||
| # @Software: PyCharm | ||
| # @File: read_token.py | ||
| # @File: project.py | ||
| # @Author: xuefeng365 | ||
| # @E-mail: 120158568@qq.com, | ||
| # @Site: www.51automate.cn | ||
| # @Time: 11月 25, 2022 | ||
| # @Site: | ||
| # @Time: 11月 23, 2022 | ||
| import os | ||
| import configparser | ||
| import yaml | ||
| from loguru import logger | ||
| from aamt.config import Operate_config | ||
| class Config: | ||
| # 全局项目根目录 | ||
| # root_dir = os.path.dirname(os.path.dirname(os.getcwd())) | ||
| root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | ||
| # logger.info(f'全局项目根目录:{root_dir}') | ||
| def read_token(): | ||
| # 获取配置文件中的key值 (token值) | ||
| class Clazz: | ||
| Get = Operate_config() | ||
| systerm_admin_token = Get.read_token(key='systerm_admin_token') | ||
| xuefeng2_doctor1_token = Get.read_token(key='xuefeng_doctor1_token') | ||
| xuefeng2_doctor2_token = Get.read_token(key='xuefeng_doctor2_token') | ||
| xuefeng_nurse1_token = Get.read_token(key='xuefeng2_nurse1_token') | ||
| xuefeng_nurse2_token = Get.read_token(key='xuefeng2_nurse2_token') | ||
| data_dir = os.path.join(root_dir, "data") | ||
| file_dir = os.path.join(root_dir, "file") | ||
| gz_doctor1_token = Get.read_token(key='gz_doctor1_token') | ||
| gz_nurse1_token = Get.read_token(key='gz_nurse1_token') | ||
| settings = Config() | ||
| return Clazz | ||
| class Operate_token(): | ||
| def __init__(self): | ||
| super().__init__() | ||
| self.env_configpath = os.path.join(settings.root_dir, "resources", "aamt.ini") | ||
| # 实例化configParser对象 | ||
| self.conf = configparser.ConfigParser() | ||
| self.conf.read(self.env_configpath, encoding="utf-8") | ||
| # 读取配置文件中的key值 (读取) | ||
| def read_token(self, section='Token', key=''): | ||
| return self.conf.get(section, key) | ||
| # 将value的值写入配置文件中 | ||
| def write_token(self, section='Token', key='', value=''): | ||
| self.conf.set(section, key, value) # 给iphone分组设置 key:value (iphone_url:www.xxx.com) | ||
| # 写入文件 | ||
| with open(self.env_configpath, 'w', encoding="utf-8") as configfile: | ||
| self.conf.write(configfile) | ||
| # 读取配置文件中的key值 (读取) | ||
| def read_environ_active(self, section='Environ', key='active'): | ||
| return self.conf.get(section, key) | ||
| class Read_yaml(Operate_token): | ||
| def __init__(self): | ||
| super().__init__() | ||
| def get_env_vars_yaml(self): | ||
| # 获取当前激活的环境 | ||
| env_active = self.read_environ_active() | ||
| env_filename = f'env_vars_{env_active}.yaml' | ||
| with open( | ||
| os.path.join(settings.root_dir, "resources", "env_vars", env_filename), encoding="utf-8") as f: | ||
| return yaml.load(f.read(), Loader=yaml.FullLoader) | ||
| def get_test_yaml(self, filepath): | ||
| ''' | ||
| filepath="/brand/brand_controller.yaml" | ||
| ''' | ||
| # 测试用例数据路径 | ||
| test_data_path = os.path.join(settings.data_dir, filepath) | ||
| # logger.info(f'测试用例数据路径:{test_data_path}') | ||
| with open(test_data_path, encoding="utf-8") as f: | ||
| return yaml.load(f.read(), Loader=yaml.FullLoader) | ||
| def get_file_path(file_name, middle='file'): | ||
| ''' | ||
| file_name: 文件名,比如 xiaoxin.png | ||
| ''' | ||
| filePath = os.path.join(settings.root_dir, middle,file_name) | ||
| return filePath | ||
| if __name__ == '__main__': | ||
| root_dir = os.path.dirname(os.path.dirname(os.getcwd())) | ||
| print(f'项目根路径:{settings.root_dir}') | ||
| test_data_path = os.path.join(settings.data_dir, "./brand/brand_controller.yaml") | ||
| print(f'数据文件路径:{test_data_path}') | ||
| # 在Windows系统下 路径可以是`D:\\12-12\\xx`, 也可以是`D:/12-12/xx` | ||
| """ | ||
| test_brand_content = """ | ||
| # -*- coding: UTF-8 -*- | ||
| import allure | ||
| import pytest | ||
| from api.brand.brand_controller_api import Brand_ControllerApi | ||
| from api.brand.route import * | ||
| from common.assert_api import assert_api | ||
| from loguru import logger | ||
| # 后台身份会话 | ||
| @pytest.fixture(scope="session") | ||
| def xuefeng_brand_controller(): | ||
| return Brand_ControllerApi("xuefeng_doctor1_token") | ||
| # 数据清理 -- 接口 | ||
| @pytest.fixture() # 函数级别 | ||
| def del_brand(xuefeng_brand_controller): | ||
| # xuefeng_brand_controller.del_brand() | ||
| yield | ||
| # 清楚自动化品牌数据 | ||
| xuefeng_brand_controller.del_brand() | ||
| data = brand_controller['add_brand']['case_data'] | ||
| @allure.feature('测试品牌模块') | ||
| @allure.story('增加品牌功能') | ||
| # @pytest.mark.smoke | ||
| # @pytest.mark.skip | ||
| class Test_add_brand1(): | ||
| @pytest.mark.parametrize("case_data", data, ids=[data[i]['case_name'] for i in range(len(data))]) # 参数化测试用例 | ||
| def test_add_brand(self, case_data, xuefeng_brand_controller, del_brand): | ||
| allure.dynamic.title(f'title:{case_data["case_name"]}') | ||
| case_body = case_data['body'] | ||
| ret = xuefeng_brand_controller.add_brand(name=case_body['name'], | ||
| note=case_body['note'], | ||
| id=case_body['id'], | ||
| iconFile=case_body['iconFile'], | ||
| pictureUrl=case_body['pictureUrl'] | ||
| ) | ||
| assert_api(ret, expect_data=case_data["expect"]) | ||
| data = brand_controller['del_brand']['case_data'] | ||
| @allure.feature('测试品牌模块') | ||
| @allure.story('删除品牌功能') | ||
| # @pytest.mark.smoke | ||
| # @pytest.mark.skip | ||
| class Test_del_brand1(): | ||
| @pytest.mark.parametrize("case_data", data, ids=[data[i]['case_name'] for i in range(len(data))]) # 参数化测试用例 | ||
| def test_add_brand(self, case_data, xuefeng_brand_controller): | ||
| allure.dynamic.title(f'title:{case_data["case_name"]}') | ||
| case_body = case_data['body'] | ||
| ret = xuefeng_brand_controller.del_brand(brand_name=case_body['name']) | ||
| assert ret and ret['message'] == 'success', f'删除品牌失败,请检查删除功能是否异常 或者 要删除的品牌:{case_body["name"]} 是否存在' | ||
| if __name__ == "__main__": | ||
| pytest.main(['-m', __file__]) | ||
| # pytest.main(['-m smoke',__file__]) | ||
| """ | ||
| brand_controller_content = """ | ||
@@ -1330,3 +984,3 @@ add_brand: | ||
| - case_name: case1_后台新增品牌,必填参数全部正确入参,新增成功 | ||
| - case_name: case1_后台新增品牌,品牌名不填,其他必填参数全部正确入参,新增失败 | ||
| body: {"name":"", | ||
@@ -1349,4 +1003,6 @@ "note":"自动化新建", | ||
| expect: {'code': 200, 'message': 'success'} | ||
| """ | ||
| aamt_content = """ | ||
| aamt_ini_content = """ | ||
| [Token] | ||
@@ -1362,3 +1018,5 @@ systerm_admin_token = eH-nczOHaPLWo4m9Lnk | ||
| [Environ] | ||
| active = test | ||
| ;active = test | ||
| active = uat | ||
| """ | ||
@@ -1485,153 +1143,34 @@ env_vars_test_yaml_content = """ | ||
| """ | ||
| clear_results_content = """ | ||
| import os | ||
| import shutil | ||
| # shutil.rmtree( src ) #递归删除一个目录以及目录内的所有内容 | ||
| # os.makedirs() 方法用于递归创建目录。 | ||
| # 解决allure报告缓存和Jenkins无文件报错 | ||
| main_content = """ | ||
| def clear_allure(): | ||
| filepath = (os.path.abspath(os.path.dirname(os.path.dirname(__file__))) + "/report/allure-results/") | ||
| if os.path.exists(filepath): | ||
| shutil.rmtree("{}".format(filepath)) | ||
| os.makedirs("{}".format(filepath)) | ||
| else: | ||
| os.makedirs("{}".format(filepath)) | ||
| path_report = (os.path.abspath(os.path.dirname(os.path.dirname(__file__))) + "/report/allure-report") | ||
| if os.path.exists(path_report): | ||
| shutil.rmtree("{}".format(path_report)) | ||
| if __name__ == '__main__': | ||
| print(os.path.abspath(os.path.dirname(__file__)) + "/report/allure-results/") | ||
| """ | ||
| fake_content = """ | ||
| from faker import Faker | ||
| from common.logger import Logger | ||
| fake = Faker("zh_CN") | ||
| class Faker_: | ||
| def __init__(self): | ||
| self.fake = Faker("zh_CN") | ||
| # 选择中文 | ||
| def get_phone_number(self): | ||
| return self.fake.phone_number() | ||
| def get_name(self): | ||
| return self.fake.name() | ||
| def get_email(self): | ||
| return self.fake.email() | ||
| def md5(self): | ||
| return self.fake.md5() | ||
| if __name__ == "__main__": | ||
| test = Faker_() | ||
| print(test.get_name()) | ||
| print(test.get_phone_number()) | ||
| print(test.get_email()) | ||
| print('年月日:', fake.date(pattern=' %Y-%m-%d')) | ||
| print('随机年份:', fake.year) | ||
| print('随机年份:', fake.year) | ||
| print('随机月份:', fake.month) | ||
| print('随机几号:', fake.day_of_month) | ||
| print('随机星期数:', fake.day_of_week) | ||
| print('时间:', fake.time(pattern='%H:%M:%S')) | ||
| # -30y是过去30年前为开始日期,end_date表示结束到今天 | ||
| print('过去某一天:', fake.date_between(start_date="-30y", end_date="today")) | ||
| print('今天:', fake.date_between_dates) # 今天 | ||
| print('日期和时间:', fake.date_time) # 2021-05-14 19:36:00 | ||
| print('当前日期时间:', fake.date_time_between_dates) | ||
| # print('某个区间内随机日期时 间:', fake.date_time_between_dates(datetime_start=datetime(1999, 2, 2, 10, 30, 20), dat | ||
| # etime_end = datetime(2000, 2, 2, 10, 30, 20))) | ||
| print('未来的日期:', fake.future_date(end_date="+30d"), str(fake.future_date(end_date="+30d"))) | ||
| print('未来的日期时间:', fake.future_datetime(end_date="+30d"), | ||
| f'类型是:{type(fake.future_datetime(end_date="+30d"))}') # 未来日期和时间) | ||
| print('过去的日期:', fake.past_date(start_date="-30m")) # 过去日期 | ||
| print('过去的日期时间:', fake.past_datetime(start_date="-30d")) # 过去日期和时间 | ||
| print('时间戳:', fake.unix_time(), f'类型是:{type(fake.unix_time())}') | ||
| print('时间戳:', fake.time(), f'类型是:{type(fake.time())}') | ||
| print('随机md5:', fake.md5(), f'类型是:{type(fake.md5())}') | ||
| """ | ||
| main_content = """ | ||
| # -*- coding: UTF-8 -*- | ||
| import os | ||
| from random import randint | ||
| import pytest | ||
| from loguru import logger | ||
| from common.logger import Logger | ||
| from until.clear_results import clear_allure | ||
| if __name__ == '__main__': | ||
| Logger.info("Starting......... 走起 ......") | ||
| logger.info("Starting......... 走起 ......") | ||
| # # 执行用例 | ||
| # # 执行shell(cmd)命令 (不生成报告文件) -- 串行 | ||
| # os.system('pytest') | ||
| # 解决allure报告缓存和Jenkins无文件报错 | ||
| # 执行shell(cmd)命令 (同时生成报告文件) -- 串行 | ||
| os.system('pytest --aamt-reports') | ||
| # # 执行shell(cmd)命令 (同时生成报告文件) -- 并行 | ||
| # os.system('pytest -n auto --aamt-reports') | ||
| clear_allure() | ||
| # pytest.main(['-m','smoke']) | ||
| # pytest.main(["-s",'--workers=1', '--tests-per-worker=4']) | ||
| # # --workers=n:多进程运行需要加此参数, n是进程数。默认为1 | ||
| # --tests-per-worker=n:多线程需要添加此参数,n是线程数 | ||
| # 如果两个参数都配置了,就是进程并行,每个进程最多n个线程,总线程数:进程数*线程数 | ||
| # 注意:pytest-parallel支持python3.6及以上版本,如果是想做多进程并发的需要在linux平台或mac上做,windows上不起作用即(workers永远=1),如果是做多线程的Linux/Mac/Windows平台都支持,进程数为workers设置的值。 | ||
| pytest.main() | ||
| # 测试报告本地静态数据生成--allure generate ./allure-xml -o ./allure-result | ||
| os.system(r"allure generate ./report/allure-results -o ./report/allure-report --clean") | ||
| # # 执行结束后 自动启动web服务报告 | ||
| # port = randint(1000, 9999) | ||
| # os.system('allure open ./report/allure-report --port {}'.format(port)) # 打开报告 | ||
| # os.system('allure open ./report --port {}'.format(port)) # 打开报告 | ||
| # 失败重试 | ||
| # • 测试失败后要重新运行n次,要在重新运行之间添加延迟时 间,间隔n秒再运行。 | ||
| # • 执行: | ||
| # • 安装:pip install pytest-rerunfailures | ||
| # • pytest -v - -reruns 5 --reruns-delay 1 —每次等1秒 重试5次 | ||
| # 在windows下想用多进程的选pytest-xdist; 想用多线程的选pytest-parallel | ||
| """ | ||
| pytest_content = """ | ||
| [pytest] | ||
| ;addopts = -sq --strict -m smoke --alluredir ./report/allure-results | ||
| ;addopts = -sq --reruns 0 --reruns-delay 1 --alluredir ./report/allure-results | ||
| ;addopts = -sq --workers 1 --tests-per-worker 1 --alluredir ./report/allure-results | ||
| ;addopts = -sq --workers 1 --tests-per-worker 1 --reruns 3 --reruns-delay 1 --alluredir ./report/allure-results | ||
| addopts = -sq --alluredir ./report/allure-results | ||
| testpaths = ./case | ||
| ;python_files = test_order.py | ||
| ;python_files = test_brand.py | ||
| python_files = test_offer_doctor.py | ||
| ;python_files = test_offer_nurse.py | ||
| ;python_files = test_*.py | ||
@@ -1645,15 +1184,21 @@ ;python_classes = Test* | ||
| """ | ||
| requirements_content = """ | ||
| allure-pytest==2.11.1 | ||
| allure-python-commons==2.11.1 | ||
| aamt==0.2.0 | ||
| allure-pytest==2.12.0 | ||
| allure-python-commons==2.12.0 | ||
| attrs==22.1.0 | ||
| certifi==2022.9.24 | ||
| cffi==1.15.1 | ||
| charset-normalizer==2.1.1 | ||
| colorama==0.4.6 | ||
| colorlog==6.7.0 | ||
| cryptography==39.0.0 | ||
| et-xmlfile==1.1.0 | ||
| exceptiongroup==1.0.0 | ||
| execnet==1.9.0 | ||
| Faker==15.1.1 | ||
| Faker==15.3.4 | ||
| filelock==3.9.0 | ||
| greenlet==2.0.1 | ||
| idna==3.4 | ||
@@ -1663,2 +1208,3 @@ iniconfig==1.1.1 | ||
| jsonpath==0.82 | ||
| loguru==0.6.0 | ||
| openpyxl==3.0.10 | ||
@@ -1668,2 +1214,3 @@ packaging==21.3 | ||
| pycodestyle==2.9.1 | ||
| pycparser==2.21 | ||
| PyMySQL==1.0.2 | ||
@@ -1679,2 +1226,4 @@ pyparsing==3.0.9 | ||
| six==1.16.0 | ||
| SQLAlchemy==1.4.45 | ||
| texttable==1.6.7 | ||
| tomli==2.0.1 | ||
@@ -1685,29 +1234,134 @@ urllib3==1.26.12 | ||
| """ | ||
| conftest_content = """ | ||
| # -*- coding: utf-8 -*- | ||
| # @Software: PyCharm | ||
| # @File: conftest.py | ||
| # @Author: xuefeng365 | ||
| # @E-mail: 120158568@qq.com, | ||
| # @Site: 51automate.cn | ||
| # @Time: 11月 25, 2022 | ||
| import pytest | ||
| from aamt.plugin import aamt_plugins | ||
| # 加载aamt插件 | ||
| from api.offer_doctor.doctor_need_api import Dictor_need | ||
| pytest_plugins = aamt_plugins() | ||
| def pytest_collection_modifyitems(items): | ||
| # 测试用例收集完成时,将收集到的item的name和nodeid的中文显示 | ||
| for item in items: | ||
| item.name = item.name.encode("utf-8").decode("unicode_escape") | ||
| item._nodeid = item.nodeid.encode("utf-8").decode("unicode_escape") | ||
| """ | ||
| run_shell_content = """ | ||
| #!/bin/bash | ||
| set -X | ||
| rm -rf /var/jenkins_home/workspace/api_auotmationtest/report/* | ||
| sleep 1 | ||
| /var/jenkins_home/test_project/py3.9.15/bin/python3 /var/jenkins_home/workspace/api_auotmationtest/main.py | ||
| """ | ||
| README_content = """ | ||
| 安装最新版本 | ||
| ## AAMT 项目模版 | ||
| > 用于生成 基于pytest的接口自动化脚手架 | ||
| > pip install aamt | ||
| """ | ||
| structure_content = """项目结构说明: | ||
| api :各模块接口文件 | ||
| 指定版本安装 | ||
| data:用例yaml数据 | ||
| > pip install tep==0.1.0 | ||
| case:测试用例; | ||
| 升级aamt | ||
| fixtures:Pytest fixture,自动导入; | ||
| > pip install -U aamt | ||
| common:个人封装 比如邮件、mysql、project项目各种路径、token信息读取等 | ||
| 创建项目脚手架 | ||
| report:测试报告; | ||
| > aamt startproject demo | ||
| conftest.py:Pytest挂载; | ||
| 创建项目脚手架(自动创建虚拟环境) | ||
| file:待上传的文件目录; | ||
| > aamt startproject demo -venv | ||
| utils:工具包; | ||
| 外网速度慢,pandas可能安装失败,推荐用国内镜像 | ||
| pytest.ini:Pytest配置; | ||
| > pip --default-timeout=6000 install -i https://pypi.tuna.tsinghua.edu.cn/simple aamt | ||
| main.py:执行入口; | ||
| requirements.txt:项目依赖; | ||
| 项目结构说明.txt | ||
| """ | ||
| fake_content = """ | ||
| from faker import Faker | ||
| fake = Faker(locale="zh_CN") | ||
| print('姓名:', fake.name()) | ||
| print('手机号:', fake.phone_number()) | ||
| phones = [fake.phone_number() for _ in range(5)] # 列表推导,把生成的数据组成一个列表 | ||
| print('批量生成手机号:', phones) | ||
| print('邮箱:', fake.email()) | ||
| print('MD5:', fake.md5()) | ||
| print(fake.items()) | ||
| print('年月日:', fake.date(pattern=' %Y-%m-%d')) | ||
| print('随机年份:', fake.year()) | ||
| print('随机月份:', fake.month()) | ||
| print('随机几号:', fake.day_of_month()) | ||
| print('随机星期数:', fake.day_of_week()) | ||
| print('随机时间字符串1:', fake.time(pattern='%Y-%m-%d %H:%M:%S')) | ||
| print('随机时间字符串2:', fake.time(pattern='%H:%M:%S')) | ||
| # -30y是过去30年前为开始日期,end_date表示结束到今天 | ||
| print('过去某一天:', fake.date_between(start_date="-30y", end_date="today")) | ||
| print('今天:', fake.date_between_dates()) # 今天 | ||
| print('日期和时间:', fake.date_time()) # 2021-05-14 19:36:00 | ||
| print('当前日期时间:', fake.date_time_between_dates()) | ||
| # print('某个区间内随机日期时 间:', fake.date_time_between_dates(datetime_start=datetime(1999, 2, 2, 10, 30, 20), dat | ||
| # etime_end = datetime(2000, 2, 2, 10, 30, 20))) | ||
| print('未来30天内的随机日期:', fake.future_date(end_date="+30d"), f'类型是:{type(fake.future_date(end_date="+30d"))},', str(fake.future_date(end_date="+30d")),'已转成字符串') | ||
| print('未来30天内的随机日期时间:', fake.future_datetime(end_date="+30d"), | ||
| f'类型是:{type(fake.future_datetime(end_date="+30d"))}') # 未来日期和时间) | ||
| print('过去的日期:', fake.past_date(start_date="-30m")) # 过去日期 | ||
| print('过去的日期时间:', fake.past_datetime(start_date="-30d")) # 过去日期和时间 | ||
| print('时间戳:', fake.unix_time(), f'类型是:{type(fake.unix_time())}') | ||
| print('时间戳(时分秒):', fake.time(), f'类型是:{type(fake.time())}') | ||
| print('随机md5:', fake.md5(), f'类型是:{type(fake.md5())}') | ||
| """ | ||
| fastapi_mock_content = """#!/usr/bin/python | ||
@@ -1764,3 +1418,2 @@ # encoding=utf-8 | ||
| """ | ||
| mitm_content = """#!/usr/bin/python | ||
@@ -1870,5 +1523,262 @@ # encoding=utf-8 | ||
| """ | ||
| dao_conftent = """ | ||
| # -*- coding: UTF-8 -*- | ||
| # @Software: PyCharm | ||
| # @File: dao.py | ||
| # @Author: xuefeng365 | ||
| # @E-mail: 120158568@qq.com, | ||
| # @Site: | ||
| # @Time: 11月 23, 2022 | ||
| structure_content = """项目结构说明: | ||
| 待完善 | ||
| from loguru import logger | ||
| try: | ||
| from sqlalchemy import create_engine | ||
| from texttable import Texttable | ||
| except ModuleNotFoundError: | ||
| pass | ||
| def mysql_engine(host, port, user, password, db): | ||
| # sqlalchemy create_engine | ||
| try: | ||
| engine = create_engine(f"mysql+pymysql://{user}:{password}@{host}:{port}/{db}") | ||
| except NameError: | ||
| return "" | ||
| return engine | ||
| def print_db_table(data_frame): | ||
| #以表格形式打印数据表 | ||
| tb = Texttable() | ||
| tb.header(data_frame.columns.array) | ||
| tb.set_max_width(0) | ||
| # text * cols | ||
| tb.set_cols_dtype(['t'] * data_frame.shape[1]) | ||
| tb.add_rows(data_frame.to_numpy(), header=False) | ||
| logger.info(tb.draw()) | ||
| """ | ||
| client_content = """ | ||
| import json | ||
| import time | ||
| from mimetypes import MimeTypes | ||
| from urllib.parse import urlencode | ||
| import allure | ||
| import jmespath | ||
| import requests | ||
| from requests import Response | ||
| from requests_toolbelt.multipart.encoder import MultipartEncoder | ||
| from common.project import * | ||
| from loguru import logger | ||
| class Body_type(): | ||
| none = '无类型' | ||
| # 默认 | ||
| json = 'json数据' | ||
| form_text = '表单数据 :纯json数据' | ||
| form_file = '表单数据 :文件二进制流+json数据' | ||
| binary = '文件直传:文件二进制流' | ||
| graphql = '其他类型' | ||
| # 缓存变量池 | ||
| class Cache_vars: | ||
| def __init__(self): | ||
| self.var_ = {'init_data': '初始值'} | ||
| def put(self, key, value): | ||
| self.var_[key] = value | ||
| # logger.info(f'变量池 成功新增:{{{key}:{value}}}') | ||
| def get(self, key): | ||
| value = "" | ||
| try: | ||
| value = self.var_[key] | ||
| except KeyError: | ||
| logger.error(f'异常:获取 api缓存变量池的key:{key}不存在, 返回空字符串。类实例变量池:{self.var_}') | ||
| return value | ||
| class HttpClient(Body_type): | ||
| def __init__(self): | ||
| self.session = requests.Session() | ||
| # 缓存变量池实例 | ||
| self.cache = Cache_vars() | ||
| # 获取当前使用的环境 | ||
| self.environ_active = Operate_token().read_environ_active() | ||
| # 获取激活yaml的所有数据 | ||
| self.env_vars_data = Read_yaml().get_env_vars_yaml() | ||
| self.host = self.env_vars_data['after_host'] | ||
| self.default_header = { | ||
| "Content-Type": "application/json", | ||
| "language": "zh_CN" | ||
| } | ||
| def send(self, url: str = '', method='post', body={}, body_type: str = Body_type.json, x_token='', | ||
| file_key='picFile', file_path='', timeout=30, **kwargs): | ||
| start_time = time.time() | ||
| if not url.startswith(("http://", "https://")): | ||
| raise Exception("请输入正确的url, 记得带上http:// 或者 https:// 哦") | ||
| # 用户传了headers,就用用户的,不传就用默认的 | ||
| headers = kwargs.get("headers", self.default_header) | ||
| if x_token: | ||
| headers["_token_"] = x_token.strip('"') # strip() 方法用于移除字符串头尾指定的字符(默认为空格或换行符)或字符序列。 | ||
| if method == "get": | ||
| result = self.session.request(url=url, method=method, params=body, headers=headers, timeout=timeout, | ||
| **kwargs) | ||
| elif method == "post": | ||
| logger.info(f'body_type类型:{body_type}') | ||
| if body_type == Body_type.json: | ||
| headers['Content-Type'] = 'application/json; charset=UTF-8' | ||
| result = self.session.request(url=url, method=method, json=body, headers=headers, timeout=timeout, | ||
| **kwargs) | ||
| elif body_type == Body_type.form_file: | ||
| filename = file_path.split('\\\\')[-1] # xiaoxin.png | ||
| # 通过 mimetypes 来自动识别文件类型 : https://cloud.tencent.com/developer/section/1369143 | ||
| fileType = MimeTypes().guess_type(file_path)[0] | ||
| # 没有识别到就不传 content_type | ||
| if fileType is None: | ||
| # files = {file_key: ('xiaoxin.png',open(file_path, 'rb'))} | ||
| files = {'file_key': (filename, open(file_path, 'rb'))} | ||
| # 也可以用已下的方法,不用加 filename | ||
| # files = {file_key: open(file_path, 'rb'))} | ||
| # files = {'file_key': open(file_path, 'rb')} | ||
| else: | ||
| # files = {file_key: ('xiaoxin.png',open(file_path, 'rb'),'image/png')} | ||
| files = {file_key: (filename, open(file_path, 'rb'), fileType)} | ||
| body.update(files) | ||
| # 把要传入的数据 转变为form_data格式 | ||
| form_data = MultipartEncoder(body) | ||
| # 以下命令自动 转变 headers 中的 Content-Type 为:'multipart/form-data; boundary=。。。。。。。。。。。 | ||
| headers['Content-Type'] = form_data.content_type | ||
| # :param data:(可选)字典,元组列表,字节,或文件类对象发送到:class: 'Request'的主体中。 | ||
| result = self.session.request(url=url, method=method, data=form_data, headers=headers, timeout=timeout, | ||
| **kwargs) | ||
| elif body_type == Body_type.form_text: | ||
| # 把要传入的数据 转变为form_data格式 | ||
| form_data = MultipartEncoder(body) | ||
| # 以下命令自动 转变 headers 中的 Content-Type 为:'multipart/form-data; boundary=。。。。。。。。。。。 | ||
| headers['Content-Type'] = form_data.content_type | ||
| result = self.session.request(url=url, method=method, data=form_data, headers=headers, timeout=timeout, | ||
| **kwargs) | ||
| elif body_type == Body_type.binary: | ||
| files = {file_key: open(file_path, 'rb')} | ||
| # 文件流通过files 传给request的请求参数files | ||
| result = self.session.request(url=url, method=method, json=body, headers=headers, files=files, | ||
| timeout=timeout, **kwargs) | ||
| else: | ||
| raise ValueError(f"=====body_type没有定义,{body_type} 请确认====") | ||
| elif method == "patch": | ||
| result = self.session.request(url=url, method=method, data=json.dumps(body), headers=headers, | ||
| timeout=timeout, **kwargs) | ||
| elif method == "delete": | ||
| result = '' | ||
| elif method == "put": | ||
| result = '' | ||
| else: | ||
| raise ValueError(f"=====大兄弟===暂不支持{method} 请求呢====需要就自己补充吧====") | ||
| end_time = time.time() | ||
| # python 内置函数 保留4位小数 | ||
| time_ = round((end_time - start_time), 4) | ||
| # # 处理 | ||
| # result = AamtResponse(result) | ||
| try: | ||
| logger.info( | ||
| f'\\n请求日志:\\nurl: {url}\\nmethod: {method}\\nbody: \\n{body}\\nbody_type: {body_type}\\nheaders: \\n{headers}\\n**********************************************************************************') | ||
| logger.debug( | ||
| f'\\n响应日志:\\n响应码: {result.status_code}\\n请求>响应 时间开销: {time_}\\n**********************************************************************************\\n') | ||
| except AttributeError: | ||
| logger.error( | ||
| f'\\n无法获取响应码, 响应日志:\\n{result}\\n请求>响应 时间开销: {time_}\\n**********************************************************************************\\n') | ||
| except TypeError: | ||
| logger.warning(f'警告:{kwargs}') | ||
| self.__create_request_log(url, method, body, body_type, headers) | ||
| try: | ||
| self.__create_response_log(result.status_code, result.json(), time_) | ||
| return result.json() | ||
| except: | ||
| self.__create_response_log(result.status_code, result.text, time_) | ||
| logger.warning(f'\\n注意 响应内容:不可以序列化,具体响应如下:\\n{result.text}') | ||
| return result.text | ||
| def get_full_url(self, url, etc={}, replace={}, h=""): | ||
| if h: | ||
| host = h.rstrip('/') # rstrip() 删除 string 字符串末尾的指定字符(默认为空格). | ||
| else: | ||
| host = self.host.rstrip('/') | ||
| url = url.lstrip('/') # lstrip() 方法用于截掉字符串左边的空格或指定字符。 | ||
| full_url = host + "/" + url | ||
| # full_url += "?" | ||
| full_url += "?platform={}".format(self.environ_active) | ||
| if etc: | ||
| s = urlencode(etc) # urlencode urllib库里面有个urlencode函数,可以把key-value这样的键值对转换成我们想要的格式,返回的是a=1&b=2这样的字符串 | ||
| full_url += "&" + s | ||
| if replace: | ||
| full_url = full_url.format(replace) # str.format() 方法通过字符串中的花括号 {} 来识别替换字段 replacement field,从而完成字符串的格式化。 | ||
| # full_url = str.format(full_url,replace) #str.format() 方法通过字符串中的花括号 {} 来识别替换字段 replacement field,从而完成字符串的格式化。 | ||
| return full_url | ||
| # 目的就是 在allure显示,没什么实际意义 | ||
| @allure.step("请求日志") | ||
| def __create_request_log(self, url, method, body, body_type, headers): | ||
| pass | ||
| # 目的就是 在allure显示 | ||
| @allure.step('响应日志') | ||
| def __create_response_log(self, status_code, text,time_): | ||
| pass | ||
| class AamtResponse(Response): | ||
| # 包装requests.Response,简化jmespath写法 | ||
| def __init__(self, response): | ||
| super().__init__() | ||
| for k, v in response.__dict__.items(): | ||
| self.__dict__[k] = v | ||
| def jmespath(self, expression): | ||
| return jmespath.search(expression, self.json()) | ||
| if __name__ == '__main__': | ||
| a = HttpClient() | ||
| a.send(url='http://www.baidu.com',method='get', body={}, body_type=Body_type.json) | ||
| """ | ||
+15
-11
@@ -15,3 +15,3 @@ # -*- coding: utf-8 -*- | ||
| from aamt.sample import * | ||
| from aamt.logger import Logger | ||
| from loguru import logger | ||
@@ -33,3 +33,3 @@ | ||
| action="store_true", | ||
| help="Create virtual environment in the project, and install tep.", | ||
| help="Create virtual environment in the project, and install aamt.", | ||
| ) | ||
@@ -42,3 +42,3 @@ return sub_parser_scaffold | ||
| if os.path.isdir(project_name): | ||
| Logger.warning( | ||
| logger.warning( | ||
| f"Project folder {project_name} exists, please specify a new project name." | ||
@@ -48,3 +48,3 @@ ) | ||
| elif os.path.isfile(project_name): | ||
| Logger.warning( | ||
| logger.warning( | ||
| f"Project name {project_name} conflicts with existed file, please specify a new one." | ||
@@ -54,3 +54,3 @@ ) | ||
| Logger.info(f"Create new project: {project_name}") | ||
| logger.info(f"Create new project: {project_name}") | ||
| # os.getcwd() 获取当前目录 | ||
@@ -89,18 +89,19 @@ print(f"Project root dir: {os.path.join(os.getcwd(), project_name)}\n") | ||
| create_file(os.path.join(project_name, "fixtures", "xf", "fixture_xf.py"), file_content=fixture_xf_content) | ||
| create_folder(os.path.join(project_name , "fixtures", "zhangsan")) | ||
| create_file(os.path.join(project_name, "fixtures", "zhangsan", "__init__.py")) | ||
| create_file(os.path.join(project_name, "fixtures", "zhangsan", "fixture_zhangsan.py"), file_content=fixture_zhangsan_content) | ||
| create_folder(os.path.join(project_name, "case")) | ||
| create_file(os.path.join(project_name, "case", "__init__.py")) | ||
| create_file(os.path.join(project_name, "case", "conftest.py"),file_content=conftest_content) | ||
| create_file(os.path.join(project_name, "case", "conftest.py"),file_content=conftest_content1) | ||
| create_folder(os.path.join(project_name, "case", "test_brand")) | ||
| create_file(os.path.join(project_name, "case", "test_brand", "test_brand.py"),file_content=test_brand_content) | ||
| create_folder(os.path.join(project_name, "common")) | ||
| create_file(os.path.join(project_name, "common", "__init__.py")) | ||
| create_file(os.path.join(project_name, "common", "assert_api.py"),file_content=assert_api_content) | ||
| create_file(os.path.join(project_name, "common", "clear_results.py"),file_content=clear_results_content) | ||
| # create_file(os.path.join(project_name, "common", "config.py"),file_content=config_content) | ||
| create_file(os.path.join(project_name, "common", "logger.py"),file_content=logger_content) | ||
| create_file(os.path.join(project_name, "common", "mysqlhelper.py"),file_content=mysqlhelper_content) | ||
| create_file(os.path.join(project_name, "common", "emailhelper.py"),file_content=emailhelper_content) | ||
| create_file(os.path.join(project_name, "common", "read_token.py"),file_content=read_token_content) | ||
| create_file(os.path.join(project_name, "common", "project.py"),file_content=project_content) | ||
@@ -115,3 +116,3 @@ create_folder(os.path.join(project_name, "data")) | ||
| create_folder(os.path.join(project_name, "resources")) | ||
| create_file(os.path.join(project_name, "resources", "aamt.ini"),file_content=aamt_content) | ||
| create_file(os.path.join(project_name, "resources", "aamt.ini"),file_content=aamt_ini_content) | ||
| create_folder(os.path.join(project_name, "resources", "env_vars")) | ||
@@ -127,2 +128,3 @@ create_file(os.path.join(project_name, "resources", "env_vars", "env_vars_test.yaml"),file_content=env_vars_test_yaml_content) | ||
| create_file(os.path.join(project_name, "until", "mitm.py"), file_content=mitm_content) | ||
| create_file(os.path.join(project_name, "until", "dao.py"), file_content=dao_conftent) | ||
@@ -133,3 +135,5 @@ create_file(os.path.join(project_name, "main.py"), file_content=main_content) | ||
| create_file(os.path.join(project_name, "README.md"), file_content=README_content) | ||
| create_file(os.path.join(project_name, "conftest.py"), file_content=conftest_content) | ||
| create_file(os.path.join(project_name, "项目结构说明.txt"), structure_content) | ||
| create_file(os.path.join(project_name, "run.sh"), run_shell_content) | ||
@@ -136,0 +140,0 @@ |
+1
-1
| Metadata-Version: 2.1 | ||
| Name: aamt | ||
| Version: 0.1.9 | ||
| Version: 0.2.0 | ||
| Summary: 基于pytest的接口自动化测试工具模板 | ||
@@ -5,0 +5,0 @@ Home-page: https://github.com/xuefeng365/aamt-template.git |
+1
-1
| [tool.poetry] | ||
| name = "aamt" | ||
| version = "0.1.9" | ||
| version = "0.2.0" | ||
| description = "基于pytest的接口自动化测试工具模板" | ||
@@ -5,0 +5,0 @@ authors = ["xuefeng365 <120158568@qq.com>"] |
+1
-1
@@ -26,3 +26,3 @@ # -*- coding: utf-8 -*- | ||
| 'name': 'aamt', | ||
| 'version': '0.1.9', | ||
| 'version': '0.2.0', | ||
| 'description': '基于pytest的接口自动化测试工具模板', | ||
@@ -29,0 +29,0 @@ 'long_description': '## AAMT 项目模版\n> 用于生成 基于pytest的接口自动化脚手架\n\npython 版本\n\n> 3.9\n\n安装最新版本\n\n> pip install aamt\n\n指定版本安装\n\n> pip install tep==0.1.0\n\n升级aamt\n\n> pip install -U aamt\n\n创建项目脚手架 \n\n> aamt startproject demo\n\n创建项目脚手架(自动创建虚拟环境)\n\n> aamt startproject demo -venv\n\n外网速度慢,pandas可能安装失败,推荐用国内镜像\n\n> pip --default-timeout=6000 install -i https://pypi.tuna.tsinghua.edu.cn/simple aamt\n\n\n\n', |
-194
| import json | ||
| import time | ||
| from mimetypes import MimeTypes | ||
| from urllib.parse import urlencode | ||
| import allure | ||
| import jmespath | ||
| import requests | ||
| from requests import Response | ||
| from requests_toolbelt.multipart.encoder import MultipartEncoder | ||
| from aamt.config import * | ||
| from aamt.fixture import AamtVars | ||
| from aamt.logger import Logger | ||
| class Body_type(): | ||
| none = '无类型' | ||
| # 默认 | ||
| json = 'json数据' | ||
| form_text = '表单数据 :纯json数据' | ||
| form_file = '表单数据 :文件二进制流+json数据' | ||
| binary = '文件直传:文件二进制流' | ||
| graphql = '其他类型' | ||
| class HttpClient(Body_type): | ||
| def __init__(self): | ||
| self.session = requests.Session() | ||
| # 变量池实例 | ||
| self.aamt_vars = AamtVars() | ||
| # 日志实例 | ||
| self.logging = Logger | ||
| # 获取当前使用的环境 | ||
| self.environ_active = Operate_config().read_environ_active() | ||
| # 获取激活yaml的所有数据 | ||
| self.env_vars_data = Read_yaml().get_env_vars_yaml() | ||
| self.host = self.env_vars_data['after_host'] | ||
| self.default_header = { | ||
| "Content-Type": "application/json", | ||
| "language": "zh_CN" | ||
| } | ||
| def send(self, url: str = '', method='post', body={}, body_type: str = Body_type.json, x_token='', | ||
| file_key='picFile', file_path='', timeout=30, **kwargs): | ||
| start_time = time.time() | ||
| if not url.startswith(("http://", "https://")): | ||
| raise Exception("请输入正确的url, 记得带上http:// 或者 https:// 哦") | ||
| # 用户传了headers,就用用户的,不传就用默认的 | ||
| headers = kwargs.get("headers", self.default_header) | ||
| if x_token: | ||
| headers["_token_"] = x_token.strip('"') # strip() 方法用于移除字符串头尾指定的字符(默认为空格或换行符)或字符序列。 | ||
| if method == "get": | ||
| result = self.session.request(url=url, method=method, params=body, headers=headers, timeout=timeout, | ||
| **kwargs) | ||
| elif method == "post": | ||
| self.logging.info(f'body_type类型:{body_type}') | ||
| if body_type == Body_type.json: | ||
| headers['Content-Type'] = 'application/json; charset=UTF-8' | ||
| result = self.session.request(url=url, method=method, json=body, headers=headers, timeout=timeout, | ||
| **kwargs) | ||
| elif body_type == Body_type.form_file: | ||
| filename = file_path.split('\\')[-1] # xiaoxin.png | ||
| # 通过 mimetypes 来自动识别文件类型 : https://cloud.tencent.com/developer/section/1369143 | ||
| fileType = MimeTypes().guess_type(file_path)[0] | ||
| # 没有识别到就不传 content_type | ||
| if fileType is None: | ||
| # files = {file_key: ('xiaoxin.png',open(file_path, 'rb'))} | ||
| files = {'file_key': (filename, open(file_path, 'rb'))} | ||
| # 也可以用已下的方法,不用加 filename | ||
| # files = {file_key: open(file_path, 'rb'))} | ||
| # files = {'file_key': open(file_path, 'rb')} | ||
| else: | ||
| # files = {file_key: ('xiaoxin.png',open(file_path, 'rb'),'image/png')} | ||
| files = {file_key: (filename, open(file_path, 'rb'), fileType)} | ||
| body.update(files) | ||
| # 把要传入的数据 转变为form_data格式 | ||
| form_data = MultipartEncoder(body) | ||
| # 以下命令自动 转变 headers 中的 Content-Type 为:'multipart/form-data; boundary=。。。。。。。。。。。 | ||
| headers['Content-Type'] = form_data.content_type | ||
| # :param data:(可选)字典,元组列表,字节,或文件类对象发送到:class: 'Request'的主体中。 | ||
| result = self.session.request(url=url, method=method, data=form_data, headers=headers, timeout=timeout, | ||
| **kwargs) | ||
| elif body_type == Body_type.form_text: | ||
| # 把要传入的数据 转变为form_data格式 | ||
| form_data = MultipartEncoder(body) | ||
| # 以下命令自动 转变 headers 中的 Content-Type 为:'multipart/form-data; boundary=。。。。。。。。。。。 | ||
| headers['Content-Type'] = form_data.content_type | ||
| result = self.session.request(url=url, method=method, data=form_data, headers=headers, timeout=timeout, | ||
| **kwargs) | ||
| elif body_type == Body_type.binary: | ||
| files = {file_key: open(file_path, 'rb')} | ||
| # 文件流通过files 传给request的请求参数files | ||
| result = self.session.request(url=url, method=method, json=body, headers=headers, files=files, | ||
| timeout=timeout, **kwargs) | ||
| else: | ||
| raise ValueError(f"=====body_type没有定义,{body_type} 请确认====") | ||
| elif method == "patch": | ||
| result = self.session.request(url=url, method=method, data=json.dumps(body), headers=headers, | ||
| timeout=timeout, **kwargs) | ||
| elif method == "delete": | ||
| result = '' | ||
| elif method == "put": | ||
| result = '' | ||
| else: | ||
| raise ValueError(f"=====大兄弟===暂不支持{method} 请求呢====需要就自己补充吧====") | ||
| end_time = time.time() | ||
| # python 内置函数 保留4位小数 | ||
| time_ = round((end_time - start_time), 4) | ||
| # 处理 | ||
| result = AamtResponse(result) | ||
| try: | ||
| self.logging.info( | ||
| f'\n请求日志:\nurl: {url}\nmethod: {method}\nbody: \n{body}\nbody_type: {body_type}\nheaders: \n{headers}\n**********************************************************************************') | ||
| self.logging.debug( | ||
| f'\n响应日志:\n响应码: {result.status_code}\n请求>响应 时间开销: {time_}\n**********************************************************************************\n') | ||
| except AttributeError: | ||
| self.logging.error( | ||
| f'\n无法获取响应码, 响应日志:\n{result}\n请求>响应 时间开销: {time_}\n**********************************************************************************\n') | ||
| except TypeError: | ||
| self.logging.warning(f'警告:{kwargs}') | ||
| self.__create_request_log(url, method, body, body_type, headers) | ||
| try: | ||
| self.__create_response_log(result.status_code, result.json(), time_) | ||
| return result.json() | ||
| except: | ||
| self.__create_response_log(result.status_code, result.text, time_) | ||
| self.logging.warning(f'\n注意 响应内容:不可以序列化,具体响应如下:\n{result.text}') | ||
| return result.text | ||
| def get_full_url(self, url, etc={}, replace={}, h=""): | ||
| if h: | ||
| host = h.rstrip('/') # rstrip() 删除 string 字符串末尾的指定字符(默认为空格). | ||
| else: | ||
| host = self.host.rstrip('/') | ||
| url = url.lstrip('/') # lstrip() 方法用于截掉字符串左边的空格或指定字符。 | ||
| full_url = host + "/" + url | ||
| # full_url += "?" | ||
| full_url += "?platform={}".format(self.environ_active) | ||
| if etc: | ||
| s = urlencode(etc) # urlencode urllib库里面有个urlencode函数,可以把key-value这样的键值对转换成我们想要的格式,返回的是a=1&b=2这样的字符串 | ||
| full_url += "&" + s | ||
| if replace: | ||
| full_url = full_url.format(replace) # str.format() 方法通过字符串中的花括号 {} 来识别替换字段 replacement field,从而完成字符串的格式化。 | ||
| # full_url = str.format(full_url,replace) #str.format() 方法通过字符串中的花括号 {} 来识别替换字段 replacement field,从而完成字符串的格式化。 | ||
| return full_url | ||
| # 目的就是 在allure显示,没什么实际意义 | ||
| @allure.step("请求日志") | ||
| def __create_request_log(self, url, method, body, body_type, headers): | ||
| pass | ||
| # 目的就是 在allure显示 | ||
| @allure.step('响应日志') | ||
| def __create_response_log(self, status_code, text, time_): | ||
| pass | ||
| class AamtResponse(Response): | ||
| # 包装requests.Response,简化jmespath写法 | ||
| def __init__(self, response): | ||
| super().__init__() | ||
| for k, v in response.__dict__.items(): | ||
| self.__dict__[k] = v | ||
| def jmespath(self, expression): | ||
| return jmespath.search(expression, self.json()) | ||
| if __name__ == '__main__': | ||
| a = HttpClient() | ||
| a.send(url='http://www.baidu.com', method='get', body={}, body_type=Body_type.json) |
-35
| # -*- coding: UTF-8 -*- | ||
| # @Software: PyCharm | ||
| # @File: dao.py | ||
| # @Author: xuefeng365 | ||
| # @E-mail: 120158568@qq.com, | ||
| # @Site: | ||
| # @Time: 11月 23, 2022 | ||
| from aamt.logger import Logger | ||
| try: | ||
| from sqlalchemy import create_engine | ||
| from texttable import Texttable | ||
| except ModuleNotFoundError: | ||
| pass | ||
| def mysql_engine(host, port, user, password, db): | ||
| # sqlalchemy create_engine | ||
| try: | ||
| engine = create_engine(f"mysql+pymysql://{user}:{password}@{host}:{port}/{db}") | ||
| except NameError: | ||
| return "" | ||
| return engine | ||
| def print_db_table(data_frame): | ||
| """以表格形式打印数据表""" | ||
| tb = Texttable() | ||
| tb.header(data_frame.columns.array) | ||
| tb.set_max_width(0) | ||
| # text * cols | ||
| tb.set_cols_dtype(['t'] * data_frame.shape[1]) | ||
| tb.add_rows(data_frame.to_numpy(), header=False) | ||
| Logger.info(tb.draw()) |
-116
| # -*- coding: UTF-8 -*- | ||
| # @Software: PyCharm | ||
| # @File: func.py | ||
| # @Author: xuefeng365 | ||
| # @E-mail: 120158568@qq.com, | ||
| # @Site: | ||
| # @Time: 11月 23, 2022 | ||
| import copy | ||
| import itertools | ||
| import json | ||
| import time | ||
| from sys import stdout | ||
| from aamt.logger import Logger | ||
| try: | ||
| import numpy as np | ||
| except ModuleNotFoundError: | ||
| pass | ||
| def current_time(): | ||
| """当前时间""" | ||
| return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())) | ||
| def current_date(): | ||
| """当前日期""" | ||
| return time.strftime("%Y-%m-%d", time.localtime(time.time())) | ||
| def progress_bar(i): | ||
| """进度条""" | ||
| c = int(i / 10) | ||
| progress = '\r %2d%% [%s%s]' | ||
| a = '■' * c | ||
| b = '□' * (10 - c) | ||
| msg = progress % (i, a, b) | ||
| stdout.write(msg) | ||
| stdout.flush() | ||
| def pairwise(option): | ||
| """pairwise算法""" | ||
| cp = [] # 笛卡尔积 | ||
| s = [] # 两两拆分 | ||
| for x in eval('itertools.product' + str(tuple(option))): | ||
| cp.append(x) | ||
| s.append([i for i in itertools.combinations(x, 2)]) | ||
| Logger.info('笛卡尔积:%s' % len(cp)) | ||
| del_row = [] | ||
| progress_bar(0) | ||
| s2 = copy.deepcopy(s) | ||
| for i in range(len(s)): # 对每行用例进行匹配 | ||
| if (i % 100) == 0 or i == len(s) - 1: | ||
| progress_bar(int(100 * i / (len(s) - 1))) | ||
| t = 0 | ||
| for j in range(len(s[i])): # 对每行用例的两两拆分进行判断,是否出现在其他行 | ||
| flag = False | ||
| for i2 in [x for x in range(len(s2)) if s2[x] != s[i]]: # 找同一列 | ||
| if s[i][j] == s2[i2][j]: | ||
| t = t + 1 | ||
| flag = True | ||
| break | ||
| if not flag: # 同一列没找到,不用找剩余列了 | ||
| break | ||
| if t == len(s[i]): | ||
| del_row.append(i) | ||
| s2.remove(s[i]) | ||
| res = [cp[i] for i in range(len(cp)) if i not in del_row] | ||
| Logger.info('过滤后:%s' % len(res)) | ||
| return res | ||
| class NpEncoder(json.JSONEncoder): | ||
| """numpy类型转换""" | ||
| def default(self, obj): | ||
| if isinstance(obj, np.integer): | ||
| return int(obj) | ||
| elif isinstance(obj, np.int64): | ||
| return int(obj) | ||
| elif isinstance(obj, np.floating): | ||
| return float(obj) | ||
| elif isinstance(obj, np.float64): | ||
| return float(obj) | ||
| elif isinstance(obj, np.ndarray): | ||
| return obj.tolist() | ||
| else: | ||
| return super(NpEncoder, self).default(obj) | ||
| def textbox_input(): | ||
| """文本框输入数据""" | ||
| return ["你好", "HELLO", "hello", "8", "0", "-5", "8.5", "", " 123 ", " 你好 ", "你 好", "!", "@", "#", "$", | ||
| "$$", "^", "&", "*", "_", "null", "<html></html>", "'", "''", '"', '""', "[", "]", "[]", "{", "}", | ||
| "{}", "\r\n", "HE1", | ||
| """ 你好HEllo 10 -3 0 0.5 !@#$%^&*_、null、<html></html><html>、'、"、[]、{}、/r/n\\r\\n、ces """, | ||
| # 10 | ||
| "1234567890", | ||
| # 50 | ||
| "12345678901234567890123456789012345678901234567890", | ||
| # 100 | ||
| "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890", | ||
| # 200 | ||
| "12345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890", | ||
| # 500 | ||
| "12345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890", | ||
| # 1000 | ||
| "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890", | ||
| # 2000 | ||
| "12345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890", | ||
| # 4000 | ||
| "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890" | ||
| ] |
-119
| # -*- coding: UTF-8 -*- | ||
| # @Software: PyCharm | ||
| # @File: logger.py | ||
| # @Author: xuefeng365 | ||
| # @E-mail: 120158568@qq.com, | ||
| # @Site: | ||
| # @Time: 11月 23, 2022 | ||
| import logging | ||
| import time | ||
| from functools import wraps | ||
| import colorlog | ||
| from aamt.config import * | ||
| # logging模块中包含的类 | ||
| # 用来自定义日志对象的规则(比如:设置日志输出格式、等级等) | ||
| # 常用子类:StreamHandler、FileHandler | ||
| # StreamHandler 控制台输出日志 | ||
| # FileHandler 日志输出到文件 | ||
| # BASE_PATH = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) | ||
| # print(BASE_PATH) | ||
| # 日志文件路径 | ||
| LOG_PATH = os.path.join(Config.project_root_dir, "log") | ||
| if not os.path.exists(LOG_PATH): | ||
| os.mkdir(LOG_PATH) | ||
| class Logger(): | ||
| def __init__(self): | ||
| log_colors_config = { | ||
| 'DEBUG': 'white', # cyan white | ||
| 'INFO': 'green', | ||
| 'WARNING': 'yellow', | ||
| 'ERROR': 'red', | ||
| 'CRITICAL': 'bold_red'} | ||
| # 创建一个logger日志对象 | ||
| self.logger = logging.getLogger("log") | ||
| # 创建日志格式(不带颜色)对象 | ||
| self.file_formater = logging.Formatter( | ||
| fmt='[%(asctime)s.%(msecs)03d] %(filename)s -> %(funcName)s line:%(lineno)d [%(levelname)s] : %(message)s', | ||
| datefmt='%Y-%m-%d %H:%M:%S') | ||
| # 创建颜色格式((带颜色))对象 | ||
| self.console_formatter = colorlog.ColoredFormatter( | ||
| fmt='%(log_color)s[%(asctime)s.%(msecs)03d] %(filename)s -> %(funcName)s line:%(lineno)d [%(levelname)s] : %(message)s', | ||
| datefmt='%Y-%m-%d %H:%M:%S', | ||
| log_colors=log_colors_config) | ||
| # 输出到控制台 | ||
| self.console = logging.StreamHandler() | ||
| self.logname = os.path.join(LOG_PATH, "{}.log".format(time.strftime("%Y%m%d"))) # 创建日志路径和时间 | ||
| # 输出到文件:获取日志路径,并创建FileHandler对象()-- 设定写入方式:追加 | ||
| self.filelogger = logging.FileHandler(self.logname, mode='a', encoding="UTF-8") | ||
| # 设置默认的日志级别 控制台logger 和 handler以最高级别为准,不同handler之间可以不一样,不相互影响 | ||
| self.logger.setLevel(logging.INFO) | ||
| self.console.setLevel(logging.INFO) | ||
| self.filelogger.setLevel(logging.INFO) | ||
| # self.logger.setLevel(logging.INFO) | ||
| # self.logger.setLevel(logging.WARNING) | ||
| # 设置控制台日志的颜色 | ||
| self.console.setFormatter(self.console_formatter) | ||
| # 设置文件日志的颜色 | ||
| self.filelogger.setFormatter(self.file_formater) | ||
| # 重复日志问题: | ||
| # 1、防止多次addHandler; | ||
| # 2、loggername 保证每次添加的时候不一样; | ||
| # 3、显示完log之后调用removeHandler | ||
| if not self.logger.handlers: | ||
| self.logger.addHandler(self.console) | ||
| self.logger.addHandler(self.filelogger) | ||
| self.console.close() | ||
| self.filelogger.close() | ||
| Logger = Logger().logger | ||
| def decorate_log(func): | ||
| @wraps(func) | ||
| # Python装饰器中@wraps作用 https://blog.csdn.net/weixin_40576010/article/details/88639686 | ||
| def log(*args, **kwargs): | ||
| Logger.info(f'-- 开始执行 {func.__name__} --') | ||
| try: | ||
| func(*args, **kwargs) | ||
| except Exception as e: | ||
| Logger.error(f'-- {func.__name__}执行失败,原因:{e} --') | ||
| raise e | ||
| else: | ||
| Logger.info(f'-- {func.__name__} 执行成功 --') | ||
| return log | ||
| # 调试用 | ||
| @decorate_log | ||
| def xuefeng(): | ||
| assert 2 == 2 | ||
| if __name__ == '__main__': | ||
| xuefeng() | ||
| Logger.info("---测试开始---") | ||
| Logger.warning("---测试开始---") | ||
| Logger.error("---测试结束---") | ||
| Logger.debug("---测试结束---") |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
76924
-26.87%11
-26.67%1732
-22.51%