Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

cqrs-eventdenormalizer

Package Overview
Dependencies
Maintainers
1
Versions
169
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

cqrs-eventdenormalizer - npm Package Compare versions

Comparing version 0.2.3 to 0.2.4

26

lib/bases/eventDenormalizerBase.js

@@ -11,3 +11,3 @@ var _ = require('lodash')

return this;
},
},

@@ -22,2 +22,6 @@ use: function(module) {

setOptions: function(options) {
this.options = options;
},
create: function(evt, aux, callback) {

@@ -86,4 +90,8 @@ return this.defaultAction(evt, aux, 'create', callback);

defaultQueuingStrategy: function(evt, vm, callback) {
if(evt.head.revision < vm._revision) {
callback(null);
if (self.options.ignoreRevision) {
return true;
}
if(evt.head.revision < vm._revision) {
callback(null);
return false;

@@ -98,2 +106,6 @@ }

defaultDequeuingStrategy: function(vm) {
if (self.options.ignoreRevision) {
return;
}
var pendingEvents = this.getQueuedEvents(vm.id);

@@ -105,3 +117,3 @@ if(!pendingEvents) return;

});
if(!nextEvent) return;
if(!nextEvent) return;

@@ -112,2 +124,6 @@ this.removeQueuedEvent(vm.id, nextEvent); // dequeue event

defaultRevisionUpdateStrategy: function(vm, evt) {
if (self.options.ignoreRevision) {
return;
}
vm._revision = evt.head.revision + 1;

@@ -164,2 +180,4 @@ }

newObj.options = {};
return newObj;

@@ -166,0 +184,0 @@ }

23

lib/contextEventDenormalizer.js

@@ -26,3 +26,5 @@ var eventDenormalizerLoader = require('./loaders/eventDenormalizerLoader')

eventQueue: { type: 'inMemory', collectionName: 'events' },
repository: { type: 'inMemory' }
repository: { type: 'inMemory' },
ignoreRevision: false,
disableQueuing: false
};

@@ -65,12 +67,17 @@

function(callback) {
eventDenormalizerLoader.load(options.denormalizersPath, callback);
eventDenormalizerLoader.load(options.denormalizersPath, { ignoreRevision: options.ignoreRevision }, callback);
}
], function(err) {
queue.connect(options.eventQueue, function(err, eventQueue) {
eventDispatcher.configure(function() {
this.use(eventQueue);
if (options.disableQueuing) {
eventDispatcher.initialize({}, callback);
} else {
queue.connect(options.eventQueue, function(err, eventQueue) {
eventDispatcher.configure(function() {
this.use(eventQueue);
});
eventDispatcher.initialize({}, callback);
});
eventDispatcher.initialize({}, callback);
});
}
});

@@ -81,3 +88,3 @@

denormalize: function(evt, callback) {
eventDispatcher.queueEvent(evt, function(err) {
eventDispatcher.dispatch(evt, function(err) {
if (callback) callback(null);

@@ -84,0 +91,0 @@ });

@@ -11,3 +11,3 @@ var async = require('async')

return this;
},
},

@@ -25,4 +25,9 @@ use: function(module) {

if (!callback) callback = options;
if (!callback) {
callback = options;
options = {};
}
this.options = options || {};
eventEmitter.on('extend:*', selfExtendHandle = function(evt) {

@@ -40,12 +45,20 @@ // var listeners = _.filter(eventEmitter.listeners('extend:' + evt.event), function(listener) {

eventEmitter.on('denormalized:*', function(evt) {
self.eventQueue.decrement(evt.id, function(err, removed) {
if (removed) {
eventEmitter.emit('extend:' + evt.event, evt);
}
});
if (self.eventQueue) {
self.eventQueue.decrement(evt.id, function(err, removed) {
if (removed) {
eventEmitter.emit('extend:' + evt.event, evt);
}
});
} else {
eventEmitter.emit('extend:' + evt.event, evt);
}
});
this.resetWorkers(function(err) {
self.reEmitEvents(callback);
});
if (!this.options.ignoreQueue) {
this.resetWorkers(function(err) {
self.reEmitEvents(callback);
});
} else {
callback(null);
}
},

@@ -56,21 +69,29 @@

this.eventQueue.getAll(function(err, items) {
async.forEach(items, function(item, cb) {
// item.data.workers = eventEmitter.listeners('denormalize:' + item.data.event.event).length;
item.data.workers = eventEmitter.registerCount('denormalize:' + item.data.event.event);
self.eventQueue.push(item.id, item.data, cb);
}, callback);
});
if (this.eventQueue) {
this.eventQueue.getAll(function(err, items) {
async.forEach(items, function(item, cb) {
// item.data.workers = eventEmitter.listeners('denormalize:' + item.data.event.event).length;
item.data.workers = eventEmitter.registerCount('denormalize:' + item.data.event.event);
self.eventQueue.push(item.id, item.data, cb);
}, callback);
});
} else {
callback(null);
}
},
reEmitEvents: function(callback) {
this.eventQueue.getAll(function(err, items) {
async.forEach(items, function(item, cb) {
eventEmitter.emit('denormalize:' + item.data.event.event, item.data.event);
cb();
}, callback);
});
if (this.eventQueue) {
this.eventQueue.getAll(function(err, items) {
async.forEach(items, function(item, cb) {
eventEmitter.emit('denormalize:' + item.data.event.event, item.data.event);
cb();
}, callback);
});
} else {
callback(null);
}
},
queueEvent: function(evt, callback) {
dispatch: function(evt, callback) {

@@ -98,8 +119,14 @@ var entry = {

this.eventQueue.push(evt.id, entry, function(err) {
if (!this.eventQueue) {
eventEmitter.emit('denormalize:' + entry.event.event, entry.event);
callback(err);
});
callback(null);
} else {
this.eventQueue.push(evt.id, entry, function(err) {
eventEmitter.emit('denormalize:' + entry.event.event, entry.event);
callback(err);
});
}
}
};

@@ -11,3 +11,3 @@ var existsSync = require('fs').existsSync || require('path').existsSync

return this;
},
},

@@ -20,6 +20,11 @@ use: function(module) {

}
},
},
load: function(p, callback) {
load: function(p, options, callback) {
if (!callback) {
callback = options;
options = { ignoreRevision: false };
}
var eventDenormalizers = [];

@@ -33,2 +38,3 @@

var eventDenormalizer = require(file);
eventDenormalizer.setOptions(options);
eventDenormalizers.push(eventDenormalizer);

@@ -61,3 +67,3 @@

function action(evt) {
eventDenormalizer.handle(evt);
eventDenormalizer.handle(evt);
}

@@ -64,0 +70,0 @@

{
"author": "adrai",
"name": "cqrs-eventdenormalizer",
"version": "0.2.3",
"version": "0.2.4",
"private": false,

@@ -6,0 +6,0 @@ "main": "index.js",

@@ -23,3 +23,5 @@ # Introduction

denormalizersPath: __dirname + '/eventDenormalizers',
extendersPath: __dirname + '/eventExtenders'
extendersPath: __dirname + '/eventExtenders',
ignoreRevision: false,
disableQueuing: false
}, function(err) {

@@ -50,5 +52,13 @@

# Release Notes
## v0.2.4
- added disableQueuing and ignoreRevision flag
# License
Copyright (c) 2012 Adriano Raiano
Copyright (c) 2013 Adriano Raiano

@@ -55,0 +65,0 @@ Permission is hereby granted, free of charge, to any person obtaining a copy

@@ -73,3 +73,3 @@ var expect = require('expect.js')

eventQueue.push('1', {
eventQueue.push('1', {
workers: 1,

@@ -92,3 +92,3 @@ event: { id: '1', event: 'dummyChanged'}

id: '1',
data: {
data: {
workers: 2,

@@ -117,3 +117,3 @@ event: { id: '1', event: 'dummyChanged'}

describe('calling queueEvent', function() {
describe('calling dispatch', function() {

@@ -123,3 +123,3 @@ describe('having zero denormalizers', function() {

it('it should callback with success', function(done) {
eventDispatcher.queueEvent({id: '1', event: 'dummyChanged'}, function(err) {
eventDispatcher.dispatch({id: '1', event: 'dummyChanged'}, function(err) {
expect(err).not.to.be.ok();

@@ -142,3 +142,3 @@ done();

it('it should callback with success', function(done) {
eventDispatcher.queueEvent({id: '1', event: 'dummyChanged2'}, function(err) {
eventDispatcher.dispatch({id: '1', event: 'dummyChanged2'}, function(err) {
expect(err).not.to.be.ok();

@@ -150,3 +150,3 @@ done();

it('it should call the extender', function(done) {
eventDispatcher.queueEvent({id: '1', event: 'dummyChanged2'}, function(err) {
eventDispatcher.dispatch({id: '1', event: 'dummyChanged2'}, function(err) {
expect(called).to.be.ok();

@@ -172,3 +172,3 @@ done();

it('it should callback with success', function(done) {
eventDispatcher.queueEvent({ id: '0', event: 'dummyChanged'}, function(err) {
eventDispatcher.dispatch({ id: '0', event: 'dummyChanged'}, function(err) {
expect(err).not.to.be.ok();

@@ -180,3 +180,3 @@ done();

it('the eventQueueStore should contain an entry', function(done) {
eventDispatcher.queueEvent({ id: '1', event: 'dummyChanged'}, function(err) {
eventDispatcher.dispatch({ id: '1', event: 'dummyChanged'}, function(err) {
eventQueue.getAll(function(err, entries) {

@@ -190,7 +190,7 @@ expect(entries).to.have.length(1);

it('the eventQueueStore\'s entries\' workers number should match the number of denormalizers', function(done) {
eventDispatcher.queueEvent({ id: '1', event: 'dummyChanged'}, function(err) {
eventDispatcher.dispatch({ id: '1', event: 'dummyChanged'}, function(err) {
eventQueue.getAll(function(err, entries) {
expect(entries[0]).to.eql({
id: '1',
data: {
data: {
workers: 2,

@@ -219,3 +219,3 @@ event: { id: '1', event: 'dummyChanged'}

eventQueue.push('1', {
eventQueue.push('1', {
workers: 2,

@@ -235,3 +235,3 @@ event: { id: '1', event: 'dummyChanged'}

id: '1',
data: {
data: {
workers: 1,

@@ -250,3 +250,3 @@ event: { id: '1', event: 'dummyChanged'}

before(function(done) {
eventQueue.push('1', {
eventQueue.push('1', {
workers: 1,

@@ -332,7 +332,10 @@ event: { id: '1', event: 'dummyChanged'}

var handle;
dummyEmitter.on('done', function(evt) {
eventEmitter.removeListener('extended:*', handle);
done();
});
eventEmitter.once('extended:*', function(evt) {
eventEmitter.once('extended:*', handle = function(evt) {
expect(false).to.be.ok();

@@ -339,0 +342,0 @@ });

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc