
Security News
vlt Launches "reproduce": A New Tool Challenging the Limits of Package Provenance
vlt's new "reproduce" tool verifies npm packages against their source code, outperforming traditional provenance adoption in the JavaScript ecosystem.
nestjs-geteventstore
Advanced tools
Event store driven NestJS and CQRS
example is from official Nest JS example
docker run -p 22113:2113 -p 11113:1113 -d --name eventstore eventstore/eventstore --dev --enable-external-tcp --disable-external-tcp-tls --ext-ip=0.0.0.0 --int-ip=0.0.0.0
yarn
cd examples
yarn
yarn start
@Module({
imports: [
EventStoreModule.registerAsync(
{
credentials: {
username: process.env.EVENTSTORE_CREDENTIALS_USERNAME || 'admin',
password: process.env.EVENTSTORE_CREDENTIALS_PASSWORD || 'changeit',
},
/*
To connect to a single host use tcp object and specify host and port.
To connect to a cluster via dns discovery use clusterDns eg. "discover://my.host:2113".
*/
tcp: {
host: process.env.EVENTSTORE_TCP_HOST || 'localhost',
port: +process.env.EVENTSTORE_TCP_PORT || 1113,
},
clusterDns: "discover://my.host:2113",
http: {
host: process.env.EVENTSTORE_HTTP_HOST || 'http://localhost',
port: +process.env.EVENTSTORE_HTTP_PORT || 2113,
},
tcpConnectionName: 'connection-hero-event-handler-and-saga',
onTcpConnected: () => {
},
onTcpDisconnected: () => {
},
},
),
],
controllers: [],
providers: [],
})
export class AppModule {}
With this syntax all the output of your services are sent to eventstore
By default only last event will be sent back to http.
@UseInterceptors(EventStoreInterceptor)
@Controller()
export class MyController {
@Post('/test')
postMyRoute(
@Body() body: MyDTO,
): Observable {
return this.myService.doThisAction(body);
}
}
Stream target must be defined and implement method getStream()
export class HeroKilledDragonEvent implements IAggregateEvent {
constructor(
public readonly data: {
heroId: string,
dragonId: string
}) {
}
getStream() {
return `hero-${this.data.heroId}`;
}
}
You can also extends EventStoreEvent
to get all options
export class HeroKilledDragonEvent extends EventStoreEvent {
constructor(
public readonly data: {
heroId: string,
dragonId: string
}, options?) {
super(data, options);
}
getStream() {
return `hero-${this.data.heroId}`;
}
}
Basic one
export class HeroKilledDragonEvent implements IEvent{
constructor(
public readonly data: {
heroId: string,
dragonId: string
}) {
}
}
Basic one with options (event id, ...)
export class HeroKilledDragonEvent extends EventStoreEvent {
constructor(
public readonly data: {
heroId: string,
dragonId: string
}, options?) {
super(data, options);
}
}
export class Hero
// Change from base cqrs
extends EventStoreAggregateRoot {
constructor(private id) {
super();
// Where your events are gonna be stored by default
this.streamConfig = {
streamName: `hero-${id}`
} as IStreamConfig;
}
}
Here you should extends EventStoreAggregateRoot from nestjs-geteventstore lib, not the @nestjs/cqrs one!
@CommandHandler(DropAncientItemCommand)
export class DropAncientItemHandler
implements ICommandHandler<DropAncientItemCommand> {
constructor(
private readonly repository: HeroRepository,
// Only change from base CQRS
private readonly publisher: EventStorePublisher,
) {}
async execute(command: DropAncientItemCommand) {
console.log(clc.yellowBright('Async DropAncientItemCommand...'));
const { heroId, itemId } = command;
const hero = this.publisher.mergeObjectContext(
await this.repository.findOneById(+heroId),
);
hero.addItem(itemId);
hero.dropItem(itemId);
await hero.commit();
}
}
Eventstore keep in memory a few million id and deduplicate on this
means a reboot you don't have idempotency
Add a custom eventId in your event ``
Guaranty idempotency even after restart
Guaranty events order
Bonus in code define in eventStore the retention rules and stream access rules
@CommandHandler(KillDragonCommand)
export class KillDragonHandler implements ICommandHandler<KillDragonCommand> {
constructor(
private readonly repository: HeroRepository,
// Needed
private readonly publisher: EventStorePublisher,
) {
}
async execute(command: KillDragonCommand) {
const { heroId, dragonId } = command;
const hero = this.publisher.mergeObjectContext(
await this.repository.findOneById(+heroId),
);
// Use custom stream only for this process
await hero.setStreamConfig({
// all next events will have this stream
streamName: `hero_fight-${heroId}`,
// Error if the stream is not new when writing
// You can set your custom order by using this attribute in event
expectedVersion: ExpectedVersion.NoStream,
// Set retention rules for this new stream
metadata: {
// stream is deleted (needs scavenge to be run)
$maxAge: 2 * DAY,
// store only the last x events in the stream
$maxCount: 5,
},
});
hero.damageEnemy(dragonId, 2);
hero.damageEnemy(dragonId, -8);
//Write and dispatch events
await hero.commit();
// Change stream for next events
await hero.setStreamConfig({
streamName: `hero-${heroId}`,
// It must be a new stream
expectedVersion: ExpectedVersion.NoStream,
});
hero.killEnemy(dragonId);
await hero.commit();
}
}
Identical to default implementation
@Saga()
dragonKilled = (events$: Observable<any>): Observable<ICommand> => {
return events$
.pipe(
filter(ev => ev instanceof HeroKilledDragonEvent),
delay(400),
map(event => {
console.log(clc.redBright('Inside [HeroesGameSagas] Saga after a little sleep'));
return new DropAncientItemCommand(event.data.heroId, itemId);
}),
);
}
Identical with nest cqrs if your want.
You win ack()
and nack()
if your event extends AcknowledgeableEventStoreEvent
(only for persistent subscriptions)
Nack strategies are available
Acknowledgeable
export class HeroKilledDragonEvent
extends AcknowledgeableEventStoreEvent {
constructor(
public readonly data: {
heroId: string,
dragonId: string
}, options?) {
super(data, options);
}
}
@EventsHandler(HeroKilledDragonEvent)
export class HeroKilledDragonHandler
implements IEventHandler<HeroKilledDragonEvent> {
async handle(event: HeroKilledDragonEvent) {
console.log(clc.greenBright('HeroKilledDragonEventHandler...'));
await event.ack();
}
}
Sends eventstore events to saga and event handler
Configured from your module config, you can manage multiple tcp subscriptions or catchup in parrallel in the same bus
Persistent :
Catchup :
@Module({
imports: [
TerminusModule,
EventStoreCqrsModule.registerAsync(
{
credentials: {
username: process.env.EVENTSTORE_CREDENTIALS_USERNAME || 'admin',
password: process.env.EVENTSTORE_CREDENTIALS_PASSWORD || 'changeit',
},
tcp: {
host: process.env.EVENTSTORE_TCP_HOST || 'localhost',
port: +process.env.EVENTSTORE_TCP_PORT || 11113,
},
http: {
host: process.env.EVENTSTORE_HTTP_HOST || 'http://localhost',
port: +process.env.EVENTSTORE_HTTP_PORT || 22113,
},
tcpConnectionName: 'connection-hero-event-handler-and-saga',
onTcpConnected: () => {
},
onTcpDisconnected: () => {
},
},
{
// Where you map event store incoming event to your format
eventMapper: (data, options: IEventStoreEventOptions) => {
let className = `${options.eventType}`;
Logger.debug(
`Build ${className} received from stream ${options.eventStreamId} with id ${options.eventId}`,
);
if (!heroesEvents[className]) {
return false;
}
return new heroesEvents[className](data, options);
},
subscriptions: {
persistent: [
{
// Event stream category (before the -)
stream: '$ce-hero',
group: 'data',
autoAck: false,
bufferSize: 1,
// Subscription is created with this options
options: {
resolveLinkTos: true,
minCheckPointCount: 1,
},
onSubscriptionStart: (subscription) => {
},
onSubscriptionDropped: (subscription) => {
},
},
],
},
},
),
],
controllers: [],
providers: [],
})
export class AppModule {
}
With a projection you can route events to emit new events to another stream. you can also send linkTo to do symlink like
https://eventstore.com/docs/getting-started/projections/index.html
https://eventstore.com/docs/projections/user-defined-projections/index.html
A projection example:
fromCategory('hero')
// One state per id (hero-541)
.foreachStream()
.when({
// Set default state when start
$init: function() {
return {
count: 0,
};
},
// When event is received
ItemAddedEvent: function(s, e) {
s.count += 1;
},
});
You can code your eventstore projection's in javascript in your project, and include them in your module:
EventStoreCqrsModule.registerAsync(
{
useFactory: async (config: ConfigService): Promise<any> =>
config.get('eventstore'),
inject: [ConfigService],
},
{
projections: [
{
name: 'first',
file: '../projections/first.projection.js',
enabled: true,
emitEnabled: true,
mode: 'continuous',
},
],
},
);
This way it asserts your projection exist and run during your application booting process.
Give status send 503 on your HealthController
@Controller('health')
export class HealthController {
constructor(
private health: HealthCheckService,
private eventStoreHealthIndicator: EventStoreHealthIndicator,
private eventStoreBusHealthIndicator: EventStoreSubscriptionHealthIndicator,
) {
}
@Get()
@HealthCheck()
healthCheck() {
return this.health.check([
async () => this.eventStoreHealthIndicator.check(),
async () => this.eventStoreBusHealthIndicator.check(),
]);
}
}
FAQs
Event Store connector for NestJS-Cqrs
The npm package nestjs-geteventstore receives a total of 60 weekly downloads. As such, nestjs-geteventstore popularity was classified as not popular.
We found that nestjs-geteventstore demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 3 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
vlt's new "reproduce" tool verifies npm packages against their source code, outperforming traditional provenance adoption in the JavaScript ecosystem.
Research
Security News
Socket researchers uncovered a malicious PyPI package exploiting Deezer’s API to enable coordinated music piracy through API abuse and C2 server control.
Research
The Socket Research Team discovered a malicious npm package, '@ton-wallet/create', stealing cryptocurrency wallet keys from developers and users in the TON ecosystem.