Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoSign in
Socket

bptk-py

Package Overview
Dependencies
Maintainers
1
Versions
109
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bptk-py - pypi Package Compare versions

Comparing version
2.1.1
to
2.2.0
+199
BPTK_Py/externalstateadapter/file_adapter.py
import datetime
import jsonpickle
import os
from .externalStateAdapter import ExternalStateAdapter, InstanceState
from ..util import statecompression
from ..logger import log
class FileAdapter(ExternalStateAdapter):
def __init__(self, compress: bool, path: str):
super().__init__(compress)
self.path = path
log(f"[INFO] FileAdapter initialized with path: {path}, compression: {compress}")
def _is_already_compressed_results(self, results_log):
"""
Check if results_log is already in compressed format.
Compressed format: {scenario_manager: {scenario: {constant: [values]}}}
Uncompressed format: {step: {scenario_manager: {scenario: {constant: value}}}}
"""
if not isinstance(results_log, dict) or not results_log:
return False
# Check if the first level keys look like scenario managers (strings) rather than steps (floats/step strings)
first_key = next(iter(results_log.keys()))
if isinstance(first_key, str) and not (first_key.replace('.', '').isdigit()):
# This looks like a scenario manager name, so probably compressed format
# Double-check by looking at the structure
try:
first_sm = results_log[first_key]
if isinstance(first_sm, dict):
first_scenario = next(iter(first_sm.values()))
if isinstance(first_scenario, dict):
first_constant = next(iter(first_scenario.values()))
# If the constant value is a list, it's likely compressed
return isinstance(first_constant, list)
except (StopIteration, KeyError, AttributeError):
pass
return False
def load_instance(self, instance_uuid: str) -> InstanceState:
"""
Override the base class method to handle compression/decompression internally.
The base class tries to decompress after we've already handled it.
"""
state = self._load_instance(instance_uuid)
# Apply scenario_cache numeric key restoration (no compression, just JSON key conversion fix)
if(state is not None and state.state is not None):
if "scenario_cache" in state.state:
state.state["scenario_cache"] = self._restore_numeric_keys(state.state["scenario_cache"])
return state
def save_instance(self, state: InstanceState):
"""
Override the base class method to handle compression internally.
The base class tries to compress before we handle it.
"""
return self._save_instance(state)
def _save_instance(self, state: InstanceState):
log(f"[INFO] FileAdapter _save_instance called for instance {state.instance_id if state else 'None'}")
# Apply compression for settings_log and results_log (scenario_cache compression is disabled)
if self.compress and state is not None and state.state is not None:
log(f"[INFO] Compression enabled, processing state for instance {state.instance_id}")
# Create a copy to avoid modifying the original state
state_copy = state.state.copy()
if "settings_log" in state_copy:
try:
log(f"[INFO] Compressing settings_log for instance {state.instance_id}")
state_copy["settings_log"] = statecompression.compress_settings(state_copy["settings_log"])
log(f"[INFO] settings_log compressed successfully for instance {state.instance_id}")
except Exception as e:
log(f"[WARN] Failed to compress settings_log for instance {state.instance_id}: {str(e)}")
pass
# Keep original data if compression fails
if "results_log" in state_copy:
# Check if data is already in compressed format by looking at the structure
results_log = state_copy["results_log"]
if results_log and self._is_already_compressed_results(results_log):
log(f"[INFO] results_log already compressed for instance {state.instance_id}, skipping compression")
pass
else:
try:
log(f"[INFO] Compressing results_log for instance {state.instance_id}")
state_copy["results_log"] = statecompression.compress_results(state_copy["results_log"])
log(f"[INFO] results_log compressed successfully for instance {state.instance_id}")
except Exception as e:
log(f"[WARN] Failed to compress results_log for instance {state.instance_id}: {str(e)}")
pass
# Keep original data if compression fails
else:
state_copy = state.state
data = {
"data": {
"state": jsonpickle.dumps(state_copy),
"instance_id": state.instance_id,
"time": state.time.isoformat() if isinstance(state.time, datetime.datetime) else str(state.time),
"timeout": state.timeout,
"step": state.step
}
}
file_path = os.path.join(self.path, str(state.instance_id) + ".json")
log(f"[INFO] Writing instance {state.instance_id} to file: {file_path}")
try:
f = open(file_path, "w")
f.write(jsonpickle.dumps(data))
f.close()
log(f"[INFO] Instance {state.instance_id} saved successfully to {file_path}")
except Exception as e:
log(f"[ERROR] Failed to write instance {state.instance_id} to file {file_path}: {str(e)}")
raise
def _load_state(self) -> list[InstanceState]:
instances = []
instance_paths = os.listdir(self.path)
for instance_uuid in instance_paths:
if instance_uuid.endswith('.json'):
uuid = instance_uuid.split(".")[0]
instance = self._load_instance(uuid)
if instance:
# Apply scenario_cache numeric key restoration (no compression, just JSON key conversion fix)
if(instance.state is not None):
if "scenario_cache" in instance.state:
instance.state["scenario_cache"] = self._restore_numeric_keys(instance.state["scenario_cache"])
instances.append(instance)
return instances
def _load_instance(self, instance_uuid: str) -> InstanceState:
file_path = os.path.join(self.path, str(instance_uuid) + ".json")
try:
f = open(file_path, "r")
instance_data = jsonpickle.loads(f.read())
f.close()
decoded_data = jsonpickle.loads(instance_data["data"]["state"])
instance_id = instance_data["data"]["instance_id"]
time_str = instance_data["data"]["time"]
timeout = instance_data["data"]["timeout"]
step = instance_data["data"]["step"]
# Parse the time back from string
try:
if isinstance(time_str, str):
time = datetime.datetime.fromisoformat(time_str)
else:
time = time_str
except ValueError:
# Fallback for old format or parsing issues
time = datetime.datetime.now()
# Apply decompression for settings_log and results_log (scenario_cache compression is disabled)
if self.compress and decoded_data is not None:
if "settings_log" in decoded_data:
try:
log(f"[INFO] Decompressing settings_log for instance {instance_uuid}")
decoded_data["settings_log"] = statecompression.decompress_settings(decoded_data["settings_log"])
log(f"[INFO] settings_log decompressed successfully for instance {instance_uuid}")
except Exception as e:
log(f"[WARN] Failed to decompress settings_log for instance {instance_uuid}: {str(e)}")
pass
# Keep original data if decompression fails
if "results_log" in decoded_data:
results_log = decoded_data["results_log"]
if self._is_already_compressed_results(results_log):
try:
log(f"[INFO] Decompressing results_log for instance {instance_uuid}")
decoded_data["results_log"] = statecompression.decompress_results(decoded_data["results_log"])
log(f"[INFO] results_log decompressed successfully for instance {instance_uuid}")
except Exception as e:
log(f"[WARN] Failed to decompress results_log for instance {instance_uuid}: {str(e)}")
pass
else:
log(f"[INFO] Results_log doesn't appear to be compressed for instance {instance_uuid}, skipping decompression")
result = InstanceState(decoded_data, instance_id, time, timeout, step)
return result
except Exception as e:
log(f"[ERROR] Error loading instance {instance_uuid}: {str(e)}")
return None
def delete_instance(self, instance_uuid: str):
file_path = os.path.join(self.path, str(instance_uuid) + ".json")
log(f"[INFO] Deleting instance {instance_uuid}, file: {file_path}")
try:
os.remove(file_path)
log(f"[INFO] Instance {instance_uuid} deleted successfully")
except Exception as e:
log(f"[ERROR] Error deleting instance {instance_uuid}: {str(e)}")
raise
import datetime
import jsonpickle
import psycopg
from .externalStateAdapter import ExternalStateAdapter, InstanceState
from ..logger import log
# Postgres Create Script:
#
# CREATE TABLE "state" (
# "state" text,
# "instance_id" text,
# "time" text,
# "timeout.weeks" bigint,
# "timeout.days" bigint,
# "timeout.hours" bigint,
# "timeout.minutes" bigint,
# "timeout.seconds" bigint,
# "timeout.milliseconds" bigint,
# "timeout.microseconds" bigint,
# "step" bigint
# );
class PostgresAdapter(ExternalStateAdapter):
def __init__(self, postgres_client, compress: bool):
super().__init__(compress)
self._postgres_client = postgres_client
log(f"[INFO] PostgresAdapter initialized with compression: {compress}")
def _load_instance(self, instance_uuid: str) -> InstanceState:
log(f"[INFO] Loading instance {instance_uuid} from PostgreSQL")
try:
with self._postgres_client.cursor() as cur:
cur.execute("SELECT * FROM state WHERE instance_id = %s", (instance_uuid,))
res = cur.fetchone()
if res is None:
log(f"[INFO] No data found in PostgreSQL for instance {instance_uuid}")
else:
log(f"[INFO] Data retrieved from PostgreSQL for instance {instance_uuid}")
result = self._tuple_to_state(res)
if result:
log(f"[INFO] Instance {instance_uuid} loaded successfully from PostgreSQL")
return result
except Exception as e:
log(f"[ERROR] Failed to load instance {instance_uuid} from PostgreSQL: {str(e)}")
raise
def delete_instance(self, instance_uuid: str):
log(f"[INFO] Deleting instance {instance_uuid} from PostgreSQL")
try:
with self._postgres_client.cursor() as cur:
cur.execute("DELETE FROM state WHERE instance_id = %s", (instance_uuid,))
self._postgres_client.commit()
log(f"[INFO] Instance {instance_uuid} deleted successfully from PostgreSQL")
except Exception as e:
log(f"[ERROR] Failed to delete instance {instance_uuid} from PostgreSQL: {str(e)}")
raise
def _save_instance(self, instance_state: InstanceState):
log(f"[INFO] Saving instance {instance_state.instance_id if instance_state else 'None'} to PostgreSQL")
try:
with self._postgres_client.cursor() as cur:
cur.execute("SELECT * FROM state WHERE instance_id = %s", (instance_state.instance_id,))
postgres_data = {
"state": jsonpickle.dumps(instance_state.state) if instance_state.state is not None else None,
"instance_id": instance_state.instance_id,
"time": str(instance_state.time),
"timeout.weeks": instance_state.timeout["weeks"],
"timeout.days": instance_state.timeout["days"],
"timeout.hours": instance_state.timeout["hours"],
"timeout.minutes": instance_state.timeout["minutes"],
"timeout.seconds": instance_state.timeout["seconds"],
"timeout.milliseconds": instance_state.timeout["milliseconds"],
"timeout.microseconds": instance_state.timeout["microseconds"],
"step": instance_state.step
}
res = cur.fetchone()
if res is None:
log(f"[INFO] Inserting new instance {instance_state.instance_id} into PostgreSQL")
cur.execute(
"INSERT INTO state (state, instance_id, time, \"timeout.weeks\", \"timeout.days\", \"timeout.hours\", \"timeout.minutes\", \"timeout.seconds\", \"timeout.milliseconds\", \"timeout.microseconds\", step) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
(postgres_data["state"], postgres_data["instance_id"], postgres_data["time"], postgres_data["timeout.weeks"], postgres_data["timeout.days"], postgres_data["timeout.hours"], postgres_data["timeout.minutes"], postgres_data["timeout.seconds"], postgres_data["timeout.milliseconds"], postgres_data["timeout.microseconds"], postgres_data["step"])
)
self._postgres_client.commit()
log(f"[INFO] Instance {instance_state.instance_id} inserted successfully into PostgreSQL")
elif res[10] != instance_state.step:
log(f"[INFO] Updating existing instance {instance_state.instance_id} in PostgreSQL (step {res[10]} -> {instance_state.step})")
cur.execute("UPDATE state SET state = %s, time = %s, \"timeout.weeks\" = %s, \"timeout.days\" = %s, \"timeout.hours\" = %s, \"timeout.minutes\" = %s, \"timeout.seconds\" = %s, \"timeout.milliseconds\" = %s, \"timeout.microseconds\" = %s, step = %s WHERE instance_id = %s", (postgres_data["state"], postgres_data["time"], postgres_data["timeout.weeks"], postgres_data["timeout.days"], postgres_data["timeout.hours"], postgres_data["timeout.minutes"], postgres_data["timeout.seconds"], postgres_data["timeout.milliseconds"], postgres_data["timeout.microseconds"], postgres_data["step"], postgres_data["instance_id"]))
self._postgres_client.commit()
log(f"[INFO] Instance {instance_state.instance_id} updated successfully in PostgreSQL")
else:
log(f"[INFO] Instance {instance_state.instance_id} already up to date in PostgreSQL (step {res[10]})")
except (Exception, psycopg.Error) as error:
log(f"[ERROR] Failed to save instance {instance_state.instance_id if instance_state else 'None'} to PostgreSQL: {str(error)}")
raise
def _tuple_to_state(self, state_tuple: tuple) -> InstanceState:
if state_tuple is None:
return None
log(f"[INFO] Converting PostgreSQL tuple to InstanceState for instance {state_tuple[1] if state_tuple else 'None'}")
return InstanceState(
state=jsonpickle.loads(state_tuple[0]) if state_tuple[0] is not None else None,
instance_id=state_tuple[1],
time=datetime.datetime.strptime(state_tuple[2], "%Y-%m-%d %H:%M:%S.%f"),
timeout = {
"weeks": state_tuple[3],
"days": state_tuple[4],
"hours": state_tuple[5],
"minutes": state_tuple[6],
"seconds": state_tuple[7],
"milliseconds": state_tuple[8],
"microseconds":state_tuple[9]
},
step=state_tuple[10]
)
import datetime
import jsonpickle
import jsonpickle.handlers
import redis
import numpy as np
from .externalStateAdapter import ExternalStateAdapter, InstanceState
from ..logger import log
# Try to configure ujson backend, but fall back gracefully if not available
try:
import ujson
jsonpickle.load_backend('ujson', 'ujson', ValueError)
jsonpickle.set_preferred_backend('ujson')
jsonpickle.set_encoder_options('ujson', ensure_ascii=False, sort_keys=True)
jsonpickle.set_decoder_options('ujson', precise_float=True)
log("[INFO] jsonpickle configured to use ujson backend")
except (ImportError, AssertionError) as e:
# Fall back to default JSON backend if ujson is not available
log(f"[INFO] ujson not available ({e}), using default JSON backend")
pass
# Configure safer numpy serialization - convert to native Python types
# Use proper BaseHandler classes as per jsonpickle documentation
class NumpyFloatHandler(jsonpickle.handlers.BaseHandler):
def flatten(self, obj, data):
return float(obj)
class NumpyIntHandler(jsonpickle.handlers.BaseHandler):
def flatten(self, obj, data):
return int(obj)
class NumpyBoolHandler(jsonpickle.handlers.BaseHandler):
def flatten(self, obj, data):
return bool(obj)
class NumpyArrayHandler(jsonpickle.handlers.BaseHandler):
def flatten(self, obj, data):
return obj.tolist()
# Register handlers using proper jsonpickle handler classes
jsonpickle.handlers.register(np.float64, NumpyFloatHandler, base=True)
jsonpickle.handlers.register(np.float32, NumpyFloatHandler, base=True)
jsonpickle.handlers.register(np.int64, NumpyIntHandler, base=True)
jsonpickle.handlers.register(np.int32, NumpyIntHandler, base=True)
jsonpickle.handlers.register(np.int16, NumpyIntHandler, base=True)
jsonpickle.handlers.register(np.int8, NumpyIntHandler, base=True)
jsonpickle.handlers.register(np.uint64, NumpyIntHandler, base=True)
jsonpickle.handlers.register(np.uint32, NumpyIntHandler, base=True)
jsonpickle.handlers.register(np.uint16, NumpyIntHandler, base=True)
jsonpickle.handlers.register(np.uint8, NumpyIntHandler, base=True)
jsonpickle.handlers.register(np.bool_, NumpyBoolHandler, base=True)
jsonpickle.handlers.register(np.ndarray, NumpyArrayHandler, base=True)
class RedisAdapter(ExternalStateAdapter):
"""
Redis adapter for storing BPTK instance state in Redis.
Optimized for Upstash Redis but works with any Redis instance.
"""
def __init__(self, redis_client: redis.Redis, compress: bool = True, key_prefix: str = "bptk:state"):
"""
Initialize the Redis adapter.
Args:
redis_client: A configured Redis client (e.g., from redis.from_url())
compress: Whether to compress state data (default: True)
key_prefix: Prefix for Redis keys (default: "bptk:state")
"""
super().__init__(compress)
self._redis_client = redis_client
self._key_prefix = key_prefix
log(f"[INFO] RedisAdapter initialized with key_prefix: {key_prefix}, compression: {compress}")
def _get_instance_key(self, instance_uuid: str) -> str:
"""Generate Redis key for an instance."""
return f"{self._key_prefix}:{instance_uuid}"
def _load_instance(self, instance_uuid: str) -> InstanceState:
"""Load a single instance from Redis."""
key = self._get_instance_key(instance_uuid)
log(f"[INFO] Loading instance {instance_uuid} from Redis key: {key}")
try:
data = self._redis_client.get(key)
if data is None:
log(f"[INFO] No data found in Redis for instance {instance_uuid}")
return None
log(f"[INFO] Data retrieved from Redis for instance {instance_uuid}")
instance_data = jsonpickle.decode(data)
state = jsonpickle.decode(instance_data["state"]) if instance_data["state"] is not None else None
log(f"[INFO] Decoding instance data for {instance_uuid}")
result = InstanceState(
state=state,
instance_id=instance_data["instance_id"],
time=datetime.datetime.fromisoformat(instance_data["time"]),
timeout=instance_data["timeout"],
step=instance_data["step"]
)
log(f"[INFO] Instance {instance_uuid} loaded successfully from Redis")
return result
except (KeyError, ValueError, TypeError) as e:
log(f"[ERROR] Failed to load instance {instance_uuid} from Redis: {str(e)}")
return None
except Exception as e:
log(f"[ERROR] Unexpected error loading instance {instance_uuid} from Redis: {str(e)}")
return None
def load_instance(self, instance_uuid: str) -> InstanceState:
"""
Override the base class method to handle compression/decompression internally.
This prevents double decompression issues similar to FileAdapter.
"""
log(f"[INFO] RedisAdapter loading instance {instance_uuid}")
state = self._load_instance(instance_uuid)
# Apply scenario_cache numeric key restoration (no compression, just JSON key conversion fix)
if(state is not None and state.state is not None):
if "scenario_cache" in state.state:
log(f"[INFO] Restoring numeric keys in scenario_cache for instance {instance_uuid}")
state.state["scenario_cache"] = self._restore_numeric_keys(state.state["scenario_cache"])
log(f"[INFO] Numeric keys restored for instance {instance_uuid}")
return state
def save_instance(self, state: InstanceState):
"""
Override the base class method to handle compression internally.
This prevents double compression issues similar to FileAdapter.
"""
log(f"[INFO] RedisAdapter saving instance {state.instance_id if state else 'None'}")
return self._save_instance(state)
def delete_instance(self, instance_uuid: str):
"""Delete an instance from Redis."""
key = self._get_instance_key(instance_uuid)
log(f"[INFO] Deleting instance {instance_uuid} from Redis key: {key}")
try:
result = self._redis_client.delete(key)
if result > 0:
log(f"[INFO] Instance {instance_uuid} deleted successfully from Redis")
else:
log(f"[WARN] Instance {instance_uuid} not found in Redis")
except Exception as e:
log(f"[ERROR] Failed to delete instance {instance_uuid} from Redis: {str(e)}")
raise
def _save_instance(self, instance_state: InstanceState):
"""Save a single instance to Redis."""
if instance_state is None or instance_state.instance_id is None:
log("[WARN] Cannot save instance: instance_state or instance_id is None")
return
log(f"[INFO] _save_instance called for instance {instance_state.instance_id}")
try:
# Prepare data for storage with make_refs=False to prevent py/id issues
log(f"[INFO] Preparing data for Redis storage for instance {instance_state.instance_id}")
redis_data = {
"state": jsonpickle.encode(instance_state.state, make_refs=False) if instance_state.state is not None else None,
"instance_id": instance_state.instance_id,
"time": instance_state.time.isoformat(),
"timeout": instance_state.timeout,
"step": instance_state.step
}
key = self._get_instance_key(instance_state.instance_id)
log(f"[INFO] Storing instance {instance_state.instance_id} to Redis key: {key}")
# Store data with make_refs=False to prevent object reference issues
self._redis_client.set(key, jsonpickle.encode(redis_data, make_refs=False))
log(f"[INFO] Instance {instance_state.instance_id} stored successfully in Redis")
# Set TTL based on timeout if specified
if instance_state.timeout:
log(f"[INFO] Setting TTL for instance {instance_state.instance_id} based on timeout: {instance_state.timeout}")
timeout_seconds = (
instance_state.timeout.get("weeks", 0) * 7 * 24 * 3600 +
instance_state.timeout.get("days", 0) * 24 * 3600 +
instance_state.timeout.get("hours", 0) * 3600 +
instance_state.timeout.get("minutes", 0) * 60 +
instance_state.timeout.get("seconds", 0) +
instance_state.timeout.get("milliseconds", 0) / 1000 +
instance_state.timeout.get("microseconds", 0) / 1000000
)
if timeout_seconds > 0:
self._redis_client.expire(key, int(timeout_seconds))
log(f"[INFO] TTL set to {int(timeout_seconds)} seconds for instance {instance_state.instance_id}")
except Exception as error:
log(f"[ERROR] Error saving instance {instance_state.instance_id} to Redis: {error}")
raise
# /`-
# _ _ _ /####`-
#| | | | (_) /########`-
#| |_ _ __ __ _ _ __ ___ ___ _ __ | |_ _ ___ /###########`-
#| __| '__/ _` | '_ \/ __|/ _ \ '_ \| __| / __| ____ -###########/
#| |_| | | (_| | | | \__ \ __/ | | | |_| \__ \ | | `-#######/
# \__|_| \__,_|_| |_|___/\___|_| |_|\__|_|___/ |____| `- # /
#
# Copyright (c) 2024 transentis labs GmbH
# MIT License
import datetime
try:
import logfire
LOGFIRE_AVAILABLE = True
except ImportError:
LOGFIRE_AVAILABLE = False
class LogfireAdapter:
"""
Adapter for routing BPTK logs to Pydantic Logfire.
"""
def __init__(self, **logfire_config):
"""
Initialize the Logfire adapter.
Args:
**logfire_config: Configuration parameters to pass to logfire.configure()
Common options include:
- project_name: Name of your Logfire project
- token: Your Logfire API token
- environment: Environment name (e.g., 'development', 'production')
"""
if not LOGFIRE_AVAILABLE:
raise ImportError(
"Pydantic Logfire is not installed. "
"Please install it with: pip install pydantic-logfire"
)
self.configured = False
self.logfire_config = logfire_config
self._configure()
def _configure(self):
"""Configure Logfire with the provided settings."""
if not self.configured:
logfire.configure(**self.logfire_config)
self.configured = True
def log(self, message: str):
"""
Send a log message to Logfire.
Args:
message: The log message (may contain [ERROR], [WARN], [INFO], [DEBUG] prefixes)
"""
if not self.configured:
self._configure()
# Parse the log level from the message if it contains a bracket pattern
level = "INFO" # default
if "[ERROR]" in message:
level = "ERROR"
elif "[WARN]" in message:
level = "WARN"
elif "[INFO]" in message:
level = "INFO"
elif "[DEBUG]" in message:
level = "DEBUG"
# Clean the message by removing the level brackets if present
clean_message = message
for bracket_level in ["[ERROR]", "[WARN]", "[INFO]", "[DEBUG]"]:
clean_message = clean_message.replace(bracket_level, "").strip()
# Send to Logfire with appropriate level
if level == "ERROR":
logfire.error(clean_message)
elif level == "WARN":
logfire.warn(clean_message)
elif level == "DEBUG":
logfire.debug(clean_message)
else: # INFO or default
logfire.info(clean_message)
"""
Example usage of Pydantic Logfire integration with BPTK-Py
This demonstrates three different ways to enable Logfire logging in BPTK-Py.
"""
# Method 1: Configure Logfire when initializing BPTK
from BPTK_Py import bptk
# Initialize BPTK with Logfire configuration
bptk_instance = bptk(
loglevel="INFO",
configuration={
"logfire_config": {
"project_name": "bptk_simulations",
"environment": "development",
# Add your Logfire token if needed
# "token": "your-logfire-token"
}
}
)
# Method 2: Configure Logfire directly via the logger module
import BPTK_Py.logger.logger as logmod
# Configure Logfire (if not already done via bptk initialization)
logmod.configure_logfire(
project_name="bptk_simulations",
environment="production"
)
# Later, you can disable Logfire if needed
# logmod.disable_logfire()
# Method 3: Use Logfire alongside file logging
# By default, logs go to both file and Logfire when configured
logmod.logmodes = ["logfile"] # File logging is still active
logmod.loglevel = "INFO"
# Now when you run simulations, logs will be sent to Logfire
# Example:
# bptk_instance.plot_scenarios(
# scenarios=["baseline", "optimized"],
# scenario_managers=["my_model"]
# )
# The logs will include all INFO, WARN, and ERROR messages from:
# - Model loading and parsing
# - Scenario execution
# - File monitoring
# - Error handling
# - And more...
# To check if Logfire is available and enabled:
print(f"Logfire available: {logmod.LOGFIRE_AVAILABLE}")
print(f"Logfire enabled: {logmod.logfire_enabled}")
"""
Test configuration utilities for external state adapters.
Handles loading configuration from .env files and provides test helpers.
"""
import os
from pathlib import Path
from typing import Optional
# Try to load python-dotenv if available
try:
from dotenv import load_dotenv
load_dotenv(Path(__file__).parent / '.env')
except ImportError:
pass
class TestConfig:
"""Configuration helper for external state adapter tests."""
@staticmethod
def get_postgres_config() -> Optional[dict]:
"""Get PostgreSQL configuration from environment variables."""
if not TestConfig.postgres_tests_enabled():
return None
return {
'host': os.getenv('POSTGRES_HOST', 'localhost'),
'port': int(os.getenv('POSTGRES_PORT', '5432')),
'dbname': os.getenv('POSTGRES_DB', 'bptk_test'),
'user': os.getenv('POSTGRES_USER'),
'password': os.getenv('POSTGRES_PASSWORD')
}
@staticmethod
def get_redis_config() -> Optional[str]:
"""Get Redis URL from environment variables."""
if not TestConfig.redis_tests_enabled():
return None
return os.getenv('REDIS_URL')
@staticmethod
def postgres_tests_enabled() -> bool:
"""Check if PostgreSQL tests should be run."""
return os.getenv('ENABLE_POSTGRES_TESTS', 'false').lower() == 'true'
@staticmethod
def redis_tests_enabled() -> bool:
"""Check if Redis tests should be run."""
return os.getenv('ENABLE_REDIS_TESTS', 'false').lower() == 'true'
@staticmethod
def get_test_timeout() -> int:
"""Get test timeout in seconds."""
return int(os.getenv('TEST_TIMEOUT', '30'))
@staticmethod
def get_cleanup_timeout() -> int:
"""Get cleanup timeout in seconds."""
return int(os.getenv('CLEANUP_TIMEOUT', '10'))
def requires_postgres(test_func):
"""Decorator to skip tests that require PostgreSQL if not enabled."""
import pytest
return pytest.mark.skipif(
not TestConfig.postgres_tests_enabled(),
reason="PostgreSQL tests not enabled. Set ENABLE_POSTGRES_TESTS=true in .env"
)(test_func)
def requires_redis(test_func):
"""Decorator to skip tests that require Redis if not enabled."""
import pytest
return pytest.mark.skipif(
not TestConfig.redis_tests_enabled(),
reason="Redis tests not enabled. Set ENABLE_REDIS_TESTS=true in .env"
)(test_func)
+12
-1
Metadata-Version: 2.4
Name: bptk-py
Version: 2.1.1
Version: 2.2.0
Summary: A python simulation engine for System Dynamics & Agent based models

@@ -32,4 +32,8 @@ Author-email: transentis <support@transentis.com>

Requires-Dist: jsonpickle==4.1.1
Requires-Dist: psycopg==3.2.10
Requires-Dist: redis==6.4.0
Requires-Dist: ujson==5.10.0
Provides-Extra: test
Requires-Dist: pytest; extra == "test"
Requires-Dist: python-dotenv; extra == "test"
Dynamic: license-file

@@ -80,2 +84,9 @@

### 2.2.0
* Add externalStateAdapters for Postgres and Redis
* Add option to externalize state completely, thus turning bptkServer into a stateless server
* Add pydantic logfire as logging backend
* Improve return message on bptkServer /stop-instance
### 2.1.1

@@ -82,0 +93,0 @@

@@ -16,4 +16,8 @@ pandas==2.3.0

jsonpickle==4.1.1
psycopg==3.2.10
redis==6.4.0
ujson==5.10.0
[test]
pytest
python-dotenv

@@ -14,3 +14,8 @@ LICENSE

BPTK_Py/externalstateadapter/externalStateAdapter.py
BPTK_Py/externalstateadapter/file_adapter.py
BPTK_Py/externalstateadapter/postgres_adapter.py
BPTK_Py/externalstateadapter/redis_adapter.py
BPTK_Py/logger/__init__.py
BPTK_Py/logger/logfire_adapter.py
BPTK_Py/logger/logfire_usage_example.py
BPTK_Py/logger/logger.py

@@ -100,2 +105,3 @@ BPTK_Py/modeling/__init__.py

tests/test_bptk.py
tests/test_config.py
tests/test_external_state.py

@@ -102,0 +108,0 @@ tests/test_sddsl.py

@@ -75,2 +75,11 @@ # /`-

# Pydantic Logfire configuration (optional)
# Set to None to disable, or provide a dict with Logfire config
"logfire_config": None,
# Example:
# "logfire_config": {
# "project_name": "bptk_simulations",
# "environment": "development"
# },
"set_scenario_monitor": True,

@@ -77,0 +86,0 @@ "set_model_monitor": True,

@@ -1,1 +0,23 @@

from .externalStateAdapter import ExternalStateAdapter, InstanceState, FileAdapter
from .externalStateAdapter import ExternalStateAdapter, InstanceState
from .file_adapter import FileAdapter
# Optional imports - only import if dependencies are available
try:
from .postgres_adapter import PostgresAdapter
except ImportError:
class PostgresAdapter:
def __init__(self, *args, **kwargs):
raise ImportError(
"PostgresAdapter requires 'psycopg' to be installed. "
"Install it with: pip install psycopg[binary]"
)
try:
from .redis_adapter import RedisAdapter
except ImportError:
class RedisAdapter:
def __init__(self, *args, **kwargs):
raise ImportError(
"RedisAdapter requires 'redis' to be installed. "
"Install it with: pip install redis"
)
+77
-85

@@ -9,2 +9,3 @@ from abc import ABCMeta, abstractmethod

import os
from ..logger import log

@@ -17,3 +18,3 @@ @dataclass

timeout: Any
step: int
step: Any

@@ -24,38 +25,88 @@ class ExternalStateAdapter(metaclass=ABCMeta):

self.compress = compress
log(f"[INFO] ExternalStateAdapter initialized with compression: {compress}")
def save_state(self, state: list[InstanceState]):
if(self.compress):
for cur_state in state:
if(cur_state is not None and cur_state.state is not None):
cur_state.state["settings_log"] = statecompression.compress_settings(cur_state.state["settings_log"])
cur_state.state["results_log"] = statecompression.compress_results(cur_state.state["results_log"])
return self._save_state(state)
def save_instance(self, state: InstanceState):
if(self.compress and state is not None and state.state is not None):
log(f"[INFO] Saving instance {state.instance_id if state else 'None'}")
try:
if(self.compress and state is not None and state.state is not None):
log(f"[INFO] Compressing state for instance {state.instance_id}")
state.state["settings_log"] = statecompression.compress_settings(state.state["settings_log"])
state.state["results_log"] = statecompression.compress_results(state.state["results_log"])
return self._save_instance(state)
log(f"[INFO] State compression completed for instance {state.instance_id}")
result = self._save_instance(state)
log(f"[INFO] Instance {state.instance_id if state else 'None'} saved successfully")
return result
except Exception as e:
log(f"[ERROR] Failed to save instance {state.instance_id if state else 'None'}: {str(e)}")
raise
def load_state(self) -> list[InstanceState]:
state = self._load_state()
if(self.compress):
for cur_state in state:
if(cur_state is not None and cur_state.state is not None):
cur_state.state["settings_log"] = statecompression.decompress_settings(cur_state.state["settings_log"])
cur_state.state["results_log"] = statecompression.decompress_results(cur_state.state["results_log"])
return state
def load_instance(self, instance_uuid: str) -> InstanceState:
state = self._load_instance(instance_uuid)
if(self.compress and state is not None and state.state is not None):
state.state["settings_log"] = statecompression.decompress_settings(state.state["settings_log"])
state.state["results_log"] = statecompression.decompress_results(state.state["results_log"])
return state
log(f"[INFO] Loading instance {instance_uuid}")
try:
state = self._load_instance(instance_uuid)
if state is None:
log(f"[WARN] No state found for instance {instance_uuid}")
return state
@abstractmethod
def _save_state(self, state: list[InstanceState]):
pass
log(f"[INFO] State loaded for instance {instance_uuid}")
if(self.compress and state.state is not None):
log(f"[INFO] Decompressing state for instance {instance_uuid}")
state.state["settings_log"] = statecompression.decompress_settings(state.state["settings_log"])
state.state["results_log"] = statecompression.decompress_results(state.state["results_log"])
log(f"[INFO] State decompression completed for instance {instance_uuid}")
# Always restore numeric keys in scenario_cache (no compression, just JSON key conversion fix)
if(state.state is not None):
if "scenario_cache" in state.state:
log(f"[INFO] Restoring numeric keys in scenario_cache for instance {instance_uuid}")
state.state["scenario_cache"] = self._restore_numeric_keys(state.state["scenario_cache"])
log(f"[INFO] Numeric keys restored for instance {instance_uuid}")
log(f"[INFO] Instance {instance_uuid} loaded successfully")
return state
except Exception as e:
log(f"[ERROR] Failed to load instance {instance_uuid}: {str(e)}")
raise
def _restore_numeric_keys(self, data):
"""
Recursively restore numeric keys that were converted to strings during JSON serialization.
This handles the scenario_cache structure where floating point timesteps get converted to strings.
"""
if not isinstance(data, dict):
return data
restored = {}
for key, value in data.items():
# Try to convert string keys back to numbers
new_key = key
if isinstance(key, str):
# Try to convert to float first (for timesteps like "1.0", "2.5")
try:
if '.' in key:
new_key = float(key)
log(f"[INFO] Converted string key '{key}' to float {new_key}")
else:
# Try integer conversion for whole numbers
new_key = int(key)
log(f"[INFO] Converted string key '{key}' to int {new_key}")
except ValueError:
# If conversion fails, keep as string
new_key = key
# Recursively process nested dictionaries
if isinstance(value, dict):
restored[new_key] = self._restore_numeric_keys(value)
else:
restored[new_key] = value
return restored
@abstractmethod

@@ -66,6 +117,2 @@ def _save_instance(self, state: InstanceState):

@abstractmethod
def _load_state(self) -> list[InstanceState]:
pass
@abstractmethod
def _load_instance(self, instance_uuid: str) -> InstanceState:

@@ -78,56 +125,1 @@ pass

class FileAdapter(ExternalStateAdapter):
def __init__(self, compress: bool, path: str):
super().__init__(compress)
self.path = path
def _save_state(self, instance_states: list[InstanceState]):
for state in instance_states:
self._save_instance(state)
def _save_instance(self, state: InstanceState):
data = {
"data": {
"state": jsonpickle.dumps(state.state),
"instance_id": state.instance_id,
"time": str(state.time),
"timeout": state.timeout,
"step": state.step
}
}
f = open(os.path.join(self.path, str(state.instance_id) + ".json"), "w")
f.write(jsonpickle.dumps(data))
f.close()
def _load_state(self) -> list[InstanceState]:
instances = []
instance_paths = os.listdir(self.path)
for instance_uuid in instance_paths:
instances.append(self._load_instance(instance_uuid.split(".")[0]))
return instances
def _load_instance(self, instance_uuid: str) -> InstanceState:
try:
f = open(os.path.join(self.path, str(instance_uuid) + ".json"), "r")
instance_data = jsonpickle.loads(f.read())
decoded_data = jsonpickle.loads(instance_data["data"]["state"])
instance_id = instance_data["data"]["instance_id"]
timeout = instance_data["data"]["timeout"]
step = instance_data["data"]["step"]
return InstanceState(decoded_data, instance_id, datetime.datetime.now(), timeout, step)
except Exception as e:
print("Error: " + str(e))
return None
def delete_instance(self, instance_uuid: str):
try:
os.remove(os.path.join(self.path, str(instance_uuid) + ".json"))
except Exception as e:
print("Error: " + str(e))

@@ -14,2 +14,4 @@ # /`-

import logging
# Configuration variables
loglevel = "WARN"

@@ -19,4 +21,60 @@ logfile = "bptk_py.log"

# Logfire adapter support
logfire_adapter = None
logfire_enabled = False
try:
from .logfire_adapter import LogfireAdapter, LOGFIRE_AVAILABLE
except ImportError:
LOGFIRE_AVAILABLE = False
LogfireAdapter = None
def configure_logfire(**logfire_config):
"""
Configure and enable Pydantic Logfire logging.
Args:
**logfire_config: Configuration parameters to pass to logfire.configure()
Common options include:
- project_name: Name of your Logfire project
- token: Your Logfire API token
- environment: Environment name (e.g., 'development', 'production')
Returns:
bool: True if Logfire was successfully configured, False otherwise
Example:
>>> import BPTK_Py.logger.logger as logmod
>>> logmod.configure_logfire(project_name="my_bptk_project")
"""
global logfire_adapter, logfire_enabled
if not LOGFIRE_AVAILABLE:
if "logfile" in logmodes:
with open(logfile, "a", encoding="UTF-8") as myfile:
myfile.write(f"{datetime.datetime.now()}, [WARN] Pydantic Logfire is not installed. "
"Install with: pip install pydantic-logfire\n")
return False
try:
logfire_adapter = LogfireAdapter(**logfire_config)
logfire_enabled = True
log("[INFO] Logfire logging enabled successfully")
return True
except Exception as e:
if "logfile" in logmodes:
with open(logfile, "a", encoding="UTF-8") as myfile:
myfile.write(f"{datetime.datetime.now()}, [ERROR] Failed to configure Logfire: {e}\n")
return False
def disable_logfire():
"""Disable Logfire logging."""
global logfire_enabled
logfire_enabled = False
log("[INFO] Logfire logging disabled")
def log(message):

@@ -40,1 +98,12 @@ """logs all log messages either to file or stdout"""

print(str(datetime.datetime.now()) + ", " + message)
# Send to Logfire if enabled
if logfire_enabled and logfire_adapter:
try:
logfire_adapter.log(message)
except Exception as e:
# Fail silently to avoid disrupting the main application
# Optionally log this error to file
if "logfile" in logmodes:
with open(logfile, "a", encoding="UTF-8") as myfile:
myfile.write(f"{datetime.datetime.now()}, [WARN] Failed to send log to Logfire: {e}\n")

@@ -14,2 +14,3 @@ # /`-

from ..logger import log
from copy import deepcopy
###

@@ -115,2 +116,8 @@

def _set_cache(self,cache):
self.model.memo = cache
def _get_cache(self):
return self.model.memo
def setup_constants(self):

@@ -117,0 +124,0 @@ """

@@ -54,5 +54,8 @@ # /`-

instance = self._instances[instance_uuid]
session_state = copy.deepcopy(instance['instance'].session_state)
session_state["lock"] = False
return InstanceState(session_state, instance_uuid, instance["time"], instance["timeout"], session_state["step"])
session_state = copy.deepcopy(instance['instance'].session_state) if instance['instance'].session_state is not None else None
step=None
if session_state is not None:
session_state["lock"] = False
step=session_state["step"]
return InstanceState(session_state, instance_uuid, instance["time"], instance["timeout"], step)

@@ -151,3 +154,4 @@ def get_instance_states(self):

instance = self._make_bptk()
instance._set_state(session_state)
if session_state:
instance._set_state(session_state)

@@ -196,3 +200,3 @@ instance_data = {

"""
def __init__(self, import_name, bptk_factory=None, external_state_adapter=None, bearer_token=None):
def __init__(self, import_name, bptk_factory=None, external_state_adapter=None, bearer_token=None, externalize_state_completely=False):
"""

@@ -202,2 +206,3 @@ Initialize the server with the import name and the bptk.

:param bptk: simulations made by the bptk.
:param externalize_state_completely: if True and external_state_adapter is provided, instances are deleted after every use to ensure statelessness
"""

@@ -209,9 +214,4 @@ super(BptkServer, self).__init__(import_name)

self._bearer_token = bearer_token
self._externalize_state_completely = externalize_state_completely
# Loading the full state on startup
if external_state_adapter != None:
result = self._external_state_adapter.load_state()
for instance_data in result:
self._instance_manager.reconstruct_instance(instance_data.instance_id, instance_data.timeout, instance_data.time, instance_data.state)
# specifying the routes and methods of the api

@@ -236,4 +236,2 @@ self.route("/", methods=['GET'],strict_slashes=False)(self._home_resource)

self.route("/full-metrics", methods=['GET'], strict_slashes=False)(self._full_metrics_resource)
self.route("/save-state", methods=['GET'], strict_slashes=False)(self._save_state_resource)
self.route("/load-state", methods=['POST'], strict_slashes=False)(self._load_state_resource)
self.route("/<instance_uuid>/stop-instance", methods=['POST'], strict_slashes=False)(self._stop_instance_resource)

@@ -252,3 +250,3 @@

return resp
if token != self._bearer_token:

@@ -261,2 +259,30 @@ resp = make_response('{"Unauthorized": "Authentication Token is wrong!"}', 401)

@staticmethod
def auto_cleanup_instance(f):
@wraps(f)
def decorated(self, instance_uuid, *args, **kwargs):
try:
result = f(self, instance_uuid, *args, **kwargs)
if self._external_state_adapter:
# Save instance state to external storage before deleting
instance_state = self._instance_manager._get_instance_state(instance_uuid)
if instance_state:
self._external_state_adapter.save_instance(instance_state)
if self._externalize_state_completely:
self._instance_manager._delete_instance(instance_uuid)
return result
except Exception as e:
if self._external_state_adapter:
# Save instance state to external storage before deleting, even on exception
try:
instance_state = self._instance_manager._get_instance_state(instance_uuid)
if instance_state and self._external_state_adapter:
self._external_state_adapter.save_instance(instance_state)
except:
pass # Don't let save errors mask the original exception
self._instance_manager._delete_instance(instance_uuid)
raise e
return decorated
@token_required

@@ -268,3 +294,3 @@ def _stop_instance_resource(self, instance_uuid):

resp = make_response("Instance deleted.", 200)
resp = make_response('{"msg": "Instance deleted."}', 200)
resp.headers['Content-Type']='application/json'

@@ -274,36 +300,3 @@ resp.headers['Access-Control-Allow-Origin']='*'

@token_required
def _save_state_resource(self):
"""
Save all instances with the provided external state adapter.
"""
if(self._external_state_adapter == None):
return
instance_states = self._instance_manager.get_instance_states()
self._external_state_adapter.save_state(instance_states)
resp = make_response(jsonpickle.dumps(instance_states), 200)
resp.headers['Content-Type']='application/json'
resp.headers['Access-Control-Allow-Origin']='*'
return resp
@token_required
def _load_state_resource(self):
"""
Loads all instances using the external state adapter
"""
if(self._external_state_adapter == None):
return
result = self._external_state_adapter.load_state()
for instance_data in result:
self._instance_manager.reconstruct_instance(instance_data.instance_id, instance_data.timeout, instance_data.time, instance_data.state)
resp = make_response("Success", 200)
resp.headers['Access-Control-Allow-Origin']='*'
return resp
def _metrics_resource(self):

@@ -605,2 +598,9 @@ """

if instance_uuid is not None:
# If externalizing state completely, save the new instance to external storage
if self._externalize_state_completely and self._external_state_adapter:
instance_state = self._instance_manager._get_instance_state(instance_uuid)
self._instance_manager._delete_instance(instance_uuid)
if instance_state:
self._external_state_adapter.save_instance(instance_state)
response_data = {"instance_uuid":instance_uuid,"timeout":timeout}

@@ -633,3 +633,10 @@ resp = make_response(json.dumps(response_data), 200)

for i in range(instances):
instance_uuids.append(self._instance_manager.create_instance(**timeout))
instance_uuid = self._instance_manager.create_instance(**timeout)
instance_uuids.append(instance_uuid)
# If externalizing state completely, save each new instance to external storage
if self._externalize_state_completely and self._external_state_adapter:
instance_state = self._instance_manager._get_instance_state(instance_uuid)
self._instance_manager._delete_instance(instance_uuid)
if instance_state:
self._external_state_adapter.save_instance(instance_state)

@@ -644,2 +651,3 @@ response_data={"instance_uuids":instance_uuids,"timeout":timeout}

@token_required
@auto_cleanup_instance
def _begin_session_resource(self, instance_uuid):

@@ -730,2 +738,3 @@ """This endpoint starts a session for single step simulation. There can only be one session per instance at a time.

@token_required
@auto_cleanup_instance
def _end_session_resource(self, instance_uuid):

@@ -750,2 +759,3 @@ """This endpoint ends a session for single step simulation and resets the internal cache.

@token_required
@auto_cleanup_instance
def _flat_session_results_resource(self,instance_uuid):

@@ -764,2 +774,3 @@ """

@token_required
@auto_cleanup_instance
def _session_results_resource(self,instance_uuid,flat=False):

@@ -769,3 +780,2 @@ """

"""
if not self._ensure_instance_exists(instance_uuid):

@@ -786,2 +796,3 @@ resp = make_response('{"error": "expecting a valid instance id to be given"}', 500)

@token_required
@auto_cleanup_instance
def _run_step_resource(self, instance_uuid):

@@ -827,5 +838,2 @@ """

if self._external_state_adapter != None:
self._external_state_adapter.save_instance(self._instance_manager._get_instance_state(instance_uuid))
resp.headers['Content-Type'] = 'application/json'

@@ -836,2 +844,3 @@ resp.headers['Access-Control-Allow-Origin']='*'

@token_required
@auto_cleanup_instance
def _run_steps_resource(self, instance_uuid):

@@ -890,5 +899,2 @@ """

if self._external_state_adapter != None:
self._external_state_adapter.save_instance(self._instance_manager._get_instance_state(instance_uuid))
resp.headers['Content-Type'] = 'application/json'

@@ -900,2 +906,3 @@ resp.headers['Access-Control-Allow-Origin']='*'

@token_required
@auto_cleanup_instance
def _stream_steps_resource(self, instance_uuid):

@@ -956,5 +963,3 @@ """

instance.unlock()
if self._external_state_adapter != None:
self._external_state_adapter.save_instance(self._instance_manager._get_instance_state(instance_uuid))
resp = Response(streamer())

@@ -967,2 +972,3 @@ resp.headers['Content-Type'] = 'application/json'

@token_required
@auto_cleanup_instance
def _keep_alive_resource(self,instance_uuid):

@@ -975,3 +981,3 @@ """

if not self._instance_manager.is_valid_instance(instance_uuid):
if not self._ensure_instance_exists(instance_uuid):
resp = make_response('{"error": "expecting a valid instance id to be given"}', 500)

@@ -978,0 +984,0 @@ else:

@@ -123,3 +123,66 @@ def compress_settings(settings):

scenario_transformed[constant_name] = {step_str: constant[i - 1]}
return result
return result
def _compress_time_series_data(data):
"""
Helper function to compress time-series data similar to compress_settings logic.
"""
if not data:
return data
# Transform step-indexed data into compressed format
compressed = {}
for step in data.keys():
step_data = data[step]
if not isinstance(step_data, dict):
continue
for key, value in step_data.items():
if key not in compressed:
compressed[key] = [value]
else:
compressed[key].append(value)
return compressed
def _decompress_time_series_data(compressed_data):
"""
Helper function to decompress time-series data similar to decompress_settings logic.
"""
if not compressed_data:
return compressed_data
# Transform compressed format back to step-indexed data
result = {}
# Find the maximum length to determine number of steps
max_length = max(len(values) if isinstance(values, list) else 1
for values in compressed_data.values()) if compressed_data else 0
for i in range(max_length):
step_str = f"{i + 1:.1f}"
result[step_str] = {}
for key, values in compressed_data.items():
if isinstance(values, list) and i < len(values):
result[step_str][key] = values[i]
else:
result[step_str][key] = values
return result
def _is_compressed_time_series_data(data):
"""
Helper function to detect if data looks like compressed time-series data.
"""
if not isinstance(data, dict):
return False
# Check if values are lists (indicating compressed time-series)
for value in data.values():
if isinstance(value, list):
return True
return False
Metadata-Version: 2.4
Name: bptk-py
Version: 2.1.1
Version: 2.2.0
Summary: A python simulation engine for System Dynamics & Agent based models

@@ -32,4 +32,8 @@ Author-email: transentis <support@transentis.com>

Requires-Dist: jsonpickle==4.1.1
Requires-Dist: psycopg==3.2.10
Requires-Dist: redis==6.4.0
Requires-Dist: ujson==5.10.0
Provides-Extra: test
Requires-Dist: pytest; extra == "test"
Requires-Dist: python-dotenv; extra == "test"
Dynamic: license-file

@@ -80,2 +84,9 @@

### 2.2.0
* Add externalStateAdapters for Postgres and Redis
* Add option to externalize state completely, thus turning bptkServer into a stateless server
* Add pydantic logfire as logging backend
* Improve return message on bptkServer /stop-instance
### 2.1.1

@@ -82,0 +93,0 @@

@@ -23,4 +23,6 @@ [build-system]

"flask==3.1.1",
"jsonpickle==4.1.1"
"jsonpickle==4.1.1",
"psycopg==3.2.10",
"redis==6.4.0",
"ujson==5.10.0"
]

@@ -45,3 +47,3 @@ requires-python = ">=3.11"

test = [
"pytest",
"pytest","python-dotenv"
]

@@ -52,2 +54,7 @@

Documentation = "https://bptk.transentis.com"
Repository = "https://github.com/transentis/bptk_py"
Repository = "https://github.com/transentis/bptk_py"
[tool.pytest.ini_options]
filterwarnings = [
"ignore::DeprecationWarning:traitlets.*"
]

@@ -44,2 +44,9 @@ # Business Prototyping Toolkit for Python

### 2.2.0
* Add externalStateAdapters for Postgres and Redis
* Add option to externalize state completely, thus turning bptkServer into a stateless server
* Add pydantic logfire as logging backend
* Improve return message on bptkServer /stop-instance
### 2.1.1

@@ -46,0 +53,0 @@

@@ -15,3 +15,3 @@ from setuptools import setup

def get_version():
return '2.1.1'
return '2.2.0'

@@ -18,0 +18,0 @@ setup(version=get_version(),

@@ -6,16 +6,50 @@ from BPTK_Py.server import BptkServer

from BPTK_Py import FileAdapter
from BPTK_Py.externalstateadapter.redis_adapter import RedisAdapter
import os
from BPTK_Py import Model
import BPTK_Py
import redis
from dotenv import load_dotenv
from BPTK_Py import sd_functions as sd
@pytest.fixture(params=[True, False], ids=["externalize_completely", "no_externalize"])
def externalize_state_completely(request):
"""Fixture that provides both externalize_state_completely parameter values."""
return request.param
def bptk_factory():
model = Model(starttime=1.0,stoptime=5.0, dt=1.0, name="Test Model")
model.points["delay"] = [
(1.0, 2.0),
(2.0, 2.0),
(3.0, 2.0),
(4.0, 2.0),
(5.0, 2.0)
]
model.points["round"] = [
(1.0, 1.0),
(2.0, 2.0),
(3.0, 3.0),
(4.0, 4.0),
(5.0, 5.0)
]
stock = model.stock("stock")
flow = model.flow("flow")
constant = model.constant("constant")
stock.initial_value=0.0
stock.equation=flow
flow.equation=constant
constant.equation=1.0
inflow = model.flow("inflow")
outflow = model.flow("outflow")
input = model.constant("input")
converter = model.converter("converter")
round = model.converter("round")
round.equation = sd.lookup(sd.time(), "round")
delay = model.converter("delay")
delay.equation = sd.lookup(sd.time(), "delay")
stock.initial_value=400.0
stock.equation=inflow-outflow
inflow.equation=input
outflow.equation=sd.delay(model,inflow,delay,100.0)
input.equation=100.0
converter.equation=stock

@@ -45,6 +79,6 @@ scenario_manager1={

{
"1":{
"scenario1":{
"constants":
{
"constant":1.0
"input":100.0
}

@@ -63,16 +97,17 @@ }

{
"1":{
"scenario1":{
"constants":
{
"constant":1.0
"input":100.0
}
},
"2":{
"scenario2":{
"constants":{
"constant":2.0
"input":200.0
}
},
"3":{
"scenario3":{
"constants":{
"constant":3.0
"input":300.0
}

@@ -89,15 +124,219 @@ }

@pytest.fixture
def app():
def file_app(externalize_state_completely):
import os
if not os.path.exists("state/"):
os.mkdir("state/")
adapter = FileAdapter(True, os.path.join(os.getcwd(), "state"))
flask_app = BptkServer(__name__, bptk_factory, external_state_adapter=adapter)
adapter = FileAdapter(compress=True, path=os.path.join(os.getcwd(), "state"))
flask_app = BptkServer(__name__, bptk_factory, external_state_adapter=adapter, externalize_state_completely=externalize_state_completely)
yield flask_app
# Cleanup after test
print("Tearing down Flask app...")
try:
# Clean up any remaining instances
if hasattr(flask_app, '_instance_manager'):
print("Cleaning up instance manager...")
# Force cleanup of all instances
if hasattr(flask_app._instance_manager, '_instances'):
for instance_id in list(flask_app._instance_manager._instances.keys()):
try:
flask_app._instance_manager._delete_instance(instance_id)
print(f"Cleaned up instance: {instance_id}")
except Exception as e:
print(f"Error cleaning up instance {instance_id}: {e}")
# Teardown Flask app context
with flask_app.app_context():
pass # This ensures proper cleanup of app context
print("Flask app teardown complete")
except Exception as e:
print(f"Error during app teardown: {e}")
@pytest.fixture
def client(app):
return app.test_client()
def file_client_fixture(file_app):
return file_app.test_client()
def test_instance_timeouts(app, client):
def external_state_base(client):
response = client.post('/start-instance')
assert response.status_code == 200, "start-instance should return 200"
result = json.loads(response.data)
assert "instance_uuid" in result, "start_instance should return an instance id"
instance_uuid= result["instance_uuid"]
# Prepare content for begin-session
content = {
"scenario_managers": [
"firstManager"
],
"scenarios": [
"scenario1"
],
"equations": [
"converter",
"delay",
"inflow",
"outflow",
"input",
"round"
]
}
response = client.post(f'{instance_uuid}/begin-session', data=json.dumps(content), content_type='application/json')
assert response.status_code == 200, "begin-session should return 200"
# Prepare content for run-step
def make_run_content(value):
content={
"settings":{
"firstManager":{
"scenario1":{
"constants":{
"input":value
}
}}
},
"flatResults": False
}
return content
# run some steps
response = client.post(f'{instance_uuid}/run-step', data=json.dumps(make_run_content(100.0)), content_type='application/json')
assert response.status_code == 200, "run-step should return 200"
data= json.loads(response.data)
assert data["firstManager"]["scenario1"]["input"]["1.0"]==100.0 , "input should have value 100.0"
assert data["firstManager"]["scenario1"]["converter"]["1.0"]==400.0 , "converter should have value 400.0"
assert data["firstManager"]["scenario1"]["delay"]["1.0"]==2.0 , "delay should have value 2.0"
assert data["firstManager"]["scenario1"]["inflow"]["1.0"]==100.0 , "inflow should have value 100.0"
assert data["firstManager"]["scenario1"]["outflow"]["1.0"]==100.0 , "outflow should have value 100.0"
response = client.post(f'{instance_uuid}/run-step', data=json.dumps(make_run_content(400.0)), content_type='application/json')
assert response.status_code == 200, "run-step should return 200"
data= json.loads(response.data)
assert data["firstManager"]["scenario1"]["input"]["2.0"]==400.0 , "input should have value 100.0"
assert data["firstManager"]["scenario1"]["converter"]["2.0"]==400.0 , "converter should have value 400.0"
assert data["firstManager"]["scenario1"]["delay"]["2.0"]==2.0 , "delay should have value 2.0"
assert data["firstManager"]["scenario1"]["inflow"]["2.0"]==400.0 , "inflow should have value 100.0"
assert data["firstManager"]["scenario1"]["outflow"]["2.0"]==100.0 , "outflow should have value 100.0"
response = client.post(f'{instance_uuid}/run-step', data=json.dumps(make_run_content(400.0)), content_type='application/json')
assert response.status_code == 200, "run-step should return 200"
data= json.loads(response.data)
assert data["firstManager"]["scenario1"]["converter"]["3.0"]==700.0 , "converter should have value 700.0"
assert data["firstManager"]["scenario1"]["delay"]["3.0"]==2.0 , "delay should have value 2.0"
assert data["firstManager"]["scenario1"]["inflow"]["3.0"]==400.0 , "inflow should have value 100.0"
assert data["firstManager"]["scenario1"]["outflow"]["3.0"]==100.0 , "outflow should have value 100.0"
response = client.post(f'{instance_uuid}/run-step', data=json.dumps(make_run_content(400.0)), content_type='application/json')
assert response.status_code == 200, "run-step should return 200"
data= json.loads(response.data)
assert data["firstManager"]["scenario1"]["converter"]["4.0"]==1000.0 , "converter should have value 1000.0"
# Cleanup - stop the instance to properly clean up resources
print("Cleaning up instance...")
response = client.post(f'{instance_uuid}/stop-instance')
print("Stop-instance response:", response.status_code)
assert response.status_code == 200, "stop-instance should return 200"
def test_external_state_file(file_client_fixture):
external_state_base(file_client_fixture)
@pytest.fixture
def redis_app(externalize_state_completely):
"""Create Flask app with Redis adapter"""
# Try multiple paths for .env file
env_paths = [
os.path.join(os.getcwd(), ".env")
]
redis_url = None
enable_redis_tests = False
for env_path in env_paths:
try:
if os.path.exists(env_path):
print(f"Loading environment from: {env_path}")
load_dotenv(env_path)
redis_url = os.getenv("REDIS_URL")
enable_redis_tests = os.getenv("ENABLE_REDIS_TESTS", "false").lower() == "true"
if redis_url:
break
except Exception as e:
print(f"Error loading {env_path}: {e}")
print(f"Redis URL configured: {bool(redis_url)}")
print(f"Redis tests enabled: {enable_redis_tests}")
if not redis_url or not enable_redis_tests:
pytest.skip("Redis tests disabled or REDIS_URL not configured")
try:
# Create Redis client
redis_client = redis.from_url(redis_url)
# Test connection
redis_client.ping()
print(f"Connected to Redis at: {redis_url}")
except Exception as e:
pytest.skip(f"Could not connect to Redis: {e}")
# Create Redis adapter
adapter = RedisAdapter(redis_client, compress=True, key_prefix="bptk:test")
flask_app = BptkServer(__name__, bptk_factory, external_state_adapter=adapter, externalize_state_completely=externalize_state_completely)
yield flask_app
# Cleanup after test
print("Tearing down Redis Flask app...")
try:
# Clean up any remaining instances
if hasattr(flask_app, '_instance_manager'):
print("Cleaning up instance manager...")
if hasattr(flask_app._instance_manager, '_instances'):
for instance_id in list(flask_app._instance_manager._instances.keys()):
try:
flask_app._instance_manager._delete_instance(instance_id)
print(f"Cleaned up instance: {instance_id}")
# Also clean up from Redis
adapter.delete_instance(instance_id)
print(f"Cleaned up Redis data for instance: {instance_id}")
except Exception as e:
print(f"Error cleaning up instance {instance_id}: {e}")
# Clean up any test keys from Redis
try:
pattern = "bptk:test:*"
for key in redis_client.scan_iter(match=pattern):
redis_client.delete(key)
print(f"Cleaned up Redis key: {key}")
except Exception as e:
print(f"Error cleaning up Redis keys: {e}")
# Teardown Flask app context
with flask_app.app_context():
pass # This ensures proper cleanup of app context
print("Redis Flask app teardown complete")
except Exception as e:
print(f"Error during Redis app teardown: {e}")
@pytest.fixture
def redis_client_fixture(redis_app):
return redis_app.test_client()
def test_external_state_redis(redis_client_fixture):
"""Test external state with Redis adapter - equivalent to file adapter test"""
external_state_base(redis_client_fixture)
def test_instance_timeouts(file_client_fixture):
client = file_client_fixture
def assert_in_full_metrics(instance_id, contains: bool):

@@ -137,10 +376,10 @@ response = client.get('/full-metrics')

"scenarios": [
"1"
"scenario1"
],
"equations": [
"constant"
"input"
]
}
response = client.post(f'http://localhost:500/{instance_id}/begin-session', data=json.dumps(content), content_type='application/json')
response = client.post(f'http://localhost:5000/{instance_id}/begin-session', data=json.dumps(content), content_type='application/json')
assert response.status_code == 200, "begin-session should return 200"

@@ -161,51 +400,4 @@

assert_in_full_metrics(instance_id, True)
time.sleep(4)
assert_in_full_metrics(instance_id, False)
response = client.post(f'http://localhost:5000/{instance_id}/run-step', data=json.dumps(run_content), content_type='application/json')
assert response.status_code == 200, "run-step should return 200"
assert_in_full_metrics(instance_id, True)
time.sleep(4)
assert_in_full_metrics(instance_id, False)
response = client.post('http://localhost:5000/load-state')
assert response.status_code == 200, "load-state should return 200"
assert_in_full_metrics(instance_id, True)
time.sleep(4)
assert_in_full_metrics(instance_id, False)
response = client.post('http://localhost:5000/load-state')
assert response.status_code == 200, "load-state should return 200"
os.remove(os.path.join("state/", instance_id + ".json"))
response = client.get('http://localhost:5000/save-state')
assert response.status_code == 200, "save-state should return 200"
dir_content = os.listdir("state/")
assert instance_id + ".json" in dir_content
response = client.post('http://localhost:5000/load-state')
assert response.status_code == 200, "load-state should return 200"
assert_in_full_metrics(instance_id, True)
response = client.post(f'http://localhost:5000/{instance_id}/stop-instance')
assert response.status_code == 200, "stop-instance should return 200"
assert_in_full_metrics(instance_id, False)
response = client.get('http://localhost:5000/save-state')
assert response.status_code == 200, "save-state should return 200"
dir_content = os.listdir("state/")
assert not instance_id + ".json" in dir_content

Sorry, the diff of this file is too big to display