bigraph-viz
Advanced tools
| from plum import dispatch | ||
| from bigraph_schema import is_schema_key, hierarchy_depth | ||
| from bigraph_schema.schema import ( | ||
| Node, | ||
| Atom, | ||
| Empty, | ||
| Union, | ||
| Tuple, | ||
| Boolean, | ||
| Or, | ||
| And, | ||
| Xor, | ||
| Number, | ||
| Integer, | ||
| Float, | ||
| Delta, | ||
| Nonnegative, | ||
| String, | ||
| Enum, | ||
| Wrap, | ||
| Maybe, | ||
| Overwrite, | ||
| List, | ||
| Map, | ||
| Tree, | ||
| Array, | ||
| Key, | ||
| Path, | ||
| Wires, | ||
| Schema, | ||
| Link, | ||
| ) | ||
| from process_bigraph import CompositeLink | ||
| from bigraph_viz.dict_utils import absolute_path | ||
| PROCESS_SCHEMA_KEYS = [ | ||
| 'config', 'address', 'interval', 'inputs', 'outputs', 'instance', 'bridge'] | ||
| def add_place_edge(graph, parent, child): | ||
| place_edges = graph.setdefault('place_edges', []) | ||
| if not any(e['parent'] == parent and e['child'] == child for e in place_edges): | ||
| place_edges.append({'parent': parent, 'child': child}) | ||
| # Don't create a state_node for something that is already a process_node | ||
| child_path = tuple(child) | ||
| process_paths = {tuple(n['path']) for n in graph.get('process_nodes', [])} | ||
| if child_path in process_paths: | ||
| return | ||
| state_nodes = graph.setdefault('state_nodes', []) | ||
| if not any(tuple(n['path']) == child_path for n in state_nodes): | ||
| state_nodes.append({ | ||
| 'name': child_path[-1], | ||
| 'path': child_path, | ||
| 'value': None, | ||
| 'type': None, | ||
| }) | ||
| def get_single_wire(edge_path, graph_dict, port, schema_key, wire): | ||
| """ | ||
| Add a connection from a port to its wire target. | ||
| Parameters: | ||
| edge_path (tuple): Path to the process | ||
| graph_dict (dict): Current graph dict | ||
| port (str): Name of the port | ||
| schema_key (str): Either 'inputs' or 'outputs' | ||
| wire (str|list): Wire connection(s) | ||
| Returns: | ||
| Updated graph_dict | ||
| """ | ||
| if isinstance(wire, str): | ||
| wire = [wire] | ||
| else: | ||
| wire = [item for item in wire if isinstance(item, str)] | ||
| target_path = absolute_path(edge_path[:-1], tuple(wire)) | ||
| # if wire points to an internal attribute, add a place edge | ||
| if len(target_path) > 1: | ||
| add_place_edge(graph_dict, target_path[:-1], target_path) | ||
| # add the edge | ||
| edge_key = 'input_edges' if schema_key == 'inputs' else 'output_edges' | ||
| graph_dict[edge_key].append({ | ||
| 'edge_path': edge_path, | ||
| 'target_path': target_path, | ||
| 'port': port, | ||
| 'type': schema_key | ||
| }) | ||
| return graph_dict | ||
| def get_graph_wires(ports_schema, wires, graph_dict, schema_key, edge_path, bridge_wires=None): | ||
| """ | ||
| Traverse the port wiring and append wire edges or disconnected ports to graph_dict. | ||
| Parameters: | ||
| ports_schema (dict): Schema for ports (inputs or outputs) | ||
| wires (dict): Wiring structure from the process | ||
| graph_dict (dict): Accumulated graph | ||
| schema_key (str): Either 'inputs' or 'outputs' | ||
| edge_path (tuple): Path of the process node | ||
| bridge_wires (dict, optional): Optional rewiring via 'bridge' dict | ||
| Returns: | ||
| graph_dict (dict): Updated graph dict | ||
| """ | ||
| wires = wires or {} | ||
| ports_schema = ports_schema or {} | ||
| inferred_ports = set(ports_schema.keys()) | set(wires.keys()) | ||
| for port in inferred_ports: | ||
| wire = wires.get(port) | ||
| bridge = bridge_wires.get(port) if bridge_wires else None | ||
| if not wire: | ||
| # If not connected, mark as disconnected | ||
| edge_type = 'disconnected_input_edges' if schema_key == 'inputs' else 'disconnected_output_edges' | ||
| graph_dict[edge_type].append({ | ||
| 'edge_path': edge_path, | ||
| 'port': port, | ||
| 'type': schema_key | ||
| }) | ||
| elif isinstance(wire, (list, tuple, str)): | ||
| graph_dict = get_single_wire(edge_path, graph_dict, port, schema_key, wire) | ||
| elif isinstance(wire, dict): | ||
| for subpath, subwire in hierarchy_depth(wires).items(): | ||
| subport = '/'.join(subpath) | ||
| graph_dict = get_single_wire(edge_path, graph_dict, subport, schema_key, subwire) | ||
| else: | ||
| raise ValueError(f"Unexpected wire type: {wires}") | ||
| # Handle optional bridge wiring | ||
| if bridge: | ||
| target_path = absolute_path(edge_path, tuple(bridge)) | ||
| edge_key = 'input_edges' if schema_key == 'inputs' else 'output_edges' | ||
| graph_dict[edge_key].append({ | ||
| 'edge_path': edge_path, | ||
| 'target_path': target_path, | ||
| 'port': f'bridge_{port}', | ||
| 'type': f'bridge_{schema_key}' | ||
| }) | ||
| return graph_dict | ||
| # ---- Graphviz generation methods ---- | ||
| def graphviz_map(core, schema, state, path, options, graph): | ||
| """Visualize mappings by traversing key–value pairs.""" | ||
| value_type = schema._value | ||
| # value_type = core._find_parameter(schema, 'value') | ||
| # Add node for the map container itself | ||
| if path: | ||
| node_spec = { | ||
| 'name': path[-1], | ||
| 'path': path, | ||
| 'value': None, | ||
| 'type': core.render(schema) | ||
| } | ||
| graph['state_nodes'].append(node_spec) | ||
| # Add place edge to parent | ||
| if len(path) > 1: | ||
| add_place_edge(graph, path[:-1], path) | ||
| if isinstance(state, dict): | ||
| for key, value in state.items(): | ||
| if not is_schema_key(key): | ||
| graph = core.call_method('generate_graph_dict', | ||
| value_type, | ||
| value, | ||
| path + (key,), | ||
| options, | ||
| graph | ||
| ) | ||
| return graph | ||
| def graphviz_link(core, schema: Link, state, path, options, graph): | ||
| """Visualize a process node with input/output/bridge wiring.""" | ||
| show_process_config = options.get('show_process_config', False) if options else False | ||
| schema = schema or {} | ||
| node_spec = { | ||
| 'name': path[-1], | ||
| 'path': path, | ||
| 'value': None, | ||
| 'type': core.render(schema) | ||
| } | ||
| # if state.get('address') == 'local:Composite' and node_spec not in graph['process_nodes']: | ||
| # graph['process_nodes'].append(node_spec) | ||
| # return graphviz_composite(core, schema, state, path, options, graph) | ||
| graph['process_nodes'].append(node_spec) | ||
| # Wiring | ||
| config = state.get('config', {}) | ||
| graph = get_graph_wires(schema._inputs, state.get('inputs', {}), graph, 'inputs', path, | ||
| config.get('bridge', {}).get('inputs', {})) | ||
| graph = get_graph_wires(schema._outputs, state.get('outputs', {}), graph, 'outputs', path, | ||
| config.get('bridge', {}).get('outputs', {})) | ||
| # Merge bidirectional edges | ||
| def key(edge): | ||
| return (tuple(edge['edge_path']), tuple(edge['target_path']), edge['port']) | ||
| input_set = {key(e): e for e in graph['input_edges']} | ||
| output_set = {key(e): e for e in graph['output_edges']} | ||
| shared_keys = input_set.keys() & output_set.keys() | ||
| for k in shared_keys: | ||
| graph['bidirectional_edges'].append({ | ||
| 'edge_path': k[0], 'target_path': k[1], 'port': k[2], | ||
| 'type': (input_set[k]['type'], output_set[k]['type']) | ||
| }) | ||
| graph['input_edges'] = [e for k, e in input_set.items() if k not in shared_keys] | ||
| graph['output_edges'] = [e for k, e in output_set.items() if k not in shared_keys] | ||
| if len(path) > 1: | ||
| add_place_edge(graph, path[:-1], path) | ||
| if show_process_config: | ||
| config = state.get('config', {}) | ||
| graph = core.call_method('generate_graph_dict', | ||
| # schema.get(key), | ||
| {}, | ||
| config, | ||
| path + ('config',), | ||
| options, | ||
| graph | ||
| ) | ||
| return graph | ||
| def graphviz_composite(core, schema, state, path, options, graph): | ||
| """Visualize composite nodes by recursing into their internal structure.""" | ||
| graph = graphviz_link(core, schema, state, path, options, graph) | ||
| inner_state = state.get('config', {}).get('state', {}) # or state | ||
| inner_schema = state.get('config', {}).get('schema', {}) # or schema | ||
| inner_schema, inner_state = core.realize(inner_schema, inner_state) | ||
| # inner_schema, inner_state = core.generate(inner_schema, inner_state) | ||
| if len(path) > 1: | ||
| add_place_edge(graph, path[:-1], path) | ||
| for key, value in inner_state.items(): | ||
| if not is_schema_key(key) and key not in PROCESS_SCHEMA_KEYS: | ||
| graph = core.call_method('generate_graph_dict', | ||
| inner_schema.get(key), | ||
| value, | ||
| path + (key,), | ||
| options, | ||
| graph | ||
| ) | ||
| return graph | ||
| def graphviz_node(core, schema: Node, state, path, options, graph): | ||
| """Visualize any type (generic node).""" | ||
| schema = schema or {} | ||
| if path: | ||
| node_spec = { | ||
| 'name': path[-1], | ||
| 'path': path, | ||
| 'value': state if not isinstance(state, dict) else None, | ||
| 'type': core.render(schema) | ||
| } | ||
| graph['state_nodes'].append(node_spec) | ||
| if len(path) > 1: | ||
| add_place_edge(graph, path[:-1], path) | ||
| if isinstance(state, dict): | ||
| for key, value in state.items(): | ||
| if not is_schema_key(key): | ||
| attr = Empty() | ||
| if hasattr(schema, key): | ||
| attr = getattr(schema, key) | ||
| graph = core.call_method('generate_graph_dict', | ||
| attr, | ||
| value, | ||
| path + (key,), | ||
| options, | ||
| graph | ||
| ) | ||
| return graph | ||
| def graphviz_dict(core, schema, state, path, options, graph): | ||
| """Visualize any type (generic node).""" | ||
| schema = schema or {} | ||
| if path: | ||
| node_spec = { | ||
| 'name': path[-1], | ||
| 'path': path, | ||
| 'value': state if not isinstance(state, dict) else None, | ||
| 'type': core.render(schema) | ||
| } | ||
| graph['state_nodes'].append(node_spec) | ||
| if len(path) > 1: | ||
| add_place_edge(graph, path[:-1], path) | ||
| if isinstance(state, dict): | ||
| for key, value in state.items(): | ||
| if not is_schema_key(key): | ||
| graph = core.call_method('generate_graph_dict', | ||
| schema.get(key, {}), | ||
| value, | ||
| path + (key,), | ||
| options, | ||
| graph | ||
| ) | ||
| return graph | ||
| # Dispatch graphviz method based on schema type | ||
| @dispatch | ||
| def graphviz(core, schema: Empty, state, path, options, graph): | ||
| """No-op visualizer for nodes with no visualization.""" | ||
| return graph | ||
| @dispatch | ||
| def graphviz(core, schema: Map, state, path, options, graph): | ||
| return graphviz_map(core, schema, state, path, options, graph) | ||
| @dispatch | ||
| def graphviz(core, schema: Link, state, path, options, graph): | ||
| return graphviz_link(core, schema, state, path, options, graph) | ||
| @dispatch | ||
| def graphviz(core, schema: CompositeLink, state, path, options, graph): | ||
| return graphviz_composite(core, schema, state, path, options, graph) | ||
| @dispatch | ||
| def graphviz(core, schema: Node, state, path, options, graph): | ||
| return graphviz_node(core, schema, state, path, options, graph) | ||
| @dispatch | ||
| def graphviz(core, schema: dict, state, path, options, graph): | ||
| return graphviz_dict(core, schema, state, path, options, graph) | ||
| # Main method to generate graph dictionary | ||
| def generate_graph_dict(core, schema, state, path=(), options=None, graph=None): | ||
| path = path or () | ||
| graph = graph or { | ||
| 'state_nodes': [], | ||
| 'process_nodes': [], | ||
| 'place_edges': [], | ||
| 'input_edges': [], | ||
| 'output_edges': [], | ||
| 'bidirectional_edges': [], | ||
| 'disconnected_input_edges': [], | ||
| 'disconnected_output_edges': []} | ||
| if schema is None: | ||
| schema = Empty() | ||
| return graphviz( | ||
| core, | ||
| schema, | ||
| state, | ||
| path, | ||
| options, | ||
| graph) | ||
| Metadata-Version: 2.4 | ||
| Name: bigraph-viz | ||
| Version: 1.0.0 | ||
| Version: 1.0.2 | ||
| Summary: A visualization method for displaying the structure of process bigraphs | ||
@@ -5,0 +5,0 @@ Author-email: Eran Agmon <agmon.eran@gmail.com> |
@@ -14,2 +14,4 @@ AUTHORS.md | ||
| bigraph_viz.egg-info/requires.txt | ||
| bigraph_viz.egg-info/top_level.txt | ||
| bigraph_viz.egg-info/top_level.txt | ||
| bigraph_viz/methods/__init__.py | ||
| bigraph_viz/methods/generate_graph_dict.py |
@@ -1142,8 +1142,9 @@ import os | ||
| spec = { | ||
| 'composite': { | ||
| '_type': 'process', | ||
| 'composite_process': { | ||
| '_type': 'composite', | ||
| '_inputs': {'port1': 'node'}, | ||
| '_outputs': {'port2': 'node'}, | ||
| 'inputs': {'port1': ['ext1']}, | ||
| 'outputs': {'port2': ['ext2']}, | ||
| 'address': 'local:Composite', | ||
| 'inputs': {'port1': ['external store']}, | ||
| 'config': { | ||
@@ -1158,6 +1159,12 @@ 'state': { | ||
| 'inputs': {'port3': ['store1']}, | ||
| 'outputs': {'port4': ['store2']}}}, | ||
| 'outputs': {'port4': ['store2']} | ||
| } | ||
| }, | ||
| 'bridge': { | ||
| 'inputs': {'port1': ['store1']}, | ||
| 'outputs': {'port2': ['store2']}}}}} | ||
| 'outputs': {'port2': ['store2']} | ||
| } | ||
| } | ||
| } | ||
| } | ||
@@ -1168,2 +1175,3 @@ plot_bigraph( | ||
| filename='composite_process', | ||
| file_format='pdf', | ||
| **plot_settings) | ||
@@ -1170,0 +1178,0 @@ |
+1
-1
| Metadata-Version: 2.4 | ||
| Name: bigraph-viz | ||
| Version: 1.0.0 | ||
| Version: 1.0.2 | ||
| Summary: A visualization method for displaying the structure of process bigraphs | ||
@@ -5,0 +5,0 @@ Author-email: Eran Agmon <agmon.eran@gmail.com> |
+5
-3
@@ -7,3 +7,3 @@ [build-system] | ||
| name = "bigraph-viz" | ||
| version = "1.0.0" | ||
| version = "1.0.2" | ||
| description = "A visualization method for displaying the structure of process bigraphs" | ||
@@ -43,4 +43,6 @@ readme = "README.md" | ||
| [tool.setuptools] | ||
| packages = ["bigraph_viz"] | ||
| # Include bigraph_viz and ALL subpackages (e.g., bigraph_viz.methods) | ||
| [tool.setuptools.packages.find] | ||
| where = ["."] | ||
| include = ["bigraph_viz*"] | ||
@@ -47,0 +49,0 @@ [tool.uv.sources] |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
131669
10.24%18
12.5%2139
17.66%