
Security News
OWASP 2025 Top 10 Adds Software Supply Chain Failures, Ranked Top Community Concern
OWASP’s 2025 Top 10 introduces Software Supply Chain Failures as a new category, reflecting rising concern over dependency and build system risks.
Enoki is a finite state machine library for asynchronous event based systems.
import enoki
from enoki import State
class Ping(State):
def on_state(self, shared_state):
print("Ping!")
return Pong
class Pong(State):
def on_state(self, shared_state):
print("Pong!")
return Ping
class ErrorState(State):
def on_state(self, shared_state):
print("Error occurred")
# Create and run the state machine
fsm = enoki.StateMachine(
initial_state=Ping,
error_state=ErrorState
)
# Single step execution
fsm.tick() # Prints "Ping!" and transitions to Pong
States are the fundamental building blocks of your FSM. Each state inherits from the State base class and implements lifecycle methods:
on_enter(shared_state): Called when entering the stateon_state(shared_state): Main state logic (required)on_leave(shared_state): Called when leaving the stateon_fail(shared_state): Called when retry limit is exceededon_timeout(shared_state): Called when state timeout occursStates can also define optional parameters:
TIMEOUT: Timeout in seconds; if set, the FSM will call on_timeout if the state waits too long for a message.RETRIES: Number of retries before failure; if set, the FSM will call on_fail when the retry limit is exceeded.TERMINAL: If set to True, marks the state as terminal and causes the FSM to exit when reached.CAN_DWELL: If set to True, allows the state to wait indefinitely for messages, even without a timeout.class ExampleState(State):
TIMEOUT = 30 # Optional: timeout in seconds
RETRIES = 3 # Optional: number of retries before failure
TERMINAL = False # Optional: mark as terminal state
CAN_DWELL = False # Optional: allow indefinite waiting
def on_enter(self, shared_state):
print("Entering state")
def on_state(self, shared_state):
# Your state logic here
if some_condition:
return NextState # Transition to NextState
elif should_retry:
return ExampleState # Retry current state (decrements retry counter)
else:
return # Stay in current state (wait)
def on_leave(self, shared_state):
print("Leaving state")
States can return different values to control transitions:
NextState: Transition to a different stateAgain: Immediate continuation; re-enters the current state from on_state (does not reset retries/timer)Repeat: Renewal; restarts the current state from on_enter (resets retries/timer)Restart: Resets the retry counter, re-enters the state from on_enter, and waits for the next messageRetry: Decrements the retry counter, re-enters the state from on_enter immediatelyPush(State1, State2, ...): Push states onto stack and transition to the firstPop: Pop and transition to the top state from stackNone or Unhandled: Stay in current state (wait for next message)The SharedState object is passed to all state methods and contains:
fsm: Reference to the state machinecommon: Shared data object for passing information between statesmsg: Current message being processedclass DataProcessor(State):
def on_state(self, shared_state):
# Access shared data
shared_state.common.processed_count += 1
# Check current message
if shared_state.msg and shared_state.msg['type'] == 'data':
# Process the message
return ProcessingComplete
return None # Wait for more messages
Message-driven state machines handle external events and messages using an internal queue. Use send_message() to enqueue messages for processing:
fsm = enoki.StateMachine(
initial_state=WaitingState,
error_state=ErrorState,
trap_fn=handle_unprocessed_messages # Optional message handler
)
# Send messages to the state machine
fsm.send_message({'type': 'start', 'data': 'hello'})
# Process messages
while not fsm.is_finished:
fsm.tick() # Processes the next message in the queue
Note: The state machine manages its own internal message queue. Use
send_message()to add messages andtick()to process them.
Use Push and Pop for hierarchical state management:
class MainMenu(State):
def on_state(self, shared_state):
if shared_state.msg['action'] == 'enter_submenu':
# Push current state and transition to submenu
return Push(SubMenu, SubMenuOption1, SubMenuOption2)
class SubMenu(State):
def on_state(self, shared_state):
if shared_state.msg['action'] == 'back':
# Return to previous state
return Pop
Enoki is designed for asynchronous systems where operations are initiated in on_enter and responses are handled in on_state:
class NetworkRequest(State):
TIMEOUT = 10 # 10 second timeout
RETRIES = 3 # Retry up to 3 times
def on_enter(self, shared_state):
# Initiate the async operation when entering the state
self.request_id = send_network_request_async()
print(f"Sent network request {self.request_id}")
def on_state(self, shared_state):
# Check for response messages
if shared_state.msg and shared_state.msg.get('request_id') == self.request_id:
if shared_state.msg['status'] == 'success':
return ProcessResponse
elif shared_state.msg['status'] == 'error':
return type(self) # Retry the request
# No matching response yet, keep waiting
return None
def on_timeout(self, shared_state):
print("Network request timed out")
return type(self) # Retry on timeout
def on_fail(self, shared_state):
print("Network request failed after all retries")
return ErrorState
class DataContainer:
def __init__(self):
self.user_data = {}
self.session_id = None
fsm = enoki.StateMachine(
initial_state=LoginState,
error_state=ErrorState,
common_data=DataContainer()
)
Enoki provides two mechanisms for handling messages that don't need to reach individual states:
Filter Function: Pre-processes messages before they reach states. If the filter returns True, the message is consumed and won't be passed to the current state. Useful for handling global messages like heartbeats or status updates.
Trap Function: Handles messages that states don't process (when on_state returns None). This catches "unhandled" messages and can be used for logging, error reporting, or default processing.
def message_filter(shared_state):
# Handle global messages that don't need state-specific processing
if shared_state.msg and shared_state.msg.get('type') == 'heartbeat':
shared_state.common.last_heartbeat = time.time()
return True # Message consumed, don't pass to state
return False # Let state handle the message
def message_trap(shared_state):
# Handle messages that states didn't process
msg = shared_state.msg
if msg:
print(f"Unhandled message in state {shared_state.fsm._current.name}: {msg}")
# Could log, raise exception, or take other action
fsm = enoki.StateMachine(
initial_state=StartState,
error_state=ErrorState,
filter_fn=message_filter,
trap_fn=message_trap
)
The StateMachine constructor accepts several configuration options:
fsm = enoki.StateMachine(
initial_state=StartState, # Required: Starting state
error_state=ErrorState, # Required: Default error handler
filter_fn=message_filter, # Optional: Pre-filter messages
trap_fn=handle_unprocessed, # Optional: Handle unprocessed messages
on_error_fn=error_handler, # Optional: Global error handler
log_fn=print, # Optional: Logging function
transition_fn=log_transitions, # Optional: Transition callback
common_data=SharedData() # Optional: Shared state object
)
Generate visual representations of your state machine:
# Generate Mermaid flowchart
fsm.save_mermaid_flowchart('state_diagram.mmd')
# Generate Graphviz digraph
fsm.save_graphviz_digraph('state_diagram.dot')
The library includes several complete examples:
freerun.py)Demonstrates a simple ping-pong state machine with retry logic that runs indefinitely.
blocking.py)Shows how to create a state machine that waits for specific messages, with shared state management and message trapping.
event_driven.py)Illustrates timeout handling and dwell states in an event-driven architecture.
Enoki provides comprehensive error handling:
StateRetryLimitError: Raised when a state exceeds its retry limitStateTimedOut: Raised when a state exceeds its timeout durationMissingOnStateHandler: Raised when a state lacks the required on_state methodEmptyStateStackError: Raised when attempting to pop from an empty state stackBlockedInUntimedState: Raised when FSM is blocked in a state without timeouton_state: This is the only required method for statesTIMEOUTon_fail for retry limit scenarioscommonPush/Pop for hierarchical state management# Copy enoki.py to your project directory
# No external dependencies required - uses only Python standard library
Enoki is released under the MIT License. See the LICENSE file for details.
Enoki was originally developed at Keyme under the name Mortise by Jeff Ciesielski and Lianne Lairmore for robotics control.
FAQs
A framework for creating and managing finite state machines (FSMs).
We found that enoki demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
OWASP’s 2025 Top 10 introduces Software Supply Chain Failures as a new category, reflecting rising concern over dependency and build system risks.

Research
/Security News
Socket researchers discovered nine malicious NuGet packages that use time-delayed payloads to crash applications and corrupt industrial control systems.

Security News
Socket CTO Ahmad Nassri discusses why supply chain attacks now target developer machines and what AI means for the future of enterprise security.