cqrs-eventdenormalizer
Advanced tools
Comparing version 1.1.6 to 1.1.7
@@ -280,2 +280,39 @@ 'use strict'; | ||
/** | ||
* Handles denormalization with a query instead of an id. | ||
* @param {Object} evt The event object. | ||
* @param {Object} query The query object. | ||
* @param {Function} callback The function, that will be called when this action is completed. | ||
* `function(err, notification){}` | ||
*/ | ||
handleQuery: function (evt, query, callback) { | ||
var self = this; | ||
var notifications = []; | ||
this.findViewModels(query, function (err, vms) { | ||
if (err) { | ||
debug(err); | ||
return callback(err); | ||
} | ||
async.each(vms, function (vm, callback) { | ||
self.handleOne(vm, evt, function (err, notification) { | ||
if (err) { | ||
debug(err); | ||
return callback(err); | ||
} | ||
notifications.push(notification); | ||
callback(null); | ||
}); | ||
}, function (err) { | ||
if (err) { | ||
debug(err); | ||
return callback(err); | ||
} | ||
callback(null, notifications); | ||
}); | ||
}); | ||
}, | ||
/** | ||
* Denormalizes an event. | ||
@@ -289,10 +326,8 @@ * @param {Object} evt The passed event. | ||
var query = this.query; | ||
if (!query && this.getQueryForThisViewBuilder) { | ||
query = this.getQueryForThisViewBuilder(evt); | ||
if (this.query) { | ||
return this.handleQuery(evt, this.query, callback); | ||
} | ||
if (query) { | ||
var notifications = []; | ||
this.findViewModels(query, function (err, vms) { | ||
if (!this.query && this.getQueryForThisViewBuilder) { | ||
this.getQueryForThisViewBuilder(evt, function (err, query) { | ||
if (err) { | ||
@@ -302,21 +337,3 @@ debug(err); | ||
} | ||
async.each(vms, function (vm, callback) { | ||
self.handleOne(vm, evt, function (err, notification) { | ||
if (err) { | ||
debug(err); | ||
return callback(err); | ||
} | ||
notifications.push(notification); | ||
callback(null); | ||
}); | ||
}, function (err) { | ||
if (err) { | ||
debug(err); | ||
return callback(err); | ||
} | ||
callback(null, notifications); | ||
}); | ||
self.handleQuery(evt, query, callback); | ||
}); | ||
@@ -363,4 +380,9 @@ return; | ||
this.getQueryForThisViewBuilder = function (evt) { | ||
return fn(evt); | ||
if (fn.length === 2) { | ||
this.getQueryForThisViewBuilder = fn; | ||
return this; | ||
} | ||
this.getQueryForThisViewBuilder = function (evt, callback) { | ||
callback(null, fn(evt)); | ||
}; | ||
@@ -367,0 +389,0 @@ |
@@ -20,3 +20,3 @@ 'use strict'; | ||
this.store = store; | ||
def = def || {}; | ||
@@ -73,6 +73,5 @@ | ||
var self = this; | ||
var revisionMap = {}; | ||
var viewBuilderMap = {}; | ||
var groupedEvents = {}; | ||
var eventViewBuilderMap = []; // [{ event: evt, viewBuilders: [] }] | ||
var collections = {}; | ||
@@ -91,7 +90,5 @@ | ||
eventViewBuilderMap.push({ event: evt, viewBuilders: viewBuilders }); | ||
_.each(viewBuilders, function (vb) { | ||
viewBuilderMap[vb.workerId] = vb; | ||
groupedEvents[vb.workerId] = groupedEvents[vb.workerId] || []; | ||
groupedEvents[vb.workerId].push(evt); | ||
if (!collections[vb.collection.workerId]) { | ||
@@ -105,15 +102,10 @@ vb.collection.isReplaying = true; | ||
async.series([ | ||
function (callback) { | ||
async.each(_.values(viewBuilderMap), function (vb, callback) { | ||
if (!groupedEvents[vb.workerId] || groupedEvents[vb.workerId].length === 0) { | ||
return callback(null); | ||
} | ||
async.eachSeries(groupedEvents[vb.workerId], function (e, callback) { | ||
vb.denormalize(e, callback); | ||
async.eachSeries(eventViewBuilderMap, function (item, callback) { | ||
async.each(item.viewBuilders, function (vb, callback) { | ||
vb.denormalize(item.event, callback); | ||
}, callback); | ||
}, callback); | ||
}, | ||
function (callback) { | ||
@@ -124,7 +116,7 @@ async.each(_.values(collections), function (col, callback) { | ||
}, | ||
function (callback) { | ||
self.updateRevision(revisionMap, callback); | ||
} | ||
], function (err) { | ||
@@ -147,3 +139,3 @@ if (err) { | ||
var self = this; | ||
var evtCount = 0; | ||
@@ -155,5 +147,5 @@ var queues = {}; | ||
doneClb = null; | ||
var isHandling = {}; | ||
var revisionMap = {}; | ||
@@ -179,3 +171,3 @@ var collections = {}; | ||
} | ||
queues[vb.workerId] = queues[vb.workerId] || []; | ||
@@ -190,3 +182,3 @@ queues[vb.workerId].push(evt); | ||
} | ||
if (queues[vb.workerId] && queues[vb.workerId].length > 0) { | ||
@@ -227,3 +219,3 @@ var e = queues[vb.workerId].shift(); | ||
} | ||
async.series([ | ||
@@ -243,3 +235,3 @@ function (callback) { | ||
} | ||
if (errs.length === 0) { | ||
@@ -251,3 +243,3 @@ if (callback) { | ||
} | ||
if (callback) { | ||
@@ -254,0 +246,0 @@ callback(errs); |
{ | ||
"author": "adrai", | ||
"name": "cqrs-eventdenormalizer", | ||
"version": "1.1.6", | ||
"version": "1.1.7", | ||
"private": false, | ||
@@ -6,0 +6,0 @@ "main": "index.js", |
175
README.md
@@ -20,7 +20,7 @@ # Introduction | ||
denormalizerPath: '/path/to/my/files', | ||
// optional, default is 'commandRejected' | ||
// will be used to catch AggregateDestroyedError from cqrs-domain | ||
commandRejectedEventName: 'rejectedCommand', | ||
// optional, default is 800 | ||
@@ -30,3 +30,3 @@ // if using in scaled systems, this module tries to catch the concurrency issues and | ||
retryOnConcurrencyTimeout: 1000, | ||
// optional, default is in-memory | ||
@@ -45,3 +45,3 @@ // currently supports: mongodb, redis, tingodb, couchdb, azuretable and inmemory | ||
}, | ||
// optional, default is in-memory | ||
@@ -53,3 +53,3 @@ // currently supports: mongodb, redis, tingodb and inmemory | ||
queueTimeoutMaxLoops: 3 // optional, maximal loop count for non-handled event in the internal in-memory queue | ||
type: 'redis', | ||
@@ -72,7 +72,7 @@ host: 'localhost', // optional | ||
}); | ||
denormalizer.repository.on('disconnect', function() { | ||
console.log('repository disconnected'); | ||
}); | ||
// revisionGuardStore | ||
@@ -82,8 +82,8 @@ denormalizer.revisionGuardStore.on('connect', function() { | ||
}); | ||
denormalizer.revisionGuardStore.on('disconnect', function() { | ||
console.log('revisionGuardStore disconnected'); | ||
}); | ||
// anything (repository or revisionGuardStore) | ||
@@ -93,3 +93,3 @@ denormalizer.on('connect', function() { | ||
}); | ||
denormalizer.on('disconnect', function() { | ||
@@ -108,28 +108,28 @@ console.log('something disconnected'); | ||
correlationId: 'correlationId', | ||
// optional, default is 'id' | ||
id: 'id', | ||
// optional, default is 'name' | ||
name: 'name', | ||
// optional, default is 'aggregate.id' | ||
aggregateId: 'aggregate.id', | ||
// optional | ||
context: 'context.name', | ||
// optional | ||
aggregate: 'aggregate.name', | ||
// optional, default is 'payload' | ||
payload: 'payload', | ||
// optional, default is 'revision' | ||
// will represent the aggregate revision, can be used in next command | ||
revision: 'revision', | ||
// optional | ||
version: 'version', | ||
// optional, if defined the values of the command will be copied to the event (can be used to transport information like userId, etc..) | ||
@@ -148,34 +148,34 @@ meta: 'meta' | ||
correlationId: 'correlationId', | ||
// optional, default is 'id' | ||
id: 'id', | ||
// optional, default is 'name' | ||
action: 'name', | ||
// optional, default is 'collection' | ||
collection: 'collection', | ||
// optional, default is 'payload' | ||
payload: 'payload', | ||
// optional, will be copied from event | ||
aggregateId: 'meta.aggregate.id', | ||
// optional, will be copied from event | ||
context: 'meta.context.name', | ||
// optional, will be copied from event | ||
aggregate: 'meta.aggregate.name', | ||
// optional, will be copied from event | ||
// will represent the aggregate revision, can be used in next command | ||
revision: 'meta.aggregate.revision', | ||
// optional, will be copied from event | ||
eventId: 'meta.event.id', | ||
// optional, will be copied from event | ||
event: 'meta.event.name', | ||
// optional, if defined the values of the event will be copied to the notification (can be used to transport information like userId, etc..) | ||
@@ -211,3 +211,3 @@ meta: 'meta' | ||
}); | ||
### or you can define an asynchronous function | ||
@@ -230,3 +230,3 @@ | ||
}); | ||
### or you can define an asynchronous function | ||
@@ -258,3 +258,3 @@ | ||
}); | ||
### or you can define an asynchronous function | ||
@@ -269,9 +269,9 @@ | ||
## Initialization | ||
denormalizer.init(function (err) { | ||
// this callback is called when all is ready... | ||
}); | ||
// or | ||
denormalizer.init(); // callback is optional | ||
@@ -303,5 +303,5 @@ | ||
}); // callback is optional | ||
### or | ||
denormalizer.handle({ | ||
@@ -346,6 +346,6 @@ id: 'b80ade36-dd05-4340-8a8b-846eea6e286f', | ||
name: 'personDetail' | ||
// optional, default '' | ||
defaultPayload: 'payload', | ||
// indexes: [ // for mongodb | ||
@@ -359,3 +359,3 @@ // 'profileId', | ||
}, | ||
// optionally, define some initialization data for new view models... | ||
@@ -367,2 +367,9 @@ { | ||
If you need an information from an other collection while denormalizing an event, you can require such a collection and make some lookups. | ||
for example | ||
col.find({ my: 'value' }, function (err, vms) {}); | ||
But be careful with this! | ||
## ViewBuilder | ||
@@ -374,15 +381,15 @@ | ||
name: 'enteredNewPerson', | ||
// optional | ||
aggregate: 'person', | ||
// optional | ||
context: 'hr', | ||
// optional, default is 0 | ||
version: 2, | ||
// optional, if not defined or not found it will generate a new viewmodel with new id | ||
id: 'aggregate.id', | ||
// optional, if not defined it will pass the whole event... | ||
@@ -394,3 +401,3 @@ payload: 'payload' | ||
}); | ||
### ViewBuilder for multiple viewmodels in a collection | ||
@@ -406,15 +413,15 @@ | ||
name: 'enteredNewPerson', | ||
// optional | ||
aggregate: 'person', | ||
// optional | ||
context: 'hr', | ||
// optional, default is 0 | ||
version: 2, | ||
// optional, if not defined or not found it will generate a new viewmodel with new id | ||
query: { group: 'admins' }, | ||
// optional, if not defined it will pass the whole event... | ||
@@ -430,2 +437,6 @@ payload: 'payload' | ||
//}); | ||
// or async | ||
//.useAsQuery(function (evt, callback) { | ||
// callback(null, { my: evt.payload.my }); | ||
//}); | ||
@@ -440,11 +451,11 @@ ## EventExtender | ||
name: 'enteredNewPerson', | ||
// optional | ||
aggregate: 'person', | ||
// optional | ||
context: 'hr', | ||
// optional, default is 0 | ||
// if set to -1, it will ignore the version | ||
// if set to -1, it will ignore the version | ||
version: 2 | ||
@@ -456,5 +467,5 @@ }, function (evt, col, callback) { | ||
}); | ||
// or | ||
// or | ||
module.exports = require('cqrs-eventdenormalizer').defineEventExtender({ | ||
@@ -464,13 +475,13 @@ // optional, default is file name without extension, | ||
name: 'enteredNewPerson', | ||
// optional | ||
aggregate: 'person', | ||
// optional | ||
context: 'hr', | ||
// optional, default is 0 | ||
// if set to -1, it will ignore the version | ||
// if set to -1, it will ignore the version | ||
version: 2, | ||
// if defined it will load the viewmodel | ||
@@ -483,3 +494,3 @@ id: 'payload.id' | ||
}); | ||
### not for a collection | ||
@@ -491,11 +502,11 @@ | ||
name: 'enteredNewPerson', | ||
// optional | ||
aggregate: 'person', | ||
// optional | ||
context: 'hr', | ||
// optional, default is 0 | ||
// if set to -1, it will ignore the version | ||
// if set to -1, it will ignore the version | ||
version: 2 | ||
@@ -506,5 +517,5 @@ }, function (evt) { | ||
}); | ||
// or | ||
module.exports = require('cqrs-eventdenormalizer').defineEventExtender({ | ||
@@ -514,11 +525,11 @@ // optional, default is file name without extension, | ||
name: 'enteredNewPerson', | ||
// optional | ||
aggregate: 'person', | ||
// optional | ||
context: 'hr', | ||
// optional, default is 0 | ||
// if set to -1, it will ignore the version | ||
// if set to -1, it will ignore the version | ||
version: 2 | ||
@@ -538,7 +549,7 @@ }, function (evt, callback) { | ||
}); | ||
or when catching some events: | ||
denormalizer.onEventMissing(function (info, evt) { | ||
// grab the missing events, depending from info values... | ||
@@ -554,3 +565,3 @@ // info.aggregateId | ||
}); | ||
}); | ||
@@ -561,11 +572,11 @@ | ||
denormalizer.replayStreamed(function (replay, done) { | ||
replay(evt1); | ||
replay(evt2); | ||
replay(evt3); | ||
done(function (err) { | ||
if (err) { console.log(err); } | ||
}); | ||
}); | ||
@@ -572,0 +583,0 @@ |
@@ -0,1 +1,4 @@ | ||
## [v1.1.7](https://github.com/adrai/node-cqrs-eventdenormalizer/compare/v1.1.7...v1.1.8) | ||
- added possibility to denormalize multiple viewmodels in same collection with intelligent queries in an async way | ||
## [v1.1.6](https://github.com/adrai/node-cqrs-eventdenormalizer/compare/v1.1.5...v1.1.6) | ||
@@ -2,0 +5,0 @@ - added possibility to denormalize multiple viewmodels in same collection with intelligent queries |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
180242
3131
578