Python Event Sourcery
A library for event-based systems in Python. For event sourcing, CQRS, and event-driven architectures.
event = EventSourceryIsBorn()
event_store.append(event, stream_id=stream_id)
Under heavy development
Documentation: python-event-sourcery.github.io/python-event-sourcery/
Installation
pip install python-event-sourcery
Easy to setup
Just configure your EventStore instance (or a factory) and call configure_models once. Examples below.
Versatile
Event Sourcery makes no assumptions about your configuration or session management. It's designed to be plugged in into what you already have, without a need to adjust anything. It can be integrated smoothly with thread-scoped SQLAlchemy sessions as well as dependency injection.
NOT opinionated
Although one can easily start with a library, the latter is very customizable and DOES NOT enforce a particular style of using. Also, libraries integrated, i.e. SQLAlchemy and Pydantic are not hardcoded dependencies. They can be replaced with a finite amount of effort.
Use cases & features
Until full documentation is available, you can follow tests
- Event Sourcing tests
- Snapshots (for Event Sourcing) tests
- Event Store - storage for events tests
- Outbox pattern tests
- Concurrency control with optimistic locking tests
Quick start
The script is complete, it should run as-is provided FastAPI and library with dependencies are installed.
See examples/0-fastapi-integration
from typing import Iterator, Literal
from uuid import uuid4
from fastapi import FastAPI, Depends
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, as_declarative
from event_sourcery import EventStore
from event_sourcery import (
configure_models,
get_event_store,
Event,
)
engine = create_engine(
"postgresql://event_sourcery:event_sourcery@localhost:5432/event_sourcery"
)
def db_session() -> Iterator[Session]:
session = Session(bind=engine)
try:
yield session
finally:
session.close()
@as_declarative()
class Base:
pass
app = FastAPI(on_startup=[lambda: Base.metadata.create_all(bind=engine)])
configure_models(Base)
def event_store(session: Session = Depends(db_session)) -> EventStore:
return get_event_store(session)
class CustomerSubscribed(Event):
plan_id: int
model: Literal["monthly", "yearly"]
@app.post("/subscription")
def subscribe(
event_store: EventStore = Depends(event_store),
session: Session = Depends(db_session),
):
event = CustomerSubscribed(plan_id=1, model="yearly")
event_store.append(event, stream_id=uuid4())
session.commit()
Feature requests / feedback
Anything missing? Create an issue in this repository. Let others upvote
Inspirations