bptk-py
Advanced tools
| import datetime | ||
| import jsonpickle | ||
| import os | ||
| from .externalStateAdapter import ExternalStateAdapter, InstanceState | ||
| from ..util import statecompression | ||
| from ..logger import log | ||
| class FileAdapter(ExternalStateAdapter): | ||
| def __init__(self, compress: bool, path: str): | ||
| super().__init__(compress) | ||
| self.path = path | ||
| log(f"[INFO] FileAdapter initialized with path: {path}, compression: {compress}") | ||
| def _is_already_compressed_results(self, results_log): | ||
| """ | ||
| Check if results_log is already in compressed format. | ||
| Compressed format: {scenario_manager: {scenario: {constant: [values]}}} | ||
| Uncompressed format: {step: {scenario_manager: {scenario: {constant: value}}}} | ||
| """ | ||
| if not isinstance(results_log, dict) or not results_log: | ||
| return False | ||
| # Check if the first level keys look like scenario managers (strings) rather than steps (floats/step strings) | ||
| first_key = next(iter(results_log.keys())) | ||
| if isinstance(first_key, str) and not (first_key.replace('.', '').isdigit()): | ||
| # This looks like a scenario manager name, so probably compressed format | ||
| # Double-check by looking at the structure | ||
| try: | ||
| first_sm = results_log[first_key] | ||
| if isinstance(first_sm, dict): | ||
| first_scenario = next(iter(first_sm.values())) | ||
| if isinstance(first_scenario, dict): | ||
| first_constant = next(iter(first_scenario.values())) | ||
| # If the constant value is a list, it's likely compressed | ||
| return isinstance(first_constant, list) | ||
| except (StopIteration, KeyError, AttributeError): | ||
| pass | ||
| return False | ||
| def load_instance(self, instance_uuid: str) -> InstanceState: | ||
| """ | ||
| Override the base class method to handle compression/decompression internally. | ||
| The base class tries to decompress after we've already handled it. | ||
| """ | ||
| state = self._load_instance(instance_uuid) | ||
| # Apply scenario_cache numeric key restoration (no compression, just JSON key conversion fix) | ||
| if(state is not None and state.state is not None): | ||
| if "scenario_cache" in state.state: | ||
| state.state["scenario_cache"] = self._restore_numeric_keys(state.state["scenario_cache"]) | ||
| return state | ||
| def save_instance(self, state: InstanceState): | ||
| """ | ||
| Override the base class method to handle compression internally. | ||
| The base class tries to compress before we handle it. | ||
| """ | ||
| return self._save_instance(state) | ||
| def _save_instance(self, state: InstanceState): | ||
| log(f"[INFO] FileAdapter _save_instance called for instance {state.instance_id if state else 'None'}") | ||
| # Apply compression for settings_log and results_log (scenario_cache compression is disabled) | ||
| if self.compress and state is not None and state.state is not None: | ||
| log(f"[INFO] Compression enabled, processing state for instance {state.instance_id}") | ||
| # Create a copy to avoid modifying the original state | ||
| state_copy = state.state.copy() | ||
| if "settings_log" in state_copy: | ||
| try: | ||
| log(f"[INFO] Compressing settings_log for instance {state.instance_id}") | ||
| state_copy["settings_log"] = statecompression.compress_settings(state_copy["settings_log"]) | ||
| log(f"[INFO] settings_log compressed successfully for instance {state.instance_id}") | ||
| except Exception as e: | ||
| log(f"[WARN] Failed to compress settings_log for instance {state.instance_id}: {str(e)}") | ||
| pass | ||
| # Keep original data if compression fails | ||
| if "results_log" in state_copy: | ||
| # Check if data is already in compressed format by looking at the structure | ||
| results_log = state_copy["results_log"] | ||
| if results_log and self._is_already_compressed_results(results_log): | ||
| log(f"[INFO] results_log already compressed for instance {state.instance_id}, skipping compression") | ||
| pass | ||
| else: | ||
| try: | ||
| log(f"[INFO] Compressing results_log for instance {state.instance_id}") | ||
| state_copy["results_log"] = statecompression.compress_results(state_copy["results_log"]) | ||
| log(f"[INFO] results_log compressed successfully for instance {state.instance_id}") | ||
| except Exception as e: | ||
| log(f"[WARN] Failed to compress results_log for instance {state.instance_id}: {str(e)}") | ||
| pass | ||
| # Keep original data if compression fails | ||
| else: | ||
| state_copy = state.state | ||
| data = { | ||
| "data": { | ||
| "state": jsonpickle.dumps(state_copy), | ||
| "instance_id": state.instance_id, | ||
| "time": state.time.isoformat() if isinstance(state.time, datetime.datetime) else str(state.time), | ||
| "timeout": state.timeout, | ||
| "step": state.step | ||
| } | ||
| } | ||
| file_path = os.path.join(self.path, str(state.instance_id) + ".json") | ||
| log(f"[INFO] Writing instance {state.instance_id} to file: {file_path}") | ||
| try: | ||
| f = open(file_path, "w") | ||
| f.write(jsonpickle.dumps(data)) | ||
| f.close() | ||
| log(f"[INFO] Instance {state.instance_id} saved successfully to {file_path}") | ||
| except Exception as e: | ||
| log(f"[ERROR] Failed to write instance {state.instance_id} to file {file_path}: {str(e)}") | ||
| raise | ||
| def _load_state(self) -> list[InstanceState]: | ||
| instances = [] | ||
| instance_paths = os.listdir(self.path) | ||
| for instance_uuid in instance_paths: | ||
| if instance_uuid.endswith('.json'): | ||
| uuid = instance_uuid.split(".")[0] | ||
| instance = self._load_instance(uuid) | ||
| if instance: | ||
| # Apply scenario_cache numeric key restoration (no compression, just JSON key conversion fix) | ||
| if(instance.state is not None): | ||
| if "scenario_cache" in instance.state: | ||
| instance.state["scenario_cache"] = self._restore_numeric_keys(instance.state["scenario_cache"]) | ||
| instances.append(instance) | ||
| return instances | ||
| def _load_instance(self, instance_uuid: str) -> InstanceState: | ||
| file_path = os.path.join(self.path, str(instance_uuid) + ".json") | ||
| try: | ||
| f = open(file_path, "r") | ||
| instance_data = jsonpickle.loads(f.read()) | ||
| f.close() | ||
| decoded_data = jsonpickle.loads(instance_data["data"]["state"]) | ||
| instance_id = instance_data["data"]["instance_id"] | ||
| time_str = instance_data["data"]["time"] | ||
| timeout = instance_data["data"]["timeout"] | ||
| step = instance_data["data"]["step"] | ||
| # Parse the time back from string | ||
| try: | ||
| if isinstance(time_str, str): | ||
| time = datetime.datetime.fromisoformat(time_str) | ||
| else: | ||
| time = time_str | ||
| except ValueError: | ||
| # Fallback for old format or parsing issues | ||
| time = datetime.datetime.now() | ||
| # Apply decompression for settings_log and results_log (scenario_cache compression is disabled) | ||
| if self.compress and decoded_data is not None: | ||
| if "settings_log" in decoded_data: | ||
| try: | ||
| log(f"[INFO] Decompressing settings_log for instance {instance_uuid}") | ||
| decoded_data["settings_log"] = statecompression.decompress_settings(decoded_data["settings_log"]) | ||
| log(f"[INFO] settings_log decompressed successfully for instance {instance_uuid}") | ||
| except Exception as e: | ||
| log(f"[WARN] Failed to decompress settings_log for instance {instance_uuid}: {str(e)}") | ||
| pass | ||
| # Keep original data if decompression fails | ||
| if "results_log" in decoded_data: | ||
| results_log = decoded_data["results_log"] | ||
| if self._is_already_compressed_results(results_log): | ||
| try: | ||
| log(f"[INFO] Decompressing results_log for instance {instance_uuid}") | ||
| decoded_data["results_log"] = statecompression.decompress_results(decoded_data["results_log"]) | ||
| log(f"[INFO] results_log decompressed successfully for instance {instance_uuid}") | ||
| except Exception as e: | ||
| log(f"[WARN] Failed to decompress results_log for instance {instance_uuid}: {str(e)}") | ||
| pass | ||
| else: | ||
| log(f"[INFO] Results_log doesn't appear to be compressed for instance {instance_uuid}, skipping decompression") | ||
| result = InstanceState(decoded_data, instance_id, time, timeout, step) | ||
| return result | ||
| except Exception as e: | ||
| log(f"[ERROR] Error loading instance {instance_uuid}: {str(e)}") | ||
| return None | ||
| def delete_instance(self, instance_uuid: str): | ||
| file_path = os.path.join(self.path, str(instance_uuid) + ".json") | ||
| log(f"[INFO] Deleting instance {instance_uuid}, file: {file_path}") | ||
| try: | ||
| os.remove(file_path) | ||
| log(f"[INFO] Instance {instance_uuid} deleted successfully") | ||
| except Exception as e: | ||
| log(f"[ERROR] Error deleting instance {instance_uuid}: {str(e)}") | ||
| raise |
| import datetime | ||
| import jsonpickle | ||
| import psycopg | ||
| from .externalStateAdapter import ExternalStateAdapter, InstanceState | ||
| from ..logger import log | ||
| # Postgres Create Script: | ||
| # | ||
| # CREATE TABLE "state" ( | ||
| # "state" text, | ||
| # "instance_id" text, | ||
| # "time" text, | ||
| # "timeout.weeks" bigint, | ||
| # "timeout.days" bigint, | ||
| # "timeout.hours" bigint, | ||
| # "timeout.minutes" bigint, | ||
| # "timeout.seconds" bigint, | ||
| # "timeout.milliseconds" bigint, | ||
| # "timeout.microseconds" bigint, | ||
| # "step" bigint | ||
| # ); | ||
| class PostgresAdapter(ExternalStateAdapter): | ||
| def __init__(self, postgres_client, compress: bool): | ||
| super().__init__(compress) | ||
| self._postgres_client = postgres_client | ||
| log(f"[INFO] PostgresAdapter initialized with compression: {compress}") | ||
| def _load_instance(self, instance_uuid: str) -> InstanceState: | ||
| log(f"[INFO] Loading instance {instance_uuid} from PostgreSQL") | ||
| try: | ||
| with self._postgres_client.cursor() as cur: | ||
| cur.execute("SELECT * FROM state WHERE instance_id = %s", (instance_uuid,)) | ||
| res = cur.fetchone() | ||
| if res is None: | ||
| log(f"[INFO] No data found in PostgreSQL for instance {instance_uuid}") | ||
| else: | ||
| log(f"[INFO] Data retrieved from PostgreSQL for instance {instance_uuid}") | ||
| result = self._tuple_to_state(res) | ||
| if result: | ||
| log(f"[INFO] Instance {instance_uuid} loaded successfully from PostgreSQL") | ||
| return result | ||
| except Exception as e: | ||
| log(f"[ERROR] Failed to load instance {instance_uuid} from PostgreSQL: {str(e)}") | ||
| raise | ||
| def delete_instance(self, instance_uuid: str): | ||
| log(f"[INFO] Deleting instance {instance_uuid} from PostgreSQL") | ||
| try: | ||
| with self._postgres_client.cursor() as cur: | ||
| cur.execute("DELETE FROM state WHERE instance_id = %s", (instance_uuid,)) | ||
| self._postgres_client.commit() | ||
| log(f"[INFO] Instance {instance_uuid} deleted successfully from PostgreSQL") | ||
| except Exception as e: | ||
| log(f"[ERROR] Failed to delete instance {instance_uuid} from PostgreSQL: {str(e)}") | ||
| raise | ||
| def _save_instance(self, instance_state: InstanceState): | ||
| log(f"[INFO] Saving instance {instance_state.instance_id if instance_state else 'None'} to PostgreSQL") | ||
| try: | ||
| with self._postgres_client.cursor() as cur: | ||
| cur.execute("SELECT * FROM state WHERE instance_id = %s", (instance_state.instance_id,)) | ||
| postgres_data = { | ||
| "state": jsonpickle.dumps(instance_state.state) if instance_state.state is not None else None, | ||
| "instance_id": instance_state.instance_id, | ||
| "time": str(instance_state.time), | ||
| "timeout.weeks": instance_state.timeout["weeks"], | ||
| "timeout.days": instance_state.timeout["days"], | ||
| "timeout.hours": instance_state.timeout["hours"], | ||
| "timeout.minutes": instance_state.timeout["minutes"], | ||
| "timeout.seconds": instance_state.timeout["seconds"], | ||
| "timeout.milliseconds": instance_state.timeout["milliseconds"], | ||
| "timeout.microseconds": instance_state.timeout["microseconds"], | ||
| "step": instance_state.step | ||
| } | ||
| res = cur.fetchone() | ||
| if res is None: | ||
| log(f"[INFO] Inserting new instance {instance_state.instance_id} into PostgreSQL") | ||
| cur.execute( | ||
| "INSERT INTO state (state, instance_id, time, \"timeout.weeks\", \"timeout.days\", \"timeout.hours\", \"timeout.minutes\", \"timeout.seconds\", \"timeout.milliseconds\", \"timeout.microseconds\", step) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", | ||
| (postgres_data["state"], postgres_data["instance_id"], postgres_data["time"], postgres_data["timeout.weeks"], postgres_data["timeout.days"], postgres_data["timeout.hours"], postgres_data["timeout.minutes"], postgres_data["timeout.seconds"], postgres_data["timeout.milliseconds"], postgres_data["timeout.microseconds"], postgres_data["step"]) | ||
| ) | ||
| self._postgres_client.commit() | ||
| log(f"[INFO] Instance {instance_state.instance_id} inserted successfully into PostgreSQL") | ||
| elif res[10] != instance_state.step: | ||
| log(f"[INFO] Updating existing instance {instance_state.instance_id} in PostgreSQL (step {res[10]} -> {instance_state.step})") | ||
| cur.execute("UPDATE state SET state = %s, time = %s, \"timeout.weeks\" = %s, \"timeout.days\" = %s, \"timeout.hours\" = %s, \"timeout.minutes\" = %s, \"timeout.seconds\" = %s, \"timeout.milliseconds\" = %s, \"timeout.microseconds\" = %s, step = %s WHERE instance_id = %s", (postgres_data["state"], postgres_data["time"], postgres_data["timeout.weeks"], postgres_data["timeout.days"], postgres_data["timeout.hours"], postgres_data["timeout.minutes"], postgres_data["timeout.seconds"], postgres_data["timeout.milliseconds"], postgres_data["timeout.microseconds"], postgres_data["step"], postgres_data["instance_id"])) | ||
| self._postgres_client.commit() | ||
| log(f"[INFO] Instance {instance_state.instance_id} updated successfully in PostgreSQL") | ||
| else: | ||
| log(f"[INFO] Instance {instance_state.instance_id} already up to date in PostgreSQL (step {res[10]})") | ||
| except (Exception, psycopg.Error) as error: | ||
| log(f"[ERROR] Failed to save instance {instance_state.instance_id if instance_state else 'None'} to PostgreSQL: {str(error)}") | ||
| raise | ||
| def _tuple_to_state(self, state_tuple: tuple) -> InstanceState: | ||
| if state_tuple is None: | ||
| return None | ||
| log(f"[INFO] Converting PostgreSQL tuple to InstanceState for instance {state_tuple[1] if state_tuple else 'None'}") | ||
| return InstanceState( | ||
| state=jsonpickle.loads(state_tuple[0]) if state_tuple[0] is not None else None, | ||
| instance_id=state_tuple[1], | ||
| time=datetime.datetime.strptime(state_tuple[2], "%Y-%m-%d %H:%M:%S.%f"), | ||
| timeout = { | ||
| "weeks": state_tuple[3], | ||
| "days": state_tuple[4], | ||
| "hours": state_tuple[5], | ||
| "minutes": state_tuple[6], | ||
| "seconds": state_tuple[7], | ||
| "milliseconds": state_tuple[8], | ||
| "microseconds":state_tuple[9] | ||
| }, | ||
| step=state_tuple[10] | ||
| ) | ||
| import datetime | ||
| import jsonpickle | ||
| import jsonpickle.handlers | ||
| import redis | ||
| import numpy as np | ||
| from .externalStateAdapter import ExternalStateAdapter, InstanceState | ||
| from ..logger import log | ||
| # Try to configure ujson backend, but fall back gracefully if not available | ||
| try: | ||
| import ujson | ||
| jsonpickle.load_backend('ujson', 'ujson', ValueError) | ||
| jsonpickle.set_preferred_backend('ujson') | ||
| jsonpickle.set_encoder_options('ujson', ensure_ascii=False, sort_keys=True) | ||
| jsonpickle.set_decoder_options('ujson', precise_float=True) | ||
| log("[INFO] jsonpickle configured to use ujson backend") | ||
| except (ImportError, AssertionError) as e: | ||
| # Fall back to default JSON backend if ujson is not available | ||
| log(f"[INFO] ujson not available ({e}), using default JSON backend") | ||
| pass | ||
| # Configure safer numpy serialization - convert to native Python types | ||
| # Use proper BaseHandler classes as per jsonpickle documentation | ||
| class NumpyFloatHandler(jsonpickle.handlers.BaseHandler): | ||
| def flatten(self, obj, data): | ||
| return float(obj) | ||
| class NumpyIntHandler(jsonpickle.handlers.BaseHandler): | ||
| def flatten(self, obj, data): | ||
| return int(obj) | ||
| class NumpyBoolHandler(jsonpickle.handlers.BaseHandler): | ||
| def flatten(self, obj, data): | ||
| return bool(obj) | ||
| class NumpyArrayHandler(jsonpickle.handlers.BaseHandler): | ||
| def flatten(self, obj, data): | ||
| return obj.tolist() | ||
| # Register handlers using proper jsonpickle handler classes | ||
| jsonpickle.handlers.register(np.float64, NumpyFloatHandler, base=True) | ||
| jsonpickle.handlers.register(np.float32, NumpyFloatHandler, base=True) | ||
| jsonpickle.handlers.register(np.int64, NumpyIntHandler, base=True) | ||
| jsonpickle.handlers.register(np.int32, NumpyIntHandler, base=True) | ||
| jsonpickle.handlers.register(np.int16, NumpyIntHandler, base=True) | ||
| jsonpickle.handlers.register(np.int8, NumpyIntHandler, base=True) | ||
| jsonpickle.handlers.register(np.uint64, NumpyIntHandler, base=True) | ||
| jsonpickle.handlers.register(np.uint32, NumpyIntHandler, base=True) | ||
| jsonpickle.handlers.register(np.uint16, NumpyIntHandler, base=True) | ||
| jsonpickle.handlers.register(np.uint8, NumpyIntHandler, base=True) | ||
| jsonpickle.handlers.register(np.bool_, NumpyBoolHandler, base=True) | ||
| jsonpickle.handlers.register(np.ndarray, NumpyArrayHandler, base=True) | ||
| class RedisAdapter(ExternalStateAdapter): | ||
| """ | ||
| Redis adapter for storing BPTK instance state in Redis. | ||
| Optimized for Upstash Redis but works with any Redis instance. | ||
| """ | ||
| def __init__(self, redis_client: redis.Redis, compress: bool = True, key_prefix: str = "bptk:state"): | ||
| """ | ||
| Initialize the Redis adapter. | ||
| Args: | ||
| redis_client: A configured Redis client (e.g., from redis.from_url()) | ||
| compress: Whether to compress state data (default: True) | ||
| key_prefix: Prefix for Redis keys (default: "bptk:state") | ||
| """ | ||
| super().__init__(compress) | ||
| self._redis_client = redis_client | ||
| self._key_prefix = key_prefix | ||
| log(f"[INFO] RedisAdapter initialized with key_prefix: {key_prefix}, compression: {compress}") | ||
| def _get_instance_key(self, instance_uuid: str) -> str: | ||
| """Generate Redis key for an instance.""" | ||
| return f"{self._key_prefix}:{instance_uuid}" | ||
| def _load_instance(self, instance_uuid: str) -> InstanceState: | ||
| """Load a single instance from Redis.""" | ||
| key = self._get_instance_key(instance_uuid) | ||
| log(f"[INFO] Loading instance {instance_uuid} from Redis key: {key}") | ||
| try: | ||
| data = self._redis_client.get(key) | ||
| if data is None: | ||
| log(f"[INFO] No data found in Redis for instance {instance_uuid}") | ||
| return None | ||
| log(f"[INFO] Data retrieved from Redis for instance {instance_uuid}") | ||
| instance_data = jsonpickle.decode(data) | ||
| state = jsonpickle.decode(instance_data["state"]) if instance_data["state"] is not None else None | ||
| log(f"[INFO] Decoding instance data for {instance_uuid}") | ||
| result = InstanceState( | ||
| state=state, | ||
| instance_id=instance_data["instance_id"], | ||
| time=datetime.datetime.fromisoformat(instance_data["time"]), | ||
| timeout=instance_data["timeout"], | ||
| step=instance_data["step"] | ||
| ) | ||
| log(f"[INFO] Instance {instance_uuid} loaded successfully from Redis") | ||
| return result | ||
| except (KeyError, ValueError, TypeError) as e: | ||
| log(f"[ERROR] Failed to load instance {instance_uuid} from Redis: {str(e)}") | ||
| return None | ||
| except Exception as e: | ||
| log(f"[ERROR] Unexpected error loading instance {instance_uuid} from Redis: {str(e)}") | ||
| return None | ||
| def load_instance(self, instance_uuid: str) -> InstanceState: | ||
| """ | ||
| Override the base class method to handle compression/decompression internally. | ||
| This prevents double decompression issues similar to FileAdapter. | ||
| """ | ||
| log(f"[INFO] RedisAdapter loading instance {instance_uuid}") | ||
| state = self._load_instance(instance_uuid) | ||
| # Apply scenario_cache numeric key restoration (no compression, just JSON key conversion fix) | ||
| if(state is not None and state.state is not None): | ||
| if "scenario_cache" in state.state: | ||
| log(f"[INFO] Restoring numeric keys in scenario_cache for instance {instance_uuid}") | ||
| state.state["scenario_cache"] = self._restore_numeric_keys(state.state["scenario_cache"]) | ||
| log(f"[INFO] Numeric keys restored for instance {instance_uuid}") | ||
| return state | ||
| def save_instance(self, state: InstanceState): | ||
| """ | ||
| Override the base class method to handle compression internally. | ||
| This prevents double compression issues similar to FileAdapter. | ||
| """ | ||
| log(f"[INFO] RedisAdapter saving instance {state.instance_id if state else 'None'}") | ||
| return self._save_instance(state) | ||
| def delete_instance(self, instance_uuid: str): | ||
| """Delete an instance from Redis.""" | ||
| key = self._get_instance_key(instance_uuid) | ||
| log(f"[INFO] Deleting instance {instance_uuid} from Redis key: {key}") | ||
| try: | ||
| result = self._redis_client.delete(key) | ||
| if result > 0: | ||
| log(f"[INFO] Instance {instance_uuid} deleted successfully from Redis") | ||
| else: | ||
| log(f"[WARN] Instance {instance_uuid} not found in Redis") | ||
| except Exception as e: | ||
| log(f"[ERROR] Failed to delete instance {instance_uuid} from Redis: {str(e)}") | ||
| raise | ||
| def _save_instance(self, instance_state: InstanceState): | ||
| """Save a single instance to Redis.""" | ||
| if instance_state is None or instance_state.instance_id is None: | ||
| log("[WARN] Cannot save instance: instance_state or instance_id is None") | ||
| return | ||
| log(f"[INFO] _save_instance called for instance {instance_state.instance_id}") | ||
| try: | ||
| # Prepare data for storage with make_refs=False to prevent py/id issues | ||
| log(f"[INFO] Preparing data for Redis storage for instance {instance_state.instance_id}") | ||
| redis_data = { | ||
| "state": jsonpickle.encode(instance_state.state, make_refs=False) if instance_state.state is not None else None, | ||
| "instance_id": instance_state.instance_id, | ||
| "time": instance_state.time.isoformat(), | ||
| "timeout": instance_state.timeout, | ||
| "step": instance_state.step | ||
| } | ||
| key = self._get_instance_key(instance_state.instance_id) | ||
| log(f"[INFO] Storing instance {instance_state.instance_id} to Redis key: {key}") | ||
| # Store data with make_refs=False to prevent object reference issues | ||
| self._redis_client.set(key, jsonpickle.encode(redis_data, make_refs=False)) | ||
| log(f"[INFO] Instance {instance_state.instance_id} stored successfully in Redis") | ||
| # Set TTL based on timeout if specified | ||
| if instance_state.timeout: | ||
| log(f"[INFO] Setting TTL for instance {instance_state.instance_id} based on timeout: {instance_state.timeout}") | ||
| timeout_seconds = ( | ||
| instance_state.timeout.get("weeks", 0) * 7 * 24 * 3600 + | ||
| instance_state.timeout.get("days", 0) * 24 * 3600 + | ||
| instance_state.timeout.get("hours", 0) * 3600 + | ||
| instance_state.timeout.get("minutes", 0) * 60 + | ||
| instance_state.timeout.get("seconds", 0) + | ||
| instance_state.timeout.get("milliseconds", 0) / 1000 + | ||
| instance_state.timeout.get("microseconds", 0) / 1000000 | ||
| ) | ||
| if timeout_seconds > 0: | ||
| self._redis_client.expire(key, int(timeout_seconds)) | ||
| log(f"[INFO] TTL set to {int(timeout_seconds)} seconds for instance {instance_state.instance_id}") | ||
| except Exception as error: | ||
| log(f"[ERROR] Error saving instance {instance_state.instance_id} to Redis: {error}") | ||
| raise | ||
| # /`- | ||
| # _ _ _ /####`- | ||
| #| | | | (_) /########`- | ||
| #| |_ _ __ __ _ _ __ ___ ___ _ __ | |_ _ ___ /###########`- | ||
| #| __| '__/ _` | '_ \/ __|/ _ \ '_ \| __| / __| ____ -###########/ | ||
| #| |_| | | (_| | | | \__ \ __/ | | | |_| \__ \ | | `-#######/ | ||
| # \__|_| \__,_|_| |_|___/\___|_| |_|\__|_|___/ |____| `- # / | ||
| # | ||
| # Copyright (c) 2024 transentis labs GmbH | ||
| # MIT License | ||
| import datetime | ||
| try: | ||
| import logfire | ||
| LOGFIRE_AVAILABLE = True | ||
| except ImportError: | ||
| LOGFIRE_AVAILABLE = False | ||
| class LogfireAdapter: | ||
| """ | ||
| Adapter for routing BPTK logs to Pydantic Logfire. | ||
| """ | ||
| def __init__(self, **logfire_config): | ||
| """ | ||
| Initialize the Logfire adapter. | ||
| Args: | ||
| **logfire_config: Configuration parameters to pass to logfire.configure() | ||
| Common options include: | ||
| - project_name: Name of your Logfire project | ||
| - token: Your Logfire API token | ||
| - environment: Environment name (e.g., 'development', 'production') | ||
| """ | ||
| if not LOGFIRE_AVAILABLE: | ||
| raise ImportError( | ||
| "Pydantic Logfire is not installed. " | ||
| "Please install it with: pip install pydantic-logfire" | ||
| ) | ||
| self.configured = False | ||
| self.logfire_config = logfire_config | ||
| self._configure() | ||
| def _configure(self): | ||
| """Configure Logfire with the provided settings.""" | ||
| if not self.configured: | ||
| logfire.configure(**self.logfire_config) | ||
| self.configured = True | ||
| def log(self, message: str): | ||
| """ | ||
| Send a log message to Logfire. | ||
| Args: | ||
| message: The log message (may contain [ERROR], [WARN], [INFO], [DEBUG] prefixes) | ||
| """ | ||
| if not self.configured: | ||
| self._configure() | ||
| # Parse the log level from the message if it contains a bracket pattern | ||
| level = "INFO" # default | ||
| if "[ERROR]" in message: | ||
| level = "ERROR" | ||
| elif "[WARN]" in message: | ||
| level = "WARN" | ||
| elif "[INFO]" in message: | ||
| level = "INFO" | ||
| elif "[DEBUG]" in message: | ||
| level = "DEBUG" | ||
| # Clean the message by removing the level brackets if present | ||
| clean_message = message | ||
| for bracket_level in ["[ERROR]", "[WARN]", "[INFO]", "[DEBUG]"]: | ||
| clean_message = clean_message.replace(bracket_level, "").strip() | ||
| # Send to Logfire with appropriate level | ||
| if level == "ERROR": | ||
| logfire.error(clean_message) | ||
| elif level == "WARN": | ||
| logfire.warn(clean_message) | ||
| elif level == "DEBUG": | ||
| logfire.debug(clean_message) | ||
| else: # INFO or default | ||
| logfire.info(clean_message) |
| """ | ||
| Example usage of Pydantic Logfire integration with BPTK-Py | ||
| This demonstrates three different ways to enable Logfire logging in BPTK-Py. | ||
| """ | ||
| # Method 1: Configure Logfire when initializing BPTK | ||
| from BPTK_Py import bptk | ||
| # Initialize BPTK with Logfire configuration | ||
| bptk_instance = bptk( | ||
| loglevel="INFO", | ||
| configuration={ | ||
| "logfire_config": { | ||
| "project_name": "bptk_simulations", | ||
| "environment": "development", | ||
| # Add your Logfire token if needed | ||
| # "token": "your-logfire-token" | ||
| } | ||
| } | ||
| ) | ||
| # Method 2: Configure Logfire directly via the logger module | ||
| import BPTK_Py.logger.logger as logmod | ||
| # Configure Logfire (if not already done via bptk initialization) | ||
| logmod.configure_logfire( | ||
| project_name="bptk_simulations", | ||
| environment="production" | ||
| ) | ||
| # Later, you can disable Logfire if needed | ||
| # logmod.disable_logfire() | ||
| # Method 3: Use Logfire alongside file logging | ||
| # By default, logs go to both file and Logfire when configured | ||
| logmod.logmodes = ["logfile"] # File logging is still active | ||
| logmod.loglevel = "INFO" | ||
| # Now when you run simulations, logs will be sent to Logfire | ||
| # Example: | ||
| # bptk_instance.plot_scenarios( | ||
| # scenarios=["baseline", "optimized"], | ||
| # scenario_managers=["my_model"] | ||
| # ) | ||
| # The logs will include all INFO, WARN, and ERROR messages from: | ||
| # - Model loading and parsing | ||
| # - Scenario execution | ||
| # - File monitoring | ||
| # - Error handling | ||
| # - And more... | ||
| # To check if Logfire is available and enabled: | ||
| print(f"Logfire available: {logmod.LOGFIRE_AVAILABLE}") | ||
| print(f"Logfire enabled: {logmod.logfire_enabled}") |
| """ | ||
| Test configuration utilities for external state adapters. | ||
| Handles loading configuration from .env files and provides test helpers. | ||
| """ | ||
| import os | ||
| from pathlib import Path | ||
| from typing import Optional | ||
| # Try to load python-dotenv if available | ||
| try: | ||
| from dotenv import load_dotenv | ||
| load_dotenv(Path(__file__).parent / '.env') | ||
| except ImportError: | ||
| pass | ||
| class TestConfig: | ||
| """Configuration helper for external state adapter tests.""" | ||
| @staticmethod | ||
| def get_postgres_config() -> Optional[dict]: | ||
| """Get PostgreSQL configuration from environment variables.""" | ||
| if not TestConfig.postgres_tests_enabled(): | ||
| return None | ||
| return { | ||
| 'host': os.getenv('POSTGRES_HOST', 'localhost'), | ||
| 'port': int(os.getenv('POSTGRES_PORT', '5432')), | ||
| 'dbname': os.getenv('POSTGRES_DB', 'bptk_test'), | ||
| 'user': os.getenv('POSTGRES_USER'), | ||
| 'password': os.getenv('POSTGRES_PASSWORD') | ||
| } | ||
| @staticmethod | ||
| def get_redis_config() -> Optional[str]: | ||
| """Get Redis URL from environment variables.""" | ||
| if not TestConfig.redis_tests_enabled(): | ||
| return None | ||
| return os.getenv('REDIS_URL') | ||
| @staticmethod | ||
| def postgres_tests_enabled() -> bool: | ||
| """Check if PostgreSQL tests should be run.""" | ||
| return os.getenv('ENABLE_POSTGRES_TESTS', 'false').lower() == 'true' | ||
| @staticmethod | ||
| def redis_tests_enabled() -> bool: | ||
| """Check if Redis tests should be run.""" | ||
| return os.getenv('ENABLE_REDIS_TESTS', 'false').lower() == 'true' | ||
| @staticmethod | ||
| def get_test_timeout() -> int: | ||
| """Get test timeout in seconds.""" | ||
| return int(os.getenv('TEST_TIMEOUT', '30')) | ||
| @staticmethod | ||
| def get_cleanup_timeout() -> int: | ||
| """Get cleanup timeout in seconds.""" | ||
| return int(os.getenv('CLEANUP_TIMEOUT', '10')) | ||
| def requires_postgres(test_func): | ||
| """Decorator to skip tests that require PostgreSQL if not enabled.""" | ||
| import pytest | ||
| return pytest.mark.skipif( | ||
| not TestConfig.postgres_tests_enabled(), | ||
| reason="PostgreSQL tests not enabled. Set ENABLE_POSTGRES_TESTS=true in .env" | ||
| )(test_func) | ||
| def requires_redis(test_func): | ||
| """Decorator to skip tests that require Redis if not enabled.""" | ||
| import pytest | ||
| return pytest.mark.skipif( | ||
| not TestConfig.redis_tests_enabled(), | ||
| reason="Redis tests not enabled. Set ENABLE_REDIS_TESTS=true in .env" | ||
| )(test_func) |
| Metadata-Version: 2.4 | ||
| Name: bptk-py | ||
| Version: 2.1.1 | ||
| Version: 2.2.0 | ||
| Summary: A python simulation engine for System Dynamics & Agent based models | ||
@@ -32,4 +32,8 @@ Author-email: transentis <support@transentis.com> | ||
| Requires-Dist: jsonpickle==4.1.1 | ||
| Requires-Dist: psycopg==3.2.10 | ||
| Requires-Dist: redis==6.4.0 | ||
| Requires-Dist: ujson==5.10.0 | ||
| Provides-Extra: test | ||
| Requires-Dist: pytest; extra == "test" | ||
| Requires-Dist: python-dotenv; extra == "test" | ||
| Dynamic: license-file | ||
@@ -80,2 +84,9 @@ | ||
| ### 2.2.0 | ||
| * Add externalStateAdapters for Postgres and Redis | ||
| * Add option to externalize state completely, thus turning bptkServer into a stateless server | ||
| * Add pydantic logfire as logging backend | ||
| * Improve return message on bptkServer /stop-instance | ||
| ### 2.1.1 | ||
@@ -82,0 +93,0 @@ |
@@ -16,4 +16,8 @@ pandas==2.3.0 | ||
| jsonpickle==4.1.1 | ||
| psycopg==3.2.10 | ||
| redis==6.4.0 | ||
| ujson==5.10.0 | ||
| [test] | ||
| pytest | ||
| python-dotenv |
@@ -14,3 +14,8 @@ LICENSE | ||
| BPTK_Py/externalstateadapter/externalStateAdapter.py | ||
| BPTK_Py/externalstateadapter/file_adapter.py | ||
| BPTK_Py/externalstateadapter/postgres_adapter.py | ||
| BPTK_Py/externalstateadapter/redis_adapter.py | ||
| BPTK_Py/logger/__init__.py | ||
| BPTK_Py/logger/logfire_adapter.py | ||
| BPTK_Py/logger/logfire_usage_example.py | ||
| BPTK_Py/logger/logger.py | ||
@@ -100,2 +105,3 @@ BPTK_Py/modeling/__init__.py | ||
| tests/test_bptk.py | ||
| tests/test_config.py | ||
| tests/test_external_state.py | ||
@@ -102,0 +108,0 @@ tests/test_sddsl.py |
@@ -75,2 +75,11 @@ # /`- | ||
| # Pydantic Logfire configuration (optional) | ||
| # Set to None to disable, or provide a dict with Logfire config | ||
| "logfire_config": None, | ||
| # Example: | ||
| # "logfire_config": { | ||
| # "project_name": "bptk_simulations", | ||
| # "environment": "development" | ||
| # }, | ||
| "set_scenario_monitor": True, | ||
@@ -77,0 +86,0 @@ "set_model_monitor": True, |
@@ -1,1 +0,23 @@ | ||
| from .externalStateAdapter import ExternalStateAdapter, InstanceState, FileAdapter | ||
| from .externalStateAdapter import ExternalStateAdapter, InstanceState | ||
| from .file_adapter import FileAdapter | ||
| # Optional imports - only import if dependencies are available | ||
| try: | ||
| from .postgres_adapter import PostgresAdapter | ||
| except ImportError: | ||
| class PostgresAdapter: | ||
| def __init__(self, *args, **kwargs): | ||
| raise ImportError( | ||
| "PostgresAdapter requires 'psycopg' to be installed. " | ||
| "Install it with: pip install psycopg[binary]" | ||
| ) | ||
| try: | ||
| from .redis_adapter import RedisAdapter | ||
| except ImportError: | ||
| class RedisAdapter: | ||
| def __init__(self, *args, **kwargs): | ||
| raise ImportError( | ||
| "RedisAdapter requires 'redis' to be installed. " | ||
| "Install it with: pip install redis" | ||
| ) |
@@ -9,2 +9,3 @@ from abc import ABCMeta, abstractmethod | ||
| import os | ||
| from ..logger import log | ||
@@ -17,3 +18,3 @@ @dataclass | ||
| timeout: Any | ||
| step: int | ||
| step: Any | ||
@@ -24,38 +25,88 @@ class ExternalStateAdapter(metaclass=ABCMeta): | ||
| self.compress = compress | ||
| log(f"[INFO] ExternalStateAdapter initialized with compression: {compress}") | ||
| def save_state(self, state: list[InstanceState]): | ||
| if(self.compress): | ||
| for cur_state in state: | ||
| if(cur_state is not None and cur_state.state is not None): | ||
| cur_state.state["settings_log"] = statecompression.compress_settings(cur_state.state["settings_log"]) | ||
| cur_state.state["results_log"] = statecompression.compress_results(cur_state.state["results_log"]) | ||
| return self._save_state(state) | ||
| def save_instance(self, state: InstanceState): | ||
| if(self.compress and state is not None and state.state is not None): | ||
| log(f"[INFO] Saving instance {state.instance_id if state else 'None'}") | ||
| try: | ||
| if(self.compress and state is not None and state.state is not None): | ||
| log(f"[INFO] Compressing state for instance {state.instance_id}") | ||
| state.state["settings_log"] = statecompression.compress_settings(state.state["settings_log"]) | ||
| state.state["results_log"] = statecompression.compress_results(state.state["results_log"]) | ||
| return self._save_instance(state) | ||
| log(f"[INFO] State compression completed for instance {state.instance_id}") | ||
| result = self._save_instance(state) | ||
| log(f"[INFO] Instance {state.instance_id if state else 'None'} saved successfully") | ||
| return result | ||
| except Exception as e: | ||
| log(f"[ERROR] Failed to save instance {state.instance_id if state else 'None'}: {str(e)}") | ||
| raise | ||
| def load_state(self) -> list[InstanceState]: | ||
| state = self._load_state() | ||
| if(self.compress): | ||
| for cur_state in state: | ||
| if(cur_state is not None and cur_state.state is not None): | ||
| cur_state.state["settings_log"] = statecompression.decompress_settings(cur_state.state["settings_log"]) | ||
| cur_state.state["results_log"] = statecompression.decompress_results(cur_state.state["results_log"]) | ||
| return state | ||
| def load_instance(self, instance_uuid: str) -> InstanceState: | ||
| state = self._load_instance(instance_uuid) | ||
| if(self.compress and state is not None and state.state is not None): | ||
| state.state["settings_log"] = statecompression.decompress_settings(state.state["settings_log"]) | ||
| state.state["results_log"] = statecompression.decompress_results(state.state["results_log"]) | ||
| return state | ||
| log(f"[INFO] Loading instance {instance_uuid}") | ||
| try: | ||
| state = self._load_instance(instance_uuid) | ||
| if state is None: | ||
| log(f"[WARN] No state found for instance {instance_uuid}") | ||
| return state | ||
| @abstractmethod | ||
| def _save_state(self, state: list[InstanceState]): | ||
| pass | ||
| log(f"[INFO] State loaded for instance {instance_uuid}") | ||
| if(self.compress and state.state is not None): | ||
| log(f"[INFO] Decompressing state for instance {instance_uuid}") | ||
| state.state["settings_log"] = statecompression.decompress_settings(state.state["settings_log"]) | ||
| state.state["results_log"] = statecompression.decompress_results(state.state["results_log"]) | ||
| log(f"[INFO] State decompression completed for instance {instance_uuid}") | ||
| # Always restore numeric keys in scenario_cache (no compression, just JSON key conversion fix) | ||
| if(state.state is not None): | ||
| if "scenario_cache" in state.state: | ||
| log(f"[INFO] Restoring numeric keys in scenario_cache for instance {instance_uuid}") | ||
| state.state["scenario_cache"] = self._restore_numeric_keys(state.state["scenario_cache"]) | ||
| log(f"[INFO] Numeric keys restored for instance {instance_uuid}") | ||
| log(f"[INFO] Instance {instance_uuid} loaded successfully") | ||
| return state | ||
| except Exception as e: | ||
| log(f"[ERROR] Failed to load instance {instance_uuid}: {str(e)}") | ||
| raise | ||
| def _restore_numeric_keys(self, data): | ||
| """ | ||
| Recursively restore numeric keys that were converted to strings during JSON serialization. | ||
| This handles the scenario_cache structure where floating point timesteps get converted to strings. | ||
| """ | ||
| if not isinstance(data, dict): | ||
| return data | ||
| restored = {} | ||
| for key, value in data.items(): | ||
| # Try to convert string keys back to numbers | ||
| new_key = key | ||
| if isinstance(key, str): | ||
| # Try to convert to float first (for timesteps like "1.0", "2.5") | ||
| try: | ||
| if '.' in key: | ||
| new_key = float(key) | ||
| log(f"[INFO] Converted string key '{key}' to float {new_key}") | ||
| else: | ||
| # Try integer conversion for whole numbers | ||
| new_key = int(key) | ||
| log(f"[INFO] Converted string key '{key}' to int {new_key}") | ||
| except ValueError: | ||
| # If conversion fails, keep as string | ||
| new_key = key | ||
| # Recursively process nested dictionaries | ||
| if isinstance(value, dict): | ||
| restored[new_key] = self._restore_numeric_keys(value) | ||
| else: | ||
| restored[new_key] = value | ||
| return restored | ||
| @abstractmethod | ||
@@ -66,6 +117,2 @@ def _save_instance(self, state: InstanceState): | ||
| @abstractmethod | ||
| def _load_state(self) -> list[InstanceState]: | ||
| pass | ||
| @abstractmethod | ||
| def _load_instance(self, instance_uuid: str) -> InstanceState: | ||
@@ -78,56 +125,1 @@ pass | ||
| class FileAdapter(ExternalStateAdapter): | ||
| def __init__(self, compress: bool, path: str): | ||
| super().__init__(compress) | ||
| self.path = path | ||
| def _save_state(self, instance_states: list[InstanceState]): | ||
| for state in instance_states: | ||
| self._save_instance(state) | ||
| def _save_instance(self, state: InstanceState): | ||
| data = { | ||
| "data": { | ||
| "state": jsonpickle.dumps(state.state), | ||
| "instance_id": state.instance_id, | ||
| "time": str(state.time), | ||
| "timeout": state.timeout, | ||
| "step": state.step | ||
| } | ||
| } | ||
| f = open(os.path.join(self.path, str(state.instance_id) + ".json"), "w") | ||
| f.write(jsonpickle.dumps(data)) | ||
| f.close() | ||
| def _load_state(self) -> list[InstanceState]: | ||
| instances = [] | ||
| instance_paths = os.listdir(self.path) | ||
| for instance_uuid in instance_paths: | ||
| instances.append(self._load_instance(instance_uuid.split(".")[0])) | ||
| return instances | ||
| def _load_instance(self, instance_uuid: str) -> InstanceState: | ||
| try: | ||
| f = open(os.path.join(self.path, str(instance_uuid) + ".json"), "r") | ||
| instance_data = jsonpickle.loads(f.read()) | ||
| decoded_data = jsonpickle.loads(instance_data["data"]["state"]) | ||
| instance_id = instance_data["data"]["instance_id"] | ||
| timeout = instance_data["data"]["timeout"] | ||
| step = instance_data["data"]["step"] | ||
| return InstanceState(decoded_data, instance_id, datetime.datetime.now(), timeout, step) | ||
| except Exception as e: | ||
| print("Error: " + str(e)) | ||
| return None | ||
| def delete_instance(self, instance_uuid: str): | ||
| try: | ||
| os.remove(os.path.join(self.path, str(instance_uuid) + ".json")) | ||
| except Exception as e: | ||
| print("Error: " + str(e)) |
@@ -14,2 +14,4 @@ # /`- | ||
| import logging | ||
| # Configuration variables | ||
| loglevel = "WARN" | ||
@@ -19,4 +21,60 @@ logfile = "bptk_py.log" | ||
| # Logfire adapter support | ||
| logfire_adapter = None | ||
| logfire_enabled = False | ||
| try: | ||
| from .logfire_adapter import LogfireAdapter, LOGFIRE_AVAILABLE | ||
| except ImportError: | ||
| LOGFIRE_AVAILABLE = False | ||
| LogfireAdapter = None | ||
| def configure_logfire(**logfire_config): | ||
| """ | ||
| Configure and enable Pydantic Logfire logging. | ||
| Args: | ||
| **logfire_config: Configuration parameters to pass to logfire.configure() | ||
| Common options include: | ||
| - project_name: Name of your Logfire project | ||
| - token: Your Logfire API token | ||
| - environment: Environment name (e.g., 'development', 'production') | ||
| Returns: | ||
| bool: True if Logfire was successfully configured, False otherwise | ||
| Example: | ||
| >>> import BPTK_Py.logger.logger as logmod | ||
| >>> logmod.configure_logfire(project_name="my_bptk_project") | ||
| """ | ||
| global logfire_adapter, logfire_enabled | ||
| if not LOGFIRE_AVAILABLE: | ||
| if "logfile" in logmodes: | ||
| with open(logfile, "a", encoding="UTF-8") as myfile: | ||
| myfile.write(f"{datetime.datetime.now()}, [WARN] Pydantic Logfire is not installed. " | ||
| "Install with: pip install pydantic-logfire\n") | ||
| return False | ||
| try: | ||
| logfire_adapter = LogfireAdapter(**logfire_config) | ||
| logfire_enabled = True | ||
| log("[INFO] Logfire logging enabled successfully") | ||
| return True | ||
| except Exception as e: | ||
| if "logfile" in logmodes: | ||
| with open(logfile, "a", encoding="UTF-8") as myfile: | ||
| myfile.write(f"{datetime.datetime.now()}, [ERROR] Failed to configure Logfire: {e}\n") | ||
| return False | ||
| def disable_logfire(): | ||
| """Disable Logfire logging.""" | ||
| global logfire_enabled | ||
| logfire_enabled = False | ||
| log("[INFO] Logfire logging disabled") | ||
| def log(message): | ||
@@ -40,1 +98,12 @@ """logs all log messages either to file or stdout""" | ||
| print(str(datetime.datetime.now()) + ", " + message) | ||
| # Send to Logfire if enabled | ||
| if logfire_enabled and logfire_adapter: | ||
| try: | ||
| logfire_adapter.log(message) | ||
| except Exception as e: | ||
| # Fail silently to avoid disrupting the main application | ||
| # Optionally log this error to file | ||
| if "logfile" in logmodes: | ||
| with open(logfile, "a", encoding="UTF-8") as myfile: | ||
| myfile.write(f"{datetime.datetime.now()}, [WARN] Failed to send log to Logfire: {e}\n") |
@@ -14,2 +14,3 @@ # /`- | ||
| from ..logger import log | ||
| from copy import deepcopy | ||
| ### | ||
@@ -115,2 +116,8 @@ | ||
| def _set_cache(self,cache): | ||
| self.model.memo = cache | ||
| def _get_cache(self): | ||
| return self.model.memo | ||
| def setup_constants(self): | ||
@@ -117,0 +124,0 @@ """ |
@@ -54,5 +54,8 @@ # /`- | ||
| instance = self._instances[instance_uuid] | ||
| session_state = copy.deepcopy(instance['instance'].session_state) | ||
| session_state["lock"] = False | ||
| return InstanceState(session_state, instance_uuid, instance["time"], instance["timeout"], session_state["step"]) | ||
| session_state = copy.deepcopy(instance['instance'].session_state) if instance['instance'].session_state is not None else None | ||
| step=None | ||
| if session_state is not None: | ||
| session_state["lock"] = False | ||
| step=session_state["step"] | ||
| return InstanceState(session_state, instance_uuid, instance["time"], instance["timeout"], step) | ||
@@ -151,3 +154,4 @@ def get_instance_states(self): | ||
| instance = self._make_bptk() | ||
| instance._set_state(session_state) | ||
| if session_state: | ||
| instance._set_state(session_state) | ||
@@ -196,3 +200,3 @@ instance_data = { | ||
| """ | ||
| def __init__(self, import_name, bptk_factory=None, external_state_adapter=None, bearer_token=None): | ||
| def __init__(self, import_name, bptk_factory=None, external_state_adapter=None, bearer_token=None, externalize_state_completely=False): | ||
| """ | ||
@@ -202,2 +206,3 @@ Initialize the server with the import name and the bptk. | ||
| :param bptk: simulations made by the bptk. | ||
| :param externalize_state_completely: if True and external_state_adapter is provided, instances are deleted after every use to ensure statelessness | ||
| """ | ||
@@ -209,9 +214,4 @@ super(BptkServer, self).__init__(import_name) | ||
| self._bearer_token = bearer_token | ||
| self._externalize_state_completely = externalize_state_completely | ||
| # Loading the full state on startup | ||
| if external_state_adapter != None: | ||
| result = self._external_state_adapter.load_state() | ||
| for instance_data in result: | ||
| self._instance_manager.reconstruct_instance(instance_data.instance_id, instance_data.timeout, instance_data.time, instance_data.state) | ||
| # specifying the routes and methods of the api | ||
@@ -236,4 +236,2 @@ self.route("/", methods=['GET'],strict_slashes=False)(self._home_resource) | ||
| self.route("/full-metrics", methods=['GET'], strict_slashes=False)(self._full_metrics_resource) | ||
| self.route("/save-state", methods=['GET'], strict_slashes=False)(self._save_state_resource) | ||
| self.route("/load-state", methods=['POST'], strict_slashes=False)(self._load_state_resource) | ||
| self.route("/<instance_uuid>/stop-instance", methods=['POST'], strict_slashes=False)(self._stop_instance_resource) | ||
@@ -252,3 +250,3 @@ | ||
| return resp | ||
| if token != self._bearer_token: | ||
@@ -261,2 +259,30 @@ resp = make_response('{"Unauthorized": "Authentication Token is wrong!"}', 401) | ||
| @staticmethod | ||
| def auto_cleanup_instance(f): | ||
| @wraps(f) | ||
| def decorated(self, instance_uuid, *args, **kwargs): | ||
| try: | ||
| result = f(self, instance_uuid, *args, **kwargs) | ||
| if self._external_state_adapter: | ||
| # Save instance state to external storage before deleting | ||
| instance_state = self._instance_manager._get_instance_state(instance_uuid) | ||
| if instance_state: | ||
| self._external_state_adapter.save_instance(instance_state) | ||
| if self._externalize_state_completely: | ||
| self._instance_manager._delete_instance(instance_uuid) | ||
| return result | ||
| except Exception as e: | ||
| if self._external_state_adapter: | ||
| # Save instance state to external storage before deleting, even on exception | ||
| try: | ||
| instance_state = self._instance_manager._get_instance_state(instance_uuid) | ||
| if instance_state and self._external_state_adapter: | ||
| self._external_state_adapter.save_instance(instance_state) | ||
| except: | ||
| pass # Don't let save errors mask the original exception | ||
| self._instance_manager._delete_instance(instance_uuid) | ||
| raise e | ||
| return decorated | ||
| @token_required | ||
@@ -268,3 +294,3 @@ def _stop_instance_resource(self, instance_uuid): | ||
| resp = make_response("Instance deleted.", 200) | ||
| resp = make_response('{"msg": "Instance deleted."}', 200) | ||
| resp.headers['Content-Type']='application/json' | ||
@@ -274,36 +300,3 @@ resp.headers['Access-Control-Allow-Origin']='*' | ||
| @token_required | ||
| def _save_state_resource(self): | ||
| """ | ||
| Save all instances with the provided external state adapter. | ||
| """ | ||
| if(self._external_state_adapter == None): | ||
| return | ||
| instance_states = self._instance_manager.get_instance_states() | ||
| self._external_state_adapter.save_state(instance_states) | ||
| resp = make_response(jsonpickle.dumps(instance_states), 200) | ||
| resp.headers['Content-Type']='application/json' | ||
| resp.headers['Access-Control-Allow-Origin']='*' | ||
| return resp | ||
| @token_required | ||
| def _load_state_resource(self): | ||
| """ | ||
| Loads all instances using the external state adapter | ||
| """ | ||
| if(self._external_state_adapter == None): | ||
| return | ||
| result = self._external_state_adapter.load_state() | ||
| for instance_data in result: | ||
| self._instance_manager.reconstruct_instance(instance_data.instance_id, instance_data.timeout, instance_data.time, instance_data.state) | ||
| resp = make_response("Success", 200) | ||
| resp.headers['Access-Control-Allow-Origin']='*' | ||
| return resp | ||
| def _metrics_resource(self): | ||
@@ -605,2 +598,9 @@ """ | ||
| if instance_uuid is not None: | ||
| # If externalizing state completely, save the new instance to external storage | ||
| if self._externalize_state_completely and self._external_state_adapter: | ||
| instance_state = self._instance_manager._get_instance_state(instance_uuid) | ||
| self._instance_manager._delete_instance(instance_uuid) | ||
| if instance_state: | ||
| self._external_state_adapter.save_instance(instance_state) | ||
| response_data = {"instance_uuid":instance_uuid,"timeout":timeout} | ||
@@ -633,3 +633,10 @@ resp = make_response(json.dumps(response_data), 200) | ||
| for i in range(instances): | ||
| instance_uuids.append(self._instance_manager.create_instance(**timeout)) | ||
| instance_uuid = self._instance_manager.create_instance(**timeout) | ||
| instance_uuids.append(instance_uuid) | ||
| # If externalizing state completely, save each new instance to external storage | ||
| if self._externalize_state_completely and self._external_state_adapter: | ||
| instance_state = self._instance_manager._get_instance_state(instance_uuid) | ||
| self._instance_manager._delete_instance(instance_uuid) | ||
| if instance_state: | ||
| self._external_state_adapter.save_instance(instance_state) | ||
@@ -644,2 +651,3 @@ response_data={"instance_uuids":instance_uuids,"timeout":timeout} | ||
| @token_required | ||
| @auto_cleanup_instance | ||
| def _begin_session_resource(self, instance_uuid): | ||
@@ -730,2 +738,3 @@ """This endpoint starts a session for single step simulation. There can only be one session per instance at a time. | ||
| @token_required | ||
| @auto_cleanup_instance | ||
| def _end_session_resource(self, instance_uuid): | ||
@@ -750,2 +759,3 @@ """This endpoint ends a session for single step simulation and resets the internal cache. | ||
| @token_required | ||
| @auto_cleanup_instance | ||
| def _flat_session_results_resource(self,instance_uuid): | ||
@@ -764,2 +774,3 @@ """ | ||
| @token_required | ||
| @auto_cleanup_instance | ||
| def _session_results_resource(self,instance_uuid,flat=False): | ||
@@ -769,3 +780,2 @@ """ | ||
| """ | ||
| if not self._ensure_instance_exists(instance_uuid): | ||
@@ -786,2 +796,3 @@ resp = make_response('{"error": "expecting a valid instance id to be given"}', 500) | ||
| @token_required | ||
| @auto_cleanup_instance | ||
| def _run_step_resource(self, instance_uuid): | ||
@@ -827,5 +838,2 @@ """ | ||
| if self._external_state_adapter != None: | ||
| self._external_state_adapter.save_instance(self._instance_manager._get_instance_state(instance_uuid)) | ||
| resp.headers['Content-Type'] = 'application/json' | ||
@@ -836,2 +844,3 @@ resp.headers['Access-Control-Allow-Origin']='*' | ||
| @token_required | ||
| @auto_cleanup_instance | ||
| def _run_steps_resource(self, instance_uuid): | ||
@@ -890,5 +899,2 @@ """ | ||
| if self._external_state_adapter != None: | ||
| self._external_state_adapter.save_instance(self._instance_manager._get_instance_state(instance_uuid)) | ||
| resp.headers['Content-Type'] = 'application/json' | ||
@@ -900,2 +906,3 @@ resp.headers['Access-Control-Allow-Origin']='*' | ||
| @token_required | ||
| @auto_cleanup_instance | ||
| def _stream_steps_resource(self, instance_uuid): | ||
@@ -956,5 +963,3 @@ """ | ||
| instance.unlock() | ||
| if self._external_state_adapter != None: | ||
| self._external_state_adapter.save_instance(self._instance_manager._get_instance_state(instance_uuid)) | ||
| resp = Response(streamer()) | ||
@@ -967,2 +972,3 @@ resp.headers['Content-Type'] = 'application/json' | ||
| @token_required | ||
| @auto_cleanup_instance | ||
| def _keep_alive_resource(self,instance_uuid): | ||
@@ -975,3 +981,3 @@ """ | ||
| if not self._instance_manager.is_valid_instance(instance_uuid): | ||
| if not self._ensure_instance_exists(instance_uuid): | ||
| resp = make_response('{"error": "expecting a valid instance id to be given"}', 500) | ||
@@ -978,0 +984,0 @@ else: |
@@ -123,3 +123,66 @@ def compress_settings(settings): | ||
| scenario_transformed[constant_name] = {step_str: constant[i - 1]} | ||
| return result | ||
| return result | ||
| def _compress_time_series_data(data): | ||
| """ | ||
| Helper function to compress time-series data similar to compress_settings logic. | ||
| """ | ||
| if not data: | ||
| return data | ||
| # Transform step-indexed data into compressed format | ||
| compressed = {} | ||
| for step in data.keys(): | ||
| step_data = data[step] | ||
| if not isinstance(step_data, dict): | ||
| continue | ||
| for key, value in step_data.items(): | ||
| if key not in compressed: | ||
| compressed[key] = [value] | ||
| else: | ||
| compressed[key].append(value) | ||
| return compressed | ||
| def _decompress_time_series_data(compressed_data): | ||
| """ | ||
| Helper function to decompress time-series data similar to decompress_settings logic. | ||
| """ | ||
| if not compressed_data: | ||
| return compressed_data | ||
| # Transform compressed format back to step-indexed data | ||
| result = {} | ||
| # Find the maximum length to determine number of steps | ||
| max_length = max(len(values) if isinstance(values, list) else 1 | ||
| for values in compressed_data.values()) if compressed_data else 0 | ||
| for i in range(max_length): | ||
| step_str = f"{i + 1:.1f}" | ||
| result[step_str] = {} | ||
| for key, values in compressed_data.items(): | ||
| if isinstance(values, list) and i < len(values): | ||
| result[step_str][key] = values[i] | ||
| else: | ||
| result[step_str][key] = values | ||
| return result | ||
| def _is_compressed_time_series_data(data): | ||
| """ | ||
| Helper function to detect if data looks like compressed time-series data. | ||
| """ | ||
| if not isinstance(data, dict): | ||
| return False | ||
| # Check if values are lists (indicating compressed time-series) | ||
| for value in data.values(): | ||
| if isinstance(value, list): | ||
| return True | ||
| return False |
+12
-1
| Metadata-Version: 2.4 | ||
| Name: bptk-py | ||
| Version: 2.1.1 | ||
| Version: 2.2.0 | ||
| Summary: A python simulation engine for System Dynamics & Agent based models | ||
@@ -32,4 +32,8 @@ Author-email: transentis <support@transentis.com> | ||
| Requires-Dist: jsonpickle==4.1.1 | ||
| Requires-Dist: psycopg==3.2.10 | ||
| Requires-Dist: redis==6.4.0 | ||
| Requires-Dist: ujson==5.10.0 | ||
| Provides-Extra: test | ||
| Requires-Dist: pytest; extra == "test" | ||
| Requires-Dist: python-dotenv; extra == "test" | ||
| Dynamic: license-file | ||
@@ -80,2 +84,9 @@ | ||
| ### 2.2.0 | ||
| * Add externalStateAdapters for Postgres and Redis | ||
| * Add option to externalize state completely, thus turning bptkServer into a stateless server | ||
| * Add pydantic logfire as logging backend | ||
| * Improve return message on bptkServer /stop-instance | ||
| ### 2.1.1 | ||
@@ -82,0 +93,0 @@ |
+11
-4
@@ -23,4 +23,6 @@ [build-system] | ||
| "flask==3.1.1", | ||
| "jsonpickle==4.1.1" | ||
| "jsonpickle==4.1.1", | ||
| "psycopg==3.2.10", | ||
| "redis==6.4.0", | ||
| "ujson==5.10.0" | ||
| ] | ||
@@ -45,3 +47,3 @@ requires-python = ">=3.11" | ||
| test = [ | ||
| "pytest", | ||
| "pytest","python-dotenv" | ||
| ] | ||
@@ -52,2 +54,7 @@ | ||
| Documentation = "https://bptk.transentis.com" | ||
| Repository = "https://github.com/transentis/bptk_py" | ||
| Repository = "https://github.com/transentis/bptk_py" | ||
| [tool.pytest.ini_options] | ||
| filterwarnings = [ | ||
| "ignore::DeprecationWarning:traitlets.*" | ||
| ] |
+7
-0
@@ -44,2 +44,9 @@ # Business Prototyping Toolkit for Python | ||
| ### 2.2.0 | ||
| * Add externalStateAdapters for Postgres and Redis | ||
| * Add option to externalize state completely, thus turning bptkServer into a stateless server | ||
| * Add pydantic logfire as logging backend | ||
| * Improve return message on bptkServer /stop-instance | ||
| ### 2.1.1 | ||
@@ -46,0 +53,0 @@ |
+1
-1
@@ -15,3 +15,3 @@ from setuptools import setup | ||
| def get_version(): | ||
| return '2.1.1' | ||
| return '2.2.0' | ||
@@ -18,0 +18,0 @@ setup(version=get_version(), |
+264
-72
@@ -6,16 +6,50 @@ from BPTK_Py.server import BptkServer | ||
| from BPTK_Py import FileAdapter | ||
| from BPTK_Py.externalstateadapter.redis_adapter import RedisAdapter | ||
| import os | ||
| from BPTK_Py import Model | ||
| import BPTK_Py | ||
| import redis | ||
| from dotenv import load_dotenv | ||
| from BPTK_Py import sd_functions as sd | ||
| @pytest.fixture(params=[True, False], ids=["externalize_completely", "no_externalize"]) | ||
| def externalize_state_completely(request): | ||
| """Fixture that provides both externalize_state_completely parameter values.""" | ||
| return request.param | ||
| def bptk_factory(): | ||
| model = Model(starttime=1.0,stoptime=5.0, dt=1.0, name="Test Model") | ||
| model.points["delay"] = [ | ||
| (1.0, 2.0), | ||
| (2.0, 2.0), | ||
| (3.0, 2.0), | ||
| (4.0, 2.0), | ||
| (5.0, 2.0) | ||
| ] | ||
| model.points["round"] = [ | ||
| (1.0, 1.0), | ||
| (2.0, 2.0), | ||
| (3.0, 3.0), | ||
| (4.0, 4.0), | ||
| (5.0, 5.0) | ||
| ] | ||
| stock = model.stock("stock") | ||
| flow = model.flow("flow") | ||
| constant = model.constant("constant") | ||
| stock.initial_value=0.0 | ||
| stock.equation=flow | ||
| flow.equation=constant | ||
| constant.equation=1.0 | ||
| inflow = model.flow("inflow") | ||
| outflow = model.flow("outflow") | ||
| input = model.constant("input") | ||
| converter = model.converter("converter") | ||
| round = model.converter("round") | ||
| round.equation = sd.lookup(sd.time(), "round") | ||
| delay = model.converter("delay") | ||
| delay.equation = sd.lookup(sd.time(), "delay") | ||
| stock.initial_value=400.0 | ||
| stock.equation=inflow-outflow | ||
| inflow.equation=input | ||
| outflow.equation=sd.delay(model,inflow,delay,100.0) | ||
| input.equation=100.0 | ||
| converter.equation=stock | ||
@@ -45,6 +79,6 @@ scenario_manager1={ | ||
| { | ||
| "1":{ | ||
| "scenario1":{ | ||
| "constants": | ||
| { | ||
| "constant":1.0 | ||
| "input":100.0 | ||
| } | ||
@@ -63,16 +97,17 @@ } | ||
| { | ||
| "1":{ | ||
| "scenario1":{ | ||
| "constants": | ||
| { | ||
| "constant":1.0 | ||
| "input":100.0 | ||
| } | ||
| }, | ||
| "2":{ | ||
| "scenario2":{ | ||
| "constants":{ | ||
| "constant":2.0 | ||
| "input":200.0 | ||
| } | ||
| }, | ||
| "3":{ | ||
| "scenario3":{ | ||
| "constants":{ | ||
| "constant":3.0 | ||
| "input":300.0 | ||
| } | ||
@@ -89,15 +124,219 @@ } | ||
| @pytest.fixture | ||
| def app(): | ||
| def file_app(externalize_state_completely): | ||
| import os | ||
| if not os.path.exists("state/"): | ||
| os.mkdir("state/") | ||
| adapter = FileAdapter(True, os.path.join(os.getcwd(), "state")) | ||
| flask_app = BptkServer(__name__, bptk_factory, external_state_adapter=adapter) | ||
| adapter = FileAdapter(compress=True, path=os.path.join(os.getcwd(), "state")) | ||
| flask_app = BptkServer(__name__, bptk_factory, external_state_adapter=adapter, externalize_state_completely=externalize_state_completely) | ||
| yield flask_app | ||
| # Cleanup after test | ||
| print("Tearing down Flask app...") | ||
| try: | ||
| # Clean up any remaining instances | ||
| if hasattr(flask_app, '_instance_manager'): | ||
| print("Cleaning up instance manager...") | ||
| # Force cleanup of all instances | ||
| if hasattr(flask_app._instance_manager, '_instances'): | ||
| for instance_id in list(flask_app._instance_manager._instances.keys()): | ||
| try: | ||
| flask_app._instance_manager._delete_instance(instance_id) | ||
| print(f"Cleaned up instance: {instance_id}") | ||
| except Exception as e: | ||
| print(f"Error cleaning up instance {instance_id}: {e}") | ||
| # Teardown Flask app context | ||
| with flask_app.app_context(): | ||
| pass # This ensures proper cleanup of app context | ||
| print("Flask app teardown complete") | ||
| except Exception as e: | ||
| print(f"Error during app teardown: {e}") | ||
| @pytest.fixture | ||
| def client(app): | ||
| return app.test_client() | ||
| def file_client_fixture(file_app): | ||
| return file_app.test_client() | ||
| def test_instance_timeouts(app, client): | ||
| def external_state_base(client): | ||
| response = client.post('/start-instance') | ||
| assert response.status_code == 200, "start-instance should return 200" | ||
| result = json.loads(response.data) | ||
| assert "instance_uuid" in result, "start_instance should return an instance id" | ||
| instance_uuid= result["instance_uuid"] | ||
| # Prepare content for begin-session | ||
| content = { | ||
| "scenario_managers": [ | ||
| "firstManager" | ||
| ], | ||
| "scenarios": [ | ||
| "scenario1" | ||
| ], | ||
| "equations": [ | ||
| "converter", | ||
| "delay", | ||
| "inflow", | ||
| "outflow", | ||
| "input", | ||
| "round" | ||
| ] | ||
| } | ||
| response = client.post(f'{instance_uuid}/begin-session', data=json.dumps(content), content_type='application/json') | ||
| assert response.status_code == 200, "begin-session should return 200" | ||
| # Prepare content for run-step | ||
| def make_run_content(value): | ||
| content={ | ||
| "settings":{ | ||
| "firstManager":{ | ||
| "scenario1":{ | ||
| "constants":{ | ||
| "input":value | ||
| } | ||
| }} | ||
| }, | ||
| "flatResults": False | ||
| } | ||
| return content | ||
| # run some steps | ||
| response = client.post(f'{instance_uuid}/run-step', data=json.dumps(make_run_content(100.0)), content_type='application/json') | ||
| assert response.status_code == 200, "run-step should return 200" | ||
| data= json.loads(response.data) | ||
| assert data["firstManager"]["scenario1"]["input"]["1.0"]==100.0 , "input should have value 100.0" | ||
| assert data["firstManager"]["scenario1"]["converter"]["1.0"]==400.0 , "converter should have value 400.0" | ||
| assert data["firstManager"]["scenario1"]["delay"]["1.0"]==2.0 , "delay should have value 2.0" | ||
| assert data["firstManager"]["scenario1"]["inflow"]["1.0"]==100.0 , "inflow should have value 100.0" | ||
| assert data["firstManager"]["scenario1"]["outflow"]["1.0"]==100.0 , "outflow should have value 100.0" | ||
| response = client.post(f'{instance_uuid}/run-step', data=json.dumps(make_run_content(400.0)), content_type='application/json') | ||
| assert response.status_code == 200, "run-step should return 200" | ||
| data= json.loads(response.data) | ||
| assert data["firstManager"]["scenario1"]["input"]["2.0"]==400.0 , "input should have value 100.0" | ||
| assert data["firstManager"]["scenario1"]["converter"]["2.0"]==400.0 , "converter should have value 400.0" | ||
| assert data["firstManager"]["scenario1"]["delay"]["2.0"]==2.0 , "delay should have value 2.0" | ||
| assert data["firstManager"]["scenario1"]["inflow"]["2.0"]==400.0 , "inflow should have value 100.0" | ||
| assert data["firstManager"]["scenario1"]["outflow"]["2.0"]==100.0 , "outflow should have value 100.0" | ||
| response = client.post(f'{instance_uuid}/run-step', data=json.dumps(make_run_content(400.0)), content_type='application/json') | ||
| assert response.status_code == 200, "run-step should return 200" | ||
| data= json.loads(response.data) | ||
| assert data["firstManager"]["scenario1"]["converter"]["3.0"]==700.0 , "converter should have value 700.0" | ||
| assert data["firstManager"]["scenario1"]["delay"]["3.0"]==2.0 , "delay should have value 2.0" | ||
| assert data["firstManager"]["scenario1"]["inflow"]["3.0"]==400.0 , "inflow should have value 100.0" | ||
| assert data["firstManager"]["scenario1"]["outflow"]["3.0"]==100.0 , "outflow should have value 100.0" | ||
| response = client.post(f'{instance_uuid}/run-step', data=json.dumps(make_run_content(400.0)), content_type='application/json') | ||
| assert response.status_code == 200, "run-step should return 200" | ||
| data= json.loads(response.data) | ||
| assert data["firstManager"]["scenario1"]["converter"]["4.0"]==1000.0 , "converter should have value 1000.0" | ||
| # Cleanup - stop the instance to properly clean up resources | ||
| print("Cleaning up instance...") | ||
| response = client.post(f'{instance_uuid}/stop-instance') | ||
| print("Stop-instance response:", response.status_code) | ||
| assert response.status_code == 200, "stop-instance should return 200" | ||
| def test_external_state_file(file_client_fixture): | ||
| external_state_base(file_client_fixture) | ||
| @pytest.fixture | ||
| def redis_app(externalize_state_completely): | ||
| """Create Flask app with Redis adapter""" | ||
| # Try multiple paths for .env file | ||
| env_paths = [ | ||
| os.path.join(os.getcwd(), ".env") | ||
| ] | ||
| redis_url = None | ||
| enable_redis_tests = False | ||
| for env_path in env_paths: | ||
| try: | ||
| if os.path.exists(env_path): | ||
| print(f"Loading environment from: {env_path}") | ||
| load_dotenv(env_path) | ||
| redis_url = os.getenv("REDIS_URL") | ||
| enable_redis_tests = os.getenv("ENABLE_REDIS_TESTS", "false").lower() == "true" | ||
| if redis_url: | ||
| break | ||
| except Exception as e: | ||
| print(f"Error loading {env_path}: {e}") | ||
| print(f"Redis URL configured: {bool(redis_url)}") | ||
| print(f"Redis tests enabled: {enable_redis_tests}") | ||
| if not redis_url or not enable_redis_tests: | ||
| pytest.skip("Redis tests disabled or REDIS_URL not configured") | ||
| try: | ||
| # Create Redis client | ||
| redis_client = redis.from_url(redis_url) | ||
| # Test connection | ||
| redis_client.ping() | ||
| print(f"Connected to Redis at: {redis_url}") | ||
| except Exception as e: | ||
| pytest.skip(f"Could not connect to Redis: {e}") | ||
| # Create Redis adapter | ||
| adapter = RedisAdapter(redis_client, compress=True, key_prefix="bptk:test") | ||
| flask_app = BptkServer(__name__, bptk_factory, external_state_adapter=adapter, externalize_state_completely=externalize_state_completely) | ||
| yield flask_app | ||
| # Cleanup after test | ||
| print("Tearing down Redis Flask app...") | ||
| try: | ||
| # Clean up any remaining instances | ||
| if hasattr(flask_app, '_instance_manager'): | ||
| print("Cleaning up instance manager...") | ||
| if hasattr(flask_app._instance_manager, '_instances'): | ||
| for instance_id in list(flask_app._instance_manager._instances.keys()): | ||
| try: | ||
| flask_app._instance_manager._delete_instance(instance_id) | ||
| print(f"Cleaned up instance: {instance_id}") | ||
| # Also clean up from Redis | ||
| adapter.delete_instance(instance_id) | ||
| print(f"Cleaned up Redis data for instance: {instance_id}") | ||
| except Exception as e: | ||
| print(f"Error cleaning up instance {instance_id}: {e}") | ||
| # Clean up any test keys from Redis | ||
| try: | ||
| pattern = "bptk:test:*" | ||
| for key in redis_client.scan_iter(match=pattern): | ||
| redis_client.delete(key) | ||
| print(f"Cleaned up Redis key: {key}") | ||
| except Exception as e: | ||
| print(f"Error cleaning up Redis keys: {e}") | ||
| # Teardown Flask app context | ||
| with flask_app.app_context(): | ||
| pass # This ensures proper cleanup of app context | ||
| print("Redis Flask app teardown complete") | ||
| except Exception as e: | ||
| print(f"Error during Redis app teardown: {e}") | ||
| @pytest.fixture | ||
| def redis_client_fixture(redis_app): | ||
| return redis_app.test_client() | ||
| def test_external_state_redis(redis_client_fixture): | ||
| """Test external state with Redis adapter - equivalent to file adapter test""" | ||
| external_state_base(redis_client_fixture) | ||
| def test_instance_timeouts(file_client_fixture): | ||
| client = file_client_fixture | ||
| def assert_in_full_metrics(instance_id, contains: bool): | ||
@@ -137,10 +376,10 @@ response = client.get('/full-metrics') | ||
| "scenarios": [ | ||
| "1" | ||
| "scenario1" | ||
| ], | ||
| "equations": [ | ||
| "constant" | ||
| "input" | ||
| ] | ||
| } | ||
| response = client.post(f'http://localhost:500/{instance_id}/begin-session', data=json.dumps(content), content_type='application/json') | ||
| response = client.post(f'http://localhost:5000/{instance_id}/begin-session', data=json.dumps(content), content_type='application/json') | ||
| assert response.status_code == 200, "begin-session should return 200" | ||
@@ -161,51 +400,4 @@ | ||
| assert_in_full_metrics(instance_id, True) | ||
| time.sleep(4) | ||
| assert_in_full_metrics(instance_id, False) | ||
| response = client.post(f'http://localhost:5000/{instance_id}/run-step', data=json.dumps(run_content), content_type='application/json') | ||
| assert response.status_code == 200, "run-step should return 200" | ||
| assert_in_full_metrics(instance_id, True) | ||
| time.sleep(4) | ||
| assert_in_full_metrics(instance_id, False) | ||
| response = client.post('http://localhost:5000/load-state') | ||
| assert response.status_code == 200, "load-state should return 200" | ||
| assert_in_full_metrics(instance_id, True) | ||
| time.sleep(4) | ||
| assert_in_full_metrics(instance_id, False) | ||
| response = client.post('http://localhost:5000/load-state') | ||
| assert response.status_code == 200, "load-state should return 200" | ||
| os.remove(os.path.join("state/", instance_id + ".json")) | ||
| response = client.get('http://localhost:5000/save-state') | ||
| assert response.status_code == 200, "save-state should return 200" | ||
| dir_content = os.listdir("state/") | ||
| assert instance_id + ".json" in dir_content | ||
| response = client.post('http://localhost:5000/load-state') | ||
| assert response.status_code == 200, "load-state should return 200" | ||
| assert_in_full_metrics(instance_id, True) | ||
| response = client.post(f'http://localhost:5000/{instance_id}/stop-instance') | ||
| assert response.status_code == 200, "stop-instance should return 200" | ||
| assert_in_full_metrics(instance_id, False) | ||
| response = client.get('http://localhost:5000/save-state') | ||
| assert response.status_code == 200, "save-state should return 200" | ||
| dir_content = os.listdir("state/") | ||
| assert not instance_id + ".json" in dir_content | ||
Sorry, the diff of this file is too big to display
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
878358
6.15%109
5.83%16644
6.26%