cqrs-eventdenormalizer
Advanced tools
Comparing version 1.14.8 to 1.14.9
@@ -144,37 +144,6 @@ var debug = require('debug')('denormalizer:replayHandler'), | ||
var previousRevisionMap = {}; | ||
var seenEvents = {}; | ||
function shouldProcess(concatenatedId, revInEvt, callback) { | ||
if (!previousRevisionMap[concatenatedId]) { | ||
self.store.get(concatenatedId, function (err, revInStore) { | ||
if (err) { | ||
debug(err); | ||
return callback(err); | ||
} | ||
previousRevisionMap[concatenatedId] = revInStore; | ||
if (revInEvt < previousRevisionMap[concatenatedId]) { | ||
debug('event already seen [concatenatedId]=' + concatenatedId + ', [revInStore]=' + previousRevisionMap[concatenatedId] + ', [revInEvt]=' + revInEvt); | ||
return callback(null, false); | ||
} | ||
previousRevisionMap[concatenatedId] = revInEvt + 1; | ||
callback(null, true); | ||
}); | ||
return; | ||
} | ||
if (revInEvt < previousRevisionMap[concatenatedId]) { | ||
debug('event already seen [concatenatedId]=' + concatenatedId + ', [revInStore]=' + previousRevisionMap[concatenatedId] + ', [revInEvt]=' + revInEvt); | ||
return callback(null, false); | ||
} | ||
previousRevisionMap[concatenatedId] = revInEvt + 1; | ||
callback(null, true); | ||
} | ||
var replay = function (evt) { | ||
var evtId = dotty.get(evt, self.definition.id); | ||
lastEvent = evt; | ||
@@ -188,86 +157,81 @@ var concatenatedId; | ||
function proceed() { | ||
var target = self.dispatcher.getTargetInformation(evt); | ||
var concatenatedWithEventId = evtId; | ||
if (concatenatedId) concatenatedWithEventId = concatenatedId + ':' + evtId; | ||
if (seenEvents[concatenatedWithEventId]) return; | ||
seenEvents[concatenatedWithEventId] = true; | ||
var viewBuilders = [], foundPrioSet = false; | ||
var target = self.dispatcher.getTargetInformation(evt); | ||
_.each(self.dispatcher.tree.getViewBuilders(target), function (vb) { | ||
if (!vb.collection.noReplay) { | ||
viewBuilders.push(vb); | ||
var viewBuilders = [], foundPrioSet = false; | ||
if (!foundPrioSet && vb.priority < Infinity) { | ||
foundPrioSet = true; | ||
} | ||
_.each(self.dispatcher.tree.getViewBuilders(target), function (vb) { | ||
if (!vb.collection.noReplay) { | ||
viewBuilders.push(vb); | ||
if (!collections[vb.collection.workerId]) { | ||
vb.collection.isReplaying = true; | ||
collections[vb.collection.workerId] = vb.collection; | ||
} | ||
if (!foundPrioSet && vb.priority < Infinity) { | ||
foundPrioSet = true; | ||
} | ||
if (!collections[vb.collection.workerId]) { | ||
vb.collection.isReplaying = true; | ||
collections[vb.collection.workerId] = vb.collection; | ||
} | ||
} | ||
}); | ||
if (foundPrioSet) { | ||
_.each(viewBuilders, function (vb) { | ||
eventQueue.push({event: evt, viewBuilders: [vb]}); | ||
evtCount++; | ||
}); | ||
} else { | ||
eventQueue.push({event: evt, viewBuilders: viewBuilders}); | ||
evtCount += viewBuilders.length; | ||
} | ||
if (foundPrioSet) { | ||
_.each(viewBuilders, function (vb) { | ||
eventQueue.push({event: evt, viewBuilders: [vb]}); | ||
evtCount++; | ||
}); | ||
} else { | ||
eventQueue.push({event: evt, viewBuilders: viewBuilders}); | ||
evtCount += viewBuilders.length; | ||
function handleNext () { | ||
if (evtCount <= 0 && doneCalled) { | ||
doneLater(); | ||
return; | ||
} | ||
function handleNext () { | ||
if (evtCount <= 0 && doneCalled) { | ||
doneLater(); | ||
return; | ||
} | ||
if (eventQueue.length > 0) { | ||
var task = eventQueue.shift(); | ||
var e = task.event, vbs = task.viewBuilders; | ||
if (eventQueue.length > 0) { | ||
var task = eventQueue.shift(); | ||
var e = task.event, vbs = task.viewBuilders; | ||
async.series([ | ||
function (clb) { | ||
var preEventExtender = self.dispatcher.tree.getPreEventExtender(self.dispatcher.getTargetInformation(e)); | ||
if (!preEventExtender) return clb(null); | ||
preEventExtender.extend(e, function (err, extEvt) { | ||
if (err) return clb(err); | ||
e = extEvt || e; | ||
clb(null); | ||
}); | ||
}, | ||
function (clb) { | ||
async.each(vbs, function (vb, callback) { | ||
vb.denormalize(e, function (err) { | ||
--evtCount; | ||
if (err) { | ||
debug(err); | ||
errs.push(err); | ||
} | ||
async.series([ | ||
function (clb) { | ||
var preEventExtender = self.dispatcher.tree.getPreEventExtender(self.dispatcher.getTargetInformation(e)); | ||
if (!preEventExtender) return clb(null); | ||
preEventExtender.extend(e, function (err, extEvt) { | ||
if (err) return clb(err); | ||
e = extEvt || e; | ||
clb(null); | ||
callback(); | ||
}); | ||
}, | ||
function (clb) { | ||
async.each(vbs, function (vb, callback) { | ||
vb.denormalize(e, function (err) { | ||
--evtCount; | ||
if (err) { | ||
debug(err); | ||
errs.push(err); | ||
} | ||
callback(); | ||
}); | ||
}, clb); | ||
} | ||
], function () { | ||
handleNext(); | ||
}); | ||
} else if (eventQueueHandling) { | ||
eventQueueHandling = false; | ||
} | ||
}, clb); | ||
} | ||
], function () { | ||
handleNext(); | ||
}); | ||
} else if (eventQueueHandling) { | ||
eventQueueHandling = false; | ||
} | ||
} | ||
if (!eventQueueHandling) { | ||
eventQueueHandling = true; | ||
process.nextTick(handleNext); | ||
} | ||
if (!eventQueueHandling) { | ||
eventQueueHandling = true; | ||
process.nextTick(handleNext); | ||
} | ||
if (!concatenatedId) return proceed(); | ||
shouldProcess(concatenatedId, revisionMap[concatenatedId], function(err, should) { | ||
if (err) return errs.push(err); | ||
if (!should) return; | ||
proceed(); | ||
}); | ||
}; | ||
@@ -301,2 +265,4 @@ | ||
], function (err) { | ||
seenEvents = {}; | ||
if (err) { | ||
@@ -303,0 +269,0 @@ debug(err); |
{ | ||
"author": "adrai", | ||
"name": "cqrs-eventdenormalizer", | ||
"version": "1.14.8", | ||
"version": "1.14.9", | ||
"private": false, | ||
@@ -6,0 +6,0 @@ "main": "index.js", |
@@ -0,1 +1,4 @@ | ||
## [v1.14.9](https://github.com/adrai/node-cqrs-eventdenormalizer/compare/v1.14.8...v1.14.9) | ||
- replayHandler: make sure to not handle old events (duplicates) by checking the eventId | ||
## [v1.14.8](https://github.com/adrai/node-cqrs-eventdenormalizer/compare/v1.14.7...v1.14.8) | ||
@@ -2,0 +5,0 @@ - replayHandler: make sure to not handle old events (duplicates) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
199288
4578