Big News: Socket raises $60M Series C at a $1B valuation to secure software supply chains for AI-driven development.Announcement
Sign In

camera-ui-python-common

Package Overview
Dependencies
Maintainers
1
Versions
16
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

camera-ui-python-common - pypi Package Compare versions

Comparing version
2.0.2
to
3.0.0
+5
camera_ui_python_common/camera_utils/__init__.py
"""Camera utilities module."""
from .utils import build_target_url
__all__ = ["build_target_url"]
"""Camera utility functions."""
from __future__ import annotations
from urllib.parse import urlparse
from camera_ui_python_types import RTSPUrlOptions
def build_target_url(rtsp_url: str, options: RTSPUrlOptions | None = None) -> str:
"""
Build a target URL with streaming options.
Args:
rtsp_url: The base RTSP URL
options: Optional streaming options
Returns:
The constructed URL with query parameters
"""
if options is None:
options = {}
parsed = urlparse(rtsp_url)
base_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
# Extract options with defaults
video = options.get("video", True)
audio = options.get("audio", True)
audio_single_track = options.get("audioSingleTrack", True)
backchannel = options.get("backchannel", False)
timeout = options.get("timeout", 15)
gop = options.get("gop", True)
prebuffer = options.get("prebuffer", False)
# Validate timeout (5-30 seconds)
validated_timeout = min(max(5, timeout), 30)
params: list[str] = []
# Video parameter
if video:
params.append("video")
# Audio parameter
if audio:
if isinstance(audio, bool):
params.append("audio")
elif isinstance(audio, list):
if audio_single_track:
# Single track with multiple codecs
params.append(f"audio={','.join(audio)}")
else:
# Multiple tracks
for codec in audio:
params.append(f"audio={codec}")
else:
params.append(f"audio={audio}")
# Backchannel parameter
if backchannel:
params.append("backchannel=1")
# GOP parameter
if gop:
params.append("gop=1")
# Prebuffer parameter
if prebuffer:
params.append("prebuffer=5")
# Timeout parameter
params.append(f"timeout={validated_timeout}")
return f"{base_url}?{'&'.join(params)}"
from .object_path import ObjectPath, Path
from .promise import Deferred
from .reactive import ReactiveProperty
from .signal_handler import SignalHandler
from .subscribed import Subscribed
from .task import TaskSet
from .thread import to_thread
from .utils import make_sync, merge, merge_with
__all__ = [
"Deferred",
"make_sync",
"merge",
"merge_with",
"ObjectPath",
"Path",
"ReactiveProperty",
"SignalHandler",
"Subscribed",
"TaskSet",
"to_thread",
]
"""Object path utilities for nested object access."""
from __future__ import annotations
from collections.abc import Sequence
from typing import Any, cast
Path = Sequence[int | str] | str | int
def is_empty(value: list[int] | list[str] | Sequence[int | str] | None) -> bool:
"""Check if a sequence is empty or None."""
if value is None:
return True
return len(value) == 0
def get_key(key: str | int) -> int | str:
"""Convert a string key to int if it's a digit."""
if isinstance(key, int):
return key
if key.isdigit():
return int(key)
return key
class ObjectPath:
"""Utility class for accessing and modifying nested objects."""
@staticmethod
def delete(obj: dict[Any, Any] | list[Any], path: Path) -> dict[Any, Any] | list[Any]:
"""Delete a value at the specified path."""
if isinstance(path, int):
path = [path]
elif isinstance(path, str):
path = path.split(".")
if is_empty(path):
return obj
current_path = get_key(path[0])
if isinstance(obj, list):
if isinstance(current_path, int) and 0 <= current_path < len(obj):
if len(path) == 1:
obj.pop(current_path)
else:
next_obj = obj[current_path]
if isinstance(next_obj, (dict, list)):
ObjectPath.delete(cast(Any, next_obj), path[1:])
elif current_path in obj:
if len(path) == 1:
del obj[current_path]
else:
next_obj = obj[current_path]
if isinstance(next_obj, (dict, list)):
ObjectPath.delete(cast(Any, next_obj), path[1:])
return obj
@staticmethod
def has(obj: dict[Any, Any] | list[Any], path: Path) -> bool:
"""Check if a path exists in the object."""
if isinstance(path, int):
path = [path]
elif isinstance(path, str):
path = path.split(".")
if not path:
return bool(obj)
current: Any = obj
for item in path:
key = get_key(item)
if isinstance(current, list):
if isinstance(key, int) and 0 <= key < len(cast(list[str], current)):
current = current[key]
else:
return False
elif isinstance(current, dict):
if key in current:
current = current[key]
else:
return False
else:
return False
return True
@staticmethod
def get(obj: dict[Any, Any] | list[Any], path: Path, default_value: Any = None) -> Any:
"""Get a value at the specified path."""
# Handle empty paths
if isinstance(path, str) and not path or isinstance(path, Sequence) and not path:
return obj
# Convert single numbers or strings to a list
if isinstance(path, int):
path = [path]
elif isinstance(path, str):
path = path.split(".")
current_path = get_key(path[0])
next_obj = None
if isinstance(obj, list):
if isinstance(current_path, int) and 0 <= current_path < len(obj):
next_obj = obj[current_path]
else:
next_obj = obj.get(current_path)
if next_obj is None:
return default_value
if len(path) == 1:
return next_obj
return ObjectPath.get(next_obj, path[1:], default_value)
@staticmethod
def set(
obj: dict[Any, Any] | list[Any], path: Path, value: Any, do_not_replace: bool = False
) -> Any:
"""Set a value at the specified path."""
if isinstance(path, int):
path = [path]
elif isinstance(path, str):
path = [get_key(key) for key in path.split(".") if key]
if not path:
return obj
current_path = path[0]
if isinstance(obj, list):
if isinstance(current_path, int):
while len(obj) <= current_path:
obj.append(None)
if len(path) == 1:
if not do_not_replace or obj[current_path] is None:
obj[current_path] = value
else:
if obj[current_path] is None:
obj[current_path] = [] if isinstance(path[1], int) else {}
return ObjectPath.set(obj[current_path], path[1:], value, do_not_replace)
return None
if len(path) == 1:
if not do_not_replace or current_path not in obj:
obj[current_path] = value
return obj.get(current_path)
if current_path not in obj:
obj[current_path] = [] if isinstance(path[1], int) else {}
return ObjectPath.set(obj[current_path], path[1:], value, do_not_replace)
@staticmethod
def push(obj: dict[Any, Any] | list[Any], path: Path, *items: Any) -> None:
"""Push items to an array at the specified path."""
target = ObjectPath.get(obj, path)
if not isinstance(target, list):
target = []
ObjectPath.set(obj, path, target)
target.extend(items)
@staticmethod
def coalesce(
obj: dict[Any, Any] | list[Any],
paths: Path | list[Path],
default_value: Any = None,
) -> Any:
"""Return the first non-None value from the specified paths."""
paths_list = paths if isinstance(paths, list) else [paths]
for path in paths_list:
value = ObjectPath.get(obj, path)
if value is not None:
return value
return default_value
@staticmethod
def empty(obj: dict[Any, Any] | list[Any], path: Path) -> Any:
"""Empty the value at the specified path."""
value = ObjectPath.get(obj, path)
if value is None:
return None
if isinstance(value, str):
return ObjectPath.set(obj, path, "")
if isinstance(value, bool):
return ObjectPath.set(obj, path, False)
if isinstance(value, (int, float)):
return ObjectPath.set(obj, path, 0)
if isinstance(value, (list, dict)):
value.clear()
return cast(Any, value)
return ObjectPath.set(obj, path, None)
@staticmethod
def ensure_exists(obj: dict[Any, Any] | list[Any], path: Path, value: Any) -> Any:
"""Ensure a value exists at the specified path."""
return ObjectPath.set(obj, path, value, True)
@staticmethod
def insert(obj: dict[Any, Any] | list[Any], path: Path, value: Any, at: int = 0) -> None:
"""Insert a value into an array at the specified path."""
target = ObjectPath.get(obj, path)
if not isinstance(target, list):
target = []
ObjectPath.set(obj, path, target)
target.insert(at, value)
"""Promise/Deferred utilities for async operations."""
from __future__ import annotations
import asyncio
from typing import Generic, TypeVar
T = TypeVar("T")
class Deferred(Generic[T]):
"""A deferred promise that can be resolved or rejected externally."""
def __init__(self) -> None:
self._loop = asyncio.get_event_loop()
self._future: asyncio.Future[T] = self._loop.create_future()
@property
def promise(self) -> asyncio.Future[T]:
"""The underlying future/promise."""
return self._future
@property
def is_done(self) -> bool:
"""Whether the deferred has been resolved or rejected."""
return self._future.done()
def resolve(self, value: T) -> None:
"""Resolve the deferred with a value."""
if not self._future.done():
self._future.set_result(value)
def reject(self, reason: Exception | None = None) -> None:
"""Reject the deferred with an exception."""
if not self._future.done():
self._future.set_exception(reason or Exception("Rejected"))
def cancel(self, reason: str = "Cancelled") -> None:
"""Cancel the deferred."""
if not self._future.done():
self._future.set_exception(asyncio.CancelledError(reason))
"""Reactive property utilities."""
from __future__ import annotations
from copy import deepcopy
from typing import Generic, TypeVar, cast
from camera_ui_python_types import HybridObservable
from reactivex import operators as ops
from reactivex.subject import BehaviorSubject
T = TypeVar("T")
class ReactiveProperty(Generic[T]):
"""A reactive property that wraps a BehaviorSubject with observable access."""
__subject: BehaviorSubject[T]
observable: HybridObservable[T]
def __init__(
self, initial_value: T | BehaviorSubject[T], observable: HybridObservable[T] | None = None
) -> None:
if isinstance(initial_value, BehaviorSubject):
self.__subject = cast(BehaviorSubject[T], initial_value)
else:
self.__subject = BehaviorSubject(initial_value)
self.observable = observable or self.__create_state_observable(self.__subject)
@property
def value(self) -> T:
val = self.__subject.value
if isinstance(val, (dict, list)) or hasattr(val, "__dict__"):
return deepcopy(val)
return val
def next(self, value: T) -> None:
self.__subject.on_next(value)
def complete(self) -> None:
self.__subject.on_completed()
def __create_state_observable(self, state_subject: BehaviorSubject[T]) -> HybridObservable[T]:
return HybridObservable(state_subject.pipe(ops.distinct_until_changed(), ops.share()))
"""Signal handling utilities for graceful shutdown."""
from __future__ import annotations
import asyncio
import contextlib
import signal
import traceback
from collections.abc import Callable, Coroutine
from typing import Any, TypedDict
from camera_ui_python_types import LoggerService
class SignalHandlerOptions(TypedDict):
"""Options for configuring the signal handler."""
display_name: str
logger: LoggerService
timeout_duration: int
close_function: Callable[..., Coroutine[Any, Any, Any]]
class SignalHandler:
"""Handles system signals for graceful shutdown."""
display_name: str
logger: LoggerService
timeout_duration: int
close_function: Callable[..., Coroutine[Any, Any, Any]]
is_shutting_down: bool
shutdown_event: asyncio.Event
def __init__(self, options: SignalHandlerOptions) -> None:
self.display_name: str = options["display_name"]
self.logger = options["logger"]
self.timeout_duration = options.get("timeout_duration", 5)
self.close_function = options["close_function"]
self.is_shutting_down = False
self.shutdown_event = asyncio.Event()
def setup_handlers(self) -> None:
loop = asyncio.get_running_loop()
with contextlib.suppress(NotImplementedError):
for signame in ("SIGINT", "SIGTERM"):
def signal_callback() -> None:
with contextlib.suppress(asyncio.CancelledError):
asyncio.create_task(self.gracefully_close(signame))
sig = getattr(signal, signame)
loop.add_signal_handler(sig, signal_callback)
loop.set_exception_handler(self.handle_exception)
async def gracefully_close(self, signame: str) -> None:
if self.is_shutting_down:
return
self.is_shutting_down = True
self.logger.log(f"{self.display_name} Received {signame}. Stopping...")
try:
close_task = asyncio.create_task(self.close_function())
await asyncio.wait_for(close_task, timeout=self.timeout_duration)
except asyncio.TimeoutError:
self.logger.warn(
f"{self.display_name} Failed to gracefully close before timeout. Force quitting!"
)
except Exception as e:
self.logger.error(f"Error during shutdown: {str(e)}\n{traceback.format_exc()}")
finally:
self.shutdown_event.set()
def handle_exception(self, loop: asyncio.AbstractEventLoop, context: dict[str, Any]) -> None:
exception = context.get("exception")
message = context.get("message")
task = context.get("task") # Das Task Objekt
if isinstance(exception, asyncio.CancelledError) and self.is_shutting_down:
return
error_message = f"{self.display_name} Caught exception: {message}"
if task:
# Task Stack ausgeben
stack = task.get_stack()
if stack:
error_message += "\nTask Stack:"
for frame in stack:
error_message += f"\n File {frame.f_code.co_filename}, line {frame.f_lineno}, in {frame.f_code.co_name}"
if exception:
error_message += f"\n{type(exception).__name__}: {str(exception)}"
if not isinstance(exception, asyncio.CancelledError):
error_message += f"\n{traceback.format_exc()}"
self.logger.error(error_message)
if not self.is_shutting_down:
asyncio.create_task(self.gracefully_close("uncaughtException"))
"""Subscription management utilities."""
from __future__ import annotations
from typing import Protocol, runtime_checkable
@runtime_checkable
class Unsubscribable(Protocol):
"""Protocol for objects that can be disposed/unsubscribed."""
def dispose(self) -> None:
"""Dispose of the subscription."""
...
class Subscribed:
"""Base class for managing subscriptions."""
_subscriptions: list[Unsubscribable]
_additional_subscriptions: list[Unsubscribable]
def __init__(self) -> None:
self._subscriptions = []
self._additional_subscriptions = []
def add_subscriptions(self, *subscriptions: Unsubscribable) -> None:
"""Add subscriptions to be managed."""
self._subscriptions.extend(subscriptions)
def unsubscribe(self) -> None:
"""Dispose all managed subscriptions."""
for subscription in self._subscriptions:
subscription.dispose()
"""Task set utilities for managing async tasks."""
from __future__ import annotations
import asyncio
from collections.abc import Coroutine
from typing import Any
class TaskSet:
"""A set of async tasks that can be managed together."""
tasks: set[asyncio.Task[Any]]
name: str | None
def __init__(self, name: str | None = None) -> None:
self.tasks = set[asyncio.Task[Any]]()
self.name = name
def log_prefix(self) -> str:
return f"[{self.name}]"
def add(self, coroutine: Coroutine[Any, Any, Any]) -> asyncio.Task[Any]:
task = asyncio.create_task(coroutine, name=self.name)
self.tasks.add(task)
task.add_done_callback(lambda _: self.remove(task))
return task
def remove(self, task: asyncio.Task[Any]) -> None:
if task in self.tasks:
if not task.cancelled():
task.cancel()
self.tasks.remove(task)
def remove_all(self) -> None:
tasks = self.tasks.copy()
for task in tasks:
self.remove(task)
self.tasks.clear()
def __await__(self) -> Any:
return asyncio.gather(*self.tasks).__await__()
"""Thread utilities for running blocking operations."""
from __future__ import annotations
import asyncio
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor
from typing import TypeVar
T = TypeVar("T")
toThreadExecutor: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="worker")
async def to_thread(f: Callable[[], T]) -> T:
"""Run a blocking function in a thread pool."""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(toThreadExecutor, f)
"""Common utility functions."""
from __future__ import annotations
import asyncio
import copy
from collections.abc import Callable, Coroutine
from typing import Any, ParamSpec, TypeVar, cast
TSource = TypeVar("TSource", bound=dict[Any, Any] | list[Any] | Any)
TTarget = TypeVar("TTarget", bound=dict[Any, Any] | list[Any] | Any)
TKey = str | int | None
Customizer = Callable[[Any, Any, TKey, TSource, TTarget, list[dict[Any, Any]]], Any]
R = TypeVar("R")
P = ParamSpec("P")
def make_sync(async_func: Callable[P, Coroutine[Any, Any, R]]) -> Callable[P, R]:
"""Convert an async function to a sync function."""
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
try:
loop = asyncio.get_running_loop()
return asyncio.run_coroutine_threadsafe(async_func(*args, **kwargs), loop).result()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(async_func(*args, **kwargs))
return wrapper
def merge_with(
source_object: TSource,
target_object: TTarget | None,
customizer: Customizer[Any, Any] | None = None,
stack: list[dict[Any, Any]] | None = None,
) -> TSource | TTarget:
"""Merge target_object into source_object with optional customizer."""
if stack is None:
stack = []
if isinstance(source_object, list) and isinstance(target_object, list):
customized_value = (
customizer(source_object, target_object, None, source_object, target_object, stack)
if customizer
else None
)
if customized_value is not None:
return cast(TSource | TTarget, customized_value)
else:
return cast(TSource | TTarget, source_object + target_object)
if isinstance(source_object, dict) and isinstance(target_object, dict):
for key in target_object:
obj_value = source_object.get(key)
src_value = target_object[key]
stack.append({"key": key, "source_object": source_object, "target_object": target_object})
customized_value = (
customizer(obj_value, src_value, key, source_object, target_object, stack)
if customizer
else None
)
if customized_value is not None:
source_object[key] = customized_value
elif isinstance(obj_value, (dict, list)) and isinstance(src_value, (dict, list)):
source_object[key] = merge_with(
copy.deepcopy(cast(Any, obj_value)),
cast(Any, src_value),
customizer,
stack,
)
else:
source_object[key] = src_value
stack.pop()
else:
if not isinstance(target_object, (dict, list)):
return source_object
return source_object
def merge(
source: Any,
target: Any,
key: TKey,
source_object: dict[Any, Any] | list[Any] | Any,
target_object: dict[Any, Any] | list[Any] | Any,
stack: list[Any],
) -> Any:
"""Default merge customizer that replaces lists instead of concatenating."""
if isinstance(source, list):
return target
return None
class AnsicolorMethods:
start_code: str
end_code: str
def __init__(self, start_code: str, end_code: str = "39") -> None:
self.start_code = start_code
self.end_code = end_code
def __call__(self, text: str) -> str:
return f"\033[{self.start_code}m{text}\033[{self.end_code}m"
class Ansicolor:
# Foreground colors
default: AnsicolorMethods = AnsicolorMethods("39")
white: AnsicolorMethods = AnsicolorMethods("97")
black: AnsicolorMethods = AnsicolorMethods("30")
red: AnsicolorMethods = AnsicolorMethods("31")
green: AnsicolorMethods = AnsicolorMethods("32")
yellow: AnsicolorMethods = AnsicolorMethods("33")
blue: AnsicolorMethods = AnsicolorMethods("34")
magenta: AnsicolorMethods = AnsicolorMethods("35")
cyan: AnsicolorMethods = AnsicolorMethods("36")
darkGray: AnsicolorMethods = AnsicolorMethods("90")
lightGray: AnsicolorMethods = AnsicolorMethods("37")
lightRed: AnsicolorMethods = AnsicolorMethods("91")
lightGreen: AnsicolorMethods = AnsicolorMethods("92")
lightYellow: AnsicolorMethods = AnsicolorMethods("93")
lightBlue: AnsicolorMethods = AnsicolorMethods("94")
lightMagenta: AnsicolorMethods = AnsicolorMethods("95")
lightCyan: AnsicolorMethods = AnsicolorMethods("96")
# Text styles
bright: AnsicolorMethods = AnsicolorMethods("1", "22")
dim: AnsicolorMethods = AnsicolorMethods("2", "22")
italic: AnsicolorMethods = AnsicolorMethods("3", "23")
underline: AnsicolorMethods = AnsicolorMethods("4", "24")
inverse: AnsicolorMethods = AnsicolorMethods("7", "27")
# Background colors
bgDefault: AnsicolorMethods = AnsicolorMethods("49", "49")
bgWhite: AnsicolorMethods = AnsicolorMethods("107", "49")
bgBlack: AnsicolorMethods = AnsicolorMethods("40", "49")
bgRed: AnsicolorMethods = AnsicolorMethods("41", "49")
bgGreen: AnsicolorMethods = AnsicolorMethods("42", "49")
bgYellow: AnsicolorMethods = AnsicolorMethods("43", "49")
bgBlue: AnsicolorMethods = AnsicolorMethods("44", "49")
bgMagenta: AnsicolorMethods = AnsicolorMethods("45", "49")
bgCyan: AnsicolorMethods = AnsicolorMethods("46", "49")
bgDarkGray: AnsicolorMethods = AnsicolorMethods("100", "49")
bgLightGray: AnsicolorMethods = AnsicolorMethods("47", "49")
bgLightRed: AnsicolorMethods = AnsicolorMethods("101", "49")
bgLightGreen: AnsicolorMethods = AnsicolorMethods("102", "49")
bgLightYellow: AnsicolorMethods = AnsicolorMethods("103", "49")
bgLightBlue: AnsicolorMethods = AnsicolorMethods("104", "49")
bgLightMagenta: AnsicolorMethods = AnsicolorMethods("105", "49")
bgLightCyan: AnsicolorMethods = AnsicolorMethods("106", "49")
ansicolor: Ansicolor = Ansicolor()
"""Logger service for structured logging."""
from __future__ import annotations
import json
import os
import time
import traceback
from datetime import datetime
from typing import Any, Literal, TypedDict, cast
from .ansicolor import ansicolor
LogLevel = Literal["log", "warn", "error", "debug", "trace", "success", "attention", "raw"]
LogTargetType = Literal["camera", "plugin", "system"]
LogSource = Literal["main", "child"]
class LogEntry(TypedDict):
"""A structured log entry."""
timestamp: int
level: LogLevel
prefix: str
suffix: str | None
message: str
targetId: str | None
targetType: LogTargetType | None
pluginId: str | None
source: LogSource
processId: int
class ChildLogMessage(TypedDict):
"""Message format for child process logs."""
type: Literal["log"]
entry: LogEntry
class LoggerOptions(TypedDict, total=False):
"""Options for configuring the logger."""
prefix: str | None
suffix: str | None
disable_prefix: bool | None
disable_timestamps: bool | None
debug_enabled: bool | None
trace_enabled: bool | None
target_id: str | None
target_type: LogTargetType | None
plugin_id: str | None
class LoggerService:
"""A structured logging service with support for child process mode."""
prefix: str
suffix: str | None
target_id: str | None
target_type: LogTargetType | None
disable_prefix: bool | None
plugin_id: str | None
disable_timestamps: bool
debug_enabled: bool
trace_enabled: bool
# Whether to output JSON (for parent process) or formatted text (for local debugging)
_is_child_process: bool = True
def __init__(self, options: LoggerOptions | None = None) -> None:
self.prefix = cast(str, options.get("prefix", "camera.ui") if options else "camera.ui")
self.suffix = options.get("suffix") if options else None
self.target_id = options.get("target_id") if options else None
self.target_type = options.get("target_type") if options else None
self.disable_prefix = options.get("disable_prefix", False) if options else False
self.plugin_id = options.get("plugin_id") if options else None
self.disable_timestamps = cast(bool, options.get("disable_timestamps", False) if options else False)
self.debug_enabled = cast(bool, options.get("debug_enabled", False) if options else False)
self.trace_enabled = cast(bool, options.get("trace_enabled", False) if options else False)
def set_child_process_mode(self, enabled: bool) -> None:
"""
Set whether this logger runs in child process mode.
In child mode, logs are output as JSON to stdout.
"""
self._is_child_process = enabled
def create_logger(self, options: LoggerOptions | None = None) -> LoggerService:
"""Create a child logger that inherits settings from this logger."""
logger_options: LoggerOptions = {
"prefix": options.get("prefix", self.prefix) if options else self.prefix,
"suffix": options.get("suffix", self.suffix) if options else self.suffix,
"disable_timestamps": options.get("disable_timestamps", self.disable_timestamps)
if options
else self.disable_timestamps,
"debug_enabled": options.get("debug_enabled", self.debug_enabled)
if options
else self.debug_enabled,
"trace_enabled": options.get("trace_enabled", self.trace_enabled)
if options
else self.trace_enabled,
"target_id": options.get("target_id", self.target_id) if options else self.target_id,
"target_type": options.get("target_type", self.target_type) if options else self.target_type,
"disable_prefix": options.get("disable_prefix", False) if options else False,
"plugin_id": options.get("plugin_id", self.plugin_id) if options else self.plugin_id,
}
logger = LoggerService(logger_options)
logger._is_child_process = self._is_child_process
return logger
def log(self, *args: Any) -> None:
self._write_log("log", args)
def error(self, *args: Any) -> None:
self._write_log("error", args)
def warn(self, *args: Any) -> None:
self._write_log("warn", args)
def success(self, *args: Any) -> None:
self._write_log("success", args)
def attention(self, *args: Any) -> None:
self._write_log("attention", args)
def debug(self, *args: Any) -> None:
if not self.debug_enabled:
return
self._write_log("debug", args)
def trace(self, *args: Any) -> None:
if not self.trace_enabled:
return
self._write_log("trace", args)
def raw(self, *args: Any) -> None:
self._write_log("raw", args)
def _write_log(self, level: LogLevel, args: tuple[Any, ...]) -> None:
"""Core logging method that creates entry and routes it."""
entry = self._create_entry(level, args)
if self._is_child_process:
# Child process: write JSON to stdout
self._write_to_stdout(entry)
else:
# Local mode: write formatted to console
self._write_to_console(entry)
def _create_entry(self, level: LogLevel, args: tuple[Any, ...]) -> LogEntry:
"""Create a structured log entry."""
formatted_args = self._format_args(args)
message = " ".join(formatted_args)
return LogEntry(
timestamp=int(time.time() * 1000),
level=level,
prefix=self.prefix,
suffix=self.suffix,
message=message,
targetId=self.target_id,
targetType=self.target_type,
pluginId=self.plugin_id,
source="child",
processId=os.getpid(),
)
def _write_to_stdout(self, entry: LogEntry) -> None:
"""Write log entry to stdout as JSON."""
message: ChildLogMessage = {"type": "log", "entry": entry}
json_line = json.dumps(message, ensure_ascii=False)
print(json_line, flush=True)
def _write_to_console(self, entry: LogEntry) -> None:
"""Write formatted log to console."""
formatted = self._format_for_console(entry)
print(*formatted)
def _format_for_console(self, entry: LogEntry) -> list[str]:
"""Format a log entry for console output with ANSI colors."""
parts: list[str] = []
# Timestamp
if not self.disable_timestamps:
parts.append(f"[{self._format_timestamp(entry['timestamp'])}]")
# Prefix
if not self.disable_prefix and entry.get("prefix"):
parts.append(ansicolor.blue(f"[{entry['prefix']}]"))
# Suffix
if entry.get("suffix"):
parts.append(ansicolor.cyan(f"[{entry['suffix']}]"))
# Level prefix and colored message
level = entry["level"]
level_prefix = self._get_level_prefix(level)
if level_prefix:
parts.append(level_prefix)
# Message with appropriate color
colored_message = self._colorize_message(level, entry["message"])
parts.append(colored_message)
return parts
def _get_level_prefix(self, level: LogLevel) -> str:
"""Get the level prefix badge."""
prefixes = {
"error": ansicolor.bgRed(" ERROR "),
"warn": ansicolor.bgYellow(" WARN "),
"success": ansicolor.bgGreen(" SUCCESS "),
"attention": ansicolor.bgMagenta(" ATTENTION "),
"trace": ansicolor.bgDarkGray(" TRACE "),
"debug": "",
"log": "",
"raw": "",
}
return prefixes.get(level, "")
def _colorize_message(self, level: LogLevel, message: str) -> str:
"""Apply color to message based on log level."""
if level in ("debug", "trace"):
return ansicolor.darkGray(message)
elif level == "warn":
return ansicolor.yellow(message)
elif level == "error":
return ansicolor.red(message)
elif level == "success":
return ansicolor.green(message)
elif level == "attention":
return ansicolor.magenta(message)
return message
def _format_args(self, args: tuple[Any, ...]) -> list[str]:
"""Format arguments for logging."""
formatted_args: list[str] = []
for arg in args:
if isinstance(arg, BaseException):
message = str(arg).split("\n")[0].strip() if str(arg) else "Unknown error"
error_traceback = traceback.format_exc().rstrip()
formatted_args.append(message)
if error_traceback and error_traceback != "NoneType: None":
formatted_args.append(f"\n{error_traceback}")
elif isinstance(arg, (dict, list)):
try:
formatted_args.append(json.dumps(arg, indent=2, ensure_ascii=False))
except (TypeError, ValueError):
formatted_args.append(str(arg)) # pyright: ignore[reportUnknownArgumentType]
else:
formatted_args.append(str(arg))
return formatted_args
def _format_timestamp(self, timestamp: int) -> str:
"""Format timestamp for display."""
try:
dt = datetime.fromtimestamp(timestamp / 1000)
return dt.strftime("%d.%m.%Y, %H:%M:%S")
except Exception:
return "Unknown"
@staticmethod
def format_as_text(entry: LogEntry) -> str:
"""Format a log entry as plain text (for file output)."""
parts: list[str] = []
# Timestamp
try:
dt = datetime.fromtimestamp(entry["timestamp"] / 1000)
parts.append(f"[{dt.strftime('%d.%m.%Y, %H:%M:%S')}]")
except Exception:
parts.append("[Unknown]")
# Prefix
if entry.get("prefix"):
parts.append(f"[{entry['prefix']}]")
# Suffix
if entry.get("suffix"):
parts.append(f"[{entry['suffix']}]")
# Level prefix
level_prefixes = {
"error": " ERROR ",
"warn": " WARN ",
"success": " SUCCESS ",
"attention": " ATTENTION ",
"trace": " TRACE ",
"debug": "",
"log": "",
"raw": "",
}
level_prefix = level_prefixes.get(entry["level"], "")
if level_prefix:
parts.append(level_prefix)
# Message
parts.append(entry["message"])
return " ".join(parts)
@staticmethod
def parse_child_log(line: str) -> LogEntry | None:
"""Parse a JSON log line from a child process."""
try:
parsed = json.loads(line)
if parsed.get("type") == "log" and parsed.get("entry"):
return cast(LogEntry, parsed["entry"])
except (json.JSONDecodeError, KeyError):
pass
return None
+172
-7
Metadata-Version: 2.4
Name: camera-ui-python-common
Version: 2.0.2
Version: 3.0.0
Summary: camera.ui python utilities

@@ -15,10 +15,8 @@ Author-email: seydx <dev@seydx.com>

Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Requires-Python: >=3.10
Requires-Python: >=3.11
Description-Content-Type: text/markdown

@@ -28,6 +26,173 @@ License-File: LICENSE.txt

Requires-Dist: reactivex>=4.0.4
Requires-Dist: camera-ui-python-types>=2.0.3
Requires-Dist: pyeecui>=11.1.2
Requires-Dist: camera-ui-python-types>=3.0.0
Dynamic: license-file
## @camera.ui/python-common
# camera-ui-python-common
Shared utilities for Python-based camera.ui plugins including logging, reactive programming, and common helpers.
## Installation
```bash
pip install camera-ui-python-common
```
## Core Features
- **Logging** - Colored, structured logging
- **Reactive Programming** - Observable properties
- **Signal Handling** - Graceful shutdown
- **Object Utilities** - Deep object manipulation
- **Async Helpers** - Task and thread management
## Logging
```python
from camera_ui_python_common import LoggerService
logger = LoggerService({
"prefix": "MyPlugin",
"debug_enabled": True
})
logger.log("Plugin started")
logger.error("Something went wrong")
logger.success("Operation completed")
logger.debug("Debug information")
```
## Reactive Programming
```python
from camera_ui_python_common import ReactiveProperty
# Create reactive state
state = ReactiveProperty({"count": 0, "active": True})
# Subscribe to changes
state.observable.subscribe(lambda value: print(f"State: {value}"))
# Update state
state.next({"count": 1, "active": True})
```
## Signal Handling
```python
from camera_ui_python_common import SignalHandler
async def cleanup():
print("Cleaning up...")
signal_handler = SignalHandler({
"display_name": "MyPlugin",
"logger": logger,
"close_function": cleanup
})
signal_handler.setup_handlers()
```
## Object Path Operations
```python
from camera_ui_python_common import ObjectPath
data = {"camera": {"settings": {"resolution": "1080p"}}}
# Get nested values
resolution = ObjectPath.get(data, "camera.settings.resolution")
# Set nested values
ObjectPath.set(data, "camera.settings.bitrate", 5000)
# Check if path exists
has_setting = ObjectPath.has(data, "camera.settings.fps")
```
## Async Utilities
### Task Management
```python
from camera_ui_python_common import TaskSet
tasks = TaskSet("MyTasks")
tasks.add(my_async_function())
await tasks
```
### Thread Operations
```python
from camera_ui_python_common import to_thread
# Run blocking operation in thread
result = await to_thread(blocking_function)
```
### Sync/Async Bridge
```python
from camera_ui_python_common import make_sync
# Convert async function to sync
sync_version = make_sync(async_function)
result = sync_version(args)
```
## RTSP Utilities
```python
from camera_ui_python_common import build_target_url
url = build_target_url("rtsp://camera.local/stream", {
"video": True,
"audio": ["pcma", "opus"],
"timeout": 30
})
```
## Complete Example
```python
import asyncio
from camera_ui_python_common import LoggerService, ReactiveProperty, SignalHandler
class MyPlugin:
def __init__(self):
self.logger = LoggerService({"prefix": "MyPlugin"})
self.state = ReactiveProperty({"active": False})
self.signal_handler = SignalHandler({
"display_name": "MyPlugin",
"logger": self.logger,
"close_function": self.cleanup
})
async def start(self):
self.logger.log("Starting plugin...")
self.signal_handler.setup_handlers()
# Update state
self.state.next({"active": True})
# Keep running
await asyncio.Future()
async def cleanup(self):
self.logger.log("Stopping plugin...")
# Usage
plugin = MyPlugin()
asyncio.run(plugin.start())
```
## Contributing
Contributions are welcome! Please read our contributing guidelines and submit pull requests to our repository.
## License
MIT
---
*Part of the camera.ui ecosystem - A comprehensive camera management solution.*
+1
-2
typing_extensions>=4.12.2
reactivex>=4.0.4
camera-ui-python-types>=2.0.3
pyeecui>=11.1.2
camera-ui-python-types>=3.0.0

@@ -5,3 +5,2 @@ LICENSE.txt

camera_ui_python_common/__init__.py
camera_ui_python_common/__init__.pyi
camera_ui_python_common/py.typed

@@ -12,2 +11,16 @@ camera_ui_python_common.egg-info/PKG-INFO

camera_ui_python_common.egg-info/requires.txt
camera_ui_python_common.egg-info/top_level.txt
camera_ui_python_common.egg-info/top_level.txt
camera_ui_python_common/camera_utils/__init__.py
camera_ui_python_common/camera_utils/utils.py
camera_ui_python_common/common_utils/__init__.py
camera_ui_python_common/common_utils/object_path.py
camera_ui_python_common/common_utils/promise.py
camera_ui_python_common/common_utils/reactive.py
camera_ui_python_common/common_utils/signal_handler.py
camera_ui_python_common/common_utils/subscribed.py
camera_ui_python_common/common_utils/task.py
camera_ui_python_common/common_utils/thread.py
camera_ui_python_common/common_utils/utils.py
camera_ui_python_common/logger_service/__init__.py
camera_ui_python_common/logger_service/ansicolor.py
camera_ui_python_common/logger_service/logger.py

@@ -1,3 +0,3 @@

from .camera_utils.utils import build_target_url
from .common_utils.object_path import ObjectPath, Path
from .common_utils.promise import Deferred
from .common_utils.reactive import ReactiveProperty

@@ -13,3 +13,3 @@ from .common_utils.signal_handler import SignalHandler, SignalHandlerOptions

__all__ = [
"build_target_url",
"Deferred",
"ObjectPath",

@@ -16,0 +16,0 @@ "Path",

+172
-7
Metadata-Version: 2.4
Name: camera-ui-python-common
Version: 2.0.2
Version: 3.0.0
Summary: camera.ui python utilities

@@ -15,10 +15,8 @@ Author-email: seydx <dev@seydx.com>

Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Requires-Python: >=3.10
Requires-Python: >=3.11
Description-Content-Type: text/markdown

@@ -28,6 +26,173 @@ License-File: LICENSE.txt

Requires-Dist: reactivex>=4.0.4
Requires-Dist: camera-ui-python-types>=2.0.3
Requires-Dist: pyeecui>=11.1.2
Requires-Dist: camera-ui-python-types>=3.0.0
Dynamic: license-file
## @camera.ui/python-common
# camera-ui-python-common
Shared utilities for Python-based camera.ui plugins including logging, reactive programming, and common helpers.
## Installation
```bash
pip install camera-ui-python-common
```
## Core Features
- **Logging** - Colored, structured logging
- **Reactive Programming** - Observable properties
- **Signal Handling** - Graceful shutdown
- **Object Utilities** - Deep object manipulation
- **Async Helpers** - Task and thread management
## Logging
```python
from camera_ui_python_common import LoggerService
logger = LoggerService({
"prefix": "MyPlugin",
"debug_enabled": True
})
logger.log("Plugin started")
logger.error("Something went wrong")
logger.success("Operation completed")
logger.debug("Debug information")
```
## Reactive Programming
```python
from camera_ui_python_common import ReactiveProperty
# Create reactive state
state = ReactiveProperty({"count": 0, "active": True})
# Subscribe to changes
state.observable.subscribe(lambda value: print(f"State: {value}"))
# Update state
state.next({"count": 1, "active": True})
```
## Signal Handling
```python
from camera_ui_python_common import SignalHandler
async def cleanup():
print("Cleaning up...")
signal_handler = SignalHandler({
"display_name": "MyPlugin",
"logger": logger,
"close_function": cleanup
})
signal_handler.setup_handlers()
```
## Object Path Operations
```python
from camera_ui_python_common import ObjectPath
data = {"camera": {"settings": {"resolution": "1080p"}}}
# Get nested values
resolution = ObjectPath.get(data, "camera.settings.resolution")
# Set nested values
ObjectPath.set(data, "camera.settings.bitrate", 5000)
# Check if path exists
has_setting = ObjectPath.has(data, "camera.settings.fps")
```
## Async Utilities
### Task Management
```python
from camera_ui_python_common import TaskSet
tasks = TaskSet("MyTasks")
tasks.add(my_async_function())
await tasks
```
### Thread Operations
```python
from camera_ui_python_common import to_thread
# Run blocking operation in thread
result = await to_thread(blocking_function)
```
### Sync/Async Bridge
```python
from camera_ui_python_common import make_sync
# Convert async function to sync
sync_version = make_sync(async_function)
result = sync_version(args)
```
## RTSP Utilities
```python
from camera_ui_python_common import build_target_url
url = build_target_url("rtsp://camera.local/stream", {
"video": True,
"audio": ["pcma", "opus"],
"timeout": 30
})
```
## Complete Example
```python
import asyncio
from camera_ui_python_common import LoggerService, ReactiveProperty, SignalHandler
class MyPlugin:
def __init__(self):
self.logger = LoggerService({"prefix": "MyPlugin"})
self.state = ReactiveProperty({"active": False})
self.signal_handler = SignalHandler({
"display_name": "MyPlugin",
"logger": self.logger,
"close_function": self.cleanup
})
async def start(self):
self.logger.log("Starting plugin...")
self.signal_handler.setup_handlers()
# Update state
self.state.next({"active": True})
# Keep running
await asyncio.Future()
async def cleanup(self):
self.logger.log("Stopping plugin...")
# Usage
plugin = MyPlugin()
asyncio.run(plugin.start())
```
## Contributing
Contributions are welcome! Please read our contributing guidelines and submit pull requests to our repository.
## License
MIT
---
*Part of the camera.ui ecosystem - A comprehensive camera management solution.*

@@ -7,3 +7,3 @@ [build-system]

name = "camera-ui-python-common"
version = "2.0.2"
version = "3.0.0"
description = "camera.ui python utilities"

@@ -15,10 +15,8 @@ keywords = ["camera.ui", "python", "utilities"]

maintainers = [{ name = "seydx", email = "dev@seydx.com" }]
requires-python = ">=3.10"
requires-python = ">=3.11"
classifiers = [
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",

@@ -28,3 +26,3 @@ "Programming Language :: Python :: 3.12",

]
dependencies = ["typing_extensions>=4.12.2", "reactivex>=4.0.4", "camera-ui-python-types>=2.0.3", "pyeecui>=11.1.2"]
dependencies = ["typing_extensions>=4.12.2", "reactivex>=4.0.4", "camera-ui-python-types>=3.0.0"]

@@ -38,4 +36,7 @@ [project.urls]

[tool.setuptools]
packages = ["camera_ui_python_common"]
include-package-data = true
[tool.setuptools.packages.find]
where = ["."]
include = ["camera_ui_python_common*"]
[tool.setuptools.package-data]
camera_ui_python_common = ["py.typed"]
+169
-1

@@ -1,1 +0,169 @@

## @camera.ui/python-common
# camera-ui-python-common
Shared utilities for Python-based camera.ui plugins including logging, reactive programming, and common helpers.
## Installation
```bash
pip install camera-ui-python-common
```
## Core Features
- **Logging** - Colored, structured logging
- **Reactive Programming** - Observable properties
- **Signal Handling** - Graceful shutdown
- **Object Utilities** - Deep object manipulation
- **Async Helpers** - Task and thread management
## Logging
```python
from camera_ui_python_common import LoggerService
logger = LoggerService({
"prefix": "MyPlugin",
"debug_enabled": True
})
logger.log("Plugin started")
logger.error("Something went wrong")
logger.success("Operation completed")
logger.debug("Debug information")
```
## Reactive Programming
```python
from camera_ui_python_common import ReactiveProperty
# Create reactive state
state = ReactiveProperty({"count": 0, "active": True})
# Subscribe to changes
state.observable.subscribe(lambda value: print(f"State: {value}"))
# Update state
state.next({"count": 1, "active": True})
```
## Signal Handling
```python
from camera_ui_python_common import SignalHandler
async def cleanup():
print("Cleaning up...")
signal_handler = SignalHandler({
"display_name": "MyPlugin",
"logger": logger,
"close_function": cleanup
})
signal_handler.setup_handlers()
```
## Object Path Operations
```python
from camera_ui_python_common import ObjectPath
data = {"camera": {"settings": {"resolution": "1080p"}}}
# Get nested values
resolution = ObjectPath.get(data, "camera.settings.resolution")
# Set nested values
ObjectPath.set(data, "camera.settings.bitrate", 5000)
# Check if path exists
has_setting = ObjectPath.has(data, "camera.settings.fps")
```
## Async Utilities
### Task Management
```python
from camera_ui_python_common import TaskSet
tasks = TaskSet("MyTasks")
tasks.add(my_async_function())
await tasks
```
### Thread Operations
```python
from camera_ui_python_common import to_thread
# Run blocking operation in thread
result = await to_thread(blocking_function)
```
### Sync/Async Bridge
```python
from camera_ui_python_common import make_sync
# Convert async function to sync
sync_version = make_sync(async_function)
result = sync_version(args)
```
## RTSP Utilities
```python
from camera_ui_python_common import build_target_url
url = build_target_url("rtsp://camera.local/stream", {
"video": True,
"audio": ["pcma", "opus"],
"timeout": 30
})
```
## Complete Example
```python
import asyncio
from camera_ui_python_common import LoggerService, ReactiveProperty, SignalHandler
class MyPlugin:
def __init__(self):
self.logger = LoggerService({"prefix": "MyPlugin"})
self.state = ReactiveProperty({"active": False})
self.signal_handler = SignalHandler({
"display_name": "MyPlugin",
"logger": self.logger,
"close_function": self.cleanup
})
async def start(self):
self.logger.log("Starting plugin...")
self.signal_handler.setup_handlers()
# Update state
self.state.next({"active": True})
# Keep running
await asyncio.Future()
async def cleanup(self):
self.logger.log("Stopping plugin...")
# Usage
plugin = MyPlugin()
asyncio.run(plugin.start())
```
## Contributing
Contributions are welcome! Please read our contributing guidelines and submit pull requests to our repository.
## License
MIT
---
*Part of the camera.ui ecosystem - A comprehensive camera management solution.*
from .camera_utils.utils import build_target_url as build_target_url
from .common_utils.object_path import ObjectPath as ObjectPath, Path as Path
from .common_utils.reactive import ReactiveProperty as ReactiveProperty
from .common_utils.signal_handler import SignalHandler as SignalHandler, SignalHandlerOptions as SignalHandlerOptions
from .common_utils.subscribed import Subscribed as Subscribed
from .common_utils.task import TaskSet as TaskSet
from .common_utils.thread import to_thread as to_thread
from .common_utils.utils import make_sync as make_sync, merge as merge, merge_with as merge_with
from .logger_service.ansicolor import Ansicolor as Ansicolor
from .logger_service.logger import LoggerOptions as LoggerOptions, LoggerService as LoggerService
__all__ = ['build_target_url', 'ObjectPath', 'Path', 'SignalHandler', 'SignalHandlerOptions', 'Subscribed', 'TaskSet', 'to_thread', 'make_sync', 'merge', 'merge_with', 'ReactiveProperty', 'Ansicolor', 'LoggerOptions', 'LoggerService']