New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

restate

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

restate

Storage-agnostic, reactive state management for Python

0.9.9
PyPI
Maintainers
1

restate

[!NOTE] This module is in the pre-release state. No API changes are planned, but some things may still break. This also means that currently there is a lack of tests and typechecks.

[!NOTE] This README should make you familiar with the library, but it's not a 100%-full documentation. Somewhere in the future, full-fledged docs will be built (stay tuned).

A universal state manager for Python with a simple, consistent API. Works with any storage implementation (or combination of) and makes handling application state effortless. Features hierarchical organization, event bubbling, and derived values.

Install it with a package manager of your choice (pip, poetry, uv, pdm, etc.):

pip install restate

Quick Start

Let's store some users in memory.

from restate import ControllerSync, StateEvent

# Start with a sync controller and the basic in-memory storage.
controller = ControllerSync()


# let's get an update when we change users
def on_users_change(event: StateEvent[ControllerSync]): # the typing is optional
    new_users = event.get_state('/users')
    print(f"Users changed! New value: {new_users}")


controller.subscribe("/users", on_users_change)

# add some users
controller.set_state("/users/1", {"name": "Alice", "role": "admin"})
controller.set_state("/users/2", {"name": "Bob", "role": "user"})

# or read them
print(controller.get_state("/users/1"))  # {"name": "Alice", "role": "admin"}

# while we're at it, let's derive some state
controller.derive(
    dest="/stats/admin_count",
    source="/users",
    transform=lambda users: sum(u["role"] == "admin" for u in users.values())
)

# now if we try to read it, it will always be a correct amount

print("Admin count", controller.get_state("/stats/admin_count")) # 1

Core Concepts

In restate, when you create a controller, you create a unified way to read and write data. Obviously, Python has a tool for that, it's called a dictionary.

But restate strength is what is done with the state. Here's a basic set of features you get:

  • State subscriptions: Run a callback on every state change on some path.
    • Bubbling included: subscribe to parent paths to receive events about children.
    • Wildcard paths: subscribe to partial paths with some sections omitted.
  • Derived state: automatically calculate a path value out of one or more paths. Once per state change, supports all other path features.
  • Early exit: by default, writes and notifications are not triggered if you write a state equal to previous. Easily configurable per-write via eq_func.
  • Pings: Wanted to notify a path even though it hasn't changed? Sure you did.
  • Auto-tracker: automatically calculate the callback dependencies on each run and re-run as needed, without additional configuration or mess.
  • Storage-independent: Uses a simple unified interface to communicate with storage
    • Batteries included: Several common backend implementations, like in-memory backend, cache layer and filesystem backend are included by default.
    • Hybrid backend: Built-in way to mount different backends on different paths to build a complex storage structure.

Controllers

Controllers are the main interface for interacting with state. They manage state access, event dispatch, and synchronization.

Two types of controllers are available:

# Synchronous controller
from restate import ControllerSync
sync_controller = ControllerSync(backend)

# Asynchronous controller
from restate import ControllerAsync
async_controller = ControllerAsync(backend)

State Hierarchy

State in restate is hierarchical. Think of it as a tree structure, similar to a filesystem:

controller.set_state("/config/theme", "dark")
controller.set_state("/config/colors/primary", "#FF0000")
controller.set_state("/config/colors/secondary", "#00FF00")

# Reading parent paths returns nested structure
print(controller.get_state("/config"))
# Output:
{
    "theme": "dark",
    "colors": {
        "primary": "#FF0000",
        "secondary": "#00FF00"
    }
}

Paths

Both string and Path objects are accepted:

from restate import ROOT_PATH # pathlib.PurePosixPath("/")
from pathlib import PurePosixPath as Path

# These are equivalent:
controller.get_state("/users/1")
controller.get_state(ROOT_PATH / "users" / "1")
controller.get_state(Path("/") / "users" / "1")

# leading slash is also optional
controller.get_state("users/1")
controller.get_state(Path("users") / "1")

Storage Backends

In-Memory Backend (default)

Fast, but disappears on restart:

from restate import InMemoryBackend

backend = InMemoryBackend()
controller = ControllerSync(backend)

controller.set_state("/temp", "i will be gone after restart")

Filesystem Backend

Persistent storage using the filesystem. Supports both sync and async operations:

from restate import FileSystemSyncBackend, FileSystemAsyncBackend

# Sync version
backend = FileSystemSyncBackend(
    "./state",  # Base directory
    serializer=json_serializer  # Optional, JSON is default
)

# Async version
backend = FileSystemAsyncBackend("./state")

# Custom serialization
from restate import Serializer
yaml_serializer = Serializer(
    extension="yaml",
    raw_type=str,
    serialize=yaml.dump,
    deserialize=yaml.load
)
backend = FileSystemSyncBackend("./state", serializer=yaml_serializer)

The filesystem backend creates a directory structure that mirrors your state hierarchy:

./state/
  └── config/
      ├── theme.json
      └── colors/
          ├── primary.json
          └── secondary.json

Hybrid Backend

Mount different backends at different paths:

from restate import HybridSyncBackend

# Create hybrid with in-memory root
hybrid = HybridSyncBackend(InMemoryBackend())

# Mount filesystem backend for persistent data
hybrid.mount("/persistent", FileSystemSyncBackend("./state"))

controller = ControllerSync(hybrid)

# Uses in-memory storage
controller.set_state("/temporary", "volatile")

# Uses filesystem storage
controller.set_state("/persistent/important", "saved to disk")

Caching Backend

The caching backend provides a performance-optimizing wrapper around any other backend, implementing an in-memory cache with configurable flush behaviors:

from restate import CachingSyncBackend, FileSystemSyncBackend

# Create a filesystem backend with caching
backend = CachingSyncBackend(
    FileSystemSyncBackend("./state"),
    flush_interval=5.0,        # Flush every 5 seconds when triggered
    flush_on_read=False,       # Don't flush on reads
    flush_on_write=True,       # Flush on writes
    flush_on_delete=True       # Flush on deletes
)

controller = ControllerSync(backend)

Custom Backends

Implement your own backend by subclassing Backend or AsyncBackend:

from restate import Backend
import redis

# intentionally omitting proper nested state implementation (this implementation will treat paths as unrelated)
# for a reference implementation check restate/backends/memory.py
class RedisBackend(Backend):
    def __init__(self):
        self.redis = redis.Redis()

    def read(self, path, default=None):
        value = self.redis.get(str(path))
        return json.loads(value) if value else default

    def write(self, path, value):
        self.redis.set(str(path), json.dumps(value))

    def delete(self, path):
        self.redis.delete(str(path))

Working with State

Basic Operations

# Write state
controller.set_state("/users/1", {"name": "Alice"})

# Read state
user = controller.get_state("/users/1")

# Delete state
controller.backend.delete("/users/1")

Default Values

# Return default if path doesn't exist
volume = controller.get_state("/settings/volume", default=50)

# Write default if path doesn't exist
volume = controller.get_state(
    "/settings/volume",
    default=50,
    write_default=True  # Creates the state if missing
)

Equality Comparison

Control when state updates trigger events:

# Custom equality function
def compare_users(old, new):
    if not (old and new):
        return False
    return old["id"] == new["id"]  # Only compare IDs

controller.set_state(
    "/current_user",
    {"id": 1, "last_seen": "now"},
    eq_func=compare_users  # Won't trigger if IDs match
)

Async Support

restate has an async counterpart to the ControllerSync.

It has async for most methods and supports both sync and async backends:

from restate import ControllerAsync, InMemoryBackend

controller = ControllerAsync(InMemoryBackend())

async def main():
    # Basic operations
    await controller.set_state("/counter", 0)
    value = await controller.get_state("/counter")

    async def on_counter_change(event):
        value = event.new_value
        await controller.set_state("/doubled", value * 2)

    controller.subscribe("/counter", on_counter_change)

    # Derived state
    await controller.derive(
        dest="/counter/squared",
        source="/counter",
        transform=lambda x: x ** 2
    )

# With FastAPI
@app.get("/api/counter")
async def get_counter():
    return await controller.get_state("/counter")

@app.post("/api/counter")
async def increment_counter():
    current = await controller.get_state("/counter", 0)
    await controller.set_state("/counter", current + 1)

Event System

Subscriptions

Subscribe to state changes:

def handler(event: StateEvent[ControllerSync]):
    print(f"Config changed to: {event.get_state('/config')}")

controller.subscribe("/config", handler)

...and, if needed, unsubscribe:

controller.unsubscribe("/config", handler)

Wildcard (glob) paths are also available (wildcard must be the whole section, and cannot span multiple sections):


def any_user_name_change(event):
    old_name = event.old_value
    new_name = event.new_value
    user_id = event.emitting_path.parent.name

    print(f"user '{user_id}' changed name: {old_name} -> {new_name}")


controller.subscribe("/users/*/name")

Callback ID

restate internally gives IDs to the callbacks. You can use these IDs in place of callbacks after registration:


def basic_callback(event: StateEvent):
    print(f"Basic callback: {event.new_value}")

# 1. Basic callback registration - returns an auto-generated ID
callback_id = controller.register_callback(basic_callback)

# Subscribe using the returned ID
controller.subscribe_by_id("/users", callback_id)

# 2. Named callback using Sentinel - predictable ID
stats_callback_id = "stats_callback"

def stats_handler(event: StateEvent):
    print(f"Stats changed: {event.new_value}")

# Register with custom ID
controller.register_callback(stats_handler, force_id=stats_callback_id)
controller.subscribe_by_id("/stats", stats_callback_id)

# 3. Direct subscription - auto registers and subscribes
def config_handler(event: StateEvent):
    print(f"Config changed: {event.new_value}")

# this also returns ID
controller.subscribe("/config", config_handler)

# 4. Unsubscribing examples
# Unsubscribe by ID
controller.unsubscribe_by_id("/users", callback_id)
controller.unsubscribe_by_id("/stats", stats_callback)

# Unsubscribe by function reference
controller.unsubscribe("/config", config_handler)

# 5. Re-using IDs
def new_stats_handler(event: StateEvent):
    print("New stats handler")

# Replace existing callback with same ID
controller.register_callback(new_stats_handler, force_id=stats_callback_id, replace=True)

Event Bubbling

Events bubble up the state tree by default:

def root_handler(event):
    print(f"Something changed at {event.emitting_path}")


def nested_handler(event):
    print("Handle nested change")
    event.stop_bubbling()  # Prevent bubbling to parent


controller.subscribe("/", root_handler)
controller.subscribe("/deep/nested/path", nested_handler)

Event Properties

Event objects provide rich context:

def handler(event):
    print(f"Emitting path: {event.emitting_path}")  # Emitting path
    print(f"Current path: {event.current_path}")    # Current path
    print(f"Previous value: {event.prev_value}")    # Previous value of *emitting* path (not the current path)
    print(f"New value: {event.new_value}")          # Current value of *emitting* path (not the current path)

    # Access other state during handling
    config = event.get_state("/config")

    # Write some state
    # (be aware that writing will trigger subscriptions
    # for example if you write to /users, this will run in circles)
    event.set_state("/call_counter", 10)

    # access the controller directly (not recommended for get_state/set_state, otherwise okay)
    event.controller


controller.subscribe("/users", handler)

You can also pass an arbitrary payload argument to .set_state, .ping, .track and .derive* methods. This payload will be available to the callback via .payload property.

Pings

Force notification of subscribers without changing state:

controller.ping("/users")

Derived State

Single-Source Derivation

# Compute total from items
controller.derive(
    dest="/cart/total",
    source="/cart/items",
    transform=lambda items: sum(item["price"] for item in items)
)

# Source changes automatically update destination
controller.set_state("/cart/items", [
    {"name": "Book", "price": 10},
    {"name": "Pen", "price": 2}
])

print(controller.get_state("/cart/total"))  # 12

Multi-Source Derivation

from restate import DeriveData

def aggregate_stats(data: DeriveData):
    users = data.get("users")
    posts = data.get("posts")
    comments = data.get("comments")

    return {
        "user_count": len(users),
        "post_count": len(posts),
        "comment_count": len(comments),
        "avg_comments_per_post": (
            len(comments) / len(posts)
            if posts else 0
        )
    }

controller.derive_many(
    dest="/stats",
    sources=["/users", "/posts", "/comments"],
    transform=aggregate_stats
)

State Tracking

State tracking automatically manages subscriptions based on what state paths are actually accessed during a callback execution. This eliminates the need to manually specify dependencies and helps prevent common issues like stale subscriptions or missing updates.

Basic Usage

from restate import ControllerSync, InMemoryBackend

controller = ControllerSync(InMemoryBackend())

def compute_dashboard_stats(event):
    # Tracker automatically records which paths are accessed
    users = event.get_state("/users")
    active_posts = event.get_state("/posts/active")
    settings = event.get_state("/settings")

    # This computation will re-run whenever any of the accessed paths change
    return {
        "total_users": len(users),
        "active_posts": len(active_posts),
        "is_public": settings.get("public", False)
    }

controller.track(compute_dashboard_stats)

Advantages Over Manual Subscriptions

  • Automatic Dependency Management
# Without tracking - manual subscriptions

def stats_manual(event):
    # Must manually maintain subscription list
    pass

controller.subscribe("/users", stats_manual)
controller.subscribe("/posts/active", stats_manual)
controller.subscribe("/settings", stats_manual)

# With tracking - automatic subscriptions

def stats_tracked(event):
    # Dependencies are automatically detected
    users = event.get_state("/users")
    # Adding new dependencies requires no subscription changes
    if event.get_state("/features/premium"):
        premium = event.get_state("/users/premium")

controller.track(stats_tracked)
  • Dynamic Dependencies

def dynamic_computation(event):
    base_path = event.get_state("/config/data_path")
    # Dependencies can change between runs
    data = event.get_state(base_path)

    for item in data:
        # Nested paths are automatically tracked
        details = event.get_state(f"{base_path}/{item}/details")

controller.track(dynamic_computation)

def feature_example(event):
    if event.get_state("/features/legacy"):
        # Old path accessed only when feature is on
        legacy_data = event.get_state("/legacy/data")
    else:
        # When feature is off, /legacy/data subscription
        # is automatically removed
        new_data = event.get_state("/new/data")

controller.track(feature_example)

FAQs

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts