onvif-python
Advanced tools
| # onvif/utils/parser.py | ||
| import logging | ||
| from typing import List, Dict, Optional | ||
| from zeep import Plugin | ||
| logger = logging.getLogger(__name__) | ||
| logger.addHandler(logging.NullHandler()) | ||
| class ONVIFParser(Plugin): | ||
| """ | ||
| Lightweight Zeep plugin to extract XML elements from SOAP responses using XPath. | ||
| This plugin extracts specific XML elements from raw SOAP responses before zeep | ||
| parses them into Python objects. This is useful for extracting data that zeep | ||
| doesn't parse correctly, such as simpleContent elements with attributes (e.g., Topic). | ||
| Unlike XMLCapturePlugin which stores full XML history in memory, ONVIFParser only | ||
| extracts and caches specified element texts from the last response. | ||
| Usage Example 1 - Extract Topic from event notifications: | ||
| >>> from onvif import ONVIFClient | ||
| >>> from onvif.utils import ONVIFParser | ||
| >>> | ||
| >>> # Create parser to extract Topic elements | ||
| >>> parser = ONVIFParser(extract_xpaths={ | ||
| ... 'topic': './/{http://docs.oasis-open.org/wsn/b-2}Topic' | ||
| ... }) | ||
| >>> | ||
| >>> # Pass parser to client as plugin | ||
| >>> client = ONVIFClient(host, port, user, pass, plugins=[parser]) | ||
| >>> | ||
| >>> # Make SOAP call | ||
| >>> pullpoint = client.pullpoint(subscription) | ||
| >>> msgs = pullpoint.PullMessages(Timeout="PT5S", MessageLimit=10) | ||
| >>> | ||
| >>> # Extract topic texts (cache auto-cleared on next SOAP call) | ||
| >>> topics = parser.get_extracted_texts('topic', count=10) | ||
| >>> for topic in topics: | ||
| ... print(f"Topic: {topic}") | ||
| Usage Example 2 - Extract multiple elements: | ||
| >>> parser = ONVIFParser(extract_xpaths={ | ||
| ... 'topic': './/{http://docs.oasis-open.org/wsn/b-2}Topic', | ||
| ... 'custom': './/ns:CustomElement' | ||
| ... }) | ||
| >>> client = ONVIFClient(host, port, user, pass, plugins=[parser]) | ||
| >>> | ||
| >>> # After SOAP call | ||
| >>> topics = parser.get_extracted_texts('topic', count=5) | ||
| >>> customs = parser.get_extracted_texts('custom', count=5) | ||
| Notes: | ||
| - Uses ingress() hook to access raw XML before zeep parsing | ||
| - Cache automatically cleared on each SOAP response | ||
| - Thread-safe for single client usage | ||
| - Works with any XPath expression | ||
| """ | ||
| def __init__(self, extract_xpaths: Dict[str, str]): | ||
| """ | ||
| Initialize XML element parser. | ||
| Args: | ||
| extract_xpaths: Dictionary mapping names to XPath expressions. XPath expressions will be used to find elements in SOAP response. | ||
| Example: { | ||
| 'topic': './/{http://docs.oasis-open.org/wsn/b-2}Topic', | ||
| 'custom': './/ns:CustomElement' | ||
| } | ||
| """ | ||
| self.extract_xpaths = extract_xpaths | ||
| self._extracted_elements = {} | ||
| logger.debug( | ||
| f"ONVIFParser initialized with XPaths: {list(extract_xpaths.keys())}" | ||
| ) | ||
| def ingress(self, envelope, http_headers, operation): | ||
| """ | ||
| Zeep plugin hook - called when SOAP response is received. | ||
| Extracts element texts from raw XML envelope using configured XPath expressions. | ||
| The envelope at this stage is an lxml Element tree, allowing XPath queries. | ||
| Cache is automatically cleared before extracting new elements to prevent memory accumulation. | ||
| Args: | ||
| envelope: lxml Element representing SOAP envelope | ||
| http_headers: HTTP response headers | ||
| operation: Zeep operation being executed | ||
| Returns: | ||
| Tuple of (envelope, http_headers) to pass to next plugin | ||
| """ | ||
| # Auto-clear cache from previous response | ||
| self._extracted_elements = {} | ||
| try: | ||
| # Extract elements using XPath from raw XML envelope | ||
| for name, xpath in self.extract_xpaths.items(): | ||
| elements = envelope.findall(xpath) | ||
| # Extract text content from found elements | ||
| texts = [elem.text for elem in elements if elem.text] | ||
| if texts: | ||
| self._extracted_elements[name] = texts | ||
| logger.debug( | ||
| f"ONVIFParser: Extracted {len(texts)} '{name}' elements" | ||
| ) | ||
| except Exception as e: | ||
| logger.warning(f"ONVIFParser: Failed to extract elements: {e}") | ||
| return envelope, http_headers | ||
| def get_extracted_texts(self, name: str, count: int) -> List[Optional[str]]: | ||
| """ | ||
| Get extracted element texts by name. | ||
| Args: | ||
| name: Name of the extracted elements (key from extract_xpaths dict) | ||
| count: Number of elements to return | ||
| Returns: | ||
| List of element text values, padded with None if fewer elements were found. | ||
| Example: If 3 elements found but count=5, returns [text1, text2, text3, None, None] | ||
| """ | ||
| texts = self._extracted_elements.get(name, [])[:count] | ||
| # Pad with None if not enough elements | ||
| while len(texts) < count: | ||
| texts.append(None) | ||
| return texts |
| Metadata-Version: 2.4 | ||
| Name: onvif-python | ||
| Version: 0.2.1 | ||
| Version: 0.2.2 | ||
| Summary: A modern Python library for ONVIF-compliant devices | ||
@@ -50,3 +50,3 @@ Author-email: Nirsimetri Technologies® <open@nirsimetri.com> | ||
| [](https://deepwiki.com/nirsimetri/onvif-python) | ||
| [](https://pypi.org/project/onvif-python/) | ||
| [](https://pypi.org/project/onvif-python/) | ||
| [](https://clickpy.clickhouse.com/dashboard/onvif-python) | ||
@@ -360,3 +360,3 @@ <br> | ||
| ONVIF Terminal Client — v0.2.1 | ||
| ONVIF Terminal Client — v0.2.2 | ||
| https://github.com/nirsimetri/onvif-python | ||
@@ -434,3 +434,3 @@ | ||
| ```bash | ||
| ONVIF Interactive Shell — v0.2.1 | ||
| ONVIF Interactive Shell — v0.2.2 | ||
| https://github.com/nirsimetri/onvif-python | ||
@@ -1114,5 +1114,5 @@ | ||
| - **Sub-bindings:** | ||
| - `JWTBinding` | ||
| - `AuthorizationServerBinding` | ||
| - `KeystoreBinding` | ||
| - `JWTBinding` | ||
| - `Dot1XBinding` | ||
@@ -1125,5 +1125,5 @@ - `TLSServerBinding` | ||
| client.security() # root binding | ||
| client.jwt() # sub-binding accessor | ||
| client.authorizationserver(xaddr) # sub-binding accessor (requires xAddr) | ||
| client.keystore(xaddr) # .. | ||
| client.jwt(xaddr) | ||
| client.dot1x(xaddr) | ||
@@ -1130,0 +1130,0 @@ client.tlsserver(xaddr) |
@@ -57,2 +57,3 @@ LICENSE.md | ||
| onvif/utils/exceptions.py | ||
| onvif/utils/parser.py | ||
| onvif/utils/service.py | ||
@@ -59,0 +60,0 @@ onvif/utils/wsdl.py |
@@ -11,2 +11,3 @@ # onvif/__init__.py | ||
| ONVIFDiscovery, | ||
| ONVIFParser, | ||
| ) | ||
@@ -25,2 +26,3 @@ from .cli import main as ONVIFCLI | ||
| "ONVIFDiscovery", | ||
| "ONVIFParser", | ||
| ] |
@@ -197,3 +197,3 @@ # onvif/cli/interactive.py | ||
| "/ /_/ / /| / | |/ // // __/ ", | ||
| "\\____/_/ |_/ |___/___/_/ v0.2.1", | ||
| "\\____/_/ |_/ |___/___/_/ v0.2.2", | ||
| " ", | ||
@@ -1404,3 +1404,3 @@ ] | ||
| help_text = f""" | ||
| {colorize('ONVIF Interactive Shell — v0.2.1', 'cyan')}\n{colorize('https://github.com/nirsimetri/onvif-python', 'white')} | ||
| {colorize('ONVIF Interactive Shell — v0.2.2', 'cyan')}\n{colorize('https://github.com/nirsimetri/onvif-python', 'white')} | ||
@@ -1407,0 +1407,0 @@ {colorize('Basic Commands:', 'yellow')} |
@@ -24,3 +24,3 @@ # onvif/cli/main.py | ||
| prog="onvif", | ||
| description=f"{colorize('ONVIF Terminal Client', 'yellow')} — v0.2.1\nhttps://github.com/nirsimetri/onvif-python", | ||
| description=f"{colorize('ONVIF Terminal Client', 'yellow')} — v0.2.2\nhttps://github.com/nirsimetri/onvif-python", | ||
| formatter_class=argparse.RawDescriptionHelpFormatter, | ||
@@ -465,3 +465,3 @@ epilog=f""" | ||
| if args.version: | ||
| print(colorize("0.2.1", "yellow")) | ||
| print(colorize("0.2.2", "yellow")) | ||
| sys.exit(0) | ||
@@ -468,0 +468,0 @@ |
+95
-1
@@ -621,3 +621,3 @@ # onvif/cli/utils.py | ||
| [ | ||
| "advancedsecurity", | ||
| "security", | ||
| "jwt", | ||
@@ -632,2 +632,96 @@ "keystore", | ||
| # Additional check: Try to call security.GetServiceCapabilities() to verify availability | ||
| # This is necessary because Security service might not be reported in GetServices/GetCapabilities | ||
| # Use caching to avoid calling GetServiceCapabilities repeatedly | ||
| if "security" not in available_services: | ||
| # Check if we've already tried to get security capabilities | ||
| if not hasattr(client, "_security_capabilities_checked"): | ||
| client._security_capabilities_checked = False | ||
| client._security_capabilities = None | ||
| if not client._security_capabilities_checked: | ||
| # First time check - call GetServiceCapabilities and cache result | ||
| try: | ||
| security_service = client.security() | ||
| # Try to call GetServiceCapabilities to verify the service is actually available | ||
| caps = security_service.GetServiceCapabilities() | ||
| # Cache the capabilities for future use | ||
| client._security_capabilities = caps | ||
| client._security_capabilities_checked = True | ||
| except Exception: | ||
| # Security service not available, mark as checked | ||
| client._security_capabilities_checked = True | ||
| client._security_capabilities = None | ||
| # Use cached capabilities | ||
| if client._security_capabilities is not None: | ||
| caps = client._security_capabilities | ||
| # If successful, add main security service | ||
| available_services.append("security") | ||
| # Check each sub-service capability to determine availability | ||
| # Only add sub-services if their corresponding capability is not None | ||
| if ( | ||
| hasattr(caps, "KeystoreCapabilities") | ||
| and caps.KeystoreCapabilities is not None | ||
| ): | ||
| available_services.append("keystore") | ||
| if ( | ||
| hasattr(caps, "TLSServerCapabilities") | ||
| and caps.TLSServerCapabilities is not None | ||
| ): | ||
| available_services.append("tlsserver") | ||
| if ( | ||
| hasattr(caps, "Dot1XCapabilities") | ||
| and caps.Dot1XCapabilities is not None | ||
| ): | ||
| available_services.append("dot1x") | ||
| if ( | ||
| hasattr(caps, "AuthorizationServer") | ||
| and caps.AuthorizationServer is not None | ||
| ): | ||
| available_services.append("authorizationserver") | ||
| if hasattr(caps, "MediaSigning") and caps.MediaSigning is not None: | ||
| available_services.append("mediasigning") | ||
| # Additional check for JWT service: Try to call jwt.GetJWTConfiguration() | ||
| # JWT doesn't have a capability in GetServiceCapabilities, so we need to test it directly | ||
| if "jwt" not in available_services: | ||
| # Check if we've already tried to get JWT availability | ||
| if not hasattr(client, "_jwt_checked"): | ||
| client._jwt_checked = False | ||
| client._jwt_available = None | ||
| if not client._jwt_checked: | ||
| # First time check - try to call GetJWTConfiguration and cache result | ||
| try: | ||
| # JWT service requires xaddr from security service | ||
| # Try to construct xaddr from security service endpoint | ||
| protocol = "https" if client.common_args["use_https"] else "http" | ||
| default_xaddr = f"{protocol}://{client.common_args['host']}:{client.common_args['port']}/onvif/AdvancedSecurity" | ||
| # Try to call GetJWTConfiguration to verify JWT is available | ||
| jwt_service = client.jwt(xaddr=default_xaddr) | ||
| jwt_service.GetJWTConfiguration() | ||
| # If successful, JWT is available | ||
| client._jwt_available = True | ||
| client._jwt_checked = True | ||
| except Exception: | ||
| # JWT service not available, mark as checked | ||
| client._jwt_checked = True | ||
| client._jwt_available = False | ||
| # Use cached JWT availability | ||
| if client._jwt_available: | ||
| available_services.append("jwt") | ||
| return sorted(list(set(available_services))) # Remove duplicates and sort | ||
@@ -634,0 +728,0 @@ |
+21
-5
@@ -114,2 +114,3 @@ # onvif/client.py | ||
| wsdl_dir: str = None, | ||
| plugins: list = None, | ||
| ): | ||
@@ -135,2 +136,11 @@ logger.info(f"Initializing ONVIF client for {host}:{port}") | ||
| # Merge user plugins with xml_plugin | ||
| all_plugins = [] | ||
| if plugins: | ||
| logger.debug(f"Adding {len(plugins)} user-provided plugins") | ||
| all_plugins.extend(plugins) | ||
| if self.xml_plugin: | ||
| logger.debug("Adding XML capture plugin") | ||
| all_plugins.append(self.xml_plugin) | ||
| # Store custom WSDL directory if provided | ||
@@ -153,3 +163,3 @@ self.wsdl_dir = wsdl_dir | ||
| "apply_patch": apply_patch, | ||
| "plugins": [self.xml_plugin] if self.xml_plugin else None, | ||
| "plugins": all_plugins if all_plugins else None, | ||
| } | ||
@@ -168,2 +178,10 @@ | ||
| # Cache for security service capabilities (lazy loaded) | ||
| self._security_capabilities = None | ||
| self._security_capabilities_checked = False | ||
| # Cache for JWT service availability (lazy loaded) | ||
| self._jwt_available = None | ||
| self._jwt_checked = False | ||
| try: | ||
@@ -729,3 +747,2 @@ # Try GetServices first (preferred method) | ||
| self._security = AdvancedSecurity( | ||
| xaddr=self._get_xaddr("advancedsecurity", "Security"), | ||
| **self.common_args, | ||
@@ -736,7 +753,6 @@ ) | ||
| @service | ||
| def jwt(self, xaddr): | ||
| def jwt(self): | ||
| if self._jwt is None: | ||
| logger.debug("Initializing JWT service") | ||
| xaddr = self._rewrite_xaddr_if_needed(xaddr) | ||
| self._jwt = JWT(xaddr=xaddr, **self.common_args) | ||
| self._jwt = JWT(**self.common_args) | ||
| return self._jwt | ||
@@ -743,0 +759,0 @@ |
@@ -18,3 +18,3 @@ # onvif/services/security/advancedsecurity.py | ||
| binding=f"{{{definition['namespace']}}}{definition['binding']}", | ||
| service_path="Security", # fallback | ||
| service_path="device_service", # fallback | ||
| xaddr=xaddr, | ||
@@ -21,0 +21,0 @@ **kwargs, |
@@ -10,2 +10,3 @@ # onvif/utils/__init__.py | ||
| from .service import ONVIFService | ||
| from .parser import ONVIFParser | ||
@@ -21,2 +22,3 @@ | ||
| "ONVIFService", | ||
| "ONVIFParser", | ||
| ] |
@@ -25,11 +25,2 @@ # onvif/utils/xml_capture.py | ||
| Key Features: | ||
| - Pretty-printed XML with proper indentation | ||
| - Last request/response quick access | ||
| - Full transaction history storage | ||
| - HTTP header capture | ||
| - Operation name tracking | ||
| - File export capability | ||
| - Secure XML parsing (XXE protection) | ||
| Use Cases: | ||
@@ -36,0 +27,0 @@ 1. **Debugging**: See exact SOAP messages being sent/received |
+38
-3
@@ -363,4 +363,3 @@ # onvif/utils/zeep.py | ||
| if should_flatten: | ||
| # This is truly a wrapper - flatten by copying fields to parent | ||
| # But still set the tag_name field to the inner_content | ||
| # This is truly a wrapper - flatten by extracting children to parent | ||
| # Convert zeep object to dict to preserve manually added attributes | ||
@@ -373,4 +372,19 @@ if hasattr(inner_content, "__values__") or hasattr( | ||
| ) | ||
| # Set the wrapper field itself | ||
| values[tag_name] = inner_content | ||
| # Don't copy fields up to parent - keep them in the structured object | ||
| # If inner_content is a dict with children, copy them to parent too | ||
| if isinstance(inner_content, dict): | ||
| for child_key, child_val in inner_content.items(): | ||
| if child_key.startswith("_"): | ||
| continue | ||
| # Copy children to parent level if field exists and is None | ||
| if ( | ||
| child_key in values | ||
| and values[child_key] is None | ||
| ): | ||
| values[child_key] = child_val | ||
| elif child_key not in values: | ||
| values[child_key] = child_val | ||
| else: | ||
@@ -453,2 +467,16 @@ # Not a wrapper - just set the field directly | ||
| setattr(obj, tag_name, inner_content) | ||
| # If inner_content is a dict with children, copy them to parent too | ||
| if isinstance(inner_content, dict): | ||
| for child_key, child_val in inner_content.items(): | ||
| if child_key.startswith("_"): | ||
| continue | ||
| # Copy children to parent level if field exists and is None | ||
| if ( | ||
| hasattr(obj, child_key) | ||
| and getattr(obj, child_key) is None | ||
| ): | ||
| setattr(obj, child_key, child_val) | ||
| elif not hasattr(obj, child_key): | ||
| setattr(obj, child_key, child_val) | ||
| else: | ||
@@ -617,3 +645,10 @@ # Not a wrapper - just set the field directly | ||
| parsed_result[tag_name] = child_result | ||
| elif xmlelement.attrib: | ||
| # Element has attributes but no children - parse attributes | ||
| parsed_result[tag_name] = { | ||
| k: ZeepPatcher.parse_text_value(v) | ||
| for k, v in xmlelement.attrib.items() | ||
| } | ||
| else: | ||
| # Element has no attributes and no children - just text content | ||
| parsed_result[tag_name] = ZeepPatcher.parse_text_value(xmlelement.text) | ||
@@ -620,0 +655,0 @@ |
+6
-6
| Metadata-Version: 2.4 | ||
| Name: onvif-python | ||
| Version: 0.2.1 | ||
| Version: 0.2.2 | ||
| Summary: A modern Python library for ONVIF-compliant devices | ||
@@ -50,3 +50,3 @@ Author-email: Nirsimetri Technologies® <open@nirsimetri.com> | ||
| [](https://deepwiki.com/nirsimetri/onvif-python) | ||
| [](https://pypi.org/project/onvif-python/) | ||
| [](https://pypi.org/project/onvif-python/) | ||
| [](https://clickpy.clickhouse.com/dashboard/onvif-python) | ||
@@ -360,3 +360,3 @@ <br> | ||
| ONVIF Terminal Client — v0.2.1 | ||
| ONVIF Terminal Client — v0.2.2 | ||
| https://github.com/nirsimetri/onvif-python | ||
@@ -434,3 +434,3 @@ | ||
| ```bash | ||
| ONVIF Interactive Shell — v0.2.1 | ||
| ONVIF Interactive Shell — v0.2.2 | ||
| https://github.com/nirsimetri/onvif-python | ||
@@ -1114,5 +1114,5 @@ | ||
| - **Sub-bindings:** | ||
| - `JWTBinding` | ||
| - `AuthorizationServerBinding` | ||
| - `KeystoreBinding` | ||
| - `JWTBinding` | ||
| - `Dot1XBinding` | ||
@@ -1125,5 +1125,5 @@ - `TLSServerBinding` | ||
| client.security() # root binding | ||
| client.jwt() # sub-binding accessor | ||
| client.authorizationserver(xaddr) # sub-binding accessor (requires xAddr) | ||
| client.keystore(xaddr) # .. | ||
| client.jwt(xaddr) | ||
| client.dot1x(xaddr) | ||
@@ -1130,0 +1130,0 @@ client.tlsserver(xaddr) |
+1
-1
@@ -7,3 +7,3 @@ [build-system] | ||
| name = "onvif-python" | ||
| version = "0.2.1" | ||
| version = "0.2.2" | ||
| description = "A modern Python library for ONVIF-compliant devices" | ||
@@ -10,0 +10,0 @@ readme = "README.md" |
+5
-5
@@ -7,3 +7,3 @@ <h1 align="center">ONVIF Python</h1> | ||
| [](https://deepwiki.com/nirsimetri/onvif-python) | ||
| [](https://pypi.org/project/onvif-python/) | ||
| [](https://pypi.org/project/onvif-python/) | ||
| [](https://clickpy.clickhouse.com/dashboard/onvif-python) | ||
@@ -317,3 +317,3 @@ <br> | ||
| ONVIF Terminal Client — v0.2.1 | ||
| ONVIF Terminal Client — v0.2.2 | ||
| https://github.com/nirsimetri/onvif-python | ||
@@ -391,3 +391,3 @@ | ||
| ```bash | ||
| ONVIF Interactive Shell — v0.2.1 | ||
| ONVIF Interactive Shell — v0.2.2 | ||
| https://github.com/nirsimetri/onvif-python | ||
@@ -1071,5 +1071,5 @@ | ||
| - **Sub-bindings:** | ||
| - `JWTBinding` | ||
| - `AuthorizationServerBinding` | ||
| - `KeystoreBinding` | ||
| - `JWTBinding` | ||
| - `Dot1XBinding` | ||
@@ -1082,5 +1082,5 @@ - `TLSServerBinding` | ||
| client.security() # root binding | ||
| client.jwt() # sub-binding accessor | ||
| client.authorizationserver(xaddr) # sub-binding accessor (requires xAddr) | ||
| client.keystore(xaddr) # .. | ||
| client.jwt(xaddr) | ||
| client.dot1x(xaddr) | ||
@@ -1087,0 +1087,0 @@ client.tlsserver(xaddr) |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
85014781
0.01%133
0.76%10714
2.16%