cqrs-eventdenormalizer
Advanced tools
Comparing version 0.3.1 to 0.3.2
@@ -1,173 +0,193 @@ | ||
var viewBuilderLoader = require('./loaders/viewBuilderLoader') | ||
, eventExtenderLoader = require('./loaders/eventExtenderLoader') | ||
, eventDispatcher = require('./eventDispatcher') | ||
, eventEmitter = require('./eventEmitter') | ||
, EventEmitter2 = require('eventemitter2').EventEmitter2 | ||
, _ = require('lodash') | ||
, async = require('async') | ||
, queue = require('node-queue') | ||
, eventQueue | ||
, guardStore | ||
, viewBuilders | ||
, repository = require('viewmodel').write.create() | ||
, revisionGuardStore = require('./revisionGuardStore') | ||
, revisionGuard = require('./revisionGuard') | ||
, evtDen; | ||
var viewBuilderLoader = require('./loaders/viewBuilderLoader'), | ||
eventExtenderLoader = require('./loaders/eventExtenderLoader'), | ||
eventDispatcher = require('./eventDispatcher'), | ||
eventEmitter = require('./eventEmitter'), | ||
EventEmitter2 = require('eventemitter2').EventEmitter2, | ||
_ = require('lodash'), | ||
async = require('async'), | ||
queue = require('node-queue'), | ||
eventQueue, | ||
guardStore, | ||
viewBuilders, | ||
repository = require('viewmodel').write.create(), | ||
revisionGuardStore = require('./revisionGuardStore'), | ||
revisionGuard = require('./revisionGuard'), | ||
evtDen; | ||
module.exports = evtDen = _.extend(new EventEmitter2({ | ||
wildcard: true, | ||
delimiter: ':', | ||
maxListeners: 1000 // default would be 10! | ||
}), { | ||
wildcard: true, | ||
delimiter: ':', | ||
maxListeners: 1000 // default would be 10! | ||
}), { | ||
initialize: function(options, callback) { | ||
if (_.isFunction(options)) { | ||
callback = options; | ||
} | ||
initialize: function(options, callback) { | ||
if (_.isFunction(options)) { | ||
callback = options; | ||
} | ||
var defaults = { | ||
eventQueue: { type: 'inMemory', collectionName: 'events' }, | ||
repository: { type: 'inMemory' }, | ||
revisionGuardStore: { type: 'inMemory', collectionName: 'revisionguard' }, | ||
ignoreRevision: false, | ||
disableQueuing: false, | ||
revisionGuardQueueTimeout: 3000, | ||
revisionGuardQueueTimeoutMaxLoops: 3 | ||
}; | ||
var defaults = { | ||
eventQueue: { type: 'inMemory', collectionName: 'events' }, | ||
repository: { type: 'inMemory' }, | ||
revisionGuardStore: { type: 'inMemory', collectionName: 'revisionguard' }, | ||
ignoreRevision: false, | ||
disableQueuing: false, | ||
revisionGuardQueueTimeout: 3000, | ||
revisionGuardQueueTimeoutMaxLoops: 3 | ||
}; | ||
_.defaults(options, defaults); | ||
_.defaults(options, defaults); | ||
eventEmitter.on('extended:*', function(evt) { | ||
evtDen.emit('event', evt); | ||
}); | ||
if (options.revisionGuardStore.revisionStart === undefined) { | ||
options.revisionGuardStore.revisionStart = 1; | ||
} | ||
eventEmitter.on('eventMissing', function(id, aggregateRevision, eventRevision, evt) { | ||
evtDen.emit('eventMissing', id, aggregateRevision, eventRevision, evt); | ||
}); | ||
eventEmitter.on('extended:*', function(evt) { | ||
evtDen.emit('event', evt); | ||
}); | ||
async.series([ | ||
eventEmitter.on('eventMissing', function(id, aggregateRevision, eventRevision, evt) { | ||
evtDen.emit('eventMissing', id, aggregateRevision, eventRevision, evt); | ||
}); | ||
function(callback) { | ||
repository.init(options.repository, callback); | ||
}, | ||
async.series([ | ||
function(callback) { | ||
viewBuilderLoader.configure(function() { | ||
this.use(repository); | ||
}); | ||
eventExtenderLoader.configure(function() { | ||
this.use(repository); | ||
}); | ||
callback(null); | ||
}, | ||
function(callback) { | ||
repository.init(options.repository, callback); | ||
}, | ||
function(callback) { | ||
if (options.extendersPath) { | ||
eventExtenderLoader.load(options.extendersPath, callback); | ||
} else { | ||
callback(null); | ||
} | ||
}, | ||
function(callback) { | ||
viewBuilderLoader.load(options.viewBuildersPath, { ignoreRevision: options.ignoreRevision }, function(err, vBuilders) { | ||
viewBuilders = vBuilders; | ||
callback(err); | ||
}); | ||
}, | ||
function(callback) { | ||
viewBuilderLoader.configure(function() { | ||
this.use(repository); | ||
}); | ||
eventExtenderLoader.configure(function() { | ||
this.use(repository); | ||
}); | ||
callback(null); | ||
}, | ||
function(callback) { | ||
if (options.disableQueuing) { | ||
eventQueue = null; | ||
eventDispatcher.initialize({}, callback); | ||
} else { | ||
queue.connect(options.eventQueue, function(err, evtQueue) { | ||
eventQueue = evtQueue; | ||
eventDispatcher.configure(function() { | ||
this.use(evtQueue); | ||
}); | ||
eventDispatcher.initialize({}, callback); | ||
}); | ||
} | ||
} | ||
function(callback) { | ||
if (options.extendersPath) { | ||
eventExtenderLoader.load(options.extendersPath, callback); | ||
} else { | ||
callback(null); | ||
} | ||
}, | ||
function(callback) { | ||
viewBuilderLoader.load(options.viewBuildersPath, { ignoreRevision: options.ignoreRevision }, function(err, vBuilders) { | ||
viewBuilders = vBuilders; | ||
callback(err); | ||
}); | ||
}, | ||
], function(err) { | ||
revisionGuardStore.connect(options.revisionGuardStore, function(err, revGuardStore) { | ||
guardStore = revGuardStore; | ||
revisionGuard.configure(function() { | ||
this.use(revGuardStore); | ||
this.use(eventDispatcher); | ||
this.use(eventQueue); | ||
}); | ||
revisionGuard.initialize({ | ||
ignoreRevision: options.ignoreRevision, | ||
queueTimeout: options.revisionGuardQueueTimeout, | ||
queueTimeoutMaxLoops: options.revisionGuardQueueTimeoutMaxLoops | ||
}, callback); | ||
function(callback) { | ||
if (options.disableQueuing) { | ||
eventQueue = null; | ||
eventDispatcher.initialize({}, callback); | ||
} else { | ||
queue.connect(options.eventQueue, function(err, evtQueue) { | ||
eventQueue = evtQueue; | ||
eventDispatcher.configure(function() { | ||
this.use(evtQueue); | ||
}); | ||
eventDispatcher.initialize({}, callback); | ||
}); | ||
} | ||
} | ||
], function(err) { | ||
revisionGuardStore.connect(options.revisionGuardStore, function(err, revGuardStore) { | ||
guardStore = revGuardStore; | ||
revisionGuard.configure(function() { | ||
this.use(revGuardStore); | ||
this.use(eventDispatcher); | ||
this.use(eventQueue); | ||
}); | ||
}, | ||
revisionGuard.initialize({ | ||
ignoreRevision: options.ignoreRevision, | ||
queueTimeout: options.revisionGuardQueueTimeout, | ||
queueTimeoutMaxLoops: options.revisionGuardQueueTimeoutMaxLoops | ||
}, callback); | ||
}); | ||
}); | ||
}, | ||
denormalize: function(evt, callback) { | ||
var entry = { | ||
// workers: eventEmitter.listeners('denormalize:' + evt.event).length, | ||
workers: eventEmitter.registerCount('denormalize:' + evt.event), | ||
event: evt | ||
}; | ||
denormalize: function(evt, callback) { | ||
var entry = { | ||
// workers: eventEmitter.listeners('denormalize:' + evt.event).length, | ||
workers: eventEmitter.registerCount('denormalize:' + evt.event), | ||
event: evt | ||
}; | ||
var extendersCount = eventEmitter.registerCount('extend:' + evt.event); | ||
if (entry.workers === 0 && extendersCount === 0) { | ||
eventEmitter.emit('extended:' + evt.event, evt); | ||
if (callback) callback(null); | ||
return; | ||
} | ||
var extendersCount = eventEmitter.registerCount('extend:' + evt.event); | ||
if (entry.workers === 0 && extendersCount === 0) { | ||
eventEmitter.emit('extended:' + evt.event, evt); | ||
if (callback) callback(null); | ||
return; | ||
} | ||
if (entry.workers === 0 && extendersCount > 0) { | ||
eventEmitter.emit('extend:' + evt.event, evt); | ||
if (callback) callback(null); | ||
return; | ||
} | ||
if (entry.workers === 0 && extendersCount > 0) { | ||
eventEmitter.emit('extend:' + evt.event, evt); | ||
if (callback) callback(null); | ||
return; | ||
} | ||
if (!eventQueue) { | ||
if (callback) callback(null); | ||
revisionGuard.guard(evt); | ||
} else { | ||
eventQueue.push(evt.id, entry, function(err) { | ||
if (callback) callback(err); | ||
revisionGuard.guard(evt); | ||
}); | ||
} | ||
}, | ||
if (!eventQueue) { | ||
if (callback) callback(null); | ||
revisionGuard.guard(evt); | ||
} else { | ||
eventQueue.push(evt.id, entry, function(err) { | ||
if (callback) callback(err); | ||
revisionGuard.guard(evt); | ||
}); | ||
} | ||
}, | ||
replay: function(evts, callback) { | ||
replay: function(evts, callback) { | ||
var groupedEvents = {}; | ||
var revisionMap = {}, | ||
groupedEvents = {}; | ||
_.each(evts, function(evt) { | ||
_.each(evts, function(evt) { | ||
if (evt.head && evt.head.revision) { | ||
revisionMap[evt.payload.id] = evt.head.revision; | ||
} | ||
var interested = _.filter(viewBuilders, function(vB) { | ||
return _.contains(vB.registeredEventNames, evt.event); | ||
}); | ||
var interested = _.filter(viewBuilders, function(vB) { | ||
return _.contains(vB.registeredEventNames, evt.event); | ||
}); | ||
_.each(interested, function(inter) { | ||
groupedEvents[inter.id] = groupedEvents[inter.id] || []; | ||
groupedEvents[inter.id].push(evt); | ||
}); | ||
_.each(interested, function(inter) { | ||
groupedEvents[inter.id] = groupedEvents[inter.id] || []; | ||
groupedEvents[inter.id].push(evt); | ||
}); | ||
}); | ||
}); | ||
async.series([ | ||
function(callback) { | ||
async.each(viewBuilders, function(viewBuilder, callback) { | ||
if (!groupedEvents[viewBuilder.id] || groupedEvents[viewBuilder.id].length === 0) { | ||
return callback(null); | ||
} | ||
if (!groupedEvents[viewBuilder.id] || groupedEvents[viewBuilder.id].length === 0) { | ||
return callback(null); | ||
} | ||
viewBuilder.replay(groupedEvents[viewBuilder.id], callback); | ||
viewBuilder.replay(groupedEvents[viewBuilder.id], callback); | ||
}, callback); | ||
}, | ||
function(callback) { | ||
var ids = _.keys(revisionMap); | ||
async.each(ids, function(id, callback) { | ||
guardStore.getRevision(id, function(err, entry) { | ||
if (err) { return callback(err); } | ||
}, function(err) { | ||
if (callback) callback(err); | ||
}); | ||
entry.revision = revisionMap[id] + 1; | ||
guardStore.saveRevision(entry, callback); | ||
}); | ||
}, callback); | ||
} | ||
], function(err) { | ||
if (callback) callback(err); | ||
}); | ||
} | ||
} | ||
}); |
@@ -48,23 +48,10 @@ var _ = require('lodash'), | ||
retryGuard: function(evt, callback) { | ||
var aux = self._getAux(); | ||
self.store.getRevision(evt.payload.id, function(err, entry) { | ||
function proceed(entry) { | ||
var aux = self._getAux(); | ||
if (aux.wouldQueue(evt, entry)) { | ||
if (callback) callback(true, entry); | ||
return; | ||
} | ||
aux.finishGuard(evt, entry, callback); | ||
if (aux.wouldQueue(evt, entry)) { | ||
if (callback) callback(true, entry); | ||
return; | ||
} | ||
if (!entry.revision && evt.head.revision !== 1) { | ||
var max = (self.options.queueTimeout * self.options.queueTimeoutMaxLoops) / 3; | ||
setTimeout(function() { | ||
self.store.getRevision(evt.payload.id, function(err, entry) { | ||
proceed(entry); | ||
}); | ||
}, randomBetween(0, max)); | ||
} else { | ||
proceed(entry); | ||
} | ||
aux.finishGuard(evt, entry, callback); | ||
}); | ||
@@ -79,3 +66,3 @@ }, | ||
if (err) { | ||
if (loopCount <= self.options.queueTimeoutMaxLoops) { | ||
if (loopCount < self.options.queueTimeoutMaxLoops) { | ||
return waitAgain(); | ||
@@ -202,3 +189,3 @@ } | ||
this.queue = new Queue(options.queueTimeout); | ||
this.queue = new Queue({ queueTimeout: options.queueTimeout }); | ||
@@ -205,0 +192,0 @@ if (callback) callback(null); |
@@ -5,6 +5,13 @@ var repository = require('viewmodel').write.create(), | ||
function getStore(repo) { | ||
function getStore(repo, opt) { | ||
var store = { | ||
getRevision: function(id, callback) { | ||
repo.get(id, callback); | ||
repo.get(id, function(err, entry) { | ||
if (opt && opt.revisionStart !== undefined) { | ||
entry.revision = entry.revision || opt.revisionStart; | ||
callback(err, entry); | ||
} else { | ||
callback(err, entry); | ||
} | ||
}); | ||
}, | ||
@@ -61,2 +68,6 @@ saveRevision: function(toSave, callback) { | ||
if (options.revisionStart === undefined) { | ||
options.revisionStart = 1; | ||
} | ||
repository.init(options, function(err) { | ||
@@ -71,3 +82,3 @@ if(err) { | ||
callback(null, getStore(repo)); | ||
callback(null, getStore(repo, options)); | ||
}); | ||
@@ -74,0 +85,0 @@ } |
{ | ||
"author": "adrai", | ||
"name": "cqrs-eventdenormalizer", | ||
"version": "0.3.1", | ||
"version": "0.3.2", | ||
"private": false, | ||
@@ -6,0 +6,0 @@ "main": "index.js", |
@@ -97,3 +97,4 @@ # Introduction | ||
collectionName: 'revisionguard', | ||
timeout: 60 * 1000 | ||
timeout: 60 * 1000//, | ||
//revisionStart: 1 | ||
}, | ||
@@ -111,2 +112,6 @@ ignoreRevision: false, | ||
## v0.3.2 | ||
- introduced optional revisionStart (default = 1) | ||
## v0.3.1 | ||
@@ -113,0 +118,0 @@ |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
157
42601
19
1043