You're Invited:Meet the Socket Team at RSAC and BSidesSF 2026, March 23–26.RSVP
Socket
Book a DemoSign in
Socket

bigraph-viz

Package Overview
Dependencies
Maintainers
2
Versions
48
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bigraph-viz - pypi Package Compare versions

Comparing version
0.1.15
to
1.0.0
+3
-2
bigraph_viz.egg-info/PKG-INFO
Metadata-Version: 2.4
Name: bigraph-viz
Version: 0.1.15
Version: 1.0.0
Summary: A visualization method for displaying the structure of process bigraphs

@@ -222,3 +222,3 @@ Author-email: Eran Agmon <agmon.eran@gmail.com>

Classifier: Topic :: Software Development :: Libraries
Requires-Python: >=3.9
Requires-Python: >=3.11
Description-Content-Type: text/markdown

@@ -228,2 +228,3 @@ License-File: LICENSE

Requires-Dist: bigraph-schema
Requires-Dist: process-bigraph
Requires-Dist: graphviz

@@ -230,0 +231,0 @@ Dynamic: license-file

+1
-0
bigraph-schema
process-bigraph
graphviz

@@ -6,3 +6,2 @@ AUTHORS.md

bigraph_viz/__init__.py
bigraph_viz/convert.py
bigraph_viz/convert_vivarium_v1.py

@@ -9,0 +8,0 @@ bigraph_viz/dict_utils.py

import pprint
from bigraph_viz.visualize_types import VisualizeTypes, plot_bigraph, get_graphviz_fig
# from bigraph_viz.visualize_types import VisualizeTypes, plot_bigraph, get_graphviz_fig
from bigraph_viz.visualize_types import plot_bigraph, get_graphviz_fig
from bigraph_viz.dict_utils import replace_regex_recursive
from bigraph_viz.methods.generate_graph_dict import generate_graph_dict

@@ -11,1 +13,8 @@

return pretty.pformat(x)
def register_types(core):
core.register_method('generate_graph_dict', generate_graph_dict)
return core

@@ -14,3 +14,3 @@ """

required_schema_keys = {'_default', '_apply', '_check', '_serialize', '_deserialize', '_fold'}
required_schema_keys = {'_default', '_apply', '_check', '_serialize', '_realize', '_fold'}

@@ -17,0 +17,0 @@ optional_schema_keys = {'_type', '_value', '_description', '_type_parameters', '_inherit', '_divide'}

@@ -9,4 +9,5 @@ import os

from bigraph_schema import TypeSystem, is_schema_key, hierarchy_depth
from bigraph_schema import is_schema_key, hierarchy_depth, Edge
from bigraph_viz.dict_utils import absolute_path
from process_bigraph import allocate_core

@@ -24,88 +25,2 @@ # Constants

def get_graph_wires(ports_schema, wires, graph_dict, schema_key, edge_path, bridge_wires=None):
"""
Traverse the port wiring and append wire edges or disconnected ports to graph_dict.
Parameters:
ports_schema (dict): Schema for ports (inputs or outputs)
wires (dict): Wiring structure from the process
graph_dict (dict): Accumulated graph
schema_key (str): Either 'inputs' or 'outputs'
edge_path (tuple): Path of the process node
bridge_wires (dict, optional): Optional rewiring via 'bridge' dict
Returns:
graph_dict (dict): Updated graph dict
"""
wires = wires or {}
ports_schema = ports_schema or {}
inferred_ports = set(ports_schema.keys()) | set(wires.keys())
for port in inferred_ports:
wire = wires.get(port)
bridge = bridge_wires.get(port) if bridge_wires else None
if not wire:
# If not connected, mark as disconnected
edge_type = 'disconnected_input_edges' if schema_key == 'inputs' else 'disconnected_output_edges'
graph_dict[edge_type].append({
'edge_path': edge_path,
'port': port,
'type': schema_key
})
elif isinstance(wire, (list, tuple, str)):
graph_dict = get_single_wire(edge_path, graph_dict, port, schema_key, wire)
elif isinstance(wire, dict):
for subpath, subwire in hierarchy_depth(wires).items():
subport = '/'.join(subpath)
graph_dict = get_single_wire(edge_path, graph_dict, subport, schema_key, subwire)
else:
raise ValueError(f"Unexpected wire type: {wires}")
# Handle optional bridge wiring
if bridge:
target_path = absolute_path(edge_path, tuple(bridge))
edge_key = 'input_edges' if schema_key == 'inputs' else 'output_edges'
graph_dict[edge_key].append({
'edge_path': edge_path,
'target_path': target_path,
'port': f'bridge_{port}',
'type': f'bridge_{schema_key}'
})
return graph_dict
# Append a single port wire connection to graph_dict
def get_single_wire(edge_path, graph_dict, port, schema_key, wire):
"""
Add a connection from a port to its wire target.
Parameters:
edge_path (tuple): Path to the process
graph_dict (dict): Current graph dict
port (str): Name of the port
schema_key (str): Either 'inputs' or 'outputs'
wire (str|list): Wire connection(s)
Returns:
Updated graph_dict
"""
if isinstance(wire, str):
wire = [wire]
else:
wire = [item for item in wire if isinstance(item, str)]
target_path = absolute_path(edge_path[:-1], tuple(wire))
edge_key = 'input_edges' if schema_key == 'inputs' else 'output_edges'
graph_dict[edge_key].append({
'edge_path': edge_path,
'target_path': target_path,
'port': port,
'type': schema_key
})
return graph_dict
# Plot a labeled edge from a port to a process

@@ -121,6 +36,2 @@ def plot_edges(graph, edge, port_labels, port_label_size, state_node_spec, constraint='false'):

if target_name not in graph.body:
label_text = make_label(edge['target_path'][-1])
graph.node(target_name, label=label_text, **state_node_spec)
with graph.subgraph(name=process_name) as sub:

@@ -171,4 +82,5 @@ sub.edge(target_name, process_name, constraint=constraint, label=label,

# Type row
if show_types and (typ := node.get('type')):
# Type row (LEAF ONLY)
is_leaf = node.get('value') is not None
if show_types and is_leaf and (typ := node.get('type')):
typ_str = str(typ)

@@ -190,8 +102,2 @@ if len(typ_str) > type_char_limit:

# make the Graphviz figure
import os
from collections import defaultdict
import graphviz
def get_graphviz_fig(

@@ -220,2 +126,4 @@ graph_dict,

collapse_redundant_processes=False,
collapse_paths=None,
remove_paths=None,
):

@@ -229,2 +137,3 @@ """

Dictionary describing nodes and edges of a simulation bigraph.
collapse_redundant_processes : bool | str | Iterable | dict

@@ -243,2 +152,20 @@ Controls collapsing of processes that share identical port wiring:

collapse_paths : Iterable[Iterable[str]] | Iterable[str] | str | None
Hierarchical paths to collapse as subtrees. For each prefix path P,
all nodes whose path starts with P and is strictly deeper than P are
removed, while the node at P is kept.
Example:
collapse_paths=[['particles']]
keeps: ['particles']
removes: ['particles', 'abc'], ['particles', 'abc', 'def'], ...
remove_paths : Iterable[Iterable[str]] | Iterable[str] | str | None
Hierarchical paths to fully remove. For each prefix path P,
nodes whose path starts with P (including P itself) are removed.
Example:
remove_paths=[['particles']]
removes: ['particles'], ['particles', 'abc'], ...
Returns

@@ -294,3 +221,3 @@ -------

# -------- collapse configuration ----------------------------------------
# -------- collapse configuration: redundant processes -------------------

@@ -330,10 +257,41 @@ def normalize_collapse_arg(arg):

def process_matches_selector(entry, selector):
"""Check if a process entry matches a single selector."""
"""
Match a process entry against a selector.
Entry is (path, path_str, name) where:
- path is a list/tuple like ('particles', 'id123', 'glucose eater')
- path_str is str(path)
- name is leaf node name
Selector forms:
- tuple/list: exact path match OR (if len==1) match any path segment OR (if len>1) match prefix
- str: match leaf name OR full path string OR any path segment
"""
path, path_str, name = entry
path_t = tuple(path)
# tuple/list selectors: exact, prefix, or "segment mark" if length 1
if isinstance(selector, (tuple, list)):
return tuple(selector) == tuple(path)
sel_t = tuple(selector)
return selector == name or selector == path_str
# exact match
if sel_t == path_t:
return True
# single-element tuple means "mark": appears anywhere in the path
if len(sel_t) == 1:
return sel_t[0] in path_t
# multi-element tuple: treat as a prefix path selector
if len(sel_t) <= len(path_t) and path_t[:len(sel_t)] == sel_t:
return True
return False
# string selectors: leaf name, exact stringified path, or any segment match
if isinstance(selector, str):
return selector == name or selector == path_str or selector in path_t
return False
def process_is_selected(entry):

@@ -359,2 +317,126 @@ """Return True if this process is eligible to be collapsed."""

# -------- collapse/remove configuration: hierarchical subtrees ---------
def normalize_prefixes(arg):
"""
Normalize a 'paths' argument into a list of tuple prefixes.
Examples:
None -> []
'particles' -> [('particles',)]
['particles'] -> [('particles',)]
[['particles']] -> [('particles',)]
[['a'], ['b', 'c']] -> [('a',), ('b', 'c')]
"""
if not arg:
return []
# Strings are treated as single-segment paths
if isinstance(arg, (str, bytes)):
return [(arg,)]
# Try to interpret it as an iterable
try:
items = list(arg)
except TypeError:
return [(arg,)]
if not items:
return []
# If first element is not a list/tuple, treat whole thing as one path
if not isinstance(items[0], (list, tuple)):
return [tuple(items)]
# Otherwise each element is a path
prefixes = []
for p in items:
if isinstance(p, (list, tuple)):
prefixes.append(tuple(p))
else:
prefixes.append((p,))
return prefixes
collapsed_prefixes = normalize_prefixes(collapse_paths)
removed_prefixes = normalize_prefixes(remove_paths)
def path_is_hidden(path):
"""
Return True if this path should be removed from the graph_dict
based on collapse/remove prefixes.
- remove_paths: hide root AND descendants
- collapse_paths: hide ONLY descendants, keep root
"""
# If paths are always lists/tuples, you can assert here.
if isinstance(path, str):
# If you ever represent paths as strings, customize as needed.
# For now, treat them as non-hierarchical => never auto-hide.
return False
path_t = tuple(path)
# 1) Removal: root + subtree
for pref in removed_prefixes:
if path_t[:len(pref)] == pref:
return True
# 2) Collapse: only descendants
for pref in collapsed_prefixes:
if len(path_t) > len(pref) and path_t[:len(pref)] == pref:
return True
return False
def prune_subtrees():
"""
Remove all nodes and edges that should be hidden according to
collapse_paths and remove_paths.
- For removed_prefixes: delete root and descendants.
- For collapsed_prefixes: delete descendants only (root kept).
"""
if not collapsed_prefixes and not removed_prefixes:
return
# State and process nodes
graph_dict['state_nodes'] = [
n for n in graph_dict.get('state_nodes', [])
if not path_is_hidden(n['path'])
]
graph_dict['process_nodes'] = [
n for n in graph_dict.get('process_nodes', [])
if not path_is_hidden(n['path'])
]
def filter_edge_list(edges):
new_edges = []
for e in edges:
if path_is_hidden(e['edge_path']):
continue
tpath = e.get('target_path')
if isinstance(tpath, (list, tuple)) and path_is_hidden(tpath):
continue
new_edges.append(e)
return new_edges
for group in [
'input_edges',
'output_edges',
'bidirectional_edges',
'disconnected_input_edges',
'disconnected_output_edges',
]:
if group in graph_dict:
graph_dict[group] = filter_edge_list(graph_dict[group])
# Place edges
if 'place_edges' in graph_dict:
new_place_edges = []
for e in graph_dict['place_edges']:
if path_is_hidden(e['parent']) or path_is_hidden(e['child']):
continue
new_place_edges.append(e)
graph_dict['place_edges'] = new_place_edges
# -------- core helpers --------------------------------------------------

@@ -425,3 +507,3 @@

for path, path_str, _ in selected[1:]:
collapse_map[str(path)] = rep_str
collapse_map[str(path)] = rep_path

@@ -505,4 +587,4 @@ # Draw remaining (unselected) entries individually

visible = not (
(remove_process_place_edges and edge['child'] in process_paths)
or (edge in invisible_edges)
(remove_process_place_edges and edge['child'] in process_paths)
or (edge in invisible_edges)
)

@@ -564,2 +646,5 @@ graph.attr('edge', style='filled' if visible else 'invis')

# First, apply hierarchical collapse/remove rules
prune_subtrees()
add_state_nodes()

@@ -574,3 +659,4 @@ process_paths, collapse_map = add_process_nodes()

])
add_state_nodes()
# TODO: the second add_state_nodes() looks redundant
# add_state_nodes()
add_disconnected_edges()

@@ -583,2 +669,29 @@ rank_node_groups()

def plot_graph(graph_dict,
out_dir='out',
filename=None,
file_format='png',
print_source=False,
options=None
):
# make a figure
options = options or {}
graph = get_graphviz_fig(
graph_dict,
**options)
# display or save results
if print_source:
print(graph.source)
if filename is not None:
out_dir = out_dir or 'out'
os.makedirs(out_dir, exist_ok=True)
fig_path = os.path.join(out_dir, filename)
print(f"Writing {fig_path}")
graph.render(filename=fig_path, format=file_format)
return graph
def plot_bigraph(

@@ -591,2 +704,3 @@ state,

file_format='png',
show_compiled_state=True,
**kwargs

@@ -615,9 +729,14 @@ ):

# Defaults
core = core or VisualizeTypes()
core = core or allocate_core()
schema = schema or {}
schema, state = core.generate(schema, state)
compiled_schema, compiled_state = core.realize(schema, state)
graph_dict = core.generate_graph_dict(schema, state, (), options=traversal_kwargs)
graph_dict = core.call_method(
'generate_graph_dict',
compiled_schema,
compiled_state if show_compiled_state else state,
(), options=traversal_kwargs)
return core.plot_graph(
return plot_graph(
graph_dict,

@@ -631,232 +750,2 @@ filename=filename,

# Visualize Types
def graphviz_any(core, schema, state, path, options, graph):
"""Visualize any type (generic node)."""
schema = schema or {}
if path:
node_spec = {
'name': path[-1],
'path': path,
'value': state if not isinstance(state, dict) else None,
'type': core.representation(schema)
}
graph['state_nodes'].append(node_spec)
if len(path) > 1:
graph['place_edges'].append({'parent': path[:-1], 'child': path})
if isinstance(state, dict):
for key, value in state.items():
if not is_schema_key(key):
graph = core.get_graph_dict(
schema.get(key, {}),
value,
path + (key,),
options,
graph
)
return graph
def graphviz_edge(core, schema, state, path, options, graph):
"""Visualize a process node with input/output/bridge wiring."""
schema = schema or {}
node_spec = {
'name': path[-1],
'path': path,
'value': None,
'type': core.representation(schema)
}
if state.get('address') == 'local:composite' and node_spec not in graph['process_nodes']:
graph['process_nodes'].append(node_spec)
return graphviz_composite(core, schema, state, path, options, graph)
graph['process_nodes'].append(node_spec)
# Wiring
graph = get_graph_wires(schema.get('_inputs', {}), state.get('inputs', {}), graph, 'inputs', path,
state.get('bridge', {}).get('inputs', {}))
graph = get_graph_wires(schema.get('_outputs', {}), state.get('outputs', {}), graph, 'outputs', path,
state.get('bridge', {}).get('outputs', {}))
# Merge bidirectional edges
def key(edge):
return (tuple(edge['edge_path']), tuple(edge['target_path']), edge['port'])
input_set = {key(e): e for e in graph['input_edges']}
output_set = {key(e): e for e in graph['output_edges']}
shared_keys = input_set.keys() & output_set.keys()
for k in shared_keys:
graph['bidirectional_edges'].append({
'edge_path': k[0], 'target_path': k[1], 'port': k[2],
'type': (input_set[k]['type'], output_set[k]['type'])
})
graph['input_edges'] = [e for k, e in input_set.items() if k not in shared_keys]
graph['output_edges'] = [e for k, e in output_set.items() if k not in shared_keys]
if len(path) > 1:
graph['place_edges'].append({'parent': path[:-1], 'child': path})
return graph
def graphviz_none(core, schema, state, path, options, graph):
"""No-op visualizer for nodes with no visualization."""
return graph
def graphviz_map(core, schema, state, path, options, graph):
"""Visualize mappings by traversing key–value pairs."""
value_type = core._find_parameter(schema, 'value')
# Add node for the map container itself
if path:
node_spec = {
'name': path[-1],
'path': path,
'value': None,
'type': core.representation(schema)
}
graph['state_nodes'].append(node_spec)
# Add place edge to parent
if len(path) > 1:
graph['place_edges'].append({'parent': path[:-1], 'child': path})
if isinstance(state, dict):
for key, value in state.items():
if not is_schema_key(key):
graph = core.get_graph_dict(
value_type,
value,
path + (key,),
options,
graph
)
return graph
def graphviz_composite(core, schema, state, path, options, graph):
"""Visualize composite nodes by recursing into their internal structure."""
graph = graphviz_edge(core, schema, state, path, options, graph)
inner_state = state.get('config', {}).get('state') or state
inner_schema = state.get('config', {}).get('composition') or schema
inner_schema, inner_state = core.generate(inner_schema, inner_state)
if len(path) > 1:
graph['place_edges'].append({'parent': path[:-1], 'child': path})
for key, value in inner_state.items():
if not is_schema_key(key) and key not in PROCESS_SCHEMA_KEYS:
graph = core.get_graph_dict(
inner_schema.get(key),
value,
path + (key,),
options,
graph
)
return graph
# dict with different types and their graphviz functions
visualize_types = {
'any': {
'_graphviz': graphviz_any
},
'edge': {
'_graphviz': graphviz_edge
},
'quote': {
'_graphviz': graphviz_none,
},
'map': {
'_graphviz': graphviz_map,
},
'step': {
'_inherit': ['edge']
},
'process': {
'_inherit': ['edge']
},
'composite': {
'_inherit': ['process'],
'_graphviz': graphviz_composite,
},
}
# TODO: we want to visualize things that are not yet complete
class VisualizeTypes(TypeSystem):
def __init__(self):
super().__init__()
self.update_types(visualize_types)
def get_graph_dict(self, schema, state, path, options, graph=None):
path = path or ()
graph = graph or {
'state_nodes': [],
'process_nodes': [],
'place_edges': [],
'input_edges': [],
'output_edges': [],
'bidirectional_edges': [],
'disconnected_input_edges': [],
'disconnected_output_edges': []}
graphviz_function = self.choose_method(
schema,
state,
'graphviz')
if options.get('remove_nodes') and path in options['remove_nodes']:
return graph
return graphviz_function(
self,
schema,
state,
path,
options,
graph)
def generate_graph_dict(self, schema, state, path, options):
full_schema, full_state = self.generate(schema, state)
return self.get_graph_dict(full_schema, full_state, path, options)
def plot_graph(self,
graph_dict,
out_dir='out',
filename=None,
file_format='png',
print_source=False,
options=None
):
# make a figure
options = options or {}
graph = get_graphviz_fig(
graph_dict,
**options)
# display or save results
if print_source:
print(graph.source)
if filename is not None:
out_dir = out_dir or 'out'
os.makedirs(out_dir, exist_ok=True)
fig_path = os.path.join(out_dir, filename)
print(f"Writing {fig_path}")
graph.render(filename=fig_path, format=file_format)
return graph
# Begin Tests

@@ -871,3 +760,3 @@ ###############

def test_simple_store():
def run_simple_store(core):
simple_store_state = {

@@ -882,3 +771,3 @@ 'store1': 1.0,

def test_forest():
def run_forest(core):
forest = {

@@ -895,6 +784,7 @@ 'v0': {

}
plot_bigraph(forest, **plot_settings, filename='forest')
plot_bigraph(forest, **plot_settings, filename='forest', core=core)
def test_nested_composite():
def run_nested_composite(core):
state = {

@@ -912,6 +802,6 @@ 'environment': {

'interval': 1.0,
'address': 'local:composite',
'config': {'_type': 'quote',
'address': 'local:Composite',
'config': {'_type': 'node',
'state': {'grow': {'_type': 'process',
'address': 'local:grow',
'address': 'local:Grow',
'config': {'rate': 0.03},

@@ -921,3 +811,3 @@ 'inputs': {'mass': ['mass']},

'divide': {'_type': 'process',
'address': 'local:divide',
'address': 'local:Divide',
'config': {'agent_id': '0',

@@ -933,6 +823,6 @@ 'agent_schema': {'mass': 'float'},

'environment': ['environment']}},
'composition': {'global_time': 'float'},
'schema': {'global_time': 'float'},
'interface': {'inputs': {}, 'outputs': {}},
'emitter': {'path': ['emitter'],
'address': 'local:ram-emitter',
'address': 'local:RAMEmitter',
'config': {},

@@ -943,8 +833,11 @@ 'mode': 'none',

}}}}
plot_bigraph(state,
filename='nested_composite',
**plot_settings)
plot_bigraph(
state,
core=core,
filename='nested_composite',
**plot_settings)
def test_graphviz():
def run_graphviz(core):
cell = {

@@ -958,3 +851,3 @@ 'config': {

# 'config': {},
# 'address': 'local:cell', # TODO -- this is where the ports/inputs/outputs come from
# 'address': 'local:Cell', # TODO -- this is where the ports/inputs/outputs come from
'internal': 1.0,

@@ -978,4 +871,3 @@ '_inputs': {

core = VisualizeTypes()
graph_dict = core.generate_graph_dict(
graph_dict = core.call_method('generate_graph_dict',
{},

@@ -986,19 +878,33 @@ cell,

core.plot_graph(
plot_graph(
graph_dict,
out_dir='out',
filename='test_graphviz'
filename='run_graphviz'
)
def test_bigraph_cell():
class Cell(Edge):
def inputs(self):
return {
'nutrients': 'float',
}
def outputs(self):
return {
'secretions': 'float',
'biomass': 'float',
}
def run_bigraph_cell(core):
cell = {
'config': {
'_type': 'map[float]',
'a': 11.0, # {'_type': 'float', '_value': 11.0},
'a': 11.0,
'b': 3333.33},
'cell': {
'_type': 'process', # TODO -- this should also accept process, step, but how in bigraph-schema?
'config': {},
'address': 'local:cell', # TODO -- this is where the ports/inputs/outputs come from
'config': {'param1': 42},
'address': 'local:Cell', # TODO -- this is where the ports/inputs/outputs come from
'internal': 1.0,

@@ -1019,3 +925,3 @@ '_inputs': {

}
}
},
}

@@ -1025,4 +931,5 @@

filename='bigraph_cell',
core=core,
show_values=True,
# show_types=True,
show_types=True,
**plot_settings

@@ -1032,4 +939,3 @@ )

def test_bio_schema():
core = VisualizeTypes()
def run_bio_schema(core):
b = {

@@ -1044,4 +950,4 @@ 'environment': {

'_type': 'process',
'_inputs': {'DNA': 'any'},
'_outputs': {'RNA': 'any'},
'_inputs': {'DNA': 'node'},
'_outputs': {'RNA': 'node'},
'inputs': {

@@ -1059,7 +965,8 @@ 'DNA': ['chromosome']

'fields': {},
'barriers': {},
'barriers': {
'_type': 'node'},
'diffusion': {
'_type': 'process',
'_inputs': {'fields': 'any'},
'_outputs': {'fields': 'any'},
'_inputs': {'fields': 'node'},
'_outputs': {'fields': 'node'},
'inputs': {

@@ -1078,8 +985,12 @@ 'fields': ['fields', ]

def test_flat_composite():
def run_flat_composite(core):
flat_composite_spec = {
'store1.1': 'float',
'store1.2': 'int',
'store1.2': 'integer',
'process1': {
'_type': 'process',
'_outputs': {
'port1': 'node',
'port2': 'node',
},
'outputs': {

@@ -1093,4 +1004,4 @@ 'port1': ['store1.1'],

'_inputs': {
'port1': 'any',
'port2': 'any',
'port1': 'node',
'port2': 'node',
},

@@ -1103,3 +1014,5 @@ 'inputs': {

}
plot_bigraph(flat_composite_spec,
core=core,
rankdir='RL',

@@ -1110,10 +1023,10 @@ filename='flat_composite',

def test_multi_processes():
def run_multi_processes(core):
process_schema = {
'_type': 'process',
'_inputs': {
'port1': 'Any',
'port1': 'node',
},
'_outputs': {
'port2': 'Any'
'port2': 'node'
},

@@ -1127,3 +1040,5 @@ }

}
plot_bigraph(processes_spec,
core=core,
rankdir='BT',

@@ -1134,9 +1049,13 @@ filename='multiple_processes',

def test_nested_processes():
def run_nested_processes(core):
nested_process_spec = {
'store1': {
'store1.1': 'float',
'store1.2': 'int',
'store1.2': 'integer',
'process1': {
'_type': 'process',
'_inputs': {
'port1': 'node',
'port2': 'node',
},
'inputs': {

@@ -1149,2 +1068,6 @@ 'port1': ['store1.1'],

'_type': 'process',
'_outputs': {
'port1': 'node',
'port2': 'node',
},
'outputs': {

@@ -1158,2 +1081,5 @@ 'port1': ['store1.1'],

'_type': 'process',
'_inputs': {
'port1': 'node',
},
'inputs': {

@@ -1164,13 +1090,15 @@ 'port1': ['store1'],

}
plot_bigraph(nested_process_spec,
**plot_settings,
core=core,
filename='nested_processes')
def test_cell_hierarchy():
core = VisualizeTypes()
def run_cell_hierarchy(core):
core.register_type('concentrations', 'float')
core.access('concentrations')
core.register('concentrations', 'float')
core.register('sequences', 'float')
core.register('membrane', {
core.register_type('sequences', 'float')
core.register_type('membrane', {
'transporters': 'concentrations',

@@ -1189,3 +1117,3 @@ 'lipids': 'concentrations',

core.register('cytoplasm', {
core.register_type('cytoplasm', {
'metabolites': 'concentrations',

@@ -1201,7 +1129,7 @@ 'ribosomal complexes': 'concentrations',

core.register('nucleoid', {
core.register_type('nucleoid', {
'chromosome': {
'genes': 'sequences'}})
core.register('cell', {
core.register_type('cell', {
'membrane': 'membrane',

@@ -1230,8 +1158,7 @@ 'cytoplasm': 'cytoplasm',

filename='cell_hierarchy',
show_compiled_state=True,
**plot_settings)
def test_multiple_disconnected_ports():
core = VisualizeTypes()
def run_multiple_disconnected_ports(core):
spec = {

@@ -1241,8 +1168,8 @@ 'process': {

'_inputs': {
'port1': 'Any',
'port2': 'Any',
'port1': 'node',
'port2': 'node',
},
'_outputs': {
'port1': 'Any',
'port2': 'Any',
'port1': 'node',
'port2': 'node',
},

@@ -1260,22 +1187,23 @@ },

def test_composite_process():
core = VisualizeTypes()
def run_composite_process(core):
spec = {
'composite': {
'_type': 'composite',
'_inputs': {'port1': 'any'},
'_outputs': {'port2': 'any'},
'_type': 'process',
'_inputs': {'port1': 'node'},
'_outputs': {'port2': 'node'},
'address': 'local:Composite',
'inputs': {'port1': ['external store']},
'store1': 'any',
'store2': 'any',
'bridge': {
'inputs': {'port1': ['store1']},
'outputs': {'port2': ['store2']}},
'process1': {
'_type': 'process',
'_inputs': {'port3': 'any'},
'_outputs': {'port4': 'any', },
'inputs': {'port3': ['store1']},
'outputs': {'port4': ['store2']}}}}
'config': {
'state': {
'store1': 'node',
'store2': 'node',
'process1': {
'_type': 'process',
'_inputs': {'port3': 'node'},
'_outputs': {'port4': 'node'},
'inputs': {'port3': ['store1']},
'outputs': {'port4': ['store2']}}},
'bridge': {
'inputs': {'port1': ['store1']},
'outputs': {'port2': ['store2']}}}}}

@@ -1289,10 +1217,8 @@ plot_bigraph(

def test_bidirectional_edges():
core = VisualizeTypes()
def run_bidirectional_edges(core):
spec = {
'process1': {
'_type': 'process',
'_inputs': {'port1': 'any'},
'_outputs': {'port1': 'any'},
'_inputs': {'port1': 'node'},
'_outputs': {'port1': 'node'},
'inputs': {'port1': ['external store']},

@@ -1302,4 +1228,4 @@ 'outputs': {'port1': ['external store']}},

'_type': 'process',
'_inputs': {'port3': 'any'},
'_outputs': {'port4': 'any'},
'_inputs': {'port3': 'node'},
'_outputs': {'port4': 'node'},
'inputs': {'port3': ['external store']},

@@ -1317,2 +1243,22 @@ 'outputs': {'port4': ['external store']}

class DynamicFBA(Edge):
config_schema = {
'model_file': 'string',
'kinetic_params': 'map[tuple[float,float]]',
'substrate_update_reactions': 'map[string]',
'bounds': 'map[map[float]]',
}
def inputs(self):
return {
'substrates': 'map[float]',
}
def outputs(self):
return {
'biomass': 'float',
'substrates': 'map[float]',
}
def generate_spec_and_schema(n_rows, n_cols):

@@ -1335,5 +1281,5 @@ spec = {'cells': {}}

'acetate': ['..', 'fields', 'acetate', i, j],
'biomass': ['..', 'fields', 'biomass', i, j],
'glucose': ['..', 'fields', 'glucose', i, j],
}
},
'biomass': ['..', 'fields', 'biomass', i, j]
},

@@ -1343,5 +1289,5 @@ 'outputs': {

'acetate': ['..', 'fields', 'acetate', i, j],
'biomass': ['..', 'fields', 'biomass', i, j],
'glucose': ['..', 'fields', 'glucose', i, j],
}
},
'biomass': ['..', 'fields', 'biomass', i, j]
}

@@ -1368,5 +1314,3 @@ }

def test_array_paths():
core = VisualizeTypes()
def run_array_paths(core):
n_rows, n_cols = 2, 1 # or any desired shape

@@ -1383,5 +1327,3 @@ spec, schema = generate_spec_and_schema(n_rows, n_cols)

def test_complex_bigraph():
core = VisualizeTypes()
def run_complex_bigraph(core):
n_rows, n_cols = 6, 6 # or any desired shape

@@ -1401,5 +1343,27 @@ spec, schema = generate_spec_and_schema(n_rows, n_cols)

def test_nested_particle_process():
core = VisualizeTypes()
class Particles(Edge):
def inputs(self):
return {
'particles': 'map[particle]',
'fields': 'map[array]',
}
def outputs(self):
return {
'particles': 'map[particle]',
'fields': 'map[array]',
}
def run_nested_particle_process(core):
core.register_type('particle', {
'id': 'string',
'position': 'tuple[float,float]',
'size': 'float',
'mass': 'float',
'local': 'map[float]',
'exchange': 'map[float]'
})
state = {

@@ -1436,3 +1400,3 @@ "particles": {

"lower": "-2.0",
"upper": "!nil"},
"upper": None},
"ATPM": {

@@ -1599,3 +1563,3 @@ "lower": "1.0",

}
composition = {
schema = {
'particles': {

@@ -1606,3 +1570,3 @@ '_type': 'map',

'address': {'_type': 'string', '_default': 'local:DynamicFBA'},
'config': {'_type': 'quote', '_default': {
'config': {'_type': 'node', '_default': {
'model_file': 'textbook',

@@ -1622,3 +1586,3 @@ 'kinetic_params': {'glucose': (0.5, 1), 'acetate': (0.5, 2)},

plot_bigraph(state=state, schema=composition, core=core,
plot_bigraph(state=state, schema=schema, core=core,
filename='nested_particle_process',

@@ -1628,19 +1592,101 @@ **plot_settings,

def run_process_config(core):
state = {
'process1': {
'_type': 'process',
'config': {'param1': 10, 'param2': 20},
'_inputs': {
'input1': 'node',
},
'_outputs': {
'output1': 'node',
},
'inputs': {
'input1': ['store1'],
},
'interval': 1.0,
},
'process2': {
'_type': 'step',
'config': {'a': 1, 'b': 2},
'_inputs': {
'interval': 'node',
'input1': 'node',
},
'inputs': {
'interval': ['process1', 'interval'],
'input1': ['store1'],
},
}
}
schema = {}
plot_bigraph(state=state, schema=schema, core=core,
filename='show_process_config',
**plot_settings,
# show_process_config=True
)
def run_leaf_types(core):
state = {
'store1': 42,
'store2': 3.14,
'store3': 'hello',
'process1': {
'_type': 'process',
'_inputs': {
'input1': 'integer',
'input2': 'float',
'input3': 'string',
},
'_outputs': {
'output1': 'integer',
'output2': 'float',
'output3': 'string',
},
'inputs': {
'input1': ['store1'],
'input2': ['store2'],
'input3': ['store3'],
},
'outputs': {
'output1': ['store1'],
'output2': ['store2'],
'output3': ['store3'],
}
}
}
schema = {
'store1': 'integer',
'store2': 'float',
'store3': 'string',
# process1 can be left unspecified if you don't need its internal typing
}
plot_bigraph(state=state, schema=schema, core=core,
filename='leaf_types',
**plot_settings,
show_types=True,
show_values=True,
)
if __name__ == '__main__':
# test_simple_store()
# test_forest()
# test_nested_composite()
# test_graphviz()
# test_bigraph_cell()
# test_bio_schema()
# test_flat_composite()
# test_multi_processes()
# test_nested_processes()
# test_cell_hierarchy()
# test_multiple_disconnected_ports()
# test_composite_process()
# test_bidirectional_edges()
# test_array_paths()
# test_complex_bigraph()
test_nested_particle_process()
core = allocate_core()
run_simple_store(core)
run_forest(core)
run_nested_composite(core)
run_graphviz(core)
run_bigraph_cell(core)
run_bio_schema(core)
run_flat_composite(core)
run_multi_processes(core)
run_nested_processes(core)
run_cell_hierarchy(core)
run_multiple_disconnected_ports(core)
run_composite_process(core)
run_bidirectional_edges(core)
run_array_paths(core)
run_complex_bigraph(core)
run_nested_particle_process(core)
run_process_config(core)
run_leaf_types(core)
Metadata-Version: 2.4
Name: bigraph-viz
Version: 0.1.15
Version: 1.0.0
Summary: A visualization method for displaying the structure of process bigraphs

@@ -222,3 +222,3 @@ Author-email: Eran Agmon <agmon.eran@gmail.com>

Classifier: Topic :: Software Development :: Libraries
Requires-Python: >=3.9
Requires-Python: >=3.11
Description-Content-Type: text/markdown

@@ -228,2 +228,3 @@ License-File: LICENSE

Requires-Dist: bigraph-schema
Requires-Dist: process-bigraph
Requires-Dist: graphviz

@@ -230,0 +231,0 @@ Dynamic: license-file

@@ -7,6 +7,6 @@ [build-system]

name = "bigraph-viz"
version = "0.1.15"
version = "1.0.0"
description = "A visualization method for displaying the structure of process bigraphs"
readme = "README.md"
requires-python = ">=3.9"
requires-python = ">=3.11"
license = { file = "LICENSE" }

@@ -39,2 +39,3 @@

"bigraph-schema",
"process-bigraph",
"graphviz"

@@ -47,2 +48,3 @@ ]

[tool.uv.sources]
bigraph-schema = { path = "../bigraph-schema", editable = true }
# bigraph-schema = { path = "../bigraph-schema", editable = true }
# process-bigraph = { path = "../process-bigraph", editable = true }
import os
from pdf2image import convert_from_path
from PIL import Image
# Print current working directory
cwd = os.getcwd()
print("Current working directory:", cwd)
# Go up one level in the directory
cwd = os.path.dirname(cwd)
# Input PDF file (single page)
pdf_path = os.path.join(cwd, "notebooks/out/ecoli.pdf")
# Output directory and files
output_dir = os.path.join(cwd, "notebooks/out/")
output_full_res = os.path.join(output_dir, "ecoli_full_res.png")
output_scaled = os.path.join(output_dir, "ecoli_google_slides.png")
# Ensure the output directory exists
os.makedirs(output_dir, exist_ok=True)
# Convert PDF to an image at high DPI
images = convert_from_path(pdf_path, dpi=1200) # Use high DPI for sharpness
if images:
# Save the **full resolution** image first
images[0].save(output_full_res, "PNG")
print(f"Full resolution image saved at: {output_full_res}")
# Resize image to match Google Slides width (3000 px)
target_width = 3000
aspect_ratio = images[0].width / images[0].height
target_height = int(target_width / aspect_ratio)
resized_image = images[0].resize((target_width, target_height), Image.LANCZOS)
resized_image.save(output_scaled, "PNG")
print(f"Resized Google Slides image saved at: {output_scaled}")
else:
print("Error: No images found in PDF!")