
Security News
Attackers Are Hunting High-Impact Node.js Maintainers in a Coordinated Social Engineering Campaign
Multiple high-impact npm maintainers confirm they have been targeted in the same social engineering campaign that compromised Axios.
@avanzu/eventstore
Advanced tools
This is a heavily modified version of the original node-eventstore by Adriano Raiano.
The project goal is to provide an eventstore implementation for node.js:
npm install @avanzu/eventstore
var eventstore = require('@avanzu/eventstore')
var es = eventstore()
By default the eventstore will use an inmemory Storage.
For logging and debugging you can use debug by TJ Holowaychuk
simply run your process with
DEBUG=@avanzu/eventstore/* node app.js
example with mongodb:
var es = require('@avanzu/eventstore')({
type: 'mongodb',
host: 'localhost', // optional
port: 27017, // optional
dbName: 'eventstore', // optional
eventsCollectionName: 'events', // optional
snapshotsCollectionName: 'snapshots', // optional
transactionsCollectionName: 'transactions', // optional
timeout: 10000, // optional
// emitStoreEvents: true // optional, by default no store events are emitted
// maxSnapshotsCount: 3 // optional, defaultly will keep all snapshots
// authSource: 'authedicationDatabase' // optional
// username: 'technicalDbUser' // optional
// password: 'secret' // optional
// url: 'mongodb://user:pass@host:port/db?opts // optional
// positionsCollectionName: 'positions' // optional, defaultly wont keep position
})
es.on('connect', function () {
console.log('storage connected')
})
es.on('disconnect', function () {
console.log('connection to storage is gone')
})
Define which values should be mapped/copied to the payload event.
es.defineEventMappings({
id: 'id',
commitId: 'commitId',
commitSequence: 'commitSequence',
commitStamp: 'commitStamp',
streamRevision: 'streamRevision',
})
await es.init()
const { events } = await es.getEventStream({ query: 'streamId' })
or
const { events } = await es.getEventStream({
query: {
aggregateId: 'myAggregateId',
aggregate: 'person', // optional
context: 'hr', // optional
},
})
'streamId' and 'aggregateId' are the same... In ddd terms aggregate and context are just to be more precise in language. For example you can have a 'person' aggregate in the context 'human ressources' and a 'person' aggregate in the context of 'business contracts'... So you can have 2 complete different aggregate instances of 2 complete different aggregates (but perhaps with same name) in 2 complete different contexts
you can request an eventstream even by limit the query with a 'minimum revision number' and a 'maximum revision number'
const { events } = await es.getEventStream({
query:
'streamId' ||
{
/* query */
},
revMin: 5,
revMax: 8,
})
store a new event and commit it to store
const stream = await es.getEventStream({ query: 'streamId' })
stream.addEvent({ my: 'event' })
stream.addEvents([{ my: 'event2' }])
await stream.commit()
console.log(stream.eventsToDispatch)
if you defined an event publisher function the committed event will be dispatched to the provided publisher
if you just want to load the last event as stream you can call getLastEventAsStream instead of ´getEventStream´.
get snapshot and eventhistory from the snapshot point
const [snapshot, stream] = await es.getFromSnapshot({ query: 'streamId' })
or
const [snapshot, stream] = await es.getFromSnapshot({
query: {
aggregateId: 'myAggregateId',
aggregate: 'person', // optional
context: 'hr', // optional
},
})
you can request a snapshot and an eventstream even by limit the query with a 'maximum revision number'
const [snapshot, stream] = es.getFromSnapshot({
query:
'streamId' ||
{
/* query */
},
revMax: 8, // if you omit revMax or you define it as -1 it will retrieve until the end
})
create a snapshot point
const [snapshot, stream] = await es.getFromSnapshot('streamId')
const snap = snapshot.data
const history = stream.events
// create a new snapshot depending on your rules
if (history.length > myLimit) {
await es.createSnapshot({
streamId: 'streamId',
data: myAggregate.getSnap(),
revision: stream.lastRevision,
version: 1 // optional
});
// or
await es.createSnapshot({
aggregateId: 'myAggregateId',
aggregate: 'person', // optional
context: 'hr' // optional
data: myAggregate.getSnap(),
revision: stream.lastRevision,
version: 1 // optional
});
}
// go on: store new event and commit it
// stream.addEvents...
You can automatically clean older snapshots by configuring the number of snapshots to keep with maxSnapshotsCount in eventstore options.
const evts = await es.getUndispatchedEvents()
currently supported by:
You can delete an aggregate including the event history, snapshots and transactions by calling deleteStream.
const deletedStream = await es.deleteStream('myStreamId')
The return value is the EventStream that has just been deleted.
This stream will contain an undispatched TombstoneEvent ready to be processed.
The payload attribute of that event contains the complete event history.
const [tombstoneEvent] = deletedStream.eventsToDispatch
for replaying your events or for rebuilding a viewmodel or just for fun...
skip, limit always optional
var skip = 0,
limit = 100 // if you omit limit or you define it as -1 it will retrieve until the end
const events = await es.getEvents({ skip, limit })
// or
const events = await es.getEvents({ query: 'streamId', skip, limit })
// or
const events = await es.getEvents({
query: {
// free choice (all, only context, only aggregate, only aggregateId...)
context: 'hr',
aggregate: 'person',
aggregateId: 'uuid',
},
skip,
limit,
})
by revision
revMin, revMax always optional
const events = await es.getEventsByRevision({
query: 'streamId',
revMin: 5,
revMax: 8, // if you omit revMax or you define it as -1 it will retrieve until the end
})
// or
const events = await es.getEventsByRevision({
query: {
aggregateId: 'myAggregateId',
aggregate: 'person', // optional
context: 'hr', // optional
},
revMin: 5,
revMax: 8, // if you omit revMax or you define it as -1 it will retrieve until the end
})
by commitStamp
skip, limit always optional
const events = await es.getEventsSince({
commitStamp: new Date(2015, 5, 23),
skip: 10,
limit: 100, // if you omit limit or you define it as -1 it will retrieve until the end
})
// or
const events = await es.getEventsSince({
commitStamp: new Date(2015, 5, 23),
limit: 50,
})
// or
const events = await es.getEventsSince({
commitStamp: new Date(2015, 5, 23),
})
Some databases support streaming your events, the api is similar to the query one
skip, limit always optional
var skip = 0,
limit = 100 // if you omit limit or you define it as -1 it will retrieve until the end
var stream = es.streamEvents({ skip, limit })
// or
var stream = es.streamEvents({ query: 'streamId', skip, limit })
// or by commitstamp
var stream = es.streamEventsSince({ commitStamp: new Date(2015, 5, 23), skip, limit })
// or by revision
var stream = es.streamEventsByRevision({
query: {
aggregateId: 'myAggregateId',
aggregate: 'person',
context: 'hr',
},
})
stream.on('data', function (e) {
doSomethingWithEvent(e)
})
stream.on('end', function () {
console.log('no more evets')
})
// or even better
stream.pipe(myWritableStream)
currently supported by:
for example to obtain the last revision nr
const event = await es.getLastEvent('streamId')
// or
const event = await es.getLastEvent({
// free choice (all, only context, only aggregate, only aggregateId...)
context: 'hr',
aggregate: 'person',
aggregateId: 'uuid',
})
const id = await es.getNewId()
some db implementations support writing the position of the event in the whole store additional to the streamRevision.
currently those implementations support this:
positionsCollectionName option)Inserting multiple events (documents) in mongodb, is not atomic.
For the eventstore tries to repair itself when calling getEventsByRevision.
But if you want you can trigger this from outside:
const [firstTransaction] = await es.store.getPendingTransactions()
const lastEvent = await es.store.getLastEvent({
aggregateId: firstTransaction.aggregateId,
aggregate: firstTransaction.aggregate, // optional
context: firstTransaction.context, // optional
})
await es.store.repairFailedTransaction(lastEvent)
Starting from version 2.0.0 the eventstore does not longer support multiple positional arguments. Instead, you have to pass in a params object. The general idea, that you only have to specify the arguments that deviate from the defaults remains.
Please refer to the following table to see how the signatures have changed
| 1.x.x | 2.x.x |
|---|---|
streamEvents(query, skip, limit) | streamEvents({query, skip, limit}) |
streamEventsSince(commitStamp, skip, limit) | streamEvents({commitStamp, skip, limit}) |
streamEventsSince(commitStamp, skip, limit) | streamEventsSince({commitStamp, skip, limit}) |
streamEventsByRevision(query, revMin, revMax) | streamEventsByRevision({query, revMin, revMax}) |
getEvents(query, skip, limit) | getEvents({query, skip, limit}) |
getEventsSince(commitStamp, skip, limit) | getEventsSince({commitStamp, skip, limit}) |
getEventsByRevision(query, revMin, revMax) | getEventsByRevision({query, revMin, revMax}) |
getEventStream(query, revMin, revMax) | getEventStream({query, revMin, revMax}) |
getFromSnapshot(query, revMax) | getFromSnapshot({query, revMax}) |
Currently these databases are supported:
You can use your own db implementation by extending this...
var Store = require('@avanzu/eventstore').Store,
util = require('util'),
_ = require('lodash')
class MyDB extends Store {
constructor(options) {
super(options)
}
}
module.exports = MyDB
and you can use it in this way
var es = require('@avanzu/eventstore')({
type: MyDB,
})
// es.init...
FAQs
An eventsourcing backend supporting multiple storage engines.
We found that @avanzu/eventstore demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
Multiple high-impact npm maintainers confirm they have been targeted in the same social engineering campaign that compromised Axios.

Security News
Axios compromise traced to social engineering, showing how attacks on maintainers can bypass controls and expose the broader software supply chain.

Security News
Node.js has paused its bug bounty program after funding ended, removing payouts for vulnerability reports but keeping its security process unchanged.