Socket
Socket
Sign inDemoInstall

cqrs-eventdenormalizer

Package Overview
Dependencies
Maintainers
1
Versions
169
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

cqrs-eventdenormalizer - npm Package Compare versions

Comparing version 0.3.1 to 0.3.2

304

lib/eventDenormalizer.js

@@ -1,173 +0,193 @@

var viewBuilderLoader = require('./loaders/viewBuilderLoader')
, eventExtenderLoader = require('./loaders/eventExtenderLoader')
, eventDispatcher = require('./eventDispatcher')
, eventEmitter = require('./eventEmitter')
, EventEmitter2 = require('eventemitter2').EventEmitter2
, _ = require('lodash')
, async = require('async')
, queue = require('node-queue')
, eventQueue
, guardStore
, viewBuilders
, repository = require('viewmodel').write.create()
, revisionGuardStore = require('./revisionGuardStore')
, revisionGuard = require('./revisionGuard')
, evtDen;
var viewBuilderLoader = require('./loaders/viewBuilderLoader'),
eventExtenderLoader = require('./loaders/eventExtenderLoader'),
eventDispatcher = require('./eventDispatcher'),
eventEmitter = require('./eventEmitter'),
EventEmitter2 = require('eventemitter2').EventEmitter2,
_ = require('lodash'),
async = require('async'),
queue = require('node-queue'),
eventQueue,
guardStore,
viewBuilders,
repository = require('viewmodel').write.create(),
revisionGuardStore = require('./revisionGuardStore'),
revisionGuard = require('./revisionGuard'),
evtDen;
module.exports = evtDen = _.extend(new EventEmitter2({
wildcard: true,
delimiter: ':',
maxListeners: 1000 // default would be 10!
}), {
wildcard: true,
delimiter: ':',
maxListeners: 1000 // default would be 10!
}), {
initialize: function(options, callback) {
if (_.isFunction(options)) {
callback = options;
}
initialize: function(options, callback) {
if (_.isFunction(options)) {
callback = options;
}
var defaults = {
eventQueue: { type: 'inMemory', collectionName: 'events' },
repository: { type: 'inMemory' },
revisionGuardStore: { type: 'inMemory', collectionName: 'revisionguard' },
ignoreRevision: false,
disableQueuing: false,
revisionGuardQueueTimeout: 3000,
revisionGuardQueueTimeoutMaxLoops: 3
};
var defaults = {
eventQueue: { type: 'inMemory', collectionName: 'events' },
repository: { type: 'inMemory' },
revisionGuardStore: { type: 'inMemory', collectionName: 'revisionguard' },
ignoreRevision: false,
disableQueuing: false,
revisionGuardQueueTimeout: 3000,
revisionGuardQueueTimeoutMaxLoops: 3
};
_.defaults(options, defaults);
_.defaults(options, defaults);
eventEmitter.on('extended:*', function(evt) {
evtDen.emit('event', evt);
});
if (options.revisionGuardStore.revisionStart === undefined) {
options.revisionGuardStore.revisionStart = 1;
}
eventEmitter.on('eventMissing', function(id, aggregateRevision, eventRevision, evt) {
evtDen.emit('eventMissing', id, aggregateRevision, eventRevision, evt);
});
eventEmitter.on('extended:*', function(evt) {
evtDen.emit('event', evt);
});
async.series([
eventEmitter.on('eventMissing', function(id, aggregateRevision, eventRevision, evt) {
evtDen.emit('eventMissing', id, aggregateRevision, eventRevision, evt);
});
function(callback) {
repository.init(options.repository, callback);
},
async.series([
function(callback) {
viewBuilderLoader.configure(function() {
this.use(repository);
});
eventExtenderLoader.configure(function() {
this.use(repository);
});
callback(null);
},
function(callback) {
repository.init(options.repository, callback);
},
function(callback) {
if (options.extendersPath) {
eventExtenderLoader.load(options.extendersPath, callback);
} else {
callback(null);
}
},
function(callback) {
viewBuilderLoader.load(options.viewBuildersPath, { ignoreRevision: options.ignoreRevision }, function(err, vBuilders) {
viewBuilders = vBuilders;
callback(err);
});
},
function(callback) {
viewBuilderLoader.configure(function() {
this.use(repository);
});
eventExtenderLoader.configure(function() {
this.use(repository);
});
callback(null);
},
function(callback) {
if (options.disableQueuing) {
eventQueue = null;
eventDispatcher.initialize({}, callback);
} else {
queue.connect(options.eventQueue, function(err, evtQueue) {
eventQueue = evtQueue;
eventDispatcher.configure(function() {
this.use(evtQueue);
});
eventDispatcher.initialize({}, callback);
});
}
}
function(callback) {
if (options.extendersPath) {
eventExtenderLoader.load(options.extendersPath, callback);
} else {
callback(null);
}
},
function(callback) {
viewBuilderLoader.load(options.viewBuildersPath, { ignoreRevision: options.ignoreRevision }, function(err, vBuilders) {
viewBuilders = vBuilders;
callback(err);
});
},
], function(err) {
revisionGuardStore.connect(options.revisionGuardStore, function(err, revGuardStore) {
guardStore = revGuardStore;
revisionGuard.configure(function() {
this.use(revGuardStore);
this.use(eventDispatcher);
this.use(eventQueue);
});
revisionGuard.initialize({
ignoreRevision: options.ignoreRevision,
queueTimeout: options.revisionGuardQueueTimeout,
queueTimeoutMaxLoops: options.revisionGuardQueueTimeoutMaxLoops
}, callback);
function(callback) {
if (options.disableQueuing) {
eventQueue = null;
eventDispatcher.initialize({}, callback);
} else {
queue.connect(options.eventQueue, function(err, evtQueue) {
eventQueue = evtQueue;
eventDispatcher.configure(function() {
this.use(evtQueue);
});
eventDispatcher.initialize({}, callback);
});
}
}
], function(err) {
revisionGuardStore.connect(options.revisionGuardStore, function(err, revGuardStore) {
guardStore = revGuardStore;
revisionGuard.configure(function() {
this.use(revGuardStore);
this.use(eventDispatcher);
this.use(eventQueue);
});
},
revisionGuard.initialize({
ignoreRevision: options.ignoreRevision,
queueTimeout: options.revisionGuardQueueTimeout,
queueTimeoutMaxLoops: options.revisionGuardQueueTimeoutMaxLoops
}, callback);
});
});
},
denormalize: function(evt, callback) {
var entry = {
// workers: eventEmitter.listeners('denormalize:' + evt.event).length,
workers: eventEmitter.registerCount('denormalize:' + evt.event),
event: evt
};
denormalize: function(evt, callback) {
var entry = {
// workers: eventEmitter.listeners('denormalize:' + evt.event).length,
workers: eventEmitter.registerCount('denormalize:' + evt.event),
event: evt
};
var extendersCount = eventEmitter.registerCount('extend:' + evt.event);
if (entry.workers === 0 && extendersCount === 0) {
eventEmitter.emit('extended:' + evt.event, evt);
if (callback) callback(null);
return;
}
var extendersCount = eventEmitter.registerCount('extend:' + evt.event);
if (entry.workers === 0 && extendersCount === 0) {
eventEmitter.emit('extended:' + evt.event, evt);
if (callback) callback(null);
return;
}
if (entry.workers === 0 && extendersCount > 0) {
eventEmitter.emit('extend:' + evt.event, evt);
if (callback) callback(null);
return;
}
if (entry.workers === 0 && extendersCount > 0) {
eventEmitter.emit('extend:' + evt.event, evt);
if (callback) callback(null);
return;
}
if (!eventQueue) {
if (callback) callback(null);
revisionGuard.guard(evt);
} else {
eventQueue.push(evt.id, entry, function(err) {
if (callback) callback(err);
revisionGuard.guard(evt);
});
}
},
if (!eventQueue) {
if (callback) callback(null);
revisionGuard.guard(evt);
} else {
eventQueue.push(evt.id, entry, function(err) {
if (callback) callback(err);
revisionGuard.guard(evt);
});
}
},
replay: function(evts, callback) {
replay: function(evts, callback) {
var groupedEvents = {};
var revisionMap = {},
groupedEvents = {};
_.each(evts, function(evt) {
_.each(evts, function(evt) {
if (evt.head && evt.head.revision) {
revisionMap[evt.payload.id] = evt.head.revision;
}
var interested = _.filter(viewBuilders, function(vB) {
return _.contains(vB.registeredEventNames, evt.event);
});
var interested = _.filter(viewBuilders, function(vB) {
return _.contains(vB.registeredEventNames, evt.event);
});
_.each(interested, function(inter) {
groupedEvents[inter.id] = groupedEvents[inter.id] || [];
groupedEvents[inter.id].push(evt);
});
_.each(interested, function(inter) {
groupedEvents[inter.id] = groupedEvents[inter.id] || [];
groupedEvents[inter.id].push(evt);
});
});
});
async.series([
function(callback) {
async.each(viewBuilders, function(viewBuilder, callback) {
if (!groupedEvents[viewBuilder.id] || groupedEvents[viewBuilder.id].length === 0) {
return callback(null);
}
if (!groupedEvents[viewBuilder.id] || groupedEvents[viewBuilder.id].length === 0) {
return callback(null);
}
viewBuilder.replay(groupedEvents[viewBuilder.id], callback);
viewBuilder.replay(groupedEvents[viewBuilder.id], callback);
}, callback);
},
function(callback) {
var ids = _.keys(revisionMap);
async.each(ids, function(id, callback) {
guardStore.getRevision(id, function(err, entry) {
if (err) { return callback(err); }
}, function(err) {
if (callback) callback(err);
});
entry.revision = revisionMap[id] + 1;
guardStore.saveRevision(entry, callback);
});
}, callback);
}
], function(err) {
if (callback) callback(err);
});
}
}
});

@@ -48,23 +48,10 @@ var _ = require('lodash'),

retryGuard: function(evt, callback) {
var aux = self._getAux();
self.store.getRevision(evt.payload.id, function(err, entry) {
function proceed(entry) {
var aux = self._getAux();
if (aux.wouldQueue(evt, entry)) {
if (callback) callback(true, entry);
return;
}
aux.finishGuard(evt, entry, callback);
if (aux.wouldQueue(evt, entry)) {
if (callback) callback(true, entry);
return;
}
if (!entry.revision && evt.head.revision !== 1) {
var max = (self.options.queueTimeout * self.options.queueTimeoutMaxLoops) / 3;
setTimeout(function() {
self.store.getRevision(evt.payload.id, function(err, entry) {
proceed(entry);
});
}, randomBetween(0, max));
} else {
proceed(entry);
}
aux.finishGuard(evt, entry, callback);
});

@@ -79,3 +66,3 @@ },

if (err) {
if (loopCount <= self.options.queueTimeoutMaxLoops) {
if (loopCount < self.options.queueTimeoutMaxLoops) {
return waitAgain();

@@ -202,3 +189,3 @@ }

this.queue = new Queue(options.queueTimeout);
this.queue = new Queue({ queueTimeout: options.queueTimeout });

@@ -205,0 +192,0 @@ if (callback) callback(null);

@@ -5,6 +5,13 @@ var repository = require('viewmodel').write.create(),

function getStore(repo) {
function getStore(repo, opt) {
var store = {
getRevision: function(id, callback) {
repo.get(id, callback);
repo.get(id, function(err, entry) {
if (opt && opt.revisionStart !== undefined) {
entry.revision = entry.revision || opt.revisionStart;
callback(err, entry);
} else {
callback(err, entry);
}
});
},

@@ -61,2 +68,6 @@ saveRevision: function(toSave, callback) {

if (options.revisionStart === undefined) {
options.revisionStart = 1;
}
repository.init(options, function(err) {

@@ -71,3 +82,3 @@ if(err) {

callback(null, getStore(repo));
callback(null, getStore(repo, options));
});

@@ -74,0 +85,0 @@ }

{
"author": "adrai",
"name": "cqrs-eventdenormalizer",
"version": "0.3.1",
"version": "0.3.2",
"private": false,

@@ -6,0 +6,0 @@ "main": "index.js",

@@ -97,3 +97,4 @@ # Introduction

collectionName: 'revisionguard',
timeout: 60 * 1000
timeout: 60 * 1000//,
//revisionStart: 1
},

@@ -111,2 +112,6 @@ ignoreRevision: false,

## v0.3.2
- introduced optional revisionStart (default = 1)
## v0.3.1

@@ -113,0 +118,0 @@

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc