Socket
Socket
Sign inDemoInstall

cqrs-eventdenormalizer

Package Overview
Dependencies
Maintainers
2
Versions
169
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

cqrs-eventdenormalizer - npm Package Compare versions

Comparing version 1.14.7 to 1.14.8

171

lib/replayHandler.js

@@ -136,4 +136,4 @@ var debug = require('debug')('denormalizer:replayHandler'),

var errs = [];
var doneCalled = false,
doneClb = null;
var doneCalled = false;
var doneClb = null;

@@ -145,84 +145,129 @@ var revisionMap = {};

var previousRevisionMap = {};
function shouldProcess(concatenatedId, revInEvt, callback) {
if (!previousRevisionMap[concatenatedId]) {
self.store.get(concatenatedId, function (err, revInStore) {
if (err) {
debug(err);
return callback(err);
}
previousRevisionMap[concatenatedId] = revInStore;
if (revInEvt < previousRevisionMap[concatenatedId]) {
debug('event already seen [concatenatedId]=' + concatenatedId + ', [revInStore]=' + previousRevisionMap[concatenatedId] + ', [revInEvt]=' + revInEvt);
return callback(null, false);
}
previousRevisionMap[concatenatedId] = revInEvt + 1;
callback(null, true);
});
return;
}
if (revInEvt < previousRevisionMap[concatenatedId]) {
debug('event already seen [concatenatedId]=' + concatenatedId + ', [revInStore]=' + previousRevisionMap[concatenatedId] + ', [revInEvt]=' + revInEvt);
return callback(null, false);
}
previousRevisionMap[concatenatedId] = revInEvt + 1;
callback(null, true);
}
var replay = function (evt) {
lastEvent = evt;
var concatenatedId;
if (!!self.definition.revision && dotty.exists(evt, self.definition.revision) &&
!!self.definition.aggregateId && dotty.exists(evt, self.definition.aggregateId)) {
var aggId = self.getConcatenatedId(evt);
revisionMap[aggId] = dotty.get(evt, self.definition.revision);
concatenatedId = self.getConcatenatedId(evt);
revisionMap[concatenatedId] = dotty.get(evt, self.definition.revision);
}
var target = self.dispatcher.getTargetInformation(evt);
function proceed() {
var target = self.dispatcher.getTargetInformation(evt);
var viewBuilders = [], foundPrioSet = false;
var viewBuilders = [], foundPrioSet = false;
_.each(self.dispatcher.tree.getViewBuilders(target), function (vb) {
if (!vb.collection.noReplay) {
viewBuilders.push(vb);
_.each(self.dispatcher.tree.getViewBuilders(target), function (vb) {
if (!vb.collection.noReplay) {
viewBuilders.push(vb);
if (!foundPrioSet && vb.priority < Infinity) {
foundPrioSet = true;
}
if (!foundPrioSet && vb.priority < Infinity) {
foundPrioSet = true;
}
if (!collections[vb.collection.workerId]) {
vb.collection.isReplaying = true;
collections[vb.collection.workerId] = vb.collection;
if (!collections[vb.collection.workerId]) {
vb.collection.isReplaying = true;
collections[vb.collection.workerId] = vb.collection;
}
}
}
});
if (foundPrioSet) {
_.each(viewBuilders, function (vb) {
eventQueue.push({event: evt, viewBuilders: [vb]});
evtCount++;
});
} else {
eventQueue.push({event: evt, viewBuilders: viewBuilders});
evtCount += viewBuilders.length;
}
function handleNext () {
if (evtCount <= 0 && doneCalled) {
doneLater();
return;
if (foundPrioSet) {
_.each(viewBuilders, function (vb) {
eventQueue.push({event: evt, viewBuilders: [vb]});
evtCount++;
});
} else {
eventQueue.push({event: evt, viewBuilders: viewBuilders});
evtCount += viewBuilders.length;
}
if (eventQueue.length > 0) {
var task = eventQueue.shift();
var e = task.event, vbs = task.viewBuilders;
function handleNext () {
if (evtCount <= 0 && doneCalled) {
doneLater();
return;
}
async.series([
function (clb) {
var preEventExtender = self.dispatcher.tree.getPreEventExtender(self.dispatcher.getTargetInformation(e));
if (!preEventExtender) return clb(null);
preEventExtender.extend(e, function (err, extEvt) {
if (err) return clb(err);
e = extEvt || e;
clb(null);
});
},
function (clb) {
async.each(vbs, function (vb, callback) {
vb.denormalize(e, function (err) {
--evtCount;
if (err) {
debug(err);
errs.push(err);
}
if (eventQueue.length > 0) {
var task = eventQueue.shift();
var e = task.event, vbs = task.viewBuilders;
callback();
async.series([
function (clb) {
var preEventExtender = self.dispatcher.tree.getPreEventExtender(self.dispatcher.getTargetInformation(e));
if (!preEventExtender) return clb(null);
preEventExtender.extend(e, function (err, extEvt) {
if (err) return clb(err);
e = extEvt || e;
clb(null);
});
}, clb);
}
], function () {
handleNext();
});
} else if (eventQueueHandling) {
eventQueueHandling = false;
},
function (clb) {
async.each(vbs, function (vb, callback) {
vb.denormalize(e, function (err) {
--evtCount;
if (err) {
debug(err);
errs.push(err);
}
callback();
});
}, clb);
}
], function () {
handleNext();
});
} else if (eventQueueHandling) {
eventQueueHandling = false;
}
}
if (!eventQueueHandling) {
eventQueueHandling = true;
process.nextTick(handleNext);
}
}
if (!eventQueueHandling) {
eventQueueHandling = true;
process.nextTick(handleNext);
}
if (!concatenatedId) return proceed();
shouldProcess(concatenatedId, revisionMap[concatenatedId], function(err, should) {
if (err) return errs.push(err);
if (!should) return;
proceed();
});
};

@@ -229,0 +274,0 @@

{
"author": "adrai",
"name": "cqrs-eventdenormalizer",
"version": "1.14.7",
"version": "1.14.8",
"private": false,

@@ -6,0 +6,0 @@ "main": "index.js",

@@ -0,1 +1,4 @@

## [v1.14.8](https://github.com/adrai/node-cqrs-eventdenormalizer/compare/v1.14.7...v1.14.8)
- replayHandler: make sure to not handle old events (duplicates)
## [v1.14.7](https://github.com/adrai/node-cqrs-eventdenormalizer/compare/v1.14.6...v1.14.7)

@@ -2,0 +5,0 @@ - update viewmodel

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc