Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

cqrs-eventdenormalizer

Package Overview
Dependencies
Maintainers
2
Versions
169
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

cqrs-eventdenormalizer - npm Package Compare versions

Comparing version 1.14.8 to 1.14.9

170

lib/replayHandler.js

@@ -144,37 +144,6 @@ var debug = require('debug')('denormalizer:replayHandler'),

var previousRevisionMap = {};
var seenEvents = {};
function shouldProcess(concatenatedId, revInEvt, callback) {
if (!previousRevisionMap[concatenatedId]) {
self.store.get(concatenatedId, function (err, revInStore) {
if (err) {
debug(err);
return callback(err);
}
previousRevisionMap[concatenatedId] = revInStore;
if (revInEvt < previousRevisionMap[concatenatedId]) {
debug('event already seen [concatenatedId]=' + concatenatedId + ', [revInStore]=' + previousRevisionMap[concatenatedId] + ', [revInEvt]=' + revInEvt);
return callback(null, false);
}
previousRevisionMap[concatenatedId] = revInEvt + 1;
callback(null, true);
});
return;
}
if (revInEvt < previousRevisionMap[concatenatedId]) {
debug('event already seen [concatenatedId]=' + concatenatedId + ', [revInStore]=' + previousRevisionMap[concatenatedId] + ', [revInEvt]=' + revInEvt);
return callback(null, false);
}
previousRevisionMap[concatenatedId] = revInEvt + 1;
callback(null, true);
}
var replay = function (evt) {
var evtId = dotty.get(evt, self.definition.id);
lastEvent = evt;

@@ -188,86 +157,81 @@ var concatenatedId;

function proceed() {
var target = self.dispatcher.getTargetInformation(evt);
var concatenatedWithEventId = evtId;
if (concatenatedId) concatenatedWithEventId = concatenatedId + ':' + evtId;
if (seenEvents[concatenatedWithEventId]) return;
seenEvents[concatenatedWithEventId] = true;
var viewBuilders = [], foundPrioSet = false;
var target = self.dispatcher.getTargetInformation(evt);
_.each(self.dispatcher.tree.getViewBuilders(target), function (vb) {
if (!vb.collection.noReplay) {
viewBuilders.push(vb);
var viewBuilders = [], foundPrioSet = false;
if (!foundPrioSet && vb.priority < Infinity) {
foundPrioSet = true;
}
_.each(self.dispatcher.tree.getViewBuilders(target), function (vb) {
if (!vb.collection.noReplay) {
viewBuilders.push(vb);
if (!collections[vb.collection.workerId]) {
vb.collection.isReplaying = true;
collections[vb.collection.workerId] = vb.collection;
}
if (!foundPrioSet && vb.priority < Infinity) {
foundPrioSet = true;
}
if (!collections[vb.collection.workerId]) {
vb.collection.isReplaying = true;
collections[vb.collection.workerId] = vb.collection;
}
}
});
if (foundPrioSet) {
_.each(viewBuilders, function (vb) {
eventQueue.push({event: evt, viewBuilders: [vb]});
evtCount++;
});
} else {
eventQueue.push({event: evt, viewBuilders: viewBuilders});
evtCount += viewBuilders.length;
}
if (foundPrioSet) {
_.each(viewBuilders, function (vb) {
eventQueue.push({event: evt, viewBuilders: [vb]});
evtCount++;
});
} else {
eventQueue.push({event: evt, viewBuilders: viewBuilders});
evtCount += viewBuilders.length;
function handleNext () {
if (evtCount <= 0 && doneCalled) {
doneLater();
return;
}
function handleNext () {
if (evtCount <= 0 && doneCalled) {
doneLater();
return;
}
if (eventQueue.length > 0) {
var task = eventQueue.shift();
var e = task.event, vbs = task.viewBuilders;
if (eventQueue.length > 0) {
var task = eventQueue.shift();
var e = task.event, vbs = task.viewBuilders;
async.series([
function (clb) {
var preEventExtender = self.dispatcher.tree.getPreEventExtender(self.dispatcher.getTargetInformation(e));
if (!preEventExtender) return clb(null);
preEventExtender.extend(e, function (err, extEvt) {
if (err) return clb(err);
e = extEvt || e;
clb(null);
});
},
function (clb) {
async.each(vbs, function (vb, callback) {
vb.denormalize(e, function (err) {
--evtCount;
if (err) {
debug(err);
errs.push(err);
}
async.series([
function (clb) {
var preEventExtender = self.dispatcher.tree.getPreEventExtender(self.dispatcher.getTargetInformation(e));
if (!preEventExtender) return clb(null);
preEventExtender.extend(e, function (err, extEvt) {
if (err) return clb(err);
e = extEvt || e;
clb(null);
callback();
});
},
function (clb) {
async.each(vbs, function (vb, callback) {
vb.denormalize(e, function (err) {
--evtCount;
if (err) {
debug(err);
errs.push(err);
}
callback();
});
}, clb);
}
], function () {
handleNext();
});
} else if (eventQueueHandling) {
eventQueueHandling = false;
}
}, clb);
}
], function () {
handleNext();
});
} else if (eventQueueHandling) {
eventQueueHandling = false;
}
}
if (!eventQueueHandling) {
eventQueueHandling = true;
process.nextTick(handleNext);
}
if (!eventQueueHandling) {
eventQueueHandling = true;
process.nextTick(handleNext);
}
if (!concatenatedId) return proceed();
shouldProcess(concatenatedId, revisionMap[concatenatedId], function(err, should) {
if (err) return errs.push(err);
if (!should) return;
proceed();
});
};

@@ -301,2 +265,4 @@

], function (err) {
seenEvents = {};
if (err) {

@@ -303,0 +269,0 @@ debug(err);

2

package.json
{
"author": "adrai",
"name": "cqrs-eventdenormalizer",
"version": "1.14.8",
"version": "1.14.9",
"private": false,

@@ -6,0 +6,0 @@ "main": "index.js",

@@ -0,1 +1,4 @@

## [v1.14.9](https://github.com/adrai/node-cqrs-eventdenormalizer/compare/v1.14.8...v1.14.9)
- replayHandler: make sure to not handle old events (duplicates) by checking the eventId
## [v1.14.8](https://github.com/adrai/node-cqrs-eventdenormalizer/compare/v1.14.7...v1.14.8)

@@ -2,0 +5,0 @@ - replayHandler: make sure to not handle old events (duplicates)

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc