cqrs-eventdenormalizer
Advanced tools
Comparing version 1.0.6 to 1.0.7
@@ -6,2 +6,4 @@ 'use strict'; | ||
_ = require('lodash'), | ||
uuid = require('node-uuid').v4, | ||
async = require('async'), | ||
debug = require('debug')('denormalizer:collection'); | ||
@@ -17,2 +19,7 @@ | ||
Definition.call(this, meta); | ||
// used for replay... | ||
this.workerId = uuid().toString(); | ||
this.isReplaying = false; | ||
this.replayingVms = {}; | ||
@@ -234,2 +241,7 @@ meta = meta || {}; | ||
saveViewModel: function (vm, callback) { | ||
if (this.isReplaying) { | ||
this.replayingVms[vm.id] = vm; | ||
return callback(null); | ||
} | ||
this.repository.commit(vm, callback); | ||
@@ -245,2 +257,6 @@ }, | ||
loadViewModel: function (id, callback) { | ||
if (this.isReplaying && this.replayingVms[id]) { | ||
return callback(null, this.replayingVms[id]); | ||
} | ||
var self = this; | ||
@@ -272,6 +288,6 @@ | ||
* Loads a viewModel array by optional query and query options. | ||
* @param {Object} query The query to find the viewModels. (mongodb style) [optional] | ||
* @param {Object} queryOptions The query options. (mongodb style) [optional] | ||
* @param {Function} callback The function, that will be called when the this action is completed. | ||
* `function(err, vms){}` vms is of type Array. | ||
* @param {Object} query The query to find the viewModels. (mongodb style) [optional] | ||
* @param {Object} queryOptions The query options. (mongodb style) [optional] | ||
* @param {Function} callback The function, that will be called when the this action is completed. | ||
* `function(err, vms){}` vms is of type Array. | ||
*/ | ||
@@ -288,2 +304,7 @@ findViewModels: function (query, queryOptions, callback) { | ||
} | ||
// if (this.isReplaying) { | ||
// // TODO: lookup in this.replayingVms | ||
// return callback(null); | ||
// } | ||
@@ -308,2 +329,30 @@ var self = this; | ||
}); | ||
}, | ||
/** | ||
* Saves all replaying viewmodels. | ||
* @param {Function} callback The function, that will be called when the this action is completed. | ||
* `function(err){}` | ||
*/ | ||
saveReplayingVms: function (callback) { | ||
if (!this.isReplaying) { | ||
var err = new Error('Not in replay mode!'); | ||
debug(err); | ||
return callback(err); | ||
} | ||
var replVms = _.values(this.replayingVms); | ||
var self = this; | ||
async.each(replVms, function (vm, callback) { | ||
self.repository.commit(vm, callback); | ||
}, function (err) { | ||
if (err) { | ||
debug(err); | ||
} | ||
self.replayingVms = {}; | ||
self.isReplaying = false; | ||
callback(err); | ||
}); | ||
} | ||
@@ -310,0 +359,0 @@ |
@@ -24,4 +24,2 @@ 'use strict'; | ||
this.workerId = uuid().toString(); | ||
this.isReplaying = false; | ||
this.replayingVms = {}; | ||
@@ -119,6 +117,2 @@ meta = meta || {}; | ||
loadViewModel: function (id, callback) { | ||
if (this.isReplaying && this.replayingVms[id]) { | ||
return callback(null, this.replayingVms[id]); | ||
} | ||
this.collection.loadViewModel(id, callback); | ||
@@ -134,7 +128,2 @@ }, | ||
saveViewModel: function (vm, callback) { | ||
if (this.isReplaying) { | ||
this.replayingVms[vm.id] = vm; | ||
return callback(null); | ||
} | ||
this.collection.saveViewModel(vm, callback); | ||
@@ -261,118 +250,2 @@ }, | ||
}); | ||
}, | ||
/** | ||
* Replays all passed events. | ||
* @param {Array} evts The passed array of events. | ||
* @param {Function} callback The function, that will be called when this action is completed. | ||
* `function(err){}` | ||
*/ | ||
replay: function (evts, callback) { | ||
var self = this; | ||
if (!evts || evts.length === 0) { | ||
return callback(null); | ||
} | ||
this.isReplaying = true; | ||
this.replayingVms = {}; | ||
async.eachSeries(evts, function (evt, callback) { | ||
self.denormalize(evt, callback); | ||
}, function (err) { | ||
if (err) { | ||
debug(err); | ||
return callback(err); | ||
} | ||
var replVms = _.values(self.replayingVms); | ||
async.each(replVms, function (vm, callback) { | ||
self.collection.saveViewModel(vm, callback); | ||
}, function (err) { | ||
self.replayingVms = {}; | ||
self.isReplaying = false; | ||
if (err) { | ||
debug(err); | ||
} | ||
callback(err); | ||
}); | ||
}); | ||
}, | ||
/** | ||
* Replays in a streamed way. | ||
* @param {Function} fn The function that will be called with the replay function and the done function. | ||
* `function(replay, done){}` | ||
*/ | ||
replayStreamed: function (fn) { | ||
var self = this; | ||
var queue = []; | ||
this.isReplaying = true; | ||
this.replayingVms = {}; | ||
var errs = []; | ||
var isHandling = false, | ||
doneCalled = false, | ||
doneClb = null; | ||
function replay (evt) { | ||
queue.push(evt); | ||
function handleNext () { | ||
if (queue.length > 0) { | ||
var e = queue.shift(); | ||
self.denormalize(e, function (err) { | ||
if (err) { | ||
debug(err); | ||
errs.push(err); | ||
} | ||
handleNext(); | ||
}); | ||
} else { | ||
isHandling = false; | ||
if (doneCalled) { | ||
doneLater(); | ||
} | ||
} | ||
} | ||
if (!isHandling) { | ||
isHandling = true; | ||
process.nextTick(handleNext); | ||
} | ||
} | ||
function done (callback) { | ||
if (queue.length > 0 || isHandling) { | ||
doneCalled = true; | ||
doneClb = callback; | ||
return; | ||
} | ||
var replVms = _.values(self.replayingVms); | ||
async.each(replVms, function (vm, callback) { | ||
self.collection.saveViewModel(vm, callback); | ||
}, function (err) { | ||
self.replayingVms = {}; | ||
self.isReplaying = false; | ||
if (err) { | ||
debug(err); | ||
} | ||
callback(err); | ||
}); | ||
} | ||
function doneLater() { | ||
if (doneCalled) { | ||
done(doneClb); | ||
} | ||
} | ||
fn(replay, done); | ||
} | ||
@@ -379,0 +252,0 @@ |
@@ -76,2 +76,3 @@ 'use strict'; | ||
var groupedEvents = {}; | ||
var collections = {}; | ||
@@ -93,2 +94,7 @@ _.each(evts, function (evt) { | ||
groupedEvents[vb.workerId].push(evt); | ||
if (!collections[vb.collection.workerId]) { | ||
vb.collection.isReplaying = true; | ||
collections[vb.collection.workerId] = vb.collection; | ||
} | ||
}); | ||
@@ -104,4 +110,6 @@ }); | ||
} | ||
vb.replay(groupedEvents[vb.workerId], callback); | ||
async.eachSeries(groupedEvents[vb.workerId], function (e, callback) { | ||
vb.denormalize(e, callback); | ||
}, callback); | ||
}, callback); | ||
@@ -111,2 +119,8 @@ }, | ||
function (callback) { | ||
async.each(_.values(collections), function (col, callback) { | ||
col.saveReplayingVms(callback); | ||
}, callback); | ||
}, | ||
function (callback) { | ||
self.updateRevision(revisionMap, callback); | ||
@@ -133,4 +147,13 @@ } | ||
var evtCount = 0; | ||
var queues = {}; | ||
var errs = []; | ||
var doneCalled = true, | ||
doneClb = null; | ||
var isHandling = {}; | ||
var revisionMap = {}; | ||
var vbReplayStreams = {}; | ||
var collections = {}; | ||
@@ -150,20 +173,57 @@ var replay = function (evt) { | ||
if (!vbReplayStreams[vb.workerId]) { | ||
vb.replayStreamed(function (vbReplay, vbDone) { | ||
vbReplayStreams[vb.workerId] = { | ||
replay: vbReplay, | ||
done: vbDone | ||
}; | ||
}); | ||
if (!collections[vb.collection.workerId]) { | ||
vb.collection.isReplaying = true; | ||
collections[vb.collection.workerId] = vb.collection; | ||
} | ||
queues[vb.workerId] = queues[vb.workerId] || []; | ||
queues[vb.workerId].push(evt); | ||
evtCount++; | ||
vbReplayStreams[vb.workerId].replay(evt); | ||
function handleNext () { | ||
if (evtCount <= 0 && doneCalled) { | ||
doneLater(); | ||
return; | ||
} | ||
if (queues[vb.workerId] && queues[vb.workerId].length > 0) { | ||
var e = queues[vb.workerId].shift(); | ||
vb.denormalize(e, function (err) { | ||
--evtCount; | ||
if (err) { | ||
debug(err); | ||
errs.push(err); | ||
} | ||
handleNext(); | ||
}); | ||
} else if (isHandling[vb.workerId]) { | ||
delete isHandling[vb.workerId]; | ||
} | ||
} | ||
if (!isHandling[vb.workerId]) { | ||
isHandling[vb.workerId] = true; | ||
process.nextTick(handleNext); | ||
} | ||
}); | ||
}; | ||
function doneLater() { | ||
if (doneCalled) { | ||
done(doneClb); | ||
} | ||
} | ||
var done = function (callback) { | ||
if (evtCount > 0) { | ||
doneCalled = true; | ||
doneClb = callback; | ||
return; | ||
} | ||
async.series([ | ||
function (callback) { | ||
async.each(_.values(vbReplayStreams), function (vbReplayStream, callback) { | ||
vbReplayStream.done(callback); | ||
async.each(_.values(collections), function (col, callback) { | ||
col.saveReplayingVms(callback); | ||
}, callback); | ||
@@ -174,8 +234,17 @@ }, | ||
} | ||
], function(err) { | ||
], function (err) { | ||
if (err) { | ||
debug(err); | ||
errs.push(err); | ||
} | ||
if (errs.length === 0) { | ||
if (callback) { | ||
callback(null); | ||
} | ||
return; | ||
} | ||
if (callback) { | ||
callback(err); | ||
callback(errs); | ||
} | ||
@@ -182,0 +251,0 @@ }); |
{ | ||
"author": "adrai", | ||
"name": "cqrs-eventdenormalizer", | ||
"version": "1.0.6", | ||
"version": "1.0.7", | ||
"private": false, | ||
@@ -6,0 +6,0 @@ "main": "index.js", |
@@ -0,1 +1,4 @@ | ||
## v1.0.7 | ||
- fixed replay handling | ||
## v1.0.6 | ||
@@ -2,0 +5,0 @@ - update viewmodel dependency |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
162870
2923