bigraph-viz
Advanced tools
| Metadata-Version: 2.4 | ||
| Name: bigraph-viz | ||
| Version: 0.1.15 | ||
| Version: 1.0.0 | ||
| Summary: A visualization method for displaying the structure of process bigraphs | ||
@@ -222,3 +222,3 @@ Author-email: Eran Agmon <agmon.eran@gmail.com> | ||
| Classifier: Topic :: Software Development :: Libraries | ||
| Requires-Python: >=3.9 | ||
| Requires-Python: >=3.11 | ||
| Description-Content-Type: text/markdown | ||
@@ -228,2 +228,3 @@ License-File: LICENSE | ||
| Requires-Dist: bigraph-schema | ||
| Requires-Dist: process-bigraph | ||
| Requires-Dist: graphviz | ||
@@ -230,0 +231,0 @@ Dynamic: license-file |
| bigraph-schema | ||
| process-bigraph | ||
| graphviz |
@@ -6,3 +6,2 @@ AUTHORS.md | ||
| bigraph_viz/__init__.py | ||
| bigraph_viz/convert.py | ||
| bigraph_viz/convert_vivarium_v1.py | ||
@@ -9,0 +8,0 @@ bigraph_viz/dict_utils.py |
| import pprint | ||
| from bigraph_viz.visualize_types import VisualizeTypes, plot_bigraph, get_graphviz_fig | ||
| # from bigraph_viz.visualize_types import VisualizeTypes, plot_bigraph, get_graphviz_fig | ||
| from bigraph_viz.visualize_types import plot_bigraph, get_graphviz_fig | ||
| from bigraph_viz.dict_utils import replace_regex_recursive | ||
| from bigraph_viz.methods.generate_graph_dict import generate_graph_dict | ||
@@ -11,1 +13,8 @@ | ||
| return pretty.pformat(x) | ||
| def register_types(core): | ||
| core.register_method('generate_graph_dict', generate_graph_dict) | ||
| return core |
@@ -14,3 +14,3 @@ """ | ||
| required_schema_keys = {'_default', '_apply', '_check', '_serialize', '_deserialize', '_fold'} | ||
| required_schema_keys = {'_default', '_apply', '_check', '_serialize', '_realize', '_fold'} | ||
@@ -17,0 +17,0 @@ optional_schema_keys = {'_type', '_value', '_description', '_type_parameters', '_inherit', '_divide'} |
+499
-453
@@ -9,4 +9,5 @@ import os | ||
| from bigraph_schema import TypeSystem, is_schema_key, hierarchy_depth | ||
| from bigraph_schema import is_schema_key, hierarchy_depth, Edge | ||
| from bigraph_viz.dict_utils import absolute_path | ||
| from process_bigraph import allocate_core | ||
@@ -24,88 +25,2 @@ # Constants | ||
| def get_graph_wires(ports_schema, wires, graph_dict, schema_key, edge_path, bridge_wires=None): | ||
| """ | ||
| Traverse the port wiring and append wire edges or disconnected ports to graph_dict. | ||
| Parameters: | ||
| ports_schema (dict): Schema for ports (inputs or outputs) | ||
| wires (dict): Wiring structure from the process | ||
| graph_dict (dict): Accumulated graph | ||
| schema_key (str): Either 'inputs' or 'outputs' | ||
| edge_path (tuple): Path of the process node | ||
| bridge_wires (dict, optional): Optional rewiring via 'bridge' dict | ||
| Returns: | ||
| graph_dict (dict): Updated graph dict | ||
| """ | ||
| wires = wires or {} | ||
| ports_schema = ports_schema or {} | ||
| inferred_ports = set(ports_schema.keys()) | set(wires.keys()) | ||
| for port in inferred_ports: | ||
| wire = wires.get(port) | ||
| bridge = bridge_wires.get(port) if bridge_wires else None | ||
| if not wire: | ||
| # If not connected, mark as disconnected | ||
| edge_type = 'disconnected_input_edges' if schema_key == 'inputs' else 'disconnected_output_edges' | ||
| graph_dict[edge_type].append({ | ||
| 'edge_path': edge_path, | ||
| 'port': port, | ||
| 'type': schema_key | ||
| }) | ||
| elif isinstance(wire, (list, tuple, str)): | ||
| graph_dict = get_single_wire(edge_path, graph_dict, port, schema_key, wire) | ||
| elif isinstance(wire, dict): | ||
| for subpath, subwire in hierarchy_depth(wires).items(): | ||
| subport = '/'.join(subpath) | ||
| graph_dict = get_single_wire(edge_path, graph_dict, subport, schema_key, subwire) | ||
| else: | ||
| raise ValueError(f"Unexpected wire type: {wires}") | ||
| # Handle optional bridge wiring | ||
| if bridge: | ||
| target_path = absolute_path(edge_path, tuple(bridge)) | ||
| edge_key = 'input_edges' if schema_key == 'inputs' else 'output_edges' | ||
| graph_dict[edge_key].append({ | ||
| 'edge_path': edge_path, | ||
| 'target_path': target_path, | ||
| 'port': f'bridge_{port}', | ||
| 'type': f'bridge_{schema_key}' | ||
| }) | ||
| return graph_dict | ||
| # Append a single port wire connection to graph_dict | ||
| def get_single_wire(edge_path, graph_dict, port, schema_key, wire): | ||
| """ | ||
| Add a connection from a port to its wire target. | ||
| Parameters: | ||
| edge_path (tuple): Path to the process | ||
| graph_dict (dict): Current graph dict | ||
| port (str): Name of the port | ||
| schema_key (str): Either 'inputs' or 'outputs' | ||
| wire (str|list): Wire connection(s) | ||
| Returns: | ||
| Updated graph_dict | ||
| """ | ||
| if isinstance(wire, str): | ||
| wire = [wire] | ||
| else: | ||
| wire = [item for item in wire if isinstance(item, str)] | ||
| target_path = absolute_path(edge_path[:-1], tuple(wire)) | ||
| edge_key = 'input_edges' if schema_key == 'inputs' else 'output_edges' | ||
| graph_dict[edge_key].append({ | ||
| 'edge_path': edge_path, | ||
| 'target_path': target_path, | ||
| 'port': port, | ||
| 'type': schema_key | ||
| }) | ||
| return graph_dict | ||
| # Plot a labeled edge from a port to a process | ||
@@ -121,6 +36,2 @@ def plot_edges(graph, edge, port_labels, port_label_size, state_node_spec, constraint='false'): | ||
| if target_name not in graph.body: | ||
| label_text = make_label(edge['target_path'][-1]) | ||
| graph.node(target_name, label=label_text, **state_node_spec) | ||
| with graph.subgraph(name=process_name) as sub: | ||
@@ -171,4 +82,5 @@ sub.edge(target_name, process_name, constraint=constraint, label=label, | ||
| # Type row | ||
| if show_types and (typ := node.get('type')): | ||
| # Type row (LEAF ONLY) | ||
| is_leaf = node.get('value') is not None | ||
| if show_types and is_leaf and (typ := node.get('type')): | ||
| typ_str = str(typ) | ||
@@ -190,8 +102,2 @@ if len(typ_str) > type_char_limit: | ||
| # make the Graphviz figure | ||
| import os | ||
| from collections import defaultdict | ||
| import graphviz | ||
| def get_graphviz_fig( | ||
@@ -220,2 +126,4 @@ graph_dict, | ||
| collapse_redundant_processes=False, | ||
| collapse_paths=None, | ||
| remove_paths=None, | ||
| ): | ||
@@ -229,2 +137,3 @@ """ | ||
| Dictionary describing nodes and edges of a simulation bigraph. | ||
| collapse_redundant_processes : bool | str | Iterable | dict | ||
@@ -243,2 +152,20 @@ Controls collapsing of processes that share identical port wiring: | ||
| collapse_paths : Iterable[Iterable[str]] | Iterable[str] | str | None | ||
| Hierarchical paths to collapse as subtrees. For each prefix path P, | ||
| all nodes whose path starts with P and is strictly deeper than P are | ||
| removed, while the node at P is kept. | ||
| Example: | ||
| collapse_paths=[['particles']] | ||
| keeps: ['particles'] | ||
| removes: ['particles', 'abc'], ['particles', 'abc', 'def'], ... | ||
| remove_paths : Iterable[Iterable[str]] | Iterable[str] | str | None | ||
| Hierarchical paths to fully remove. For each prefix path P, | ||
| nodes whose path starts with P (including P itself) are removed. | ||
| Example: | ||
| remove_paths=[['particles']] | ||
| removes: ['particles'], ['particles', 'abc'], ... | ||
| Returns | ||
@@ -294,3 +221,3 @@ ------- | ||
| # -------- collapse configuration ---------------------------------------- | ||
| # -------- collapse configuration: redundant processes ------------------- | ||
@@ -330,10 +257,41 @@ def normalize_collapse_arg(arg): | ||
| def process_matches_selector(entry, selector): | ||
| """Check if a process entry matches a single selector.""" | ||
| """ | ||
| Match a process entry against a selector. | ||
| Entry is (path, path_str, name) where: | ||
| - path is a list/tuple like ('particles', 'id123', 'glucose eater') | ||
| - path_str is str(path) | ||
| - name is leaf node name | ||
| Selector forms: | ||
| - tuple/list: exact path match OR (if len==1) match any path segment OR (if len>1) match prefix | ||
| - str: match leaf name OR full path string OR any path segment | ||
| """ | ||
| path, path_str, name = entry | ||
| path_t = tuple(path) | ||
| # tuple/list selectors: exact, prefix, or "segment mark" if length 1 | ||
| if isinstance(selector, (tuple, list)): | ||
| return tuple(selector) == tuple(path) | ||
| sel_t = tuple(selector) | ||
| return selector == name or selector == path_str | ||
| # exact match | ||
| if sel_t == path_t: | ||
| return True | ||
| # single-element tuple means "mark": appears anywhere in the path | ||
| if len(sel_t) == 1: | ||
| return sel_t[0] in path_t | ||
| # multi-element tuple: treat as a prefix path selector | ||
| if len(sel_t) <= len(path_t) and path_t[:len(sel_t)] == sel_t: | ||
| return True | ||
| return False | ||
| # string selectors: leaf name, exact stringified path, or any segment match | ||
| if isinstance(selector, str): | ||
| return selector == name or selector == path_str or selector in path_t | ||
| return False | ||
| def process_is_selected(entry): | ||
@@ -359,2 +317,126 @@ """Return True if this process is eligible to be collapsed.""" | ||
| # -------- collapse/remove configuration: hierarchical subtrees --------- | ||
| def normalize_prefixes(arg): | ||
| """ | ||
| Normalize a 'paths' argument into a list of tuple prefixes. | ||
| Examples: | ||
| None -> [] | ||
| 'particles' -> [('particles',)] | ||
| ['particles'] -> [('particles',)] | ||
| [['particles']] -> [('particles',)] | ||
| [['a'], ['b', 'c']] -> [('a',), ('b', 'c')] | ||
| """ | ||
| if not arg: | ||
| return [] | ||
| # Strings are treated as single-segment paths | ||
| if isinstance(arg, (str, bytes)): | ||
| return [(arg,)] | ||
| # Try to interpret it as an iterable | ||
| try: | ||
| items = list(arg) | ||
| except TypeError: | ||
| return [(arg,)] | ||
| if not items: | ||
| return [] | ||
| # If first element is not a list/tuple, treat whole thing as one path | ||
| if not isinstance(items[0], (list, tuple)): | ||
| return [tuple(items)] | ||
| # Otherwise each element is a path | ||
| prefixes = [] | ||
| for p in items: | ||
| if isinstance(p, (list, tuple)): | ||
| prefixes.append(tuple(p)) | ||
| else: | ||
| prefixes.append((p,)) | ||
| return prefixes | ||
| collapsed_prefixes = normalize_prefixes(collapse_paths) | ||
| removed_prefixes = normalize_prefixes(remove_paths) | ||
| def path_is_hidden(path): | ||
| """ | ||
| Return True if this path should be removed from the graph_dict | ||
| based on collapse/remove prefixes. | ||
| - remove_paths: hide root AND descendants | ||
| - collapse_paths: hide ONLY descendants, keep root | ||
| """ | ||
| # If paths are always lists/tuples, you can assert here. | ||
| if isinstance(path, str): | ||
| # If you ever represent paths as strings, customize as needed. | ||
| # For now, treat them as non-hierarchical => never auto-hide. | ||
| return False | ||
| path_t = tuple(path) | ||
| # 1) Removal: root + subtree | ||
| for pref in removed_prefixes: | ||
| if path_t[:len(pref)] == pref: | ||
| return True | ||
| # 2) Collapse: only descendants | ||
| for pref in collapsed_prefixes: | ||
| if len(path_t) > len(pref) and path_t[:len(pref)] == pref: | ||
| return True | ||
| return False | ||
| def prune_subtrees(): | ||
| """ | ||
| Remove all nodes and edges that should be hidden according to | ||
| collapse_paths and remove_paths. | ||
| - For removed_prefixes: delete root and descendants. | ||
| - For collapsed_prefixes: delete descendants only (root kept). | ||
| """ | ||
| if not collapsed_prefixes and not removed_prefixes: | ||
| return | ||
| # State and process nodes | ||
| graph_dict['state_nodes'] = [ | ||
| n for n in graph_dict.get('state_nodes', []) | ||
| if not path_is_hidden(n['path']) | ||
| ] | ||
| graph_dict['process_nodes'] = [ | ||
| n for n in graph_dict.get('process_nodes', []) | ||
| if not path_is_hidden(n['path']) | ||
| ] | ||
| def filter_edge_list(edges): | ||
| new_edges = [] | ||
| for e in edges: | ||
| if path_is_hidden(e['edge_path']): | ||
| continue | ||
| tpath = e.get('target_path') | ||
| if isinstance(tpath, (list, tuple)) and path_is_hidden(tpath): | ||
| continue | ||
| new_edges.append(e) | ||
| return new_edges | ||
| for group in [ | ||
| 'input_edges', | ||
| 'output_edges', | ||
| 'bidirectional_edges', | ||
| 'disconnected_input_edges', | ||
| 'disconnected_output_edges', | ||
| ]: | ||
| if group in graph_dict: | ||
| graph_dict[group] = filter_edge_list(graph_dict[group]) | ||
| # Place edges | ||
| if 'place_edges' in graph_dict: | ||
| new_place_edges = [] | ||
| for e in graph_dict['place_edges']: | ||
| if path_is_hidden(e['parent']) or path_is_hidden(e['child']): | ||
| continue | ||
| new_place_edges.append(e) | ||
| graph_dict['place_edges'] = new_place_edges | ||
| # -------- core helpers -------------------------------------------------- | ||
@@ -425,3 +507,3 @@ | ||
| for path, path_str, _ in selected[1:]: | ||
| collapse_map[str(path)] = rep_str | ||
| collapse_map[str(path)] = rep_path | ||
@@ -505,4 +587,4 @@ # Draw remaining (unselected) entries individually | ||
| visible = not ( | ||
| (remove_process_place_edges and edge['child'] in process_paths) | ||
| or (edge in invisible_edges) | ||
| (remove_process_place_edges and edge['child'] in process_paths) | ||
| or (edge in invisible_edges) | ||
| ) | ||
@@ -564,2 +646,5 @@ graph.attr('edge', style='filled' if visible else 'invis') | ||
| # First, apply hierarchical collapse/remove rules | ||
| prune_subtrees() | ||
| add_state_nodes() | ||
@@ -574,3 +659,4 @@ process_paths, collapse_map = add_process_nodes() | ||
| ]) | ||
| add_state_nodes() | ||
| # TODO: the second add_state_nodes() looks redundant | ||
| # add_state_nodes() | ||
| add_disconnected_edges() | ||
@@ -583,2 +669,29 @@ rank_node_groups() | ||
| def plot_graph(graph_dict, | ||
| out_dir='out', | ||
| filename=None, | ||
| file_format='png', | ||
| print_source=False, | ||
| options=None | ||
| ): | ||
| # make a figure | ||
| options = options or {} | ||
| graph = get_graphviz_fig( | ||
| graph_dict, | ||
| **options) | ||
| # display or save results | ||
| if print_source: | ||
| print(graph.source) | ||
| if filename is not None: | ||
| out_dir = out_dir or 'out' | ||
| os.makedirs(out_dir, exist_ok=True) | ||
| fig_path = os.path.join(out_dir, filename) | ||
| print(f"Writing {fig_path}") | ||
| graph.render(filename=fig_path, format=file_format) | ||
| return graph | ||
| def plot_bigraph( | ||
@@ -591,2 +704,3 @@ state, | ||
| file_format='png', | ||
| show_compiled_state=True, | ||
| **kwargs | ||
@@ -615,9 +729,14 @@ ): | ||
| # Defaults | ||
| core = core or VisualizeTypes() | ||
| core = core or allocate_core() | ||
| schema = schema or {} | ||
| schema, state = core.generate(schema, state) | ||
| compiled_schema, compiled_state = core.realize(schema, state) | ||
| graph_dict = core.generate_graph_dict(schema, state, (), options=traversal_kwargs) | ||
| graph_dict = core.call_method( | ||
| 'generate_graph_dict', | ||
| compiled_schema, | ||
| compiled_state if show_compiled_state else state, | ||
| (), options=traversal_kwargs) | ||
| return core.plot_graph( | ||
| return plot_graph( | ||
| graph_dict, | ||
@@ -631,232 +750,2 @@ filename=filename, | ||
| # Visualize Types | ||
| def graphviz_any(core, schema, state, path, options, graph): | ||
| """Visualize any type (generic node).""" | ||
| schema = schema or {} | ||
| if path: | ||
| node_spec = { | ||
| 'name': path[-1], | ||
| 'path': path, | ||
| 'value': state if not isinstance(state, dict) else None, | ||
| 'type': core.representation(schema) | ||
| } | ||
| graph['state_nodes'].append(node_spec) | ||
| if len(path) > 1: | ||
| graph['place_edges'].append({'parent': path[:-1], 'child': path}) | ||
| if isinstance(state, dict): | ||
| for key, value in state.items(): | ||
| if not is_schema_key(key): | ||
| graph = core.get_graph_dict( | ||
| schema.get(key, {}), | ||
| value, | ||
| path + (key,), | ||
| options, | ||
| graph | ||
| ) | ||
| return graph | ||
| def graphviz_edge(core, schema, state, path, options, graph): | ||
| """Visualize a process node with input/output/bridge wiring.""" | ||
| schema = schema or {} | ||
| node_spec = { | ||
| 'name': path[-1], | ||
| 'path': path, | ||
| 'value': None, | ||
| 'type': core.representation(schema) | ||
| } | ||
| if state.get('address') == 'local:composite' and node_spec not in graph['process_nodes']: | ||
| graph['process_nodes'].append(node_spec) | ||
| return graphviz_composite(core, schema, state, path, options, graph) | ||
| graph['process_nodes'].append(node_spec) | ||
| # Wiring | ||
| graph = get_graph_wires(schema.get('_inputs', {}), state.get('inputs', {}), graph, 'inputs', path, | ||
| state.get('bridge', {}).get('inputs', {})) | ||
| graph = get_graph_wires(schema.get('_outputs', {}), state.get('outputs', {}), graph, 'outputs', path, | ||
| state.get('bridge', {}).get('outputs', {})) | ||
| # Merge bidirectional edges | ||
| def key(edge): | ||
| return (tuple(edge['edge_path']), tuple(edge['target_path']), edge['port']) | ||
| input_set = {key(e): e for e in graph['input_edges']} | ||
| output_set = {key(e): e for e in graph['output_edges']} | ||
| shared_keys = input_set.keys() & output_set.keys() | ||
| for k in shared_keys: | ||
| graph['bidirectional_edges'].append({ | ||
| 'edge_path': k[0], 'target_path': k[1], 'port': k[2], | ||
| 'type': (input_set[k]['type'], output_set[k]['type']) | ||
| }) | ||
| graph['input_edges'] = [e for k, e in input_set.items() if k not in shared_keys] | ||
| graph['output_edges'] = [e for k, e in output_set.items() if k not in shared_keys] | ||
| if len(path) > 1: | ||
| graph['place_edges'].append({'parent': path[:-1], 'child': path}) | ||
| return graph | ||
| def graphviz_none(core, schema, state, path, options, graph): | ||
| """No-op visualizer for nodes with no visualization.""" | ||
| return graph | ||
| def graphviz_map(core, schema, state, path, options, graph): | ||
| """Visualize mappings by traversing key–value pairs.""" | ||
| value_type = core._find_parameter(schema, 'value') | ||
| # Add node for the map container itself | ||
| if path: | ||
| node_spec = { | ||
| 'name': path[-1], | ||
| 'path': path, | ||
| 'value': None, | ||
| 'type': core.representation(schema) | ||
| } | ||
| graph['state_nodes'].append(node_spec) | ||
| # Add place edge to parent | ||
| if len(path) > 1: | ||
| graph['place_edges'].append({'parent': path[:-1], 'child': path}) | ||
| if isinstance(state, dict): | ||
| for key, value in state.items(): | ||
| if not is_schema_key(key): | ||
| graph = core.get_graph_dict( | ||
| value_type, | ||
| value, | ||
| path + (key,), | ||
| options, | ||
| graph | ||
| ) | ||
| return graph | ||
| def graphviz_composite(core, schema, state, path, options, graph): | ||
| """Visualize composite nodes by recursing into their internal structure.""" | ||
| graph = graphviz_edge(core, schema, state, path, options, graph) | ||
| inner_state = state.get('config', {}).get('state') or state | ||
| inner_schema = state.get('config', {}).get('composition') or schema | ||
| inner_schema, inner_state = core.generate(inner_schema, inner_state) | ||
| if len(path) > 1: | ||
| graph['place_edges'].append({'parent': path[:-1], 'child': path}) | ||
| for key, value in inner_state.items(): | ||
| if not is_schema_key(key) and key not in PROCESS_SCHEMA_KEYS: | ||
| graph = core.get_graph_dict( | ||
| inner_schema.get(key), | ||
| value, | ||
| path + (key,), | ||
| options, | ||
| graph | ||
| ) | ||
| return graph | ||
| # dict with different types and their graphviz functions | ||
| visualize_types = { | ||
| 'any': { | ||
| '_graphviz': graphviz_any | ||
| }, | ||
| 'edge': { | ||
| '_graphviz': graphviz_edge | ||
| }, | ||
| 'quote': { | ||
| '_graphviz': graphviz_none, | ||
| }, | ||
| 'map': { | ||
| '_graphviz': graphviz_map, | ||
| }, | ||
| 'step': { | ||
| '_inherit': ['edge'] | ||
| }, | ||
| 'process': { | ||
| '_inherit': ['edge'] | ||
| }, | ||
| 'composite': { | ||
| '_inherit': ['process'], | ||
| '_graphviz': graphviz_composite, | ||
| }, | ||
| } | ||
| # TODO: we want to visualize things that are not yet complete | ||
| class VisualizeTypes(TypeSystem): | ||
| def __init__(self): | ||
| super().__init__() | ||
| self.update_types(visualize_types) | ||
| def get_graph_dict(self, schema, state, path, options, graph=None): | ||
| path = path or () | ||
| graph = graph or { | ||
| 'state_nodes': [], | ||
| 'process_nodes': [], | ||
| 'place_edges': [], | ||
| 'input_edges': [], | ||
| 'output_edges': [], | ||
| 'bidirectional_edges': [], | ||
| 'disconnected_input_edges': [], | ||
| 'disconnected_output_edges': []} | ||
| graphviz_function = self.choose_method( | ||
| schema, | ||
| state, | ||
| 'graphviz') | ||
| if options.get('remove_nodes') and path in options['remove_nodes']: | ||
| return graph | ||
| return graphviz_function( | ||
| self, | ||
| schema, | ||
| state, | ||
| path, | ||
| options, | ||
| graph) | ||
| def generate_graph_dict(self, schema, state, path, options): | ||
| full_schema, full_state = self.generate(schema, state) | ||
| return self.get_graph_dict(full_schema, full_state, path, options) | ||
| def plot_graph(self, | ||
| graph_dict, | ||
| out_dir='out', | ||
| filename=None, | ||
| file_format='png', | ||
| print_source=False, | ||
| options=None | ||
| ): | ||
| # make a figure | ||
| options = options or {} | ||
| graph = get_graphviz_fig( | ||
| graph_dict, | ||
| **options) | ||
| # display or save results | ||
| if print_source: | ||
| print(graph.source) | ||
| if filename is not None: | ||
| out_dir = out_dir or 'out' | ||
| os.makedirs(out_dir, exist_ok=True) | ||
| fig_path = os.path.join(out_dir, filename) | ||
| print(f"Writing {fig_path}") | ||
| graph.render(filename=fig_path, format=file_format) | ||
| return graph | ||
| # Begin Tests | ||
@@ -871,3 +760,3 @@ ############### | ||
| def test_simple_store(): | ||
| def run_simple_store(core): | ||
| simple_store_state = { | ||
@@ -882,3 +771,3 @@ 'store1': 1.0, | ||
| def test_forest(): | ||
| def run_forest(core): | ||
| forest = { | ||
@@ -895,6 +784,7 @@ 'v0': { | ||
| } | ||
| plot_bigraph(forest, **plot_settings, filename='forest') | ||
| plot_bigraph(forest, **plot_settings, filename='forest', core=core) | ||
| def test_nested_composite(): | ||
| def run_nested_composite(core): | ||
| state = { | ||
@@ -912,6 +802,6 @@ 'environment': { | ||
| 'interval': 1.0, | ||
| 'address': 'local:composite', | ||
| 'config': {'_type': 'quote', | ||
| 'address': 'local:Composite', | ||
| 'config': {'_type': 'node', | ||
| 'state': {'grow': {'_type': 'process', | ||
| 'address': 'local:grow', | ||
| 'address': 'local:Grow', | ||
| 'config': {'rate': 0.03}, | ||
@@ -921,3 +811,3 @@ 'inputs': {'mass': ['mass']}, | ||
| 'divide': {'_type': 'process', | ||
| 'address': 'local:divide', | ||
| 'address': 'local:Divide', | ||
| 'config': {'agent_id': '0', | ||
@@ -933,6 +823,6 @@ 'agent_schema': {'mass': 'float'}, | ||
| 'environment': ['environment']}}, | ||
| 'composition': {'global_time': 'float'}, | ||
| 'schema': {'global_time': 'float'}, | ||
| 'interface': {'inputs': {}, 'outputs': {}}, | ||
| 'emitter': {'path': ['emitter'], | ||
| 'address': 'local:ram-emitter', | ||
| 'address': 'local:RAMEmitter', | ||
| 'config': {}, | ||
@@ -943,8 +833,11 @@ 'mode': 'none', | ||
| }}}} | ||
| plot_bigraph(state, | ||
| filename='nested_composite', | ||
| **plot_settings) | ||
| plot_bigraph( | ||
| state, | ||
| core=core, | ||
| filename='nested_composite', | ||
| **plot_settings) | ||
| def test_graphviz(): | ||
| def run_graphviz(core): | ||
| cell = { | ||
@@ -958,3 +851,3 @@ 'config': { | ||
| # 'config': {}, | ||
| # 'address': 'local:cell', # TODO -- this is where the ports/inputs/outputs come from | ||
| # 'address': 'local:Cell', # TODO -- this is where the ports/inputs/outputs come from | ||
| 'internal': 1.0, | ||
@@ -978,4 +871,3 @@ '_inputs': { | ||
| core = VisualizeTypes() | ||
| graph_dict = core.generate_graph_dict( | ||
| graph_dict = core.call_method('generate_graph_dict', | ||
| {}, | ||
@@ -986,19 +878,33 @@ cell, | ||
| core.plot_graph( | ||
| plot_graph( | ||
| graph_dict, | ||
| out_dir='out', | ||
| filename='test_graphviz' | ||
| filename='run_graphviz' | ||
| ) | ||
| def test_bigraph_cell(): | ||
| class Cell(Edge): | ||
| def inputs(self): | ||
| return { | ||
| 'nutrients': 'float', | ||
| } | ||
| def outputs(self): | ||
| return { | ||
| 'secretions': 'float', | ||
| 'biomass': 'float', | ||
| } | ||
| def run_bigraph_cell(core): | ||
| cell = { | ||
| 'config': { | ||
| '_type': 'map[float]', | ||
| 'a': 11.0, # {'_type': 'float', '_value': 11.0}, | ||
| 'a': 11.0, | ||
| 'b': 3333.33}, | ||
| 'cell': { | ||
| '_type': 'process', # TODO -- this should also accept process, step, but how in bigraph-schema? | ||
| 'config': {}, | ||
| 'address': 'local:cell', # TODO -- this is where the ports/inputs/outputs come from | ||
| 'config': {'param1': 42}, | ||
| 'address': 'local:Cell', # TODO -- this is where the ports/inputs/outputs come from | ||
| 'internal': 1.0, | ||
@@ -1019,3 +925,3 @@ '_inputs': { | ||
| } | ||
| } | ||
| }, | ||
| } | ||
@@ -1025,4 +931,5 @@ | ||
| filename='bigraph_cell', | ||
| core=core, | ||
| show_values=True, | ||
| # show_types=True, | ||
| show_types=True, | ||
| **plot_settings | ||
@@ -1032,4 +939,3 @@ ) | ||
| def test_bio_schema(): | ||
| core = VisualizeTypes() | ||
| def run_bio_schema(core): | ||
| b = { | ||
@@ -1044,4 +950,4 @@ 'environment': { | ||
| '_type': 'process', | ||
| '_inputs': {'DNA': 'any'}, | ||
| '_outputs': {'RNA': 'any'}, | ||
| '_inputs': {'DNA': 'node'}, | ||
| '_outputs': {'RNA': 'node'}, | ||
| 'inputs': { | ||
@@ -1059,7 +965,8 @@ 'DNA': ['chromosome'] | ||
| 'fields': {}, | ||
| 'barriers': {}, | ||
| 'barriers': { | ||
| '_type': 'node'}, | ||
| 'diffusion': { | ||
| '_type': 'process', | ||
| '_inputs': {'fields': 'any'}, | ||
| '_outputs': {'fields': 'any'}, | ||
| '_inputs': {'fields': 'node'}, | ||
| '_outputs': {'fields': 'node'}, | ||
| 'inputs': { | ||
@@ -1078,8 +985,12 @@ 'fields': ['fields', ] | ||
| def test_flat_composite(): | ||
| def run_flat_composite(core): | ||
| flat_composite_spec = { | ||
| 'store1.1': 'float', | ||
| 'store1.2': 'int', | ||
| 'store1.2': 'integer', | ||
| 'process1': { | ||
| '_type': 'process', | ||
| '_outputs': { | ||
| 'port1': 'node', | ||
| 'port2': 'node', | ||
| }, | ||
| 'outputs': { | ||
@@ -1093,4 +1004,4 @@ 'port1': ['store1.1'], | ||
| '_inputs': { | ||
| 'port1': 'any', | ||
| 'port2': 'any', | ||
| 'port1': 'node', | ||
| 'port2': 'node', | ||
| }, | ||
@@ -1103,3 +1014,5 @@ 'inputs': { | ||
| } | ||
| plot_bigraph(flat_composite_spec, | ||
| core=core, | ||
| rankdir='RL', | ||
@@ -1110,10 +1023,10 @@ filename='flat_composite', | ||
| def test_multi_processes(): | ||
| def run_multi_processes(core): | ||
| process_schema = { | ||
| '_type': 'process', | ||
| '_inputs': { | ||
| 'port1': 'Any', | ||
| 'port1': 'node', | ||
| }, | ||
| '_outputs': { | ||
| 'port2': 'Any' | ||
| 'port2': 'node' | ||
| }, | ||
@@ -1127,3 +1040,5 @@ } | ||
| } | ||
| plot_bigraph(processes_spec, | ||
| core=core, | ||
| rankdir='BT', | ||
@@ -1134,9 +1049,13 @@ filename='multiple_processes', | ||
| def test_nested_processes(): | ||
| def run_nested_processes(core): | ||
| nested_process_spec = { | ||
| 'store1': { | ||
| 'store1.1': 'float', | ||
| 'store1.2': 'int', | ||
| 'store1.2': 'integer', | ||
| 'process1': { | ||
| '_type': 'process', | ||
| '_inputs': { | ||
| 'port1': 'node', | ||
| 'port2': 'node', | ||
| }, | ||
| 'inputs': { | ||
@@ -1149,2 +1068,6 @@ 'port1': ['store1.1'], | ||
| '_type': 'process', | ||
| '_outputs': { | ||
| 'port1': 'node', | ||
| 'port2': 'node', | ||
| }, | ||
| 'outputs': { | ||
@@ -1158,2 +1081,5 @@ 'port1': ['store1.1'], | ||
| '_type': 'process', | ||
| '_inputs': { | ||
| 'port1': 'node', | ||
| }, | ||
| 'inputs': { | ||
@@ -1164,13 +1090,15 @@ 'port1': ['store1'], | ||
| } | ||
| plot_bigraph(nested_process_spec, | ||
| **plot_settings, | ||
| core=core, | ||
| filename='nested_processes') | ||
| def test_cell_hierarchy(): | ||
| core = VisualizeTypes() | ||
| def run_cell_hierarchy(core): | ||
| core.register_type('concentrations', 'float') | ||
| core.access('concentrations') | ||
| core.register('concentrations', 'float') | ||
| core.register('sequences', 'float') | ||
| core.register('membrane', { | ||
| core.register_type('sequences', 'float') | ||
| core.register_type('membrane', { | ||
| 'transporters': 'concentrations', | ||
@@ -1189,3 +1117,3 @@ 'lipids': 'concentrations', | ||
| core.register('cytoplasm', { | ||
| core.register_type('cytoplasm', { | ||
| 'metabolites': 'concentrations', | ||
@@ -1201,7 +1129,7 @@ 'ribosomal complexes': 'concentrations', | ||
| core.register('nucleoid', { | ||
| core.register_type('nucleoid', { | ||
| 'chromosome': { | ||
| 'genes': 'sequences'}}) | ||
| core.register('cell', { | ||
| core.register_type('cell', { | ||
| 'membrane': 'membrane', | ||
@@ -1230,8 +1158,7 @@ 'cytoplasm': 'cytoplasm', | ||
| filename='cell_hierarchy', | ||
| show_compiled_state=True, | ||
| **plot_settings) | ||
| def test_multiple_disconnected_ports(): | ||
| core = VisualizeTypes() | ||
| def run_multiple_disconnected_ports(core): | ||
| spec = { | ||
@@ -1241,8 +1168,8 @@ 'process': { | ||
| '_inputs': { | ||
| 'port1': 'Any', | ||
| 'port2': 'Any', | ||
| 'port1': 'node', | ||
| 'port2': 'node', | ||
| }, | ||
| '_outputs': { | ||
| 'port1': 'Any', | ||
| 'port2': 'Any', | ||
| 'port1': 'node', | ||
| 'port2': 'node', | ||
| }, | ||
@@ -1260,22 +1187,23 @@ }, | ||
| def test_composite_process(): | ||
| core = VisualizeTypes() | ||
| def run_composite_process(core): | ||
| spec = { | ||
| 'composite': { | ||
| '_type': 'composite', | ||
| '_inputs': {'port1': 'any'}, | ||
| '_outputs': {'port2': 'any'}, | ||
| '_type': 'process', | ||
| '_inputs': {'port1': 'node'}, | ||
| '_outputs': {'port2': 'node'}, | ||
| 'address': 'local:Composite', | ||
| 'inputs': {'port1': ['external store']}, | ||
| 'store1': 'any', | ||
| 'store2': 'any', | ||
| 'bridge': { | ||
| 'inputs': {'port1': ['store1']}, | ||
| 'outputs': {'port2': ['store2']}}, | ||
| 'process1': { | ||
| '_type': 'process', | ||
| '_inputs': {'port3': 'any'}, | ||
| '_outputs': {'port4': 'any', }, | ||
| 'inputs': {'port3': ['store1']}, | ||
| 'outputs': {'port4': ['store2']}}}} | ||
| 'config': { | ||
| 'state': { | ||
| 'store1': 'node', | ||
| 'store2': 'node', | ||
| 'process1': { | ||
| '_type': 'process', | ||
| '_inputs': {'port3': 'node'}, | ||
| '_outputs': {'port4': 'node'}, | ||
| 'inputs': {'port3': ['store1']}, | ||
| 'outputs': {'port4': ['store2']}}}, | ||
| 'bridge': { | ||
| 'inputs': {'port1': ['store1']}, | ||
| 'outputs': {'port2': ['store2']}}}}} | ||
@@ -1289,10 +1217,8 @@ plot_bigraph( | ||
| def test_bidirectional_edges(): | ||
| core = VisualizeTypes() | ||
| def run_bidirectional_edges(core): | ||
| spec = { | ||
| 'process1': { | ||
| '_type': 'process', | ||
| '_inputs': {'port1': 'any'}, | ||
| '_outputs': {'port1': 'any'}, | ||
| '_inputs': {'port1': 'node'}, | ||
| '_outputs': {'port1': 'node'}, | ||
| 'inputs': {'port1': ['external store']}, | ||
@@ -1302,4 +1228,4 @@ 'outputs': {'port1': ['external store']}}, | ||
| '_type': 'process', | ||
| '_inputs': {'port3': 'any'}, | ||
| '_outputs': {'port4': 'any'}, | ||
| '_inputs': {'port3': 'node'}, | ||
| '_outputs': {'port4': 'node'}, | ||
| 'inputs': {'port3': ['external store']}, | ||
@@ -1317,2 +1243,22 @@ 'outputs': {'port4': ['external store']} | ||
| class DynamicFBA(Edge): | ||
| config_schema = { | ||
| 'model_file': 'string', | ||
| 'kinetic_params': 'map[tuple[float,float]]', | ||
| 'substrate_update_reactions': 'map[string]', | ||
| 'bounds': 'map[map[float]]', | ||
| } | ||
| def inputs(self): | ||
| return { | ||
| 'substrates': 'map[float]', | ||
| } | ||
| def outputs(self): | ||
| return { | ||
| 'biomass': 'float', | ||
| 'substrates': 'map[float]', | ||
| } | ||
| def generate_spec_and_schema(n_rows, n_cols): | ||
@@ -1335,5 +1281,5 @@ spec = {'cells': {}} | ||
| 'acetate': ['..', 'fields', 'acetate', i, j], | ||
| 'biomass': ['..', 'fields', 'biomass', i, j], | ||
| 'glucose': ['..', 'fields', 'glucose', i, j], | ||
| } | ||
| }, | ||
| 'biomass': ['..', 'fields', 'biomass', i, j] | ||
| }, | ||
@@ -1343,5 +1289,5 @@ 'outputs': { | ||
| 'acetate': ['..', 'fields', 'acetate', i, j], | ||
| 'biomass': ['..', 'fields', 'biomass', i, j], | ||
| 'glucose': ['..', 'fields', 'glucose', i, j], | ||
| } | ||
| }, | ||
| 'biomass': ['..', 'fields', 'biomass', i, j] | ||
| } | ||
@@ -1368,5 +1314,3 @@ } | ||
| def test_array_paths(): | ||
| core = VisualizeTypes() | ||
| def run_array_paths(core): | ||
| n_rows, n_cols = 2, 1 # or any desired shape | ||
@@ -1383,5 +1327,3 @@ spec, schema = generate_spec_and_schema(n_rows, n_cols) | ||
| def test_complex_bigraph(): | ||
| core = VisualizeTypes() | ||
| def run_complex_bigraph(core): | ||
| n_rows, n_cols = 6, 6 # or any desired shape | ||
@@ -1401,5 +1343,27 @@ spec, schema = generate_spec_and_schema(n_rows, n_cols) | ||
| def test_nested_particle_process(): | ||
| core = VisualizeTypes() | ||
| class Particles(Edge): | ||
| def inputs(self): | ||
| return { | ||
| 'particles': 'map[particle]', | ||
| 'fields': 'map[array]', | ||
| } | ||
| def outputs(self): | ||
| return { | ||
| 'particles': 'map[particle]', | ||
| 'fields': 'map[array]', | ||
| } | ||
| def run_nested_particle_process(core): | ||
| core.register_type('particle', { | ||
| 'id': 'string', | ||
| 'position': 'tuple[float,float]', | ||
| 'size': 'float', | ||
| 'mass': 'float', | ||
| 'local': 'map[float]', | ||
| 'exchange': 'map[float]' | ||
| }) | ||
| state = { | ||
@@ -1436,3 +1400,3 @@ "particles": { | ||
| "lower": "-2.0", | ||
| "upper": "!nil"}, | ||
| "upper": None}, | ||
| "ATPM": { | ||
@@ -1599,3 +1563,3 @@ "lower": "1.0", | ||
| } | ||
| composition = { | ||
| schema = { | ||
| 'particles': { | ||
@@ -1606,3 +1570,3 @@ '_type': 'map', | ||
| 'address': {'_type': 'string', '_default': 'local:DynamicFBA'}, | ||
| 'config': {'_type': 'quote', '_default': { | ||
| 'config': {'_type': 'node', '_default': { | ||
| 'model_file': 'textbook', | ||
@@ -1622,3 +1586,3 @@ 'kinetic_params': {'glucose': (0.5, 1), 'acetate': (0.5, 2)}, | ||
| plot_bigraph(state=state, schema=composition, core=core, | ||
| plot_bigraph(state=state, schema=schema, core=core, | ||
| filename='nested_particle_process', | ||
@@ -1628,19 +1592,101 @@ **plot_settings, | ||
| def run_process_config(core): | ||
| state = { | ||
| 'process1': { | ||
| '_type': 'process', | ||
| 'config': {'param1': 10, 'param2': 20}, | ||
| '_inputs': { | ||
| 'input1': 'node', | ||
| }, | ||
| '_outputs': { | ||
| 'output1': 'node', | ||
| }, | ||
| 'inputs': { | ||
| 'input1': ['store1'], | ||
| }, | ||
| 'interval': 1.0, | ||
| }, | ||
| 'process2': { | ||
| '_type': 'step', | ||
| 'config': {'a': 1, 'b': 2}, | ||
| '_inputs': { | ||
| 'interval': 'node', | ||
| 'input1': 'node', | ||
| }, | ||
| 'inputs': { | ||
| 'interval': ['process1', 'interval'], | ||
| 'input1': ['store1'], | ||
| }, | ||
| } | ||
| } | ||
| schema = {} | ||
| plot_bigraph(state=state, schema=schema, core=core, | ||
| filename='show_process_config', | ||
| **plot_settings, | ||
| # show_process_config=True | ||
| ) | ||
| def run_leaf_types(core): | ||
| state = { | ||
| 'store1': 42, | ||
| 'store2': 3.14, | ||
| 'store3': 'hello', | ||
| 'process1': { | ||
| '_type': 'process', | ||
| '_inputs': { | ||
| 'input1': 'integer', | ||
| 'input2': 'float', | ||
| 'input3': 'string', | ||
| }, | ||
| '_outputs': { | ||
| 'output1': 'integer', | ||
| 'output2': 'float', | ||
| 'output3': 'string', | ||
| }, | ||
| 'inputs': { | ||
| 'input1': ['store1'], | ||
| 'input2': ['store2'], | ||
| 'input3': ['store3'], | ||
| }, | ||
| 'outputs': { | ||
| 'output1': ['store1'], | ||
| 'output2': ['store2'], | ||
| 'output3': ['store3'], | ||
| } | ||
| } | ||
| } | ||
| schema = { | ||
| 'store1': 'integer', | ||
| 'store2': 'float', | ||
| 'store3': 'string', | ||
| # process1 can be left unspecified if you don't need its internal typing | ||
| } | ||
| plot_bigraph(state=state, schema=schema, core=core, | ||
| filename='leaf_types', | ||
| **plot_settings, | ||
| show_types=True, | ||
| show_values=True, | ||
| ) | ||
| if __name__ == '__main__': | ||
| # test_simple_store() | ||
| # test_forest() | ||
| # test_nested_composite() | ||
| # test_graphviz() | ||
| # test_bigraph_cell() | ||
| # test_bio_schema() | ||
| # test_flat_composite() | ||
| # test_multi_processes() | ||
| # test_nested_processes() | ||
| # test_cell_hierarchy() | ||
| # test_multiple_disconnected_ports() | ||
| # test_composite_process() | ||
| # test_bidirectional_edges() | ||
| # test_array_paths() | ||
| # test_complex_bigraph() | ||
| test_nested_particle_process() | ||
| core = allocate_core() | ||
| run_simple_store(core) | ||
| run_forest(core) | ||
| run_nested_composite(core) | ||
| run_graphviz(core) | ||
| run_bigraph_cell(core) | ||
| run_bio_schema(core) | ||
| run_flat_composite(core) | ||
| run_multi_processes(core) | ||
| run_nested_processes(core) | ||
| run_cell_hierarchy(core) | ||
| run_multiple_disconnected_ports(core) | ||
| run_composite_process(core) | ||
| run_bidirectional_edges(core) | ||
| run_array_paths(core) | ||
| run_complex_bigraph(core) | ||
| run_nested_particle_process(core) | ||
| run_process_config(core) | ||
| run_leaf_types(core) |
+3
-2
| Metadata-Version: 2.4 | ||
| Name: bigraph-viz | ||
| Version: 0.1.15 | ||
| Version: 1.0.0 | ||
| Summary: A visualization method for displaying the structure of process bigraphs | ||
@@ -222,3 +222,3 @@ Author-email: Eran Agmon <agmon.eran@gmail.com> | ||
| Classifier: Topic :: Software Development :: Libraries | ||
| Requires-Python: >=3.9 | ||
| Requires-Python: >=3.11 | ||
| Description-Content-Type: text/markdown | ||
@@ -228,2 +228,3 @@ License-File: LICENSE | ||
| Requires-Dist: bigraph-schema | ||
| Requires-Dist: process-bigraph | ||
| Requires-Dist: graphviz | ||
@@ -230,0 +231,0 @@ Dynamic: license-file |
+5
-3
@@ -7,6 +7,6 @@ [build-system] | ||
| name = "bigraph-viz" | ||
| version = "0.1.15" | ||
| version = "1.0.0" | ||
| description = "A visualization method for displaying the structure of process bigraphs" | ||
| readme = "README.md" | ||
| requires-python = ">=3.9" | ||
| requires-python = ">=3.11" | ||
| license = { file = "LICENSE" } | ||
@@ -39,2 +39,3 @@ | ||
| "bigraph-schema", | ||
| "process-bigraph", | ||
| "graphviz" | ||
@@ -47,2 +48,3 @@ ] | ||
| [tool.uv.sources] | ||
| bigraph-schema = { path = "../bigraph-schema", editable = true } | ||
| # bigraph-schema = { path = "../bigraph-schema", editable = true } | ||
| # process-bigraph = { path = "../process-bigraph", editable = true } |
| import os | ||
| from pdf2image import convert_from_path | ||
| from PIL import Image | ||
| # Print current working directory | ||
| cwd = os.getcwd() | ||
| print("Current working directory:", cwd) | ||
| # Go up one level in the directory | ||
| cwd = os.path.dirname(cwd) | ||
| # Input PDF file (single page) | ||
| pdf_path = os.path.join(cwd, "notebooks/out/ecoli.pdf") | ||
| # Output directory and files | ||
| output_dir = os.path.join(cwd, "notebooks/out/") | ||
| output_full_res = os.path.join(output_dir, "ecoli_full_res.png") | ||
| output_scaled = os.path.join(output_dir, "ecoli_google_slides.png") | ||
| # Ensure the output directory exists | ||
| os.makedirs(output_dir, exist_ok=True) | ||
| # Convert PDF to an image at high DPI | ||
| images = convert_from_path(pdf_path, dpi=1200) # Use high DPI for sharpness | ||
| if images: | ||
| # Save the **full resolution** image first | ||
| images[0].save(output_full_res, "PNG") | ||
| print(f"Full resolution image saved at: {output_full_res}") | ||
| # Resize image to match Google Slides width (3000 px) | ||
| target_width = 3000 | ||
| aspect_ratio = images[0].width / images[0].height | ||
| target_height = int(target_width / aspect_ratio) | ||
| resized_image = images[0].resize((target_width, target_height), Image.LANCZOS) | ||
| resized_image.save(output_scaled, "PNG") | ||
| print(f"Resized Google Slides image saved at: {output_scaled}") | ||
| else: | ||
| print("Error: No images found in PDF!") |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
1818
1.45%119443
-0.09%16
-5.88%