Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

cqrs-eventdenormalizer

Package Overview
Dependencies
Maintainers
2
Versions
169
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

cqrs-eventdenormalizer - npm Package Compare versions

Comparing version 1.0.6 to 1.0.7

57

lib/definitions/collection.js

@@ -6,2 +6,4 @@ 'use strict';

_ = require('lodash'),
uuid = require('node-uuid').v4,
async = require('async'),
debug = require('debug')('denormalizer:collection');

@@ -17,2 +19,7 @@

Definition.call(this, meta);
// used for replay...
this.workerId = uuid().toString();
this.isReplaying = false;
this.replayingVms = {};

@@ -234,2 +241,7 @@ meta = meta || {};

saveViewModel: function (vm, callback) {
if (this.isReplaying) {
this.replayingVms[vm.id] = vm;
return callback(null);
}
this.repository.commit(vm, callback);

@@ -245,2 +257,6 @@ },

loadViewModel: function (id, callback) {
if (this.isReplaying && this.replayingVms[id]) {
return callback(null, this.replayingVms[id]);
}
var self = this;

@@ -272,6 +288,6 @@

* Loads a viewModel array by optional query and query options.
* @param {Object} query The query to find the viewModels. (mongodb style) [optional]
* @param {Object} queryOptions The query options. (mongodb style) [optional]
* @param {Function} callback The function, that will be called when the this action is completed.
* `function(err, vms){}` vms is of type Array.
* @param {Object} query The query to find the viewModels. (mongodb style) [optional]
* @param {Object} queryOptions The query options. (mongodb style) [optional]
* @param {Function} callback The function, that will be called when the this action is completed.
* `function(err, vms){}` vms is of type Array.
*/

@@ -288,2 +304,7 @@ findViewModels: function (query, queryOptions, callback) {

}
// if (this.isReplaying) {
// // TODO: lookup in this.replayingVms
// return callback(null);
// }

@@ -308,2 +329,30 @@ var self = this;

});
},
/**
* Saves all replaying viewmodels.
* @param {Function} callback The function, that will be called when the this action is completed.
* `function(err){}`
*/
saveReplayingVms: function (callback) {
if (!this.isReplaying) {
var err = new Error('Not in replay mode!');
debug(err);
return callback(err);
}
var replVms = _.values(this.replayingVms);
var self = this;
async.each(replVms, function (vm, callback) {
self.repository.commit(vm, callback);
}, function (err) {
if (err) {
debug(err);
}
self.replayingVms = {};
self.isReplaying = false;
callback(err);
});
}

@@ -310,0 +359,0 @@

127

lib/definitions/viewBuilder.js

@@ -24,4 +24,2 @@ 'use strict';

this.workerId = uuid().toString();
this.isReplaying = false;
this.replayingVms = {};

@@ -119,6 +117,2 @@ meta = meta || {};

loadViewModel: function (id, callback) {
if (this.isReplaying && this.replayingVms[id]) {
return callback(null, this.replayingVms[id]);
}
this.collection.loadViewModel(id, callback);

@@ -134,7 +128,2 @@ },

saveViewModel: function (vm, callback) {
if (this.isReplaying) {
this.replayingVms[vm.id] = vm;
return callback(null);
}
this.collection.saveViewModel(vm, callback);

@@ -261,118 +250,2 @@ },

});
},
/**
* Replays all passed events.
* @param {Array} evts The passed array of events.
* @param {Function} callback The function, that will be called when this action is completed.
* `function(err){}`
*/
replay: function (evts, callback) {
var self = this;
if (!evts || evts.length === 0) {
return callback(null);
}
this.isReplaying = true;
this.replayingVms = {};
async.eachSeries(evts, function (evt, callback) {
self.denormalize(evt, callback);
}, function (err) {
if (err) {
debug(err);
return callback(err);
}
var replVms = _.values(self.replayingVms);
async.each(replVms, function (vm, callback) {
self.collection.saveViewModel(vm, callback);
}, function (err) {
self.replayingVms = {};
self.isReplaying = false;
if (err) {
debug(err);
}
callback(err);
});
});
},
/**
* Replays in a streamed way.
* @param {Function} fn The function that will be called with the replay function and the done function.
* `function(replay, done){}`
*/
replayStreamed: function (fn) {
var self = this;
var queue = [];
this.isReplaying = true;
this.replayingVms = {};
var errs = [];
var isHandling = false,
doneCalled = false,
doneClb = null;
function replay (evt) {
queue.push(evt);
function handleNext () {
if (queue.length > 0) {
var e = queue.shift();
self.denormalize(e, function (err) {
if (err) {
debug(err);
errs.push(err);
}
handleNext();
});
} else {
isHandling = false;
if (doneCalled) {
doneLater();
}
}
}
if (!isHandling) {
isHandling = true;
process.nextTick(handleNext);
}
}
function done (callback) {
if (queue.length > 0 || isHandling) {
doneCalled = true;
doneClb = callback;
return;
}
var replVms = _.values(self.replayingVms);
async.each(replVms, function (vm, callback) {
self.collection.saveViewModel(vm, callback);
}, function (err) {
self.replayingVms = {};
self.isReplaying = false;
if (err) {
debug(err);
}
callback(err);
});
}
function doneLater() {
if (doneCalled) {
done(doneClb);
}
}
fn(replay, done);
}

@@ -379,0 +252,0 @@

@@ -76,2 +76,3 @@ 'use strict';

var groupedEvents = {};
var collections = {};

@@ -93,2 +94,7 @@ _.each(evts, function (evt) {

groupedEvents[vb.workerId].push(evt);
if (!collections[vb.collection.workerId]) {
vb.collection.isReplaying = true;
collections[vb.collection.workerId] = vb.collection;
}
});

@@ -104,4 +110,6 @@ });

}
vb.replay(groupedEvents[vb.workerId], callback);
async.eachSeries(groupedEvents[vb.workerId], function (e, callback) {
vb.denormalize(e, callback);
}, callback);
}, callback);

@@ -111,2 +119,8 @@ },

function (callback) {
async.each(_.values(collections), function (col, callback) {
col.saveReplayingVms(callback);
}, callback);
},
function (callback) {
self.updateRevision(revisionMap, callback);

@@ -133,4 +147,13 @@ }

var evtCount = 0;
var queues = {};
var errs = [];
var doneCalled = true,
doneClb = null;
var isHandling = {};
var revisionMap = {};
var vbReplayStreams = {};
var collections = {};

@@ -150,20 +173,57 @@ var replay = function (evt) {

if (!vbReplayStreams[vb.workerId]) {
vb.replayStreamed(function (vbReplay, vbDone) {
vbReplayStreams[vb.workerId] = {
replay: vbReplay,
done: vbDone
};
});
if (!collections[vb.collection.workerId]) {
vb.collection.isReplaying = true;
collections[vb.collection.workerId] = vb.collection;
}
queues[vb.workerId] = queues[vb.workerId] || [];
queues[vb.workerId].push(evt);
evtCount++;
vbReplayStreams[vb.workerId].replay(evt);
function handleNext () {
if (evtCount <= 0 && doneCalled) {
doneLater();
return;
}
if (queues[vb.workerId] && queues[vb.workerId].length > 0) {
var e = queues[vb.workerId].shift();
vb.denormalize(e, function (err) {
--evtCount;
if (err) {
debug(err);
errs.push(err);
}
handleNext();
});
} else if (isHandling[vb.workerId]) {
delete isHandling[vb.workerId];
}
}
if (!isHandling[vb.workerId]) {
isHandling[vb.workerId] = true;
process.nextTick(handleNext);
}
});
};
function doneLater() {
if (doneCalled) {
done(doneClb);
}
}
var done = function (callback) {
if (evtCount > 0) {
doneCalled = true;
doneClb = callback;
return;
}
async.series([
function (callback) {
async.each(_.values(vbReplayStreams), function (vbReplayStream, callback) {
vbReplayStream.done(callback);
async.each(_.values(collections), function (col, callback) {
col.saveReplayingVms(callback);
}, callback);

@@ -174,8 +234,17 @@ },

}
], function(err) {
], function (err) {
if (err) {
debug(err);
errs.push(err);
}
if (errs.length === 0) {
if (callback) {
callback(null);
}
return;
}
if (callback) {
callback(err);
callback(errs);
}

@@ -182,0 +251,0 @@ });

{
"author": "adrai",
"name": "cqrs-eventdenormalizer",
"version": "1.0.6",
"version": "1.0.7",
"private": false,

@@ -6,0 +6,0 @@ "main": "index.js",

@@ -0,1 +1,4 @@

## v1.0.7
- fixed replay handling
## v1.0.6

@@ -2,0 +5,0 @@ - update viewmodel dependency

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc