cqrs-eventdenormalizer
Advanced tools
Comparing version 1.0.10 to 1.1.0
@@ -9,2 +9,3 @@ 'use strict'; | ||
viewmodel = require('viewmodel'), | ||
sift = require('sift'), | ||
debug = require('debug')('denormalizer:collection'); | ||
@@ -327,9 +328,11 @@ | ||
// if (this.isReplaying) { | ||
// // TODO: lookup in this.replayingVms (and this.replayingVmsToDelete) | ||
// return callback(null); | ||
// } | ||
var self = this; | ||
var localFoundVms = {}; | ||
if (this.isReplaying) { | ||
_.each(sift(query, _.values(this.replayingVms)), function (vm) { | ||
localFoundVms[vm.id] = vm; | ||
}); | ||
} | ||
this.repository.find(query, queryOptions, function (err, vms) { | ||
@@ -349,2 +352,12 @@ if (err) { | ||
if (self.isReplaying) { | ||
var mergedVms = _.map(vms, function (vm) { | ||
return localFoundVms[vm.id] || vm; | ||
}); | ||
mergedVms = _.reject(mergedVms, function (vm) { | ||
return !!self.replayingVmsToDelete[vm.id]; | ||
}); | ||
return callback(null, mergedVms); | ||
} | ||
callback(null, vms); | ||
@@ -351,0 +364,0 @@ }); |
@@ -64,2 +64,23 @@ 'use strict'; | ||
/** | ||
* Loads a viewModel array by optional query and query options. | ||
* @param {Object} query The query to find the viewModels. (mongodb style) [optional] | ||
* @param {Object} queryOptions The query options. (mongodb style) [optional] | ||
* @param {Function} callback The function, that will be called when the this action is completed. | ||
* `function(err, vms){}` vms is of type Array. | ||
*/ | ||
findViewModels: function (query, queryOptions, callback) { | ||
if (typeof query === 'function') { | ||
callback = query; | ||
query = {}; | ||
queryOptions = {}; | ||
} | ||
if (typeof queryOptions === 'function') { | ||
callback = queryOptions; | ||
queryOptions = {}; | ||
} | ||
this.collection.findViewModels(query, queryOptions, callback); | ||
}, | ||
/** | ||
* Extracts the id from the event or generates a new one. | ||
@@ -88,7 +109,7 @@ * @param {Object} evt The event object. | ||
var self = this; | ||
if (this.evtExtFn.length === 3) { | ||
return this.evtExtFn(evt, this.collection, callback); | ||
} | ||
if (this.evtExtFn.length === 1) { | ||
@@ -102,3 +123,3 @@ return callback(null, this.evtExtFn(evt)); | ||
} | ||
this.extractId(evt, function (err, id) { | ||
@@ -109,3 +130,3 @@ if (err) { | ||
} | ||
self.loadViewModel(id, function (err, vm) { | ||
@@ -112,0 +133,0 @@ if (err) { |
@@ -21,3 +21,3 @@ 'use strict'; | ||
Definition.call(this, meta); | ||
// used for replay... | ||
@@ -45,2 +45,3 @@ this.workerId = uuid().toString(); | ||
this.id = meta.id || null; | ||
this.query = meta.query || null; | ||
@@ -110,3 +111,3 @@ if (_.isString(denormFn)) { | ||
}, | ||
/** | ||
@@ -133,2 +134,23 @@ * Loads a viewModel object by id. | ||
/** | ||
* Loads a viewModel array by optional query and query options. | ||
* @param {Object} query The query to find the viewModels. (mongodb style) [optional] | ||
* @param {Object} queryOptions The query options. (mongodb style) [optional] | ||
* @param {Function} callback The function, that will be called when the this action is completed. | ||
* `function(err, vms){}` vms is of type Array. | ||
*/ | ||
findViewModels: function (query, queryOptions, callback) { | ||
if (typeof query === 'function') { | ||
callback = query; | ||
query = {}; | ||
queryOptions = {}; | ||
} | ||
if (typeof queryOptions === 'function') { | ||
callback = queryOptions; | ||
queryOptions = {}; | ||
} | ||
this.collection.findViewModels(query, queryOptions, callback); | ||
}, | ||
/** | ||
* Extracts the id from the passed event or generates a new one from read model. | ||
@@ -186,3 +208,3 @@ * @param {Object} evt The event object. | ||
dotty.put(notification, this.definitions.notification.action, vm.actionOnCommit); | ||
return notification; | ||
@@ -192,3 +214,4 @@ }, | ||
/** | ||
* Denormalizes an event. | ||
* Handles denormalization for 1 viewmodel. | ||
* @param {Object} vm The viewModel. | ||
* @param {Object} evt The passed event. | ||
@@ -198,5 +221,18 @@ * @param {Function} callback The function, that will be called when this action is completed. | ||
*/ | ||
denormalize: function (evt, callback) { | ||
handleOne: function (vm, evt, callback) { | ||
var self = this; | ||
this.extractId(evt, function (err, id) { | ||
var payload = evt; | ||
if (this.payload && this.payload !== '') { | ||
payload = dotty.get(evt, this.payload); | ||
} | ||
debug('call denormalizer function'); | ||
this.denormFn(_.cloneDeep(payload), vm); | ||
var notification = this.generateNotification(evt, vm); | ||
debug('generate new id for notification'); | ||
this.getNewId(function (err, newId) { | ||
if (err) { | ||
@@ -206,54 +242,99 @@ debug(err); | ||
} | ||
self.loadViewModel(id, function (err, vm) { | ||
dotty.put(notification, self.definitions.notification.id, newId); | ||
self.saveViewModel(vm, function (err) { | ||
if (err) { | ||
debug(err); | ||
if (err instanceof ConcurrencyError) { | ||
var retryIn = randomBetween(0, self.options.retryOnConcurrencyTimeout || 800); | ||
debug('retry in ' + retryIn + 'ms'); | ||
setTimeout(function() { | ||
self.loadViewModel(vm.id, function (err, vm) { | ||
if (err) { | ||
debug(err); | ||
return callback(err); | ||
} | ||
self.handleOne(vm, evt, callback); | ||
}); | ||
}, retryIn); | ||
return; | ||
} | ||
return callback(err); | ||
} | ||
var payload = evt; | ||
callback(null, notification); | ||
}); | ||
}); | ||
}, | ||
if (self.payload && self.payload !== '') { | ||
payload = dotty.get(evt, self.payload); | ||
/** | ||
* Denormalizes an event. | ||
* @param {Object} evt The passed event. | ||
* @param {Function} callback The function, that will be called when this action is completed. | ||
* `function(err, notification){}` | ||
*/ | ||
denormalize: function (evt, callback) { | ||
var self = this; | ||
if (this.query) { | ||
var notifications = []; | ||
this.findViewModels(this.query, function (err, vms) { | ||
if (err) { | ||
debug(err); | ||
return callback(err); | ||
} | ||
debug('call denormalizer function'); | ||
self.denormFn(_.cloneDeep(payload), vm); | ||
var notification = self.generateNotification(evt, vm); | ||
debug('generate new id for notification'); | ||
self.getNewId(function (err, newId) { | ||
if (err) { | ||
debug(err); | ||
return callback(err); | ||
} | ||
dotty.put(notification, self.definitions.notification.id, newId); | ||
self.saveViewModel(vm, function (err) { | ||
async.each(vms, function (vm, callback) { | ||
self.handleOne(vm, evt, function (err, notification) { | ||
if (err) { | ||
debug(err); | ||
if (err instanceof ConcurrencyError) { | ||
var retryIn = randomBetween(0, self.options.retryOnConcurrencyTimeout || 800); | ||
debug('retry in ' + retryIn + 'ms'); | ||
setTimeout(function() { | ||
self.denormalize(evt, callback); | ||
}, retryIn); | ||
return; | ||
} | ||
return callback(err); | ||
} | ||
callback(null, notification); | ||
notifications.push(notification); | ||
callback(null); | ||
}); | ||
}, function (err) { | ||
if (err) { | ||
debug(err); | ||
return callback(err); | ||
} | ||
callback(null, notifications); | ||
}); | ||
}); | ||
return; | ||
} | ||
this.extractId(evt, function (err, id) { | ||
if (err) { | ||
debug(err); | ||
return callback(err); | ||
} | ||
self.loadViewModel(id, function (err, vm) { | ||
if (err) { | ||
debug(err); | ||
return callback(err); | ||
} | ||
self.handleOne(vm, evt, function (err, notification) { | ||
if (err) { | ||
debug(err); | ||
return callback(err); | ||
} | ||
callback(null, [notification]); | ||
}); | ||
}); | ||
}); | ||
} | ||
}); | ||
module.exports = ViewBuilder; |
@@ -110,3 +110,3 @@ 'use strict'; | ||
var target = this.getTargetInformation(evt); | ||
var viewBuilders = this.tree.getViewBuilders(target); | ||
@@ -117,3 +117,3 @@ | ||
async.each(viewBuilders, function (viewBuilder, callback) { | ||
viewBuilder.denormalize(evt, function (err, notificaiton) { | ||
viewBuilder.denormalize(evt, function (err, notis) { | ||
if (err) { | ||
@@ -124,4 +124,4 @@ debug(err); | ||
if (notificaiton) { | ||
notifications.push(notificaiton); | ||
if (notis && notis.length > 0) { | ||
notifications = notifications.concat(notis); | ||
} | ||
@@ -128,0 +128,0 @@ callback(null); |
{ | ||
"author": "adrai", | ||
"name": "cqrs-eventdenormalizer", | ||
"version": "1.0.10", | ||
"version": "1.1.0", | ||
"private": false, | ||
@@ -15,9 +15,10 @@ "main": "index.js", | ||
"async": "0.9.0", | ||
"debug": "2.0.0", | ||
"debug": "2.1.0", | ||
"dotty": "0.0.2", | ||
"viewmodel": "1.2.0", | ||
"jsondate": "0.0.1", | ||
"lodash": "2.4.1", | ||
"node-uuid": "1.4.1", | ||
"tolerance": "1.0.0" | ||
"sift": "0.1.0", | ||
"tolerance": "1.0.0", | ||
"viewmodel": "1.2.0" | ||
}, | ||
@@ -24,0 +25,0 @@ "devDependencies": { |
@@ -376,3 +376,33 @@ # Introduction | ||
}); | ||
### ViewBuilder for multiple viewmodels in a collection | ||
Be careful with the query! | ||
A lot of viewmodels can slow down the denormalization process! | ||
module.exports = require('cqrs-eventdenormalizer').defineViewBuilder({ | ||
// optional, default is file name without extension, | ||
// if name is '' it will handle all events that matches | ||
name: 'enteredNewPerson', | ||
// optional | ||
aggregate: 'person', | ||
// optional | ||
context: 'hr', | ||
// optional, default is 0 | ||
version: 2, | ||
// optional, if not defined or not found it will generate a new viewmodel with new id | ||
query: '{ group: 'admins' }', | ||
// optional, if not defined it will pass the whole event... | ||
payload: 'payload' | ||
}, function (data, vm) { // instead of function you can define a string with default handling ('create', 'update', 'delete') | ||
vm.set('firstname', data.firstname); | ||
vm.set('lastname', data.lastname); | ||
}); | ||
## EventExtender | ||
@@ -379,0 +409,0 @@ |
@@ -1,5 +0,8 @@ | ||
## v1.0.10 | ||
## [v1.1.0](https://github.com/adrai/node-cqrs-eventdenormalizer/compare/v1.0.10...v1.1.0) | ||
- added possibility to denormalize multiple viewmodels in same collection | ||
## [v1.0.10](https://github.com/adrai/node-cqrs-eventdenormalizer/compare/v1.0.6...v1.0.10) | ||
- fixed replay handling | ||
## v1.0.6 | ||
## [v1.0.6](https://github.com/adrai/node-cqrs-eventdenormalizer/compare/v1.0.4...v1.0.6) | ||
- update viewmodel dependency | ||
@@ -6,0 +9,0 @@ |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
182569
3058
563
9
+ Addedsift@0.1.0
+ Addeddebug@2.1.0(transitive)
+ Addedsift@0.1.0(transitive)
- Removeddebug@2.0.0(transitive)
Updateddebug@2.1.0