nestjs-eventstore
Event store driven NestJS and CQRS
example is from official Nest JS example
docker run -p 22113:2113 -p 11113:1113 -d --name eventstore eventstore/eventstore --dev --enable-external-tcp --disable-external-tcp-tls --ext-ip=0.0.0.0 --int-ip=0.0.0.0
yarn
cd examples
yarn
yarn start
Config
@Module({
imports: [
EventStoreModule.registerAsync(
{
credentials: {
username: process.env.EVENTSTORE_CREDENTIALS_USERNAME || 'admin',
password: process.env.EVENTSTORE_CREDENTIALS_PASSWORD || 'changeit',
},
tcp: {
host: process.env.EVENTSTORE_TCP_HOST || 'localhost',
port: +process.env.EVENTSTORE_TCP_PORT || 1113,
},
http: {
host: process.env.EVENTSTORE_HTTP_HOST || 'http://localhost',
port: +process.env.EVENTSTORE_HTTP_PORT || 2113,
},
tcpConnectionName: 'connection-hero-event-handler-and-saga',
onTcpConnected: () => {
},
onTcpDisconnected: () => {
},
},
),
],
controllers: [],
providers: [],
})
export class AppModule {}
Controller Interceptor
With this syntax all the output of your services are sent to eventstore
By default only last event will be sent back to http.
@UseInterceptors(EventStoreInterceptor)
@Controller()
export class MyController {
@Post('/test')
postMyRoute(
@Body() body: MyDTO,
): Observable {
return this.myService.doThisAction(body);
}
}
Stream target must be defined and implement method getStream()
export class HeroKilledDragonEvent implements IAggregateEvent {
constructor(
public readonly data: {
heroId: string,
dragonId: string
}) {
}
getStream() {
return `hero-${this.data.heroId}`;
}
}
You can also extends EventStoreEvent
to get all options
export class HeroKilledDragonEvent extends EventStoreEvent {
constructor(
public readonly data: {
heroId: string,
dragonId: string
}, options?) {
super(data, options);
}
getStream() {
return `hero-${this.data.heroId}`;
}
}
CQRS
Events
Basic one
export class HeroKilledDragonEvent implements IEvent{
constructor(
public readonly data: {
heroId: string,
dragonId: string
}) {
}
}
Basic one with options (event id, ...)
export class HeroKilledDragonEvent extends EventStoreEvent {
constructor(
public readonly data: {
heroId: string,
dragonId: string
}, options?) {
super(data, options);
}
}
Aggregate root
export class Hero
extends EventStoreAggregateRoot {
constructor(private id) {
super();
this.streamConfig = {
streamName: `hero-${id}`
} as IStreamConfig;
}
}
Here you should extends EventStoreAggregateRoot from nestjs-geteventstore lib, not the @nestjs/cqrs one!
Command handling
@CommandHandler(DropAncientItemCommand)
export class DropAncientItemHandler
implements ICommandHandler<DropAncientItemCommand> {
constructor(
private readonly repository: HeroRepository,
private readonly publisher: EventStorePublisher,
) {}
async execute(command: DropAncientItemCommand) {
console.log(clc.yellowBright('Async DropAncientItemCommand...'));
const { heroId, itemId } = command;
const hero = this.publisher.mergeObjectContext(
await this.repository.findOneById(+heroId),
);
hero.addItem(itemId);
hero.dropItem(itemId);
await hero.commit();
}
}
Idempotency
Using event id
Eventstore keep in memory a few million id and deduplicate on this
means a reboot you don't have idempotency
Add a custom eventId in your event ``
Using expectedVersion
Guaranty idempotency even after restart
Guaranty events order
Bonus in code define in eventStore the retention rules and stream access rules
@CommandHandler(KillDragonCommand)
export class KillDragonHandler implements ICommandHandler<KillDragonCommand> {
constructor(
private readonly repository: HeroRepository,
private readonly publisher: EventStorePublisher,
) {
}
async execute(command: KillDragonCommand) {
const { heroId, dragonId } = command;
const hero = this.publisher.mergeObjectContext(
await this.repository.findOneById(+heroId),
);
await hero.setStreamConfig({
streamName: `hero_fight-${heroId}`,
expectedVersion: ExpectedVersion.NoStream,
metadata: {
$maxAge: 2 * DAY,
$maxCount: 5,
},
});
hero.damageEnemy(dragonId, 2);
hero.damageEnemy(dragonId, -8);
await hero.commit();
await hero.setStreamConfig({
streamName: `hero-${heroId}`,
expectedVersion: ExpectedVersion.NoStream,
});
hero.killEnemy(dragonId);
await hero.commit();
}
}
Saga
Identical to default implementation
@Saga()
dragonKilled = (events$: Observable<any>): Observable<ICommand> => {
return events$
.pipe(
filter(ev => ev instanceof HeroKilledDragonEvent),
delay(400),
map(event => {
console.log(clc.redBright('Inside [HeroesGameSagas] Saga after a little sleep'));
return new DropAncientItemCommand(event.data.heroId, itemId);
}),
);
}
EventHandler
Identical with nest cqrs if your want.
You win ack()
and nack()
if your event extends AcknowledgeableEventStoreEvent
(only for persistent subscriptions)
Nack strategies are available
Acknowledgeable
export class HeroKilledDragonEvent
extends AcknowledgeableEventStoreEvent {
constructor(
public readonly data: {
heroId: string,
dragonId: string
}, options?) {
super(data, options);
}
}
@EventsHandler(HeroKilledDragonEvent)
export class HeroKilledDragonHandler
implements IEventHandler<HeroKilledDragonEvent> {
async handle(event: HeroKilledDragonEvent) {
console.log(clc.greenBright('HeroKilledDragonEventHandler...'));
await event.ack();
}
}
Subscription
Sends eventstore events to saga and event handler
Configured from your module config, you can manage multiple tcp subscriptions or catchup in parrallel
in the same bus
Persistent :
- realtime
- you can ack, nack and
- you have a pointer in your event stack.
- dedicated interface in eventstore is also available
Catchup :
- to start you must tell where you are in the event stack
- continue to wait for realtime
@Module({
imports: [
TerminusModule,
EventStoreCqrsModule.registerAsync(
{
credentials: {
username: process.env.EVENTSTORE_CREDENTIALS_USERNAME || 'admin',
password: process.env.EVENTSTORE_CREDENTIALS_PASSWORD || 'changeit',
},
tcp: {
host: process.env.EVENTSTORE_TCP_HOST || 'localhost',
port: +process.env.EVENTSTORE_TCP_PORT || 11113,
},
http: {
host: process.env.EVENTSTORE_HTTP_HOST || 'http://localhost',
port: +process.env.EVENTSTORE_HTTP_PORT || 22113,
},
tcpConnectionName: 'connection-hero-event-handler-and-saga',
onTcpConnected: () => {
},
onTcpDisconnected: () => {
},
},
{
eventMapper: (data, options: IEventStoreEventOptions) => {
let className = `${options.eventType}`;
Logger.debug(
`Build ${className} received from stream ${options.eventStreamId} with id ${options.eventId}`,
);
if (!heroesEvents[className]) {
return false;
}
return new heroesEvents[className](data, options);
},
subscriptions: {
persistent: [
{
stream: '$ce-hero',
group: 'data',
autoAck: false,
bufferSize: 1,
options: {
resolveLinktos: true,
minCheckPointCount: 1,
},
onSubscriptionStart: (subscription) => {
},
onSubscriptionDropped: (subscription) => {
},
},
],
},
},
),
],
controllers: [],
providers: [],
})
export class AppModule {
}
Projections
You can code your eventstore projection's in javascript in the project
Using projection config and the js file
it asserts that projection exist and run during your app's boot.
With a projection you can route event to emit new events to another stream.
you can also send linkTo to do symlink like
https://eventstore.com/docs/getting-started/projections/index.html
https://eventstore.com/docs/projections/user-defined-projections/index.html
fromCategory('hero')
.foreachStream()
.when({
$init: function(){
return {
count: 0
}
},
ItemAddedEvent: function(s,e){
s.count += 1;
}
})
Terminus health
Give status send 503 on your HealthController
@Controller('health')
export class HealthController {
constructor(
private health: HealthCheckService,
private eventStoreHealthIndicator: EventStoreHealthIndicator,
private eventStoreBusHealthIndicator: EventStoreSubscriptionHealthIndicator,
) {
}
@Get()
@HealthCheck()
healthCheck() {
return this.health.check([
async () => this.eventStoreHealthIndicator.check(),
async () => this.eventStoreBusHealthIndicator.check(),
]);
}
}