cloudshell-core
Advanced tools
| import traceback | ||
| class ErrorHandlingContext(object): | ||
| def __init__(self, logger): | ||
| """ | ||
| Initializes an instance of ErrorHandlingContext | ||
| Allows proper logging on exception | ||
| :param logger: Logger | ||
| :type logger: Logger | ||
| """ | ||
| self.logger = logger | ||
| def __enter__(self): | ||
| """ | ||
| Called upon block start. Should not be called explicitly | ||
| :return: ErrorHandlingContext | ||
| :rtype ErrorHandlingContext | ||
| """ | ||
| return self | ||
| def __exit__(self, exc_type, exc_value, exc_traceback): | ||
| """ | ||
| Called upon block end. Should not be called explicitly | ||
| In case of exception during the block execution logs the error with debug severity | ||
| and allows the same exception to be thrown | ||
| :return: True means exception handles, otherwise false | ||
| :rtype: bool | ||
| """ | ||
| if not exc_value: | ||
| return True | ||
| lines = traceback.format_exception(exc_type, exc_value, exc_traceback) | ||
| self.logger.error('Error occurred: ' + ''.join(lines)) | ||
| return False |
| from pkgutil import extend_path | ||
| __path__ = extend_path(__path__, __name__) |
| import unittest | ||
| if __name__ == '__main__': | ||
| suites = unittest.TestLoader().discover(start_dir=".", pattern="test_*.py") | ||
| unittest.TextTestRunner(verbosity=2).run(suites) |
| from pkgutil import extend_path | ||
| __path__ = extend_path(__path__, __name__) |
| from pkgutil import extend_path | ||
| __path__ = extend_path(__path__, __name__) |
| [Logging] | ||
| LOG_LEVEL='INFO' | ||
| LOG_FORMAT= '%(asctime)s [%(levelname)s]: %(name)s %(module)s - %(funcName)-20s %(message)s' | ||
| TIME_FORMAT= '%d-%b-%Y--%H-%M-%S' | ||
| LOG_PATH='../../Logs' |
| #!/usr/bin/python | ||
| # -*- coding: utf-8 -*- | ||
| """ | ||
| Tests for cloudshell.core.logger.qs_config_parser | ||
| """ | ||
| import os | ||
| from unittest import TestCase | ||
| from cloudshell.core.logger.qs_config_parser import QSConfigParser | ||
| CUR_DIR = os.path.dirname(__file__) | ||
| class TestQSConfigParser(TestCase): | ||
| exp_response = {'Logging': | ||
| {'time_format': '%d-%b-%Y--%H-%M-%S', | ||
| 'log_path': '../../Logs', | ||
| 'log_format': '%(asctime)s [%(levelname)s]: %(name)s %(module)s - %(funcName)-20s %(message)s', | ||
| 'log_level': 'INFO'}} | ||
| def setUp(self): | ||
| """ Recreate parser before each suite and manage environment variable """ | ||
| # backup existing environment variable | ||
| self.qs_conf = os.getenv("QS_CONFIG") | ||
| os.environ["QS_CONFIG"] = os.path.join(CUR_DIR, "config/test_qs_config.ini") | ||
| self.parser = QSConfigParser() | ||
| def tearDown(self): | ||
| """ Restore environment variable """ | ||
| if self.qs_conf: | ||
| os.environ["QS_CONFIG"] = self.qs_conf | ||
| else: | ||
| del os.environ["QS_CONFIG"] | ||
| def test_01_get_dict(self): | ||
| """ Test suite for get_dict method """ | ||
| self.assertEqual(self.parser.get_dict(), self.exp_response) | ||
| QSConfigParser._configDict = None | ||
| self.assertEqual(self.parser.get_dict(), self.exp_response) | ||
| self.assertEqual(self.parser.get_dict("Logging"), self.exp_response["Logging"]) | ||
| self.assertIsNone(self.parser.get_dict("wrong_section_name")) | ||
| os.environ["QS_CONFIG"] = os.path.join(CUR_DIR, "config/wrong_conf_file_path.ini") | ||
| QSConfigParser._configDict = None | ||
| self.assertEqual(self.parser.get_dict(), {}) | ||
| self.assertIsNone(self.parser.get_dict("Logging")) | ||
| def test_02_get_setting(self): | ||
| """ Test suite for get_setting method """ | ||
| self.assertIsNone(self.parser.get_setting()) | ||
| self.assertIsNone(self.parser.get_setting(dict_section="wrong_section_name")) | ||
| self.assertIsNone(self.parser.get_setting(dict_section="Logging", dict_key="wrong_setting_name")) | ||
| self.assertEqual(self.parser.get_setting(dict_section="Logging", dict_key="log_level"), | ||
| self.exp_response["Logging"]["log_level"]) |
| #!/usr/bin/python | ||
| # -*- coding: utf-8 -*- | ||
| """ | ||
| Tests for cloudshell.core.logger.qs_logger | ||
| """ | ||
| import logging | ||
| import os | ||
| import re | ||
| import shutil | ||
| from mock import MagicMock | ||
| from unittest import TestCase | ||
| from cloudshell.core.logger import qs_logger | ||
| from cloudshell.core.logger.interprocess_logger import MultiProcessingLog | ||
| CUR_DIR = os.path.dirname(__file__) | ||
| full_settings = MagicMock(return_value={'LOG_PATH': '../../Logs', | ||
| 'TIME_FORMAT': '%d-%b-%Y--%H-%M-%S', | ||
| 'LOG_LEVEL': 'INFO', | ||
| 'FORMAT': '%(asctime)s [%(levelname)s]: %(name)s %(module)s - %(funcName)-20s %(message)s'}) | ||
| cut_settings = MagicMock(return_value={'TIME_FORMAT': '%d-%b-%Y--%H-%M-%S', | ||
| 'LOG_LEVEL': 'INFO', | ||
| 'FORMAT': '%(asctime)s [%(levelname)s]: %(name)s %(module)s - %(funcName)-20s %(message)s'}) | ||
| wrong_settings = MagicMock(return_value={'LOG_PATH': None, | ||
| 'TIME_FORMAT': '%d-%b-%Y--%H-%M-%S', | ||
| 'LOG_LEVEL': 'INFO', | ||
| 'FORMAT': '%(asctime)s [%(levelname)s]: %(name)s %(module)s - %(funcName)-20s %(message)s'}) | ||
| class TestQSLogger(TestCase): | ||
| _LOGS_PATH = os.path.join(os.path.dirname(__file__), "../../Logs") | ||
| def setUp(self): | ||
| """ Remove all existing test Logs folders before each suite """ | ||
| self.get_settings = qs_logger.get_settings | ||
| self.qs_conf = os.getenv("QS_CONFIG") | ||
| self.log_path = os.getenv("LOG_PATH") | ||
| os.environ["QS_CONFIG"] = os.path.join(CUR_DIR, "config/test_qs_config.ini") | ||
| os.environ["LOG_PATH"] = os.path.join(CUR_DIR, "../../Logs") | ||
| if os.path.exists(self._LOGS_PATH): | ||
| shutil.rmtree(self._LOGS_PATH) | ||
| def tearDown(self): | ||
| """ Close all existing logging handlers after each suite """ | ||
| if self.qs_conf: | ||
| os.environ["QS_CONFIG"] = self.qs_conf | ||
| elif "QS_CONFIG" in os.environ: | ||
| del os.environ["QS_CONFIG"] | ||
| if self.log_path: | ||
| os.putenv("LOG_PATH", self.log_path) | ||
| elif "LOG_PATH" in os.environ: | ||
| del os.environ["LOG_PATH"] | ||
| for logger in qs_logger._LOGGER_CONTAINER.values(): | ||
| for handler in logger.handlers: | ||
| handler.close() | ||
| qs_logger.get_settings = self.get_settings | ||
| def test_get_settings(self): | ||
| """ Test suite for get_settings method """ | ||
| exp_response = {'LOG_PATH': '../../Logs', | ||
| 'TIME_FORMAT': '%d-%b-%Y--%H-%M-%S', | ||
| 'LOG_LEVEL': 'INFO', | ||
| 'FORMAT': '%(asctime)s [%(levelname)s]: %(name)s %(module)s - %(funcName)-20s %(message)s'} | ||
| self.assertEqual(qs_logger.get_settings(), exp_response) | ||
| def test_get_accessible_log_path_default_params(self): | ||
| """ Test suite for get_accessible_log_path method """ | ||
| path = qs_logger.get_accessible_log_path() | ||
| self.assertTrue(re.search(r"Logs[\\/]Autoload[\\/]default--\d{2}-\w+-\d{4}--\d{2}-\d{2}-\d{2}\.log", path)) | ||
| self.assertTrue(os.path.dirname(path)) | ||
| def test_get_accessible_log_path_path_creation(self): | ||
| """ Test suite for get_accessible_log_path method """ | ||
| path = qs_logger.get_accessible_log_path() | ||
| self.assertTrue(os.path.dirname(path)) | ||
| def test_get_accessible_log_path(self): | ||
| """ Test suite for get_accessible_log_path method """ | ||
| path = qs_logger.get_accessible_log_path(qs_logger.get_accessible_log_path("reservation_id", "handler_name")) | ||
| self.assertTrue(re.search(r"Logs[\\/]reservation_id[\\/]handler_name--\d{2}-\w+-\d{4}--\d{2}-\d{2}-\d{2}\.log", path)) | ||
| def test_get_accessible_log_path_log_path_setting_missing(self): | ||
| """ Test suite for get_accessible_log_path method """ | ||
| if "LOG_PATH" in os.environ: | ||
| del os.environ["LOG_PATH"] | ||
| qs_logger.get_settings = cut_settings | ||
| self.assertIsNone(qs_logger.get_accessible_log_path()) | ||
| def test_get_accessible_log_path_log_path_is_none(self): | ||
| """ Test suite for get_accessible_log_path method """ | ||
| if "LOG_PATH" in os.environ: | ||
| del os.environ["LOG_PATH"] | ||
| qs_logger.get_settings = wrong_settings | ||
| self.assertIsNone(qs_logger.get_accessible_log_path()) | ||
| def test_get_qs_logger_full_settings_default_params(self): | ||
| """ Test suite for get_qs_logger method """ | ||
| qs_logger.get_settings = full_settings | ||
| self.assertTrue(isinstance(qs_logger.get_qs_logger().handlers[0], MultiProcessingLog)) | ||
| def test_get_qs_logger_full_settings(self): | ||
| """ Test suite for get_qs_logger method """ | ||
| qs_logger.get_settings = full_settings | ||
| self.assertTrue(isinstance(qs_logger.get_qs_logger(log_group='test1').handlers[0], MultiProcessingLog)) | ||
| def test_get_qs_logger_stream_handler(self): | ||
| """ Test suite for get_qs_logger method """ | ||
| if "LOG_PATH" in os.environ: | ||
| del os.environ["LOG_PATH"] | ||
| qs_logger.get_settings = cut_settings | ||
| self.assertTrue(isinstance(qs_logger.get_qs_logger(log_group='test2').handlers[0], logging.StreamHandler)) | ||
| def test_get_qs_logger_container_filling(self): | ||
| """ Test suite for get_qs_logger method """ | ||
| qs_logger.get_settings = full_settings | ||
| qs_logger.get_qs_logger() | ||
| qs_logger.get_qs_logger(log_group='test1') | ||
| if "LOG_PATH" in os.environ: | ||
| del os.environ["LOG_PATH"] | ||
| qs_logger.get_settings = cut_settings | ||
| qs_logger.get_qs_logger(log_group='test2') | ||
| self.assertEqual(sorted(qs_logger._LOGGER_CONTAINER.keys()), sorted(["Ungrouped", "test1", "test2"])) | ||
| def test_normalize_buffer_decolorize(self): | ||
| """ Test suite for normalize_buffer method """ | ||
| self.assertEqual(qs_logger.normalize_buffer("\033[1;32;40mGreenOnWhiteBack " | ||
| "\033[4;31mRedUnderscore " | ||
| "\033[93mYellow"), "GreenOnWhiteBack RedUnderscore Yellow") | ||
| def test_normalize_buffer_remove_hex_symbols(self): | ||
| """ Test suite for normalize_buffer method """ | ||
| self.assertEqual(qs_logger.normalize_buffer("\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\xff"), "---") | ||
| def test_normalize_buffer_carriage_return_replacing(self): | ||
| """ Test suite for normalize_buffer method """ | ||
| self.assertEqual(qs_logger.normalize_buffer("\r\n \n\r"), "\n \n\r") | ||
| def test_normalize_buffer_converts_tuple_to_string(self): | ||
| """ Test suite for normalize_buffer method """ | ||
| self.assertEqual(qs_logger.normalize_buffer(("test", "tuple")), "('test', 'tuple')") | ||
| def test_normalize_buffer_converts_dict_to_string(self): | ||
| """ Test suite for normalize_buffer method """ | ||
| self.assertEqual(qs_logger.normalize_buffer({"test": "dict"}), "{'test': 'dict'}") |
| Metadata-Version: 1.1 | ||
| Name: cloudshell-core | ||
| Version: 2.0.136 | ||
| Version: 2.1.153 | ||
| Summary: Core package for CloudShell Python orchestration and automation. This package contains commoncode for CloudShell packages, including logging, basic interfaces and other utilities | ||
@@ -5,0 +5,0 @@ Home-page: https://github.com/QualiSystems/cloudshell-core |
@@ -17,3 +17,3 @@ MANIFEST.in | ||
| cloudshell/core/context/__init__.py | ||
| cloudshell/core/context/context_service.py | ||
| cloudshell/core/context/error_handling_context.py | ||
| cloudshell/core/logger/__init__.py | ||
@@ -27,2 +27,9 @@ cloudshell/core/logger/interprocess_logger.py | ||
| cloudshell_core.egg-info/dependency_links.txt | ||
| cloudshell_core.egg-info/top_level.txt | ||
| cloudshell_core.egg-info/top_level.txt | ||
| tests/__init__.py | ||
| tests/cloudshell_core_tests.py | ||
| tests/core/__init__.py | ||
| tests/core/logger/__init__.py | ||
| tests/core/logger/test_qs_config_parser.py | ||
| tests/core/logger/test_qs_logger.py | ||
| tests/core/logger/config/test_qs_config.ini |
| cloudshell | ||
| tests |
@@ -1,3 +0,2 @@ | ||
| __author__ = 'g8y3e' | ||
| from pkgutil import extend_path | ||
| __path__ = extend_path(__path__, __name__) | ||
| __path__ = extend_path(__path__, __name__) |
@@ -1,3 +0,2 @@ | ||
| __author__ = 'coye' | ||
| from pkgutil import extend_path | ||
| __path__ = extend_path(__path__, __name__) | ||
| __path__ = extend_path(__path__, __name__) |
@@ -7,2 +7,2 @@ class ActionRequest(object): | ||
| self.connectioId = '' | ||
| self.connectionParams = [] | ||
| self.connectionParams = [] |
@@ -1,2 +0,1 @@ | ||
| class ActionResult(object): | ||
@@ -3,0 +2,0 @@ |
@@ -6,2 +6,1 @@ class ConnectorAttribute(object): | ||
| self.attributeValue = '' | ||
| class DriverResponseRoot(object): | ||
| def __init__(self): | ||
| self.driverResponse = None | ||
| self.driverResponse = None |
| class DriverResponse(object): | ||
| def __init__(self): | ||
| self.actionResults = [] | ||
@@ -1,3 +0,2 @@ | ||
| __author__ = 'coye' | ||
| from pkgutil import extend_path | ||
| __path__ = extend_path(__path__, __name__) | ||
| __path__ = extend_path(__path__, __name__) |
| from logging.handlers import RotatingFileHandler | ||
| import multiprocessing, threading, logging, sys, traceback | ||
| class MultiProcessingLog(logging.Handler): | ||
@@ -5,0 +6,0 @@ def __init__(self, name, mode='a', maxsize=0, rotate=0): |
@@ -0,3 +1,5 @@ | ||
| #!/usr/bin/python | ||
| # -*- coding: utf-8 -*- | ||
| import ConfigParser | ||
| import os | ||
@@ -15,5 +17,2 @@ | ||
| file_path = os.path.dirname(__file__) | ||
| # index = file_path.rfind('\\') | ||
| # if index != -1: | ||
| # file_path = file_path[:index + 1] | ||
@@ -32,3 +31,2 @@ self._config_file = os.getenv('QS_CONFIG', os.path.join(file_path, DEFAULT_CONFIG_PATH)) | ||
| def _create_dict(self): | ||
| # if QSConfigParser._configDict is None: | ||
| config_dict = {} | ||
@@ -46,3 +44,3 @@ for section in self._config_parser.sections(): | ||
| if dict_section: | ||
| if dict_section in QSConfigParser._configDict.keys(): | ||
| if dict_section in QSConfigParser._configDict: | ||
| return QSConfigParser._configDict[dict_section] | ||
@@ -56,4 +54,4 @@ else: | ||
| settings_dict = QSConfigParser.get_dict(dict_section) | ||
| if settings_dict and dict_key.lower() in settings_dict.keys(): | ||
| if settings_dict and dict_key and dict_key.lower() in settings_dict: | ||
| return settings_dict[dict_key.lower()] | ||
| return None |
@@ -7,2 +7,3 @@ #!/usr/bin/python | ||
| import threading | ||
| import traceback | ||
@@ -35,4 +36,8 @@ import os | ||
| def get_settings(): | ||
| """Read configuration settings from config or use DEFAULTS | ||
| def get_settings(): | ||
| :return: config obj | ||
| """ | ||
| config = {} | ||
@@ -60,2 +65,9 @@ # Level | ||
| def get_accessible_log_path(reservation_id='Autoload', handler='default'): | ||
| """Generate log path for the logger and verify that it's accessible using LOG_PATH/reservation_id/handler-%timestamp%.log | ||
| :param reservation_id: part of log path | ||
| :param handler: handler name for logger | ||
| :return: generated log path | ||
| """ | ||
| accessible_log_path = None | ||
@@ -66,3 +78,3 @@ config = get_settings() | ||
| log_path = os.environ['LOG_PATH'] | ||
| elif config['LOG_PATH']: | ||
| elif 'LOG_PATH' in config and config['LOG_PATH']: | ||
| log_path = config['LOG_PATH'] | ||
@@ -72,6 +84,4 @@ else: | ||
| curent_path = os.path.dirname(__file__) | ||
| if log_path.startswith('..'): | ||
| log_path = os.path.join(curent_path, log_path) | ||
| log_path = os.path.join(os.path.dirname(__file__), log_path) | ||
@@ -81,3 +91,2 @@ time_format = config['TIME_FORMAT'] or DEFAULT_TIME_FORMAT | ||
| log_file_name = '{0}--{1}.log'.format(handler, datetime.now().strftime(time_format)) | ||
| # log_file_name = '{0}.log'.format(handler) | ||
| log_path = os.path.join(log_path, reservation_id) | ||
@@ -102,4 +111,5 @@ | ||
| def log_execution_info(logger_hdlr, exec_info): | ||
| '''Log provided execution infomrmation into provided logger on 'INFO' level | ||
| ''' | ||
| """Log provided execution information into provided logger on 'INFO' level | ||
| """ | ||
| if not hasattr(logger_hdlr, 'info_logged'): | ||
@@ -113,5 +123,5 @@ logger_hdlr.info_logged = True | ||
| def get_qs_logger(log_group='Ungrouped', log_category ='QS', log_file_prefix='QS'): | ||
| def get_qs_logger(log_group='Ungrouped', log_category='QS', log_file_prefix='QS'): | ||
| """Create cloudshell specific singleton logger | ||
| """ | ||
| :param log_group: This folder will be grouped under this name. The default implementation of the group is a folder | ||
@@ -121,5 +131,7 @@ under the logs directory. According to the CloudShell logging standard pass the reservation id as this value when | ||
| :type log_group: str | ||
| :param log_category: All messages to this logger will be prefixed by the category name. The category name should be | ||
| the name of the shell/driver | ||
| :type log_category: str | ||
| :param log_file_prefix: The log file generated by this logger will have this specified prefix. According to the | ||
@@ -129,5 +141,7 @@ logging standard the prefix should be the name of the resource the command is executing on. For environment commands | ||
| :type log_file_prefix: str | ||
| :return: the logger object | ||
| :rtype: logging.Logger | ||
| """ | ||
| _LOGGER_LOCK.acquire() | ||
@@ -147,3 +161,4 @@ try: | ||
| def _create_logger(log_group, log_category, log_file_prefix): | ||
| """ | ||
| """Create logging handler | ||
| :param log_group: This folder will be grouped under this name. The default implementation of the group is a folder | ||
@@ -153,5 +168,7 @@ under the logs directory. According to the CloudShell logging standard pass the reservation id as this value when | ||
| :type log_group: str | ||
| :param log_category: All messages to this logger will be prefixed by the category name. The category name should be | ||
| the name of the shell/driver | ||
| :type log_category: str | ||
| :param log_file_prefix: The log file generated by this logger will have this specified prefix. According to the | ||
@@ -161,5 +178,8 @@ logging standard the prefix should be the name of the resource the command is executing on. For environment commands | ||
| :type log_file_prefix: str | ||
| :return: the logger object | ||
| :rtype: logging.Logger | ||
| """ | ||
| log_file_prefix = re.sub(' ', '_', log_file_prefix) | ||
@@ -182,3 +202,2 @@ log_category = '%s.%s' % (log_category, log_file_prefix) | ||
| if log_path: | ||
| # print("Logger log path: %s" % log_path) | ||
| hdlr = MultiProcessingLog(log_path, mode='a') | ||
@@ -200,5 +219,4 @@ #print 'Logger File Handler is: {0}'.format(hdlr.baseFilename) | ||
| def qs_time_this(func): | ||
| ''' | ||
| Decorator that reports the execution time. | ||
| ''' | ||
| """Decorator that reports the execution time. | ||
| """ | ||
@@ -225,15 +243,49 @@ @wraps(func) | ||
| def normalize_buffer(input_buffer): | ||
| """Clear color from input_buffer and special characters | ||
| :param str input_buffer: input buffer string from device | ||
| :return: str | ||
| """ | ||
| # \033[1;32;40m | ||
| # \033[ - Escape code | ||
| # 1 - style | ||
| # 32 - text color | ||
| # 40 - Background colour | ||
| color_pattern = re.compile(r'\[(\d+;){0,2}?\d+m|\b|' + chr(27)) # 27 - ESC character | ||
| result_buffer = '' | ||
| if not isinstance(input_buffer, basestring): | ||
| input_buffer = str(input_buffer) | ||
| match_iter = color_pattern.finditer(input_buffer) | ||
| current_index = 0 | ||
| for match_color in match_iter: | ||
| match_range = match_color.span() | ||
| result_buffer += input_buffer[current_index:match_range[0]] | ||
| current_index = match_range[1] | ||
| result_buffer += input_buffer[current_index:] | ||
| result_buffer = result_buffer.replace('\r\n', '\n') | ||
| return re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\xff]', '', result_buffer) | ||
| class MultiLineFormatter(logging.Formatter): | ||
| """Log Formatter. | ||
| """Log Formatter, Append log header to each line. | ||
| """ | ||
| Appends log header to each line. | ||
| """ | ||
| MAX_SPLIT = 1 | ||
| def format(self, record): | ||
| '''formatting for one or multi-line message | ||
| """formatting for one or multi-line message | ||
| :param record: | ||
| :return: | ||
| ''' | ||
| """ | ||
| s = '' | ||
@@ -245,2 +297,3 @@ | ||
| try: | ||
| record.msg = normalize_buffer(record.msg) | ||
| s = logging.Formatter.format(self, record) | ||
@@ -250,4 +303,5 @@ header, footer = s.rsplit(record.message, self.MAX_SPLIT) | ||
| except Exception, e: | ||
| print traceback.format_exc() | ||
| print 'logger.format: Unexpected error: ' + str(e) | ||
| print 'record = %s<<<' % record | ||
| print 'record = {}<<<'.format(record.message) | ||
| return s | ||
@@ -257,3 +311,5 @@ | ||
| class Loggable(object): | ||
| """Interface for Instances which uses Logging""" | ||
| """Interface for Instances which uses Logging | ||
| """ | ||
| LOG_LEVEL = LOG_LEVELS['WARN'] # Default Level that will be reported | ||
@@ -268,6 +324,7 @@ LOG_INFO = LOG_LEVELS['INFO'] | ||
| def setup_logger(self): | ||
| '''Setup local logger instance | ||
| """Setup local logger instance | ||
| :return: | ||
| ''' | ||
| """ | ||
| self.logger = get_qs_logger(self.__class__.__name__) | ||
@@ -274,0 +331,0 @@ self.logger.setLevel(self.LOG_LEVEL) |
+1
-1
| Metadata-Version: 1.1 | ||
| Name: cloudshell-core | ||
| Version: 2.0.136 | ||
| Version: 2.1.153 | ||
| Summary: Core package for CloudShell Python orchestration and automation. This package contains commoncode for CloudShell packages, including logging, basic interfaces and other utilities | ||
@@ -5,0 +5,0 @@ Home-page: https://github.com/QualiSystems/cloudshell-core |
+1
-1
@@ -1,1 +0,1 @@ | ||
| 2.0.136 | ||
| 2.1.153 |
| from abc import abstractmethod | ||
| class ContextBasedService(object): | ||
| @abstractmethod | ||
| def get_objects(self): | ||
| pass | ||
| @abstractmethod | ||
| def context_started(self): | ||
| pass | ||
| @abstractmethod | ||
| def context_ended(self, exc_type, exc_val, exc_tb): | ||
| pass | ||
| def __enter__(self): | ||
| self.context_started() | ||
| def __exit__(self, exc_type, exc_val, exc_tb): | ||
| self.context_ended(exc_type, exc_val, exc_tb) |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
29347
68.95%35
25%604
58.53%