
Security News
How Enterprise Security Is Adapting to AI-Accelerated Threats
Socket CTO Ahmad Nassri discusses why supply chain attacks now target developer machines and what AI means for the future of enterprise security.
dataknobs-fsm
Advanced tools
Finite State Machine framework with data modes, resource management, and streaming support
Finite State Machine framework with data modes, resource management, and streaming support.
pip install dataknobs-fsm
from dataknobs_fsm import SimpleFSM
from dataknobs_fsm.core.data_modes import DataHandlingMode
# Define configuration
config = {
"name": "data_pipeline",
"states": [
{"name": "start", "is_start": True},
{"name": "process"},
{"name": "end", "is_end": True}
],
"arcs": [
{
"from": "start",
"to": "process",
"transform": {
"type": "inline",
"code": "lambda data, ctx: {**data, 'processed': True}"
}
},
{"from": "process", "to": "end"}
]
}
# Create and run FSM
fsm = SimpleFSM(config, data_mode=DataHandlingMode.COPY)
result = fsm.process({"input": "data"})
print(f"Result: {result['data']}")
from dataknobs_fsm import AdvancedFSM, ExecutionMode
import asyncio
async def debug_example():
# Create FSM with debug mode
fsm = AdvancedFSM(
"config.yaml",
execution_mode=ExecutionMode.DEBUG
)
# Add breakpoint
fsm.add_breakpoint("process")
# Create context and run
context = fsm.create_context({"input": "data"})
await fsm.run_until_breakpoint(context)
print(f"Stopped at: {context.current_state}")
# Continue execution
await fsm.step(context)
asyncio.run(debug_example())
The examples/ directory contains comprehensive examples:
# Navigate to the FSM package
cd packages/fsm
# Run the database ETL example
uv run python examples/database_etl.py
# Run the data processing pipeline
uv run python examples/data_pipeline_example.py
# Run streaming example
uv run python examples/end_to_end_streaming.py
# Run with custom parameters
uv run python examples/database_etl.py --batch-size 500
The FSM framework provides three data handling modes:
from dataknobs_fsm import SimpleFSM
from dataknobs_fsm.core.data_modes import DataHandlingMode
# Use COPY mode for safety
fsm = SimpleFSM(config, data_mode=DataHandlingMode.COPY)
# Use REFERENCE mode for large datasets
fsm = SimpleFSM(config, data_mode=DataHandlingMode.REFERENCE)
# Use DIRECT mode for performance
fsm = SimpleFSM(config, data_mode=DataHandlingMode.DIRECT)
For LLM-specific integrations, workflows, and examples, please see the dataknobs-llm package:
dataknobs_llm.fsm_integrationpackages/llm/README.md for FSM integration guideThe LLM package provides comprehensive LLM abstractions, providers, and FSM integration capabilities.
For detailed documentation, see:
Run the tests with:
cd packages/fsm
uv run pytest tests/ -v
This package is part of the DataKnobs ecosystem. For development setup and guidelines, see the main repository README.
Licensed under the same terms as the DataKnobs project.
FAQs
Finite State Machine framework with data modes, resource management, and streaming support
We found that dataknobs-fsm demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
Socket CTO Ahmad Nassri discusses why supply chain attacks now target developer machines and what AI means for the future of enterprise security.

Security News
Learn the essential steps every developer should take to stay secure on npm and reduce exposure to supply chain attacks.

Security News
Experts push back on new claims about AI-driven ransomware, warning that hype and sponsored research are distorting how the threat is understood.