restate
[!NOTE]
This module is in the pre-release state. No API changes are planned, but some things may still break.
This also means that currently there is a lack of tests and typechecks.
[!NOTE]
This README should make you familiar with the library, but it's not a 100%-full documentation. Somewhere in the future, full-fledged docs will be built (stay tuned).
A universal state manager for Python with a simple, consistent API.
Works with any storage implementation (or combination of) and makes handling application state effortless.
Features hierarchical organization, event bubbling, and derived values.
Install it with a package manager of your choice (pip, poetry, uv, pdm, etc.):
pip install restate
Quick Start
Let's store some users in memory.
from restate import ControllerSync, StateEvent
controller = ControllerSync()
def on_users_change(event: StateEvent[ControllerSync]):
new_users = event.get_state('/users')
print(f"Users changed! New value: {new_users}")
controller.subscribe("/users", on_users_change)
controller.set_state("/users/1", {"name": "Alice", "role": "admin"})
controller.set_state("/users/2", {"name": "Bob", "role": "user"})
print(controller.get_state("/users/1"))
controller.derive(
dest="/stats/admin_count",
source="/users",
transform=lambda users: sum(u["role"] == "admin" for u in users.values())
)
print("Admin count", controller.get_state("/stats/admin_count"))
Core Concepts
In restate, when you create a controller, you create a unified way to read and write data.
Obviously, Python has a tool for that, it's called a dictionary.
But restate strength is what is done with the state. Here's a basic set of features you get:
- State subscriptions: Run a callback on every state change on some path.
- Bubbling included: subscribe to parent paths to receive events about children.
- Wildcard paths: subscribe to partial paths with some sections omitted.
- Derived state: automatically calculate a path value out of one or more paths. Once per state change, supports all other path features.
- Early exit: by default, writes and notifications are not triggered if you write a state equal to previous. Easily configurable per-write via eq_func.
- Pings: Wanted to notify a path even though it hasn't changed? Sure you did.
- Auto-tracker: automatically calculate the callback dependencies on each run and re-run as needed, without additional configuration or mess.
- Storage-independent: Uses a simple unified interface to communicate with storage
- Batteries included: Several common backend implementations, like in-memory backend, cache layer and filesystem backend are included by default.
- Hybrid backend: Built-in way to mount different backends on different paths to build a complex storage structure.
Controllers
Controllers are the main interface for interacting with state. They manage state access, event dispatch, and synchronization.
Two types of controllers are available:
from restate import ControllerSync
sync_controller = ControllerSync(backend)
from restate import ControllerAsync
async_controller = ControllerAsync(backend)
State Hierarchy
State in restate is hierarchical. Think of it as a tree structure, similar to a filesystem:
controller.set_state("/config/theme", "dark")
controller.set_state("/config/colors/primary", "#FF0000")
controller.set_state("/config/colors/secondary", "#00FF00")
print(controller.get_state("/config"))
{
"theme": "dark",
"colors": {
"primary": "#FF0000",
"secondary": "#00FF00"
}
}
Paths
Both string and Path
objects are accepted:
from restate import ROOT_PATH
from pathlib import PurePosixPath as Path
controller.get_state("/users/1")
controller.get_state(ROOT_PATH / "users" / "1")
controller.get_state(Path("/") / "users" / "1")
controller.get_state("users/1")
controller.get_state(Path("users") / "1")
Storage Backends
In-Memory Backend (default)
Fast, but disappears on restart:
from restate import InMemoryBackend
backend = InMemoryBackend()
controller = ControllerSync(backend)
controller.set_state("/temp", "i will be gone after restart")
Filesystem Backend
Persistent storage using the filesystem. Supports both sync and async operations:
from restate import FileSystemSyncBackend, FileSystemAsyncBackend
backend = FileSystemSyncBackend(
"./state",
serializer=json_serializer
)
backend = FileSystemAsyncBackend("./state")
from restate import Serializer
yaml_serializer = Serializer(
extension="yaml",
raw_type=str,
serialize=yaml.dump,
deserialize=yaml.load
)
backend = FileSystemSyncBackend("./state", serializer=yaml_serializer)
The filesystem backend creates a directory structure that mirrors your state hierarchy:
./state/
└── config/
├── theme.json
└── colors/
├── primary.json
└── secondary.json
Hybrid Backend
Mount different backends at different paths:
from restate import HybridSyncBackend
hybrid = HybridSyncBackend(InMemoryBackend())
hybrid.mount("/persistent", FileSystemSyncBackend("./state"))
controller = ControllerSync(hybrid)
controller.set_state("/temporary", "volatile")
controller.set_state("/persistent/important", "saved to disk")
Caching Backend
The caching backend provides a performance-optimizing wrapper around any other backend, implementing an in-memory cache with configurable flush behaviors:
from restate import CachingSyncBackend, FileSystemSyncBackend
backend = CachingSyncBackend(
FileSystemSyncBackend("./state"),
flush_interval=5.0,
flush_on_read=False,
flush_on_write=True,
flush_on_delete=True
)
controller = ControllerSync(backend)
Custom Backends
Implement your own backend by subclassing Backend
or AsyncBackend
:
from restate import Backend
import redis
class RedisBackend(Backend):
def __init__(self):
self.redis = redis.Redis()
def read(self, path, default=None):
value = self.redis.get(str(path))
return json.loads(value) if value else default
def write(self, path, value):
self.redis.set(str(path), json.dumps(value))
def delete(self, path):
self.redis.delete(str(path))
Working with State
Basic Operations
controller.set_state("/users/1", {"name": "Alice"})
user = controller.get_state("/users/1")
controller.backend.delete("/users/1")
Default Values
volume = controller.get_state("/settings/volume", default=50)
volume = controller.get_state(
"/settings/volume",
default=50,
write_default=True
)
Equality Comparison
Control when state updates trigger events:
def compare_users(old, new):
if not (old and new):
return False
return old["id"] == new["id"]
controller.set_state(
"/current_user",
{"id": 1, "last_seen": "now"},
eq_func=compare_users
)
Async Support
restate has an async counterpart to the ControllerSync.
It has async for most methods and supports both sync and async backends:
from restate import ControllerAsync, InMemoryBackend
controller = ControllerAsync(InMemoryBackend())
async def main():
await controller.set_state("/counter", 0)
value = await controller.get_state("/counter")
async def on_counter_change(event):
value = event.new_value
await controller.set_state("/doubled", value * 2)
controller.subscribe("/counter", on_counter_change)
await controller.derive(
dest="/counter/squared",
source="/counter",
transform=lambda x: x ** 2
)
@app.get("/api/counter")
async def get_counter():
return await controller.get_state("/counter")
@app.post("/api/counter")
async def increment_counter():
current = await controller.get_state("/counter", 0)
await controller.set_state("/counter", current + 1)
Event System
Subscriptions
Subscribe to state changes:
def handler(event: StateEvent[ControllerSync]):
print(f"Config changed to: {event.get_state('/config')}")
controller.subscribe("/config", handler)
...and, if needed, unsubscribe:
controller.unsubscribe("/config", handler)
Wildcard (glob) paths are also available (wildcard must be the whole section, and cannot span multiple sections):
def any_user_name_change(event):
old_name = event.old_value
new_name = event.new_value
user_id = event.emitting_path.parent.name
print(f"user '{user_id}' changed name: {old_name} -> {new_name}")
controller.subscribe("/users/*/name")
Callback ID
restate internally gives IDs to the callbacks. You can use these IDs in place of callbacks after registration:
def basic_callback(event: StateEvent):
print(f"Basic callback: {event.new_value}")
callback_id = controller.register_callback(basic_callback)
controller.subscribe_by_id("/users", callback_id)
stats_callback_id = "stats_callback"
def stats_handler(event: StateEvent):
print(f"Stats changed: {event.new_value}")
controller.register_callback(stats_handler, force_id=stats_callback_id)
controller.subscribe_by_id("/stats", stats_callback_id)
def config_handler(event: StateEvent):
print(f"Config changed: {event.new_value}")
controller.subscribe("/config", config_handler)
controller.unsubscribe_by_id("/users", callback_id)
controller.unsubscribe_by_id("/stats", stats_callback)
controller.unsubscribe("/config", config_handler)
def new_stats_handler(event: StateEvent):
print("New stats handler")
controller.register_callback(new_stats_handler, force_id=stats_callback_id, replace=True)
Event Bubbling
Events bubble up the state tree by default:
def root_handler(event):
print(f"Something changed at {event.emitting_path}")
def nested_handler(event):
print("Handle nested change")
event.stop_bubbling()
controller.subscribe("/", root_handler)
controller.subscribe("/deep/nested/path", nested_handler)
Event Properties
Event objects provide rich context:
def handler(event):
print(f"Emitting path: {event.emitting_path}")
print(f"Current path: {event.current_path}")
print(f"Previous value: {event.prev_value}")
print(f"New value: {event.new_value}")
config = event.get_state("/config")
event.set_state("/call_counter", 10)
event.controller
controller.subscribe("/users", handler)
You can also pass an arbitrary payload
argument to .set_state
, .ping
, .track
and .derive*
methods.
This payload will be available to the callback via .payload
property.
Pings
Force notification of subscribers without changing state:
controller.ping("/users")
Derived State
Single-Source Derivation
controller.derive(
dest="/cart/total",
source="/cart/items",
transform=lambda items: sum(item["price"] for item in items)
)
controller.set_state("/cart/items", [
{"name": "Book", "price": 10},
{"name": "Pen", "price": 2}
])
print(controller.get_state("/cart/total"))
Multi-Source Derivation
from restate import DeriveData
def aggregate_stats(data: DeriveData):
users = data.get("users")
posts = data.get("posts")
comments = data.get("comments")
return {
"user_count": len(users),
"post_count": len(posts),
"comment_count": len(comments),
"avg_comments_per_post": (
len(comments) / len(posts)
if posts else 0
)
}
controller.derive_many(
dest="/stats",
sources=["/users", "/posts", "/comments"],
transform=aggregate_stats
)
State Tracking
State tracking automatically manages subscriptions based on what state paths are actually accessed during a callback execution.
This eliminates the need to manually specify dependencies and helps prevent common issues like stale subscriptions or missing updates.
Basic Usage
from restate import ControllerSync, InMemoryBackend
controller = ControllerSync(InMemoryBackend())
def compute_dashboard_stats(event):
users = event.get_state("/users")
active_posts = event.get_state("/posts/active")
settings = event.get_state("/settings")
return {
"total_users": len(users),
"active_posts": len(active_posts),
"is_public": settings.get("public", False)
}
controller.track(compute_dashboard_stats)
Advantages Over Manual Subscriptions
- Automatic Dependency Management
def stats_manual(event):
pass
controller.subscribe("/users", stats_manual)
controller.subscribe("/posts/active", stats_manual)
controller.subscribe("/settings", stats_manual)
def stats_tracked(event):
users = event.get_state("/users")
if event.get_state("/features/premium"):
premium = event.get_state("/users/premium")
controller.track(stats_tracked)
def dynamic_computation(event):
base_path = event.get_state("/config/data_path")
data = event.get_state(base_path)
for item in data:
details = event.get_state(f"{base_path}/{item}/details")
controller.track(dynamic_computation)
def feature_example(event):
if event.get_state("/features/legacy"):
legacy_data = event.get_state("/legacy/data")
else:
new_data = event.get_state("/new/data")
controller.track(feature_example)