Castore
Making Event Sourcing easy π
Event Sourcing is a data storage paradigm that saves changes in your application state rather than the state itself. It is powerful but tricky to implement.
After years of using it at Kumo, we have grown to love it, but also experienced first-hand the lack of consensus and tooling around it. That's where Castore comes from!
Castore is a TypeScript library that makes Event Sourcing easy π
With Castore, you'll be able to:
All that with first-class developer experience and minimal boilerplate β¨
π« Core Design
Some important decisions that we've made early on:
π Abstractions first
Castore has been designed with flexibility in mind. It gives you abstractions that are meant to be used anywhere: React apps, containers, Lambdas... you name it!
For instance, EventStore
classes are stack agnostic: They need an EventStorageAdapter
class to interact with actual data. You can code your own EventStorageAdapter
(simply implement the interface), but it's much simpler to use an off-the-shelf adapter like DynamoDBEventStorageAdapter
.
π
ββοΈ We do NOT deploy resources
While some packages like DynamoDBEventStorageAdapter
require compatible infrastructure, Castore is not responsible for deploying it.
Though that is not something we exclude in the future, we are a small team and decided to focus on DevX first.
β Full type safety
Speaking of DevX, we absolutely love TypeScript! If you do too, you're in the right place: We push type-safety to the limit in everything we do!
If you don't, that's fine π Castore is still available in Node/JS. And you can still profit from some nice JSDocs!
π Best practices
The Event Sourcing journey has many hidden pitfalls. We ran into them for you!
Castore is opiniated. It comes with a collection of best practices and documented anti-patterns that we hope will help you out!
Table of content
Getting Started
π₯ Installation
npm install @castore/core
yarn add @castore/core
π¦ Packages structure
Castore is not a single package, but a collection of packages revolving around a core
package. This is made so every line of code added to your project is opt-in, wether you use tree-shaking or not.
Castore packages are released together. Though different versions may be compatible, you are guaranteed to have working code as long as you use matching versions.
Here is an example of working package.json
:
{
...
"dependencies": {
"@castore/core": "1.3.1",
"@castore/dynamodb-event-storage-adapter": "1.3.1"
...
},
"devDependencies": {
"@castore/test-tools": "1.3.1"
...
}
}
The Basics
π Events
Event Sourcing is all about saving changes in your application state. Such changes are represented by events, and needless to say, they are quite important π
Events that concern the same business entity (like a User
) are aggregated through a common id called aggregateId
(and vice versa, events that have the same aggregateId
represent changes of the same business entity). The index of an event in such a serie of events is called its version
.
In Castore, stored events (also called event details) always have exactly the following attributes:
aggregateId (string)
version (integer β₯ 1)
timestamp (string)
: A date in ISO 8601 formattype (string)
: A string identifying the business meaning of the eventpayload (?any = never)
: A payload of any typemetadata (?any = never)
: Some metadata of any type
import type { EventDetail } from '@castore/core';
type UserCreatedEventDetail = EventDetail<
'USER_CREATED',
{ name: string; age: number },
{ invitedBy?: string }
>;
type UserCreatedEventDetail = {
aggregateId: string;
version: number;
timestamp: string;
type: 'USER_CREATED';
payload: { name: string; age: number };
metadata: { invitedBy?: string };
};
π· EventType
Events are generally classified in events types (not to confuse with TS types). Castore lets you declare them via the EventType
class:
import { EventType } from '@castore/core';
export const userCreatedEventType = new EventType<
'USER_CREATED',
{ name: string; age: number },
{ invitedBy?: string }
>({ type: 'USER_CREATED' });
Note that we only provide TS types for payload
and metadata
properties. That is because, as stated in the core design, Castore is meant to be as flexible as possible, and that includes the validation library you want to use: The EventType
class is not meant to be used directly, but rather extended by other classes which will add run-time validation methods to it π
See the following packages for examples:
Constructor:
type (string)
: The event type
import { EventType } from '@castore/core';
export const userCreatedEventType = new EventType({ type: 'USER_CREATED' });
Properties:
type (string)
: The event type
const eventType = userCreatedEventType.type;
Type Helpers:
EventTypeDetail
: Returns the event detail TS type of an EventType
import type { EventTypeDetail } from '@castore/core';
type UserCreatedEventTypeDetail = EventTypeDetail<typeof userCreatedEventType>;
type UserCreatedEventTypeDetail = {
aggregateId: string;
version: number;
timestamp: string;
type: 'USER_CREATED';
payload: { name: string; age: number };
metadata: { invitedBy?: string };
};
EventTypesDetails
: Return the events details of a list of EventType
import type { EventTypesDetails } from '@castore/core';
type UserEventTypesDetails = EventTypesDetails<
[typeof userCreatedEventType, typeof userRemovedEventType]
>;
π Aggregate
Eventhough entities are stored as series of events, we still want to use a stable interface to represent their states at a point in time rather than directly using events. In Castore, it is implemented by a TS type called Aggregate
.
βοΈ Think of aggregates as "what the data would look like in CRUD"
In Castore, aggregates necessarily contain an aggregateId
and version
attributes (the version
of the latest event
). But for the rest, it's up to you π€·ββοΈ
For instance, we can include a name
, age
and status
properties to our UserAggregate
:
import type { Aggregate } from '@castore/core';
interface UserAggregate extends Aggregate {
name: string;
age: number;
status: 'CREATED' | 'REMOVED';
}
interface UserAggregate {
aggregateId: string;
version: number;
name: string;
age: number;
status: 'CREATED' | 'REMOVED';
}
βοΈ Reducer
Aggregates are derived from their events by reducing them through a reducer
function. It defines how to update the aggregate when a new event is pushed:
import type { Reducer } from '@castore/core';
export const usersReducer: Reducer<UserAggregate, UserEventsDetails> = (
userAggregate,
newEvent,
) => {
const { version, aggregateId } = newEvent;
switch (newEvent.type) {
case 'USER_CREATED': {
const { name, age } = newEvent.payload;
return {
aggregateId,
version,
name,
age,
status: 'CREATED',
};
}
case 'USER_REMOVED':
return { ...userAggregate, version, status: 'REMOVED' };
}
};
const johnDowAggregate: UserAggregate = johnDowEvents.reduce(usersReducer);
βοΈ Note that aggregates are always computed on the fly, and NOT stored. Changing them does not require any data migration whatsoever.
π EventStore
Once you've defined your event types and how to aggregate them, you can bundle them together in an EventStore
class.
Each event store in your application represents a business entity. Think of event stores as "what tables would be in CRUD", except that instead of directly updating data, you just append new events to it!
In Castore, EventStore
classes are NOT responsible for actually storing data (this will come with event storage adapters). But rather to provide a boilerplate-free and type-safe interface to perform many actions such as:
- Listing aggregate ids
- Accessing events of an aggregate
- Building an aggregate with the reducer
- Pushing new events etc...
import { EventStore } from '@castore/core';
const userEventStore = new EventStore({
eventStoreId: 'USERS',
eventTypes: [
userCreatedEventType,
userRemovedEventType,
...
],
reducer: usersReducer,
});
βοΈ The EventStore
class is the heart of Castore, it even gave it its name!
Constructor:
eventStoreId (string)
: A string identifying the event storeeventStoreEvents (EventType[])
: The list of event types in the event storereduce (EventType[])
: A reducer function that can be applied to the store event typesstorageAdapter (?EventStorageAdapter)
: See EventStorageAdapter
βοΈ Note that it's the return type of the reducer
that is used to infer the Aggregate
type of the EventStore. It is important to type it explicitely.
Properties:
const userEventStoreId = userEventStore.eventStoreId;
eventStoreEvents (EventType[])
const userEventStoreEvents = userEventStore.eventStoreEvents;
reduce ((Aggregate, EventType) => Aggregate)
const reducer = userEventStore.reduce;
const storageAdapter = userEventStore.storageAdapter;
βοΈ The storageAdapter
is not read-only so you do not have to provide it right away.
Sync Methods:
getStorageAdapter (() => EventStorageAdapter)
: Returns the event store event storage adapter if it exists. Throws an UndefinedStorageAdapterError
if it doesn't.
import { UndefinedStorageAdapterError } from '@castore/core';
expect(() => userEventStore.getStorageAdapter()).toThrow(
new UndefinedStorageAdapterError({ eventStoreId: 'USERS' }),
);
buildAggregate ((eventDetails: EventDetail[], initialAggregate?: Aggregate) => Aggregate | undefined)
: Applies the event store reducer to a serie of events.
const johnDowAggregate = userEventStore.buildAggregate(johnDowEvents);
Async Methods:
The following methods interact with the data layer of your event store through its EventStorageAdapter
. They will throw an UndefinedStorageAdapterError
if you did not provide one.
-
getEvents ((aggregateId: string, opt?: OptionsObj = {}) => Promise<ResponseObj>)
: Retrieves the events of an aggregate, ordered by version
. Returns an empty array if no event is found for this aggregateId
.
OptionsObj
contains the following attributes:
minVersion (?number)
: To retrieve events above a certain versionmaxVersion (?number)
: To retrieve events below a certain versionlimit (?number)
: Maximum number of events to retrievereverse (?boolean = false)
: To retrieve events in reverse order (does not require to swap minVersion
and maxVersion
)
ResponseObj
contains the following attributes:
events (EventDetail[])
: The aggregate events (possibly empty)
const { events: allEvents } = await userEventStore.getEvents(aggregateId);
const { events: rangedEvents } = await userEventStore.getEvents(aggregateId, {
minVersion: 2,
maxVersion: 5,
});
const { events: onlyLastEvent } = await userEventStore.getEvents(aggregateId, {
reverse: true,
limit: 1,
});
-
getAggregate ((aggregateId: string, opt?: OptionsObj = {}) => Promise<ResponseObj>)
: Retrieves the events of an aggregate and build it.
OptionsObj
contains the following attributes:
maxVersion (?number)
: To retrieve aggregate below a certain version
ResponseObj
contains the following attributes:
aggregate (?Aggregate)
: The aggregate (possibly undefined
)events (EventDetail[])
: The aggregate events (possibly empty)lastEvent (?EventDetail)
: The last event (possibly undefined
)
const { aggregate: johnDow } = await userEventStore.getAggregate(aggregateId);
const { aggregate: aggregateBelowVersion } = await userEventStore.getAggregate(
aggregateId,
{ maxVersion: 5 },
);
const { aggregate, events } = await userEventStore.getAggregate(aggregateId);
getExistingAggregate ((aggregateId: string, opt?: OptionsObj = {}) => Promise<ResponseObj>)
: Same as getAggregate
method, but ensures that the aggregate exists. Throws an AggregateNotFoundError
if no event is found for this aggregateId
.
import { AggregateNotFoundError } from '@castore/core';
expect(async () =>
userEventStore.getExistingAggregate(unexistingId),
).resolves.toThrow(
new AggregateNotFoundError({
eventStoreId: 'USERS',
aggregateId: unexistingId,
}),
);
const { aggregate } = await userEventStore.getAggregate(aggregateId);
pushEvent ((eventDetail: EventDetail) => Promise<void>)
: Pushes a new event to the event store. Throws an EventAlreadyExistsError
if an event already exists for the corresponding aggregateId
and version
.
await userEventStore.pushEvent({
aggregateId,
version: lastVersion + 1,
timestamp: new Date().toISOString(),
type: 'USER_CREATED',
payload,
metadata,
});
-
listAggregateIds ((opt?: OptionsObj = {}) => Promise<ResponseObj>)
: Retrieves the list of aggregateId
of an event store, ordered by timestamp
of their first event. Returns an empty array if no aggregate is found.
OptionsObj
contains the following attributes:
limit (?number)
: Maximum number of aggregate ids to retrievepageToken (?string)
: To retrieve a paginated result of aggregate ids
ResponseObj
contains the following attributes:
aggregateIds (string[])
: The list of aggregate idsnextPageToken (?string)
: A token for the next page of aggregate ids if one exists
const accAggregateIds: string = [];
const { aggregateIds: firstPage, nextPageToken } =
await userEventStore.getAggregate({ limit: 20 });
accAggregateIds.push(...firstPage);
if (nextPageToken) {
const { aggregateIds: secondPage } = await userEventStore.getAggregate({
limit: 20,
pageToken: nextPageToken,
});
accAggregateIds.push(...secondPage);
}
Type Helpers:
EventStoreId
: Returns the EventStore
id
import type { EventStoreId } from '@castore/core';
type UserEventStoreId = EventStoreId<typeof userEventStore>;
EventStoreEventsTypes
: Returns the EventStore
list of events types
import type { EventStoreEventsTypes } from '@castore/core';
type UserEventsTypes = EventStoreEventsTypes<typeof userEventStore>;
EventStoreEventsDetails
: Returns the union of all the EventStore
possible events details
import type { EventStoreEventsDetails } from '@castore/core';
type UserEventsDetails = EventStoreEventsDetails<typeof userEventStore>;
EventStoreReducer
: Returns the EventStore
reducer
import type { EventStoreReducer } from '@castore/core';
type UserReducer = EventStoreReducer<typeof userEventStore>;
EventStoreAggregate
: Returns the EventStore
aggregate
import type { EventStoreAggregate } from '@castore/core';
type UserReducer = EventStoreAggregate<typeof userEventStore>;
πΎ EventStorageAdapter
...coming soon
π¨ Command
Commands represent an intent to modify the state of your application. They usually result in pushing one or several events to your event stores.
They typically consist in:
- Fetching the required aggregates (if not the first event of a new aggregate)
- Validating that the intent is acceptable in regards to the state of the application*
- Pushing new events with incremented versions
* βοΈ Note that commands should NOT use read models for the validation step. Read models are not the source of truth, and may not contain the freshest state.
Fetching and pushing events at separate non-simultaneously exposes your application to race conditions. To counter that, commands executions are designed to be retried when an EventAlreadyExistsError
is triggered.
...technical description coming soon
When writing on several event stores at once, it is important to use transactions, i.e. to make sure that all events are written or none. This ensures that the application is not in a corrupt state.
Transactions accross event stores cannot be easily abstracted, so check you adapter library on how to achieve this. For instance, the DynamoDBEventStorageAdapter
exposes a pushEventsTransaction
util.
Resources
π― Test Tools
Castore comes with a handy Test Tool package that facilitates the writing of unit tests: It allows mocking event stores, populating them with an initial state and resetting them to it in a boilerplate-free and type-safe way.
π Packages List
π· Event Types
πΎ Event Storage Adapters
π¨ Commands
π Common Patterns
- Simulating a future/past aggregate state: ...coming soon
- Projecting on read models: ...coming soon
- Replaying events: ...coming soon
- Snapshotting: ...coming soon