Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

cqrs-eventdenormalizer

Package Overview
Dependencies
Maintainers
2
Versions
169
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

cqrs-eventdenormalizer - npm Package Compare versions

Comparing version 1.1.6 to 1.1.7

76

lib/definitions/viewBuilder.js

@@ -280,2 +280,39 @@ 'use strict';

/**
* Handles denormalization with a query instead of an id.
* @param {Object} evt The event object.
* @param {Object} query The query object.
* @param {Function} callback The function, that will be called when this action is completed.
* `function(err, notification){}`
*/
handleQuery: function (evt, query, callback) {
var self = this;
var notifications = [];
this.findViewModels(query, function (err, vms) {
if (err) {
debug(err);
return callback(err);
}
async.each(vms, function (vm, callback) {
self.handleOne(vm, evt, function (err, notification) {
if (err) {
debug(err);
return callback(err);
}
notifications.push(notification);
callback(null);
});
}, function (err) {
if (err) {
debug(err);
return callback(err);
}
callback(null, notifications);
});
});
},
/**
* Denormalizes an event.

@@ -289,10 +326,8 @@ * @param {Object} evt The passed event.

var query = this.query;
if (!query && this.getQueryForThisViewBuilder) {
query = this.getQueryForThisViewBuilder(evt);
if (this.query) {
return this.handleQuery(evt, this.query, callback);
}
if (query) {
var notifications = [];
this.findViewModels(query, function (err, vms) {
if (!this.query && this.getQueryForThisViewBuilder) {
this.getQueryForThisViewBuilder(evt, function (err, query) {
if (err) {

@@ -302,21 +337,3 @@ debug(err);

}
async.each(vms, function (vm, callback) {
self.handleOne(vm, evt, function (err, notification) {
if (err) {
debug(err);
return callback(err);
}
notifications.push(notification);
callback(null);
});
}, function (err) {
if (err) {
debug(err);
return callback(err);
}
callback(null, notifications);
});
self.handleQuery(evt, query, callback);
});

@@ -363,4 +380,9 @@ return;

this.getQueryForThisViewBuilder = function (evt) {
return fn(evt);
if (fn.length === 2) {
this.getQueryForThisViewBuilder = fn;
return this;
}
this.getQueryForThisViewBuilder = function (evt, callback) {
callback(null, fn(evt));
};

@@ -367,0 +389,0 @@

@@ -20,3 +20,3 @@ 'use strict';

this.store = store;
def = def || {};

@@ -73,6 +73,5 @@

var self = this;
var revisionMap = {};
var viewBuilderMap = {};
var groupedEvents = {};
var eventViewBuilderMap = []; // [{ event: evt, viewBuilders: [] }]
var collections = {};

@@ -91,7 +90,5 @@

eventViewBuilderMap.push({ event: evt, viewBuilders: viewBuilders });
_.each(viewBuilders, function (vb) {
viewBuilderMap[vb.workerId] = vb;
groupedEvents[vb.workerId] = groupedEvents[vb.workerId] || [];
groupedEvents[vb.workerId].push(evt);
if (!collections[vb.collection.workerId]) {

@@ -105,15 +102,10 @@ vb.collection.isReplaying = true;

async.series([
function (callback) {
async.each(_.values(viewBuilderMap), function (vb, callback) {
if (!groupedEvents[vb.workerId] || groupedEvents[vb.workerId].length === 0) {
return callback(null);
}
async.eachSeries(groupedEvents[vb.workerId], function (e, callback) {
vb.denormalize(e, callback);
async.eachSeries(eventViewBuilderMap, function (item, callback) {
async.each(item.viewBuilders, function (vb, callback) {
vb.denormalize(item.event, callback);
}, callback);
}, callback);
},
function (callback) {

@@ -124,7 +116,7 @@ async.each(_.values(collections), function (col, callback) {

},
function (callback) {
self.updateRevision(revisionMap, callback);
}
], function (err) {

@@ -147,3 +139,3 @@ if (err) {

var self = this;
var evtCount = 0;

@@ -155,5 +147,5 @@ var queues = {};

doneClb = null;
var isHandling = {};
var revisionMap = {};

@@ -179,3 +171,3 @@ var collections = {};

}
queues[vb.workerId] = queues[vb.workerId] || [];

@@ -190,3 +182,3 @@ queues[vb.workerId].push(evt);

}
if (queues[vb.workerId] && queues[vb.workerId].length > 0) {

@@ -227,3 +219,3 @@ var e = queues[vb.workerId].shift();

}
async.series([

@@ -243,3 +235,3 @@ function (callback) {

}
if (errs.length === 0) {

@@ -251,3 +243,3 @@ if (callback) {

}
if (callback) {

@@ -254,0 +246,0 @@ callback(errs);

{
"author": "adrai",
"name": "cqrs-eventdenormalizer",
"version": "1.1.6",
"version": "1.1.7",
"private": false,

@@ -6,0 +6,0 @@ "main": "index.js",

@@ -20,7 +20,7 @@ # Introduction

denormalizerPath: '/path/to/my/files',
// optional, default is 'commandRejected'
// will be used to catch AggregateDestroyedError from cqrs-domain
commandRejectedEventName: 'rejectedCommand',
// optional, default is 800

@@ -30,3 +30,3 @@ // if using in scaled systems, this module tries to catch the concurrency issues and

retryOnConcurrencyTimeout: 1000,
// optional, default is in-memory

@@ -45,3 +45,3 @@ // currently supports: mongodb, redis, tingodb, couchdb, azuretable and inmemory

},
// optional, default is in-memory

@@ -53,3 +53,3 @@ // currently supports: mongodb, redis, tingodb and inmemory

queueTimeoutMaxLoops: 3 // optional, maximal loop count for non-handled event in the internal in-memory queue
type: 'redis',

@@ -72,7 +72,7 @@ host: 'localhost', // optional

});
denormalizer.repository.on('disconnect', function() {
console.log('repository disconnected');
});
// revisionGuardStore

@@ -82,8 +82,8 @@ denormalizer.revisionGuardStore.on('connect', function() {

});
denormalizer.revisionGuardStore.on('disconnect', function() {
console.log('revisionGuardStore disconnected');
});
// anything (repository or revisionGuardStore)

@@ -93,3 +93,3 @@ denormalizer.on('connect', function() {

});
denormalizer.on('disconnect', function() {

@@ -108,28 +108,28 @@ console.log('something disconnected');

correlationId: 'correlationId',
// optional, default is 'id'
id: 'id',
// optional, default is 'name'
name: 'name',
// optional, default is 'aggregate.id'
aggregateId: 'aggregate.id',
// optional
context: 'context.name',
// optional
aggregate: 'aggregate.name',
// optional, default is 'payload'
payload: 'payload',
// optional, default is 'revision'
// will represent the aggregate revision, can be used in next command
revision: 'revision',
// optional
version: 'version',
// optional, if defined the values of the command will be copied to the event (can be used to transport information like userId, etc..)

@@ -148,34 +148,34 @@ meta: 'meta'

correlationId: 'correlationId',
// optional, default is 'id'
id: 'id',
// optional, default is 'name'
action: 'name',
// optional, default is 'collection'
collection: 'collection',
// optional, default is 'payload'
payload: 'payload',
// optional, will be copied from event
aggregateId: 'meta.aggregate.id',
// optional, will be copied from event
context: 'meta.context.name',
// optional, will be copied from event
aggregate: 'meta.aggregate.name',
// optional, will be copied from event
// will represent the aggregate revision, can be used in next command
revision: 'meta.aggregate.revision',
// optional, will be copied from event
eventId: 'meta.event.id',
// optional, will be copied from event
event: 'meta.event.name',
// optional, if defined the values of the event will be copied to the notification (can be used to transport information like userId, etc..)

@@ -211,3 +211,3 @@ meta: 'meta'

});
### or you can define an asynchronous function

@@ -230,3 +230,3 @@

});
### or you can define an asynchronous function

@@ -258,3 +258,3 @@

});
### or you can define an asynchronous function

@@ -269,9 +269,9 @@

## Initialization
denormalizer.init(function (err) {
// this callback is called when all is ready...
});
// or
denormalizer.init(); // callback is optional

@@ -303,5 +303,5 @@

}); // callback is optional
### or
denormalizer.handle({

@@ -346,6 +346,6 @@ id: 'b80ade36-dd05-4340-8a8b-846eea6e286f',

name: 'personDetail'
// optional, default ''
defaultPayload: 'payload',
// indexes: [ // for mongodb

@@ -359,3 +359,3 @@ // 'profileId',

},
// optionally, define some initialization data for new view models...

@@ -367,2 +367,9 @@ {

If you need an information from an other collection while denormalizing an event, you can require such a collection and make some lookups.
for example
col.find({ my: 'value' }, function (err, vms) {});
But be careful with this!
## ViewBuilder

@@ -374,15 +381,15 @@

name: 'enteredNewPerson',
// optional
aggregate: 'person',
// optional
context: 'hr',
// optional, default is 0
version: 2,
// optional, if not defined or not found it will generate a new viewmodel with new id
id: 'aggregate.id',
// optional, if not defined it will pass the whole event...

@@ -394,3 +401,3 @@ payload: 'payload'

});
### ViewBuilder for multiple viewmodels in a collection

@@ -406,15 +413,15 @@

name: 'enteredNewPerson',
// optional
aggregate: 'person',
// optional
context: 'hr',
// optional, default is 0
version: 2,
// optional, if not defined or not found it will generate a new viewmodel with new id
query: { group: 'admins' },
// optional, if not defined it will pass the whole event...

@@ -430,2 +437,6 @@ payload: 'payload'

//});
// or async
//.useAsQuery(function (evt, callback) {
// callback(null, { my: evt.payload.my });
//});

@@ -440,11 +451,11 @@ ## EventExtender

name: 'enteredNewPerson',
// optional
aggregate: 'person',
// optional
context: 'hr',
// optional, default is 0
// if set to -1, it will ignore the version
// if set to -1, it will ignore the version
version: 2

@@ -456,5 +467,5 @@ }, function (evt, col, callback) {

});
// or
// or
module.exports = require('cqrs-eventdenormalizer').defineEventExtender({

@@ -464,13 +475,13 @@ // optional, default is file name without extension,

name: 'enteredNewPerson',
// optional
aggregate: 'person',
// optional
context: 'hr',
// optional, default is 0
// if set to -1, it will ignore the version
// if set to -1, it will ignore the version
version: 2,
// if defined it will load the viewmodel

@@ -483,3 +494,3 @@ id: 'payload.id'

});
### not for a collection

@@ -491,11 +502,11 @@

name: 'enteredNewPerson',
// optional
aggregate: 'person',
// optional
context: 'hr',
// optional, default is 0
// if set to -1, it will ignore the version
// if set to -1, it will ignore the version
version: 2

@@ -506,5 +517,5 @@ }, function (evt) {

});
// or
module.exports = require('cqrs-eventdenormalizer').defineEventExtender({

@@ -514,11 +525,11 @@ // optional, default is file name without extension,

name: 'enteredNewPerson',
// optional
aggregate: 'person',
// optional
context: 'hr',
// optional, default is 0
// if set to -1, it will ignore the version
// if set to -1, it will ignore the version
version: 2

@@ -538,7 +549,7 @@ }, function (evt, callback) {

});
or when catching some events:
denormalizer.onEventMissing(function (info, evt) {
// grab the missing events, depending from info values...

@@ -554,3 +565,3 @@ // info.aggregateId

});
});

@@ -561,11 +572,11 @@

denormalizer.replayStreamed(function (replay, done) {
replay(evt1);
replay(evt2);
replay(evt3);
done(function (err) {
if (err) { console.log(err); }
});
});

@@ -572,0 +583,0 @@

@@ -0,1 +1,4 @@

## [v1.1.7](https://github.com/adrai/node-cqrs-eventdenormalizer/compare/v1.1.7...v1.1.8)
- added possibility to denormalize multiple viewmodels in same collection with intelligent queries in an async way
## [v1.1.6](https://github.com/adrai/node-cqrs-eventdenormalizer/compare/v1.1.5...v1.1.6)

@@ -2,0 +5,0 @@ - added possibility to denormalize multiple viewmodels in same collection with intelligent queries

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc