Castore
Making Event Sourcing easy π
Event Sourcing is a data storage paradigm that saves changes in your application state rather than the state itself.
It is powerful as it enables rewinding to a previous state and exploring audit trails for debugging or business/legal purposes. It also integrates very well with event-driven architectures.
However, it is tricky to implement π
After years of using it at Kumo, we have grown to love it, but also experienced first-hand the lack of consensus and tooling around it. That's where Castore comes from!
Castore is a TypeScript library that makes Event Sourcing easy π
With Castore, you'll be able to:
All that with first-class developer experience and minimal boilerplate β¨
π« Core Design
Some important decisions that we've made early on:
π Abstractions first
Castore has been designed with flexibility in mind. It gives you abstractions that are meant to be used anywhere: React apps, containers, Lambdas... you name it!
For instance, EventStore
classes are stack agnostic: They need an EventStorageAdapter
class to interact with actual data. You can code your own EventStorageAdapter
(simply implement the interface), but it's much simpler to use an off-the-shelf adapter like DynamoDBEventStorageAdapter
.
π
ββοΈ We do NOT deploy resources
While some packages like DynamoDBEventStorageAdapter
require compatible infrastructure, Castore is not responsible for deploying it.
Though that is not something we exclude in the future, we are a small team and decided to focus on DevX first.
β Full type safety
Speaking of DevX, we absolutely love TypeScript! If you do too, you're in the right place: We push type-safety to the limit in everything we do!
If you don't, that's fine π Castore is still available in Node/JS. And you can still profit from some nice JSDocs!
π Best practices
The Event Sourcing journey has many hidden pitfalls. We ran into them for you!
Castore is opiniated. It comes with a collection of best practices and documented anti-patterns that we hope will help you out!
Table of content
π¬ Getting Started
- Installation
npm install @castore/core
yarn add @castore/core
- Packages structure
Castore is not a single package, but a collection of packages revolving around a core
package. This is made so every line of code added to your project is opt-in, wether you use tree-shaking or not.
Castore packages are released together. Though different versions may be compatible, you are guaranteed to have working code as long as you use matching versions.
Here is an example of working package.json
:
{
...
"dependencies": {
"@castore/core": "1.3.1",
"@castore/dynamodb-event-storage-adapter": "1.3.1"
...
},
"devDependencies": {
"@castore/test-tools": "1.3.1"
...
}
}
π The Basics
- Events
Event Sourcing is all about saving changes in your application state. Such changes are represented by events, and needless to say, they are quite important π
Events that concern the same entity (like a Pokemon
) are aggregated through a common id called aggregateId
(and vice versa, events that have the same aggregateId
represent changes of the same business entity). The index of an event in such a serie of events is called its version
.

In Castore, stored events (also called event details) always have exactly the following properties:
aggregateId (string)
version (integer β₯ 1)
timestamp (string)
: A date in ISO 8601 formattype (string)
: A string identifying the business meaning of the eventpayload (?any = never)
: A payload of any typemetadata (?any = never)
: Some metadata of any type
import type { EventDetail } from '@castore/core';
type PokemonAppearedEventDetail = EventDetail<
'POKEMON_APPEARED',
{ name: string; level: number },
{ trigger?: 'random' | 'scripted' }
>;
type PokemonAppearedEventDetail = {
aggregateId: string;
version: number;
timestamp: string;
type: 'POKEMON_APPEARED';
payload: { name: string; level: number };
metadata: { trigger?: 'random' | 'scripted' };
};
- EventType
Events are generally classified in events types (not to confuse with TS types). Castore lets you declare them via the EventType
class:
import { EventType } from '@castore/core';
const pokemonAppearedEventType = new EventType<
'POKEMON_APPEARED',
{ name: string; level: number },
{ trigger?: 'random' | 'scripted' }
>({ type: 'POKEMON_APPEARED' });
Note that we only provided TS types for payload
and metadata
properties. That is because, as stated in the core design, Castore is meant to be as flexible as possible, and that includes the validation library you want to use (if any): The EventType
class can be used directly if no validation is required, or implemented by other classes which will add run-time validation methods to it π
See the following packages for examples:
π§ Technical description
Constructor:
type (string)
: The event type
import { EventType } from '@castore/core';
const pokemonAppearedEventType = new EventType({ type: 'POKEMON_APPEARED' });
Properties:
type (string)
: The event type
const eventType = pokemonAppearedEventType.type;
Type Helpers:
EventTypeDetail
: Returns the event detail TS type of an EventType
import type { EventTypeDetail } from '@castore/core';
type PokemonAppearedEventTypeDetail = EventTypeDetail<
typeof pokemonAppearedEventType
>;
type PokemonCaughtEventTypeDetail = {
aggregateId: string;
version: number;
timestamp: string;
type: 'POKEMON_APPEARED';
payload: { name: string; level: number };
metadata: { trigger?: 'random' | 'scripted' };
};
EventTypesDetails
: Return the events details of a list of EventType
import type { EventTypesDetails } from '@castore/core';
type PokemonEventTypeDetails = EventTypesDetails<
[typeof pokemonAppearedEventType, typeof pokemonCaughtEventType]
>;
- Aggregate
Eventhough entities are stored as series of events, we still want to use a simpler and stable interface to represent their states at a point in time rather than directly using events. In Castore, it is implemented by a TS type called Aggregate
.
βοΈ Think of aggregates as "what the data would look like in CRUD"
In Castore, aggregates necessarily contain an aggregateId
and version
properties (the version
of the latest event
). But for the rest, it's up to you π€·ββοΈ
For instance, we can include a name
, level
and status
properties to our PokemonAggregate
:
import type { Aggregate } from '@castore/core';
interface PokemonAggregate extends Aggregate {
name: string;
level: number;
status: 'wild' | 'caught';
}
interface PokemonAggregate {
aggregateId: string;
version: number;
name: string;
level: number;
status: 'wild' | 'caught';
}
- Reducer
Aggregates are derived from their events by reducing them through a reducer
function. It defines how to update the aggregate when a new event is pushed:

import type { Reducer } from '@castore/core';
const pokemonsReducer: Reducer<PokemonAggregate, PokemonEventDetails> = (
pokemonAggregate,
newEvent,
) => {
const { version, aggregateId } = newEvent;
switch (newEvent.type) {
case 'POKEMON_APPEARED': {
const { name, level } = newEvent.payload;
return {
aggregateId,
version,
name,
level,
status: 'wild',
};
}
case 'POKEMON_CAUGHT':
return { ...pokemonAggregate, version, status: 'caught' };
case 'POKEMON_LEVELED_UP':
return {
...pokemonAggregate,
version,
level: pokemonAggregate.level + 1,
};
}
};
const myPikachuAggregate: PokemonAggregate =
myPikachuEvents.reduce(pokemonsReducer);
βοΈ Aggregates are always computed on the fly, and NOT stored. Changing them does not require any data migration whatsoever.
- EventStore
Once you've defined your event types and how to aggregate them, you can bundle them together in an EventStore
class.
Each event store in your application represents a business entity. Think of event stores as "what tables would be in CRUD", except that instead of directly updating data, you just append new events to it!

In Castore, EventStore
classes are NOT responsible for actually storing data (this will come with event storage adapters). But rather to provide a boilerplate-free and type-safe interface to perform many actions such as:
- Listing aggregate ids
- Accessing events of an aggregate
- Building an aggregate with the reducer
- Pushing new events etc...
import { EventStore } from '@castore/core';
const pokemonsEventStore = new EventStore({
eventStoreId: 'POKEMONS',
eventStoreEvents: [
pokemonAppearedEventType,
pokemonCaughtEventType,
pokemonLeveledUpEventType,
...
],
reduce: pokemonsReducer,
});
βοΈ The EventStore
class is the heart of Castore, it even gave it its name!
π§ Technical description
Constructor:
eventStoreId (string)
: A string identifying the event storeeventStoreEvents (EventType[])
: The list of event types in the event storereduce (EventType[])
: A reducer function that can be applied to the store event typesonEventPushed (?(pushEventResponse: PushEventResponse => Promise<void>))
: To run a callback after events are pushed (input is exactly the return value of the pushEvent
method)storageAdapter (?EventStorageAdapter)
: See EventStorageAdapter
βοΈ The return type of the reducer
is used to infer the Aggregate
type of the EventStore
, so it is important to type it explicitely.
Properties:
const pokemonsEventStoreId = pokemonsEventStore.eventStoreId;
eventStoreEvents (EventType[])
const pokemonsEventStoreEvents = pokemonsEventStore.eventStoreEvents;
reduce ((Aggregate, EventType) => Aggregate)
const reducer = pokemonsEventStore.reduce;
onEventPushed (?(pushEventResponse: PushEventResponse) => Promise<void>)
: Callback to run after events are pushed
const onEventPushed = pokemonsEventStore.onEventPushed;
const storageAdapter = pokemonsEventStore.storageAdapter;
βοΈ The storageAdapter
is not read-only so you do not have to provide it right away.
Sync Methods:
getStorageAdapter (() => EventStorageAdapter)
: Returns the event store event storage adapter if it exists. Throws an UndefinedStorageAdapterError
if it doesn't.
import { UndefinedStorageAdapterError } from '@castore/core';
expect(() => pokemonsEventStore.getStorageAdapter()).toThrow(
new UndefinedStorageAdapterError({ eventStoreId: 'POKEMONS' }),
);
buildAggregate ((eventDetails: EventDetail[], initialAggregate?: Aggregate) => Aggregate | undefined)
: Applies the event store reducer to a serie of events.
const myPikachuAggregate = pokemonsEventStore.buildAggregate(myPikachuEvents);
groupEvent ((eventDetail: EventDetail, opt?: OptionsObj = {}) => GroupedEvent)
: See Event Groups.
Async Methods:
The following methods interact with the data layer of your event store through its EventStorageAdapter
. They will throw an UndefinedStorageAdapterError
if you did not provide one.
-
getEvents ((aggregateId: string, opt?: OptionsObj = {}) => Promise<ResponseObj>)
: Retrieves the events of an aggregate, ordered by version
. Returns an empty array if no event is found for this aggregateId
.
OptionsObj
contains the following properties:
minVersion (?number)
: To retrieve events above a certain versionmaxVersion (?number)
: To retrieve events below a certain versionlimit (?number)
: Maximum number of events to retrievereverse (?boolean = false)
: To retrieve events in reverse order (does not require to swap minVersion
and maxVersion
)
ResponseObj
contains the following properties:
events (EventDetail[])
: The aggregate events (possibly empty)
const { events: allEvents } = await pokemonsEventStore.getEvents(myPikachuId);
const { events: rangedEvents } = await pokemonsEventStore.getEvents(
myPikachuId,
{
minVersion: 2,
maxVersion: 5,
},
);
const { events: onlyLastEvent } = await pokemonsEventStore.getEvents(
myPikachuId,
{
reverse: true,
limit: 1,
},
);
-
getAggregate ((aggregateId: string, opt?: OptionsObj = {}) => Promise<ResponseObj>)
: Retrieves the events of an aggregate and build it.
OptionsObj
contains the following properties:
maxVersion (?number)
: To retrieve aggregate below a certain version
ResponseObj
contains the following properties:
aggregate (?Aggregate)
: The aggregate (possibly undefined
)events (EventDetail[])
: The aggregate events (possibly empty)lastEvent (?EventDetail)
: The last event (possibly undefined
)
const { aggregate: myPikachu } = await pokemonsEventStore.getAggregate(
myPikachuId,
);
const { aggregate: pikachuBelowVersion5 } =
await pokemonsEventStore.getAggregate(myPikachuId, { maxVersion: 5 });
const { aggregate, events } = await pokemonsEventStore.getAggregate(
myPikachuId,
);
getExistingAggregate ((aggregateId: string, opt?: OptionsObj = {}) => Promise<ResponseObj>)
: Same as getAggregate
method, but ensures that the aggregate exists. Throws an AggregateNotFoundError
if no event is found for this aggregateId
.
import { AggregateNotFoundError } from '@castore/core';
expect(async () =>
pokemonsEventStore.getExistingAggregate(unexistingId),
).resolves.toThrow(
new AggregateNotFoundError({
eventStoreId: 'POKEMONS',
aggregateId: unexistingId,
}),
);
const { aggregate } = await pokemonsEventStore.getAggregate(aggregateId);
-
pushEvent ((eventDetail: EventDetail, opt?: OptionsObj = {}) => Promise<ResponseObj>)
: Pushes a new event to the event store. The timestamp
is optional (we keep it available as it can be useful in tests & migrations). If not provided, it is automatically set as new Date().toISOString()
. Throws an EventAlreadyExistsError
if an event already exists for the corresponding aggregateId
and version
(see section below on race conditions).
OptionsObj
contains the following properties:
prevAggregate (?Aggregate)
: The aggregate at the current version, i.e. before having pushed the event. Can be useful in some cases like when using the ConnectedEventStore
classforce (?boolean)
: To force push the event even if one already exists for the corresponding aggregateId
and version
. Any existing event will be overridden, so use with extra care, mainly in data migrations.
ResponseObj
contains the following properties:
event (EventDetail)
: The complete event (includes the timestamp
)nextAggregate (?Aggregate)
: The aggregate at the new version, i.e. after having pushed the event. Returned only if the event is an initial event, if the prevAggregate
option was provided, or when using a ConnectedEventStore
class connected to a state-carrying message bus or queue
const { event: completeEvent, nextAggregate } =
await pokemonsEventStore.pushEvent(
{
aggregateId: myPikachuId,
version: lastVersion + 1,
type: 'POKEMON_LEVELED_UP',
payload,
metadata,
},
{ prevAggregate },
);
-
listAggregateIds ((opt?: OptionsObj = {}) => Promise<ResponseObj>)
: Retrieves the list of aggregateId
of an event store, ordered by the timestamp
of their initial event. Returns an empty array if no aggregate is found.
OptionsObj
contains the following properties:
limit (?number)
: Maximum number of aggregate ids to retrieveinitialEventAfter (?string)
: To retrieve aggregate ids that appeared after a certain timestampinitialEventBefore (?string)
: To retrieve aggregate ids that appeared before a certain timestampreverse (?boolean)
: To retrieve the aggregate ids in reverse orderpageToken (?string)
: To retrieve a paginated result of aggregate ids
ResponseObj
contains the following properties:
aggregateIds (string[])
: The list of aggregate idsnextPageToken (?string)
: A token for the next page of aggregate ids if one exists. The nextPageToken carries the previously used options, so you do not have to provide them again (though you can still do it to override them).
const accAggregateIds: string = [];
const { aggregateIds: firstPage, nextPageToken } =
await pokemonsEventStore.listAggregateIds({ limit: 20 });
accAggregateIds.push(...firstPage);
if (nextPageToken) {
const { aggregateIds: secondPage } =
await pokemonsEventStore.listAggregateIds({
pageToken: nextPageToken,
});
accAggregateIds.push(...secondPage);
}
Type Helpers:
EventStoreId
: Returns the EventStore
id
import type { EventStoreId } from '@castore/core';
type PokemonsEventStoreId = EventStoreId<typeof pokemonsEventStore>;
EventStoreEventsTypes
: Returns the EventStore
list of events types
import type { EventStoreEventsTypes } from '@castore/core';
type PokemonEventTypes = EventStoreEventsTypes<typeof pokemonsEventStore>;
EventStoreEventsDetails
: Returns the union of all the EventStore
possible events details
import type { EventStoreEventsDetails } from '@castore/core';
type PokemonEventDetails = EventStoreEventsDetails<typeof pokemonsEventStore>;
EventStoreReducer
: Returns the EventStore
reducer
import type { EventStoreReducer } from '@castore/core';
type PokemonsReducer = EventStoreReducer<typeof pokemonsEventStore>;
EventStoreAggregate
: Returns the EventStore
aggregate
import type { EventStoreAggregate } from '@castore/core';
type SomeAggregate = EventStoreAggregate<typeof pokemonsEventStore>;
- EventStorageAdapter
For the moment, we didn't provide any actual way to store our events data. This is the responsibility of the EventStorageAdapter
class.
import { EventStore } from '@castore/core';
const pokemonsEventStore = new EventStore({
eventStoreId: 'POKEMONS',
eventTypes: pokemonEventTypes,
reduce: pokemonsReducer,
storageAdapter: mySuperStorageAdapter,
});
pokemonsEventStore.storageAdapter = mySuperStorageAdapter;
You can choose to build an event storage adapter that suits your usage. However, we highly recommend using an off-the-shelf adapter:
If the storage solution that you use is missing, feel free to create/upvote an issue, or contribute π€
- Command
Modifying the state of your application (i.e. pushing new events to your event stores) is done by executing commands. They typically consist in:
- Fetching the required aggregates (if not the initial event of a new aggregate)
- Validating that the modification is acceptable
- Pushing new events with incremented versions

import { Command, tuple } from '@castore/core';
type Input = { name: string; level: number };
type Output = { pokemonId: string };
type Context = { generateUuid: () => string };
const catchPokemonCommand = new Command({
commandId: 'CATCH_POKEMON',
requiredEventStores: tuple(pokemonsEventStore, trainersEventStore),
handler: async (
commandInput: Input,
[pokemonsEventStore, trainersEventStore],
{ generateUuid }: Context,
): Promise<Output> => {
const { name, level } = commandInput;
const pokemonId = generateUuid();
await pokemonsEventStore.pushEvent({
aggregateId: pokemonId,
version: 1,
type: 'POKEMON_CAUGHT',
payload: { name, level },
});
return { pokemonId };
},
});
Note that we only provided TS types for Input
and Output
properties. That is because, as stated in the core design, Castore is meant to be as flexible as possible, and that includes the validation library you want to use (if any): The Command
class can be used directly if no validation is required, or implemented by other classes which will add run-time validation methods to it π
See the following packages for examples:
π§ Technical description
Constructor:
-
commandId (string)
: A string identifying the command
-
handler ((input: Input, requiredEventsStores: EventStore[]) => Promise<Output>)
: The code to execute
-
requiredEventStores (EventStore[])
: A tuple of EventStores
that are required by the command for read/write purposes. In TS, you should use the tuple
util to preserve tuple ordering in the handler (tuple
doesn't mute its input, it simply returns them)
-
eventAlreadyExistsRetries (?number = 2)
: Number of handler execution retries before breaking out of the retry loop (See section below on race conditions)
-
onEventAlreadyExists (?(error: EventAlreadyExistsError, context: ContextObj) => Promise<void>)
: Optional callback to execute when an EventAlreadyExistsError
is raised.
The EventAlreadyExistsError
class contains the following properties:
eventStoreId (?string)
: The eventStoreId
of the aggregate on which the pushEvent
attempt failedaggregateId (string)
: The aggregateId
of the aggregateversion (number)
: The version
of the aggregate
The ContextObj
contains the following properties:
attemptNumber (?number)
: The number of handler execution attempts in the retry loopretriesLeft (?number)
: The number of retries left before breaking out of the retry loop
import { Command, tuple } from '@castore/core';
const doSomethingCommand = new Command({
commandId: 'DO_SOMETHING',
requiredEventStores: tuple(eventStore1, eventStore2),
handler: async (commandInput, [eventStore1, eventStore2]) => {
},
});
Properties:
commandId (string)
: The command id
const commandId = doSomethingCommand.commandId;
requiredEventStores (EventStore[])
: The required event stores
const requiredEventStores = doSomethingCommand.requiredEventStores;
handler ((input: Input, requiredEventsStores: EventStore[]) => Promise<Output>)
: Function to invoke the command
const output = await doSomethingCommand.handler(input, [
eventStore1,
eventStore2,
]);
A few notes on commands handlers:
-
Commands
handlers should NOT use read models when validating that a modification is acceptable. Read models are like cache: They are not the source of truth, and may not represent the freshest state.
-
Fetching and pushing events non-simultaneously exposes your application to race conditions. To counter that, commands are designed to be retried when an EventAlreadyExistsError
is triggered (which is part of the EventStorageAdapter
interface).

- Finally, Command handlers should be, as much as possible, pure functions. If it depends on impure functions like functions with unpredictable outputs (like id generation), mutating effects, side effects or state dependency (like external data fetching), you should pass them through the additional context arguments rather than directly importing and using them. This will make them easier to test and to re-use in different contexts, such as in the React Visualizer.
Event Groups
Some commands can have an effect on several event stores, or on several aggregates of the same event store. For instance, the CATCH_POKEMON
command could write both a CAUGHT_BY_TRAINER
event on a pokemon aggregate (changing its status
to 'caught'
) and a POKEMON_CAUGHT
event on a trainer aggregate (appending the pokemonId
to its pokedex
).

To not have your application in a corrupt state, it's important to make sure that all those events are pushed or none. In Castore, this can be done through the event groups API:
- You can use the
groupEvent
method to build an array of events that are to be pushed together. It has the same input interface as pushEvent
but synchronously returns a GroupedEvent
class. - The
EventStore
class exposes a static pushEventGroup
method that can be used to effectively push this event group.
await EventStore.pushEventGroup(
pokemonsEventStore.groupEvent({
aggregateId: 'pikachu1',
type: 'CAUGHT_BY_TRAINER',
payload: { trainerId: 'ashKetchum' },
...
}),
trainersEventStore.groupEvent({
aggregateId: 'ashKetchum',
type: 'POKEMON_CAUGHT',
payload: { pokemonId: 'pikachu1' },
...
}),
);
Like the pushEvent
API, event groups are designed to throw an EventAlreadyExistsError
if the transaction has failed, making sure that commands are retried as expected when race conditions arise.
βοΈ When pushing event groups on several event stores, they must use the same type of event storage adapters.
βοΈ Also, be aware of technical constraints of your event storage solution. For instance, the DynamoDBEventStorageAdapter
's implementation is based on DynamoDB transactions, which means that the event stores tables must be in the same region, and that a groups cannot contain more than 100 events.
πͺ Advanced usage
- Event-driven architecture
Event Sourcing integrates very well with event-driven architectures. In a traditional architecture, you would need to design your system events (or messages for clarity) separately from your data. With Event Sourcing, they can simply broadcast the business events you already designed.
In Castore, we distinguish three types of message:
- AggregateExists messages which only carry aggregate ids (mainly for maintenance purposes)
- Notification messages which also carry event details
- State-carrying messages which also carry their corresponding aggregates

In Castore, they are implemented by the AggregateExistsMessage
, NotificationMessage
and StateCarryingMessage
TS types:
import type {
AggregateExistsMessage,
EventStoreAggregateExistsMessage,
} from '@castore/core';
type PokemonAggregateExistsMessage = AggregateExistsMessage<'POKEMONS'>;
type PokemonAggregateExistsMessage = {
eventStoreId: 'POKEMONS';
aggregateId: string;
};
type PokemonAggregateExistsMessage = EventStoreAggregateExistsMessage<
typeof pokemonsEventStore
>;
import type {
NotificationMessage,
EventStoreNotificationMessage,
} from '@castore/core';
type PokemonEventNotificationMessage = NotificationMessage<
'POKEMONS',
PokemonEventDetails
>;
type PokemonEventNotificationMessage = {
eventStoreId: 'POKEMONS';
event: PokemonEventDetails;
};
type PokemonEventNotificationMessage = EventStoreNotificationMessage<
typeof pokemonsEventStore
>;
import type {
StateCarryingMessage,
EventStoreStateCarryingMessage,
} from '@castore/core';
type PokemonEventStateCarryingMessage = StateCarryingMessage<
'POKEMONS',
PokemonEventDetails,
PokemonAggregate
>;
type PokemonEventStateCarryingMessage = {
eventStoreId: 'POKEMONS';
event: PokemonEventDetails;
aggregate: PokemonAggregate
};
type PokemonEventStateCarryingMessage = EventStoreStateCarryingMessage<
typeof pokemonsEventStore
>;
All types of message can be published through message channels, i.e. Message Queues or Message Buses.
- MessageQueue
Message Queues store the published messages until they are handled by a worker. The worker is unique and predictible. It consumes all messages indifferently of their content.

You can use the AggregateExistsMessageQueue
, NotificationMessageQueue
or StateCarryingMessageQueue
classes to implement message queues:
import { NotificationMessageQueue } from '@castore/core';
const appMessageQueue = new NotificationMessageQueue({
messageQueueId: 'APP_MESSAGE_QUEUE',
sourceEventStores: [pokemonsEventStore, trainersEventStore],
});
await appMessageQueue.publishMessage({
eventStoreId: 'POKEMONS',
event: {
type: 'POKEMON_LEVELED_UP',
...
},
});
π§ Technical description
Constructor:
messageQueueId (string)
: A string identifying the message queuesourceEventStores (EventStore[])
: List of event stores that the message queue will broadcast events frommessageQueueAdapter (?MessageChannelAdapter)
: See section on MessageQueueAdapters
Properties:
messageChannelId (string)
const appMessageQueueId = appMessageQueue.messageChannelId;
sourceEventStores (EventStore[])
const appMessageQueueSourceEventStores = appMessageQueue.sourceEventStores;
const appMessageQueueAdapter = appMessageQueue.messageChannelAdapter;
βοΈ The messageChannelAdapter
is not read-only so you do not have to provide it right away.
Async Methods:
The following methods interact with the messaging solution of your application through a MessageQueueAdapter
. They will throw an UndefinedMessageChannelAdapterError
if you did not provide one.
-
publishMessage ((message: Message, opt?: OptionsObj = {}) => Promise<void>)
: Publish a Message
(of the appropriate type) to the message queue.
OptionsObj
contains the following properties:
replay (?boolean)
: Signals that the event is not happening in real-time, e.g. in maintenance or migration operations. This information can be used downstream to react appropriately (e.g. prevent sending notification emails). Check the implementation of you adapter for more details.
-
publishMessages (messages: Message[], opt?: OptionsObj) => Promise<void>)
: Publish several Messages
(of the appropriate type) to the message queue. Options are similar to the publishMessage
options.
-
getAggregateAndPublishMessage ((message: NotificationMessage) => Promise<void>)
: (StateCarryingMessageQueues only) Append the matching aggregate (with correct version) to a NotificationMessage
and turn it into a StateCarryingMessage
before publishing it to the message queue. Uses the message queue event stores: Make sure that they have correct adapters set up.
Type Helpers:
MessageChannelMessage
: Given a MessageQueue
, returns the TS type of its messages
import type { MessageChannelMessage } from '@castore/core';
type AppMessage = MessageChannelMessage<typeof appMessageQueue>;
type AppMessage = EventStoreNotificationMessage<
typeof pokemonsEventStore | typeof trainersEventStore...
>;
- MessageQueueAdapter
Similarly to event stores, MessageQueue
classes provide a boilerplate-free and type-safe interface to publish messages, but are NOT responsible for actually doing so. This is the responsibility of the MessageQueueAdapter
, that will connect it to your actual messaging solution:
import { EventStore } from '@castore/core';
const messageQueue = new NotificationMessageQueue({
...
messageQueueAdapter: mySuperMessageQueueAdapter,
});
messageQueue.messageChannelAdapter = mySuperMessageQueueAdapter;
You can code your own MessageQueueAdapter
(simply implement the MessageChannelAdapter
interface), but we highly recommend using an off-the-shelf adapter:
If the messaging solution that you use is missing, feel free to create/upvote an issue, or contribute π€
The adapter packages will also expose useful generics to type the arguments of your queue worker. For instance:
import type {
SQSMessageQueueMessage,
SQSMessageQueueMessageBody,
} from '@castore/sqs-message-queue-adapter';
const appMessagesWorker = async ({ Records }: SQSMessageQueueMessage) => {
Records.forEach(({ body }) => {
const recordBody: SQSMessageQueueMessageBody<typeof appMessageQueue> =
JSON.parse(body);
});
};
- MessageBus
Message Buses are used to spread messages to multiple listeners. Contrary to message queues, they do not store the message or wait for the listeners to respond. Often, filter patterns can also be used to trigger listeners or not based on the message content.

You can use the AggregateExistsMessageBus
, NotificationMessageBus
or StateCarryingMessageBus
classes to implement message buses:
import { NotificationMessageBus } from '@castore/core';
const appMessageBus = new NotificationMessageBus({
messageBusId: 'APP_MESSAGE_BUSES',
sourceEventStores: [pokemonsEventStore, trainersEventStore...],
});
await appMessageBus.publishMessage({
eventStoreId: 'POKEMONS',
event: {
type: 'POKEMON_LEVELED_UP',
...
}
})
π§ Technical description
Constructor:
messageBusId (string)
: A string identifying the message bussourceEventStores (EventStore[])
: List of event stores that the message bus will broadcast events frommessageBusAdapter (?MessageChannelAdapter)
: See section on MessageBusAdapters
Properties:
const appMessageBusId = appMessageBus.messageBusId;
sourceEventStores (EventStore[])
const appMessageBusSourceEventStores = appMessageBus.sourceEventStores;
const appMessageBusAdapter = appMessageBus.messageChannelAdapter;
βοΈ The messageChannelAdapter
is not read-only so you do not have to provide it right away.
Async Methods:
The following methods interact with the messaging solution of your application through a MessageBusAdapter
. They will throw an UndefinedMessageChannelAdapterError
if you did not provide one.
-
publishMessage ((message: Message, opt?: OptionsObj = {}) => Promise<void>)
: Publish a Message
(of the appropriate type) to the message bus.
OptionsObj
contains the following properties:
replay (?boolean)
: Signals that the event is not happening in real-time, e.g. in maintenance or migration operations. This information can be used downstream to react appropriately (e.g. prevent sending notification emails). Check the implementation of you adapter for more details.
-
publishMessages (messages: Message[], opt?: OptionsObj) => Promise<void>)
: Publish several Messages
(of the appropriate type) to the message bus. Options are similar to the publishMessage
options.
-
getAggregateAndPublishMessage ((message: NotificationMessage) => Promise<void>)
: (StateCarryingMessageBuses only) Append the matching aggregate (with correct version) to a NotificationMessage
and turn it into a StateCarryingMessage
before publishing it to the message bus. Uses the message bus event stores: Make sure that they have correct adapters set up.
Type Helpers:
MessageChannelMessage
: Given a MessageBus
, returns the TS type of its messages
import type { MessageChannelMessage } from '@castore/core';
type AppMessage = MessageChannelMessage<typeof appMessageBus>;
type AppMessage = EventStoreNotificationMessage<
typeof pokemonsEventStore | typeof trainersEventStore...
>;
- MessageBusAdapter
Similarly to event stores, MessageBus
classes provide a boilerplate-free and type-safe interface to publish messages, but are NOT responsible for actually doing so. This is the responsibility of the MessageBusAdapter
, that will connect it to your actual messaging solution:
import { EventStore } from '@castore/core';
const messageBus = new NotificationMessageBus({
...
messageBusAdapter: mySuperMessageBusAdapter,
});
messageBus.messageChannelAdapter = mySuperMessageBusAdapter;
You can code your own MessageBusAdapter
(simply implement the MessageChannelAdapter
interface), but we highly recommend using an off-the-shelf adapter:
If the messaging solution that you use is missing, feel free to create/upvote an issue, or contribute π€
The adapter packages will also expose useful generics to type the arguments of your bus listeners. For instance:
import type { EventBridgeMessageBusMessage } from '@castore/event-bridge-message-bus-adapter';
const pokemonMessagesListener = async (
eventBridgeMessage: EventBridgeMessageBusMessage<
typeof appMessageQueue,
'POKEMONS'
>,
) => {
const message = eventBridgeMessage.detail;
};
- ConnectedEventStore
If your storage solution exposes data streaming capabilities (such as DynamoDB streams), you can leverage them to push your freshly written events to a message bus or queue.
You can also use the ConnectedEventStore
class. Its interface matches the EventStore
one, but successfully pushing a new event will automatically forward it to a message queue/bus, and pushing a event group will also automatically forward the events to their respective message queues/buses:
import { ConnectedEventStore } from '@castore/core';
const connectedPokemonsEventStore = new ConnectedEventStore(
pokemonsEventStore,
appMessageBus,
);
await connectedPokemonsEventStore.pushEvent({
aggregateId: pokemonId,
version: 2,
type: 'POKEMON_LEVELED_UP',
...
});
Note that setting a connected event store storageAdapter
and onEventPushed
properties will override those of the original event store instead.
If the message bus or queue is a state-carrying one, the pushEvent
method will re-fetch the aggregate to append it to the message before publishing it. You can reduce this overhead by providing the previous aggregate as an option:
await connectedPokemonsEventStore.pushEvent(
{
aggregateId: pokemonId,
version: 2,
...
},
{ prevAggregate: pokemonAggregate },
);

Compared to data streams, connected event stores have the advantage of simplicity, performances and costs. However, they strongly decouple your storage and messaging solutions: Make sure to anticipate any issue that might arise (consistency, non-caught errors etc.).
π§ Technical description
Constructor:
eventStore (EventStore)
: The event store to connectmessageChannel (MessageBus | MessageQueue)
: A message bus or queue to forward events to
Properties:
A ConnectedEventStore
will implement the interface of its original EventStore
, and extend it with two additional properties:
eventStore (EventStore)
: The original event store
const eventStore = connectedPokemonsEventStore.eventStore;
messageChannel (MessageBus | MessageQueue)
: The provided message bus or queue
const messageChannel = connectedPokemonsEventStore.messageChannel;
Note that the storageAdapter
property will act as a pointer toward the original event store storageAdapter
:
originalEventStore.storageAdapter = myStorageAdapter;
connectedEventStore.storageAdapter;
connectedEventStore.storageAdapter = anotherStorageAdapter;
originalEventStore.storageAdapter;
- Snapshotting
As events pile up in your event stores, the performances and costs of your commands can become an issue.
One solution is to periodially persist snapshots of your aggregates (e.g. through a message bus listener), and only fetch them plus the subsequent events instead of all the events.
Snapshots are not implemented in Castore yet, but we have big plans for them, so stay tuned π
- Read Models
Even with snapshots, using the event store for querying needs (like displaying data in a web page) would be slow and inefficient, if not impossible depending on the access pattern.
In Event Sourcing, it is common to use a special type of message bus listener called projections, responsible for maintaining data specifically designed for querying needs, called read models.
Read models allow for faster read operations, as well as re-indexing. Keep in mind that they are eventually consistent by design, which can be annoying in some use cases (like opening a resource page directly after its creation).
Read models are not implemented in Castore yet, but we have big plans for them, so stay tuned π
π Resources
- Test Tools
Castore comes with a handy Test Tool package that facilitates the writing of unit tests: It allows mocking event stores, populating them with an initial state and resetting them to it in a boilerplate-free and type-safe way.
- Dam
Dam is a suite of utils that facilitates data migration and maintenance operations with Castore (for instance, dispatching all the events of an event store - ordered by their timestamps - to a message queue).
- React Visualizer
Castore also comes with a handy React Visualizer library: It exposes a React component to visualize, design and manually test Castore event stores and commands.
- Packages List
Event Types
Event Storage Adapters
Commands
Message Queue Adapters
Message Buses Adapters
- Common Patterns
- Simulating a future/past aggregate state: ...coming soon
- Snapshotting: ...coming soon
- Projecting on read models: ...coming soon
- Replaying events: ...coming soon
- Migrating events: ...coming soon