cqrs-eventdenormalizer
Advanced tools
Comparing version 0.2.3 to 0.2.4
@@ -11,3 +11,3 @@ var _ = require('lodash') | ||
return this; | ||
}, | ||
}, | ||
@@ -22,2 +22,6 @@ use: function(module) { | ||
setOptions: function(options) { | ||
this.options = options; | ||
}, | ||
create: function(evt, aux, callback) { | ||
@@ -86,4 +90,8 @@ return this.defaultAction(evt, aux, 'create', callback); | ||
defaultQueuingStrategy: function(evt, vm, callback) { | ||
if(evt.head.revision < vm._revision) { | ||
callback(null); | ||
if (self.options.ignoreRevision) { | ||
return true; | ||
} | ||
if(evt.head.revision < vm._revision) { | ||
callback(null); | ||
return false; | ||
@@ -98,2 +106,6 @@ } | ||
defaultDequeuingStrategy: function(vm) { | ||
if (self.options.ignoreRevision) { | ||
return; | ||
} | ||
var pendingEvents = this.getQueuedEvents(vm.id); | ||
@@ -105,3 +117,3 @@ if(!pendingEvents) return; | ||
}); | ||
if(!nextEvent) return; | ||
if(!nextEvent) return; | ||
@@ -112,2 +124,6 @@ this.removeQueuedEvent(vm.id, nextEvent); // dequeue event | ||
defaultRevisionUpdateStrategy: function(vm, evt) { | ||
if (self.options.ignoreRevision) { | ||
return; | ||
} | ||
vm._revision = evt.head.revision + 1; | ||
@@ -164,2 +180,4 @@ } | ||
newObj.options = {}; | ||
return newObj; | ||
@@ -166,0 +184,0 @@ } |
@@ -26,3 +26,5 @@ var eventDenormalizerLoader = require('./loaders/eventDenormalizerLoader') | ||
eventQueue: { type: 'inMemory', collectionName: 'events' }, | ||
repository: { type: 'inMemory' } | ||
repository: { type: 'inMemory' }, | ||
ignoreRevision: false, | ||
disableQueuing: false | ||
}; | ||
@@ -65,12 +67,17 @@ | ||
function(callback) { | ||
eventDenormalizerLoader.load(options.denormalizersPath, callback); | ||
eventDenormalizerLoader.load(options.denormalizersPath, { ignoreRevision: options.ignoreRevision }, callback); | ||
} | ||
], function(err) { | ||
queue.connect(options.eventQueue, function(err, eventQueue) { | ||
eventDispatcher.configure(function() { | ||
this.use(eventQueue); | ||
if (options.disableQueuing) { | ||
eventDispatcher.initialize({}, callback); | ||
} else { | ||
queue.connect(options.eventQueue, function(err, eventQueue) { | ||
eventDispatcher.configure(function() { | ||
this.use(eventQueue); | ||
}); | ||
eventDispatcher.initialize({}, callback); | ||
}); | ||
eventDispatcher.initialize({}, callback); | ||
}); | ||
} | ||
}); | ||
@@ -81,3 +88,3 @@ | ||
denormalize: function(evt, callback) { | ||
eventDispatcher.queueEvent(evt, function(err) { | ||
eventDispatcher.dispatch(evt, function(err) { | ||
if (callback) callback(null); | ||
@@ -84,0 +91,0 @@ }); |
@@ -11,3 +11,3 @@ var async = require('async') | ||
return this; | ||
}, | ||
}, | ||
@@ -25,4 +25,9 @@ use: function(module) { | ||
if (!callback) callback = options; | ||
if (!callback) { | ||
callback = options; | ||
options = {}; | ||
} | ||
this.options = options || {}; | ||
eventEmitter.on('extend:*', selfExtendHandle = function(evt) { | ||
@@ -40,12 +45,20 @@ // var listeners = _.filter(eventEmitter.listeners('extend:' + evt.event), function(listener) { | ||
eventEmitter.on('denormalized:*', function(evt) { | ||
self.eventQueue.decrement(evt.id, function(err, removed) { | ||
if (removed) { | ||
eventEmitter.emit('extend:' + evt.event, evt); | ||
} | ||
}); | ||
if (self.eventQueue) { | ||
self.eventQueue.decrement(evt.id, function(err, removed) { | ||
if (removed) { | ||
eventEmitter.emit('extend:' + evt.event, evt); | ||
} | ||
}); | ||
} else { | ||
eventEmitter.emit('extend:' + evt.event, evt); | ||
} | ||
}); | ||
this.resetWorkers(function(err) { | ||
self.reEmitEvents(callback); | ||
}); | ||
if (!this.options.ignoreQueue) { | ||
this.resetWorkers(function(err) { | ||
self.reEmitEvents(callback); | ||
}); | ||
} else { | ||
callback(null); | ||
} | ||
}, | ||
@@ -56,21 +69,29 @@ | ||
this.eventQueue.getAll(function(err, items) { | ||
async.forEach(items, function(item, cb) { | ||
// item.data.workers = eventEmitter.listeners('denormalize:' + item.data.event.event).length; | ||
item.data.workers = eventEmitter.registerCount('denormalize:' + item.data.event.event); | ||
self.eventQueue.push(item.id, item.data, cb); | ||
}, callback); | ||
}); | ||
if (this.eventQueue) { | ||
this.eventQueue.getAll(function(err, items) { | ||
async.forEach(items, function(item, cb) { | ||
// item.data.workers = eventEmitter.listeners('denormalize:' + item.data.event.event).length; | ||
item.data.workers = eventEmitter.registerCount('denormalize:' + item.data.event.event); | ||
self.eventQueue.push(item.id, item.data, cb); | ||
}, callback); | ||
}); | ||
} else { | ||
callback(null); | ||
} | ||
}, | ||
reEmitEvents: function(callback) { | ||
this.eventQueue.getAll(function(err, items) { | ||
async.forEach(items, function(item, cb) { | ||
eventEmitter.emit('denormalize:' + item.data.event.event, item.data.event); | ||
cb(); | ||
}, callback); | ||
}); | ||
if (this.eventQueue) { | ||
this.eventQueue.getAll(function(err, items) { | ||
async.forEach(items, function(item, cb) { | ||
eventEmitter.emit('denormalize:' + item.data.event.event, item.data.event); | ||
cb(); | ||
}, callback); | ||
}); | ||
} else { | ||
callback(null); | ||
} | ||
}, | ||
queueEvent: function(evt, callback) { | ||
dispatch: function(evt, callback) { | ||
@@ -98,8 +119,14 @@ var entry = { | ||
this.eventQueue.push(evt.id, entry, function(err) { | ||
if (!this.eventQueue) { | ||
eventEmitter.emit('denormalize:' + entry.event.event, entry.event); | ||
callback(err); | ||
}); | ||
callback(null); | ||
} else { | ||
this.eventQueue.push(evt.id, entry, function(err) { | ||
eventEmitter.emit('denormalize:' + entry.event.event, entry.event); | ||
callback(err); | ||
}); | ||
} | ||
} | ||
}; |
@@ -11,3 +11,3 @@ var existsSync = require('fs').existsSync || require('path').existsSync | ||
return this; | ||
}, | ||
}, | ||
@@ -20,6 +20,11 @@ use: function(module) { | ||
} | ||
}, | ||
}, | ||
load: function(p, callback) { | ||
load: function(p, options, callback) { | ||
if (!callback) { | ||
callback = options; | ||
options = { ignoreRevision: false }; | ||
} | ||
var eventDenormalizers = []; | ||
@@ -33,2 +38,3 @@ | ||
var eventDenormalizer = require(file); | ||
eventDenormalizer.setOptions(options); | ||
eventDenormalizers.push(eventDenormalizer); | ||
@@ -61,3 +67,3 @@ | ||
function action(evt) { | ||
eventDenormalizer.handle(evt); | ||
eventDenormalizer.handle(evt); | ||
} | ||
@@ -64,0 +70,0 @@ |
{ | ||
"author": "adrai", | ||
"name": "cqrs-eventdenormalizer", | ||
"version": "0.2.3", | ||
"version": "0.2.4", | ||
"private": false, | ||
@@ -6,0 +6,0 @@ "main": "index.js", |
@@ -23,3 +23,5 @@ # Introduction | ||
denormalizersPath: __dirname + '/eventDenormalizers', | ||
extendersPath: __dirname + '/eventExtenders' | ||
extendersPath: __dirname + '/eventExtenders', | ||
ignoreRevision: false, | ||
disableQueuing: false | ||
}, function(err) { | ||
@@ -50,5 +52,13 @@ | ||
# Release Notes | ||
## v0.2.4 | ||
- added disableQueuing and ignoreRevision flag | ||
# License | ||
Copyright (c) 2012 Adriano Raiano | ||
Copyright (c) 2013 Adriano Raiano | ||
@@ -55,0 +65,0 @@ Permission is hereby granted, free of charge, to any person obtaining a copy |
@@ -73,3 +73,3 @@ var expect = require('expect.js') | ||
eventQueue.push('1', { | ||
eventQueue.push('1', { | ||
workers: 1, | ||
@@ -92,3 +92,3 @@ event: { id: '1', event: 'dummyChanged'} | ||
id: '1', | ||
data: { | ||
data: { | ||
workers: 2, | ||
@@ -117,3 +117,3 @@ event: { id: '1', event: 'dummyChanged'} | ||
describe('calling queueEvent', function() { | ||
describe('calling dispatch', function() { | ||
@@ -123,3 +123,3 @@ describe('having zero denormalizers', function() { | ||
it('it should callback with success', function(done) { | ||
eventDispatcher.queueEvent({id: '1', event: 'dummyChanged'}, function(err) { | ||
eventDispatcher.dispatch({id: '1', event: 'dummyChanged'}, function(err) { | ||
expect(err).not.to.be.ok(); | ||
@@ -142,3 +142,3 @@ done(); | ||
it('it should callback with success', function(done) { | ||
eventDispatcher.queueEvent({id: '1', event: 'dummyChanged2'}, function(err) { | ||
eventDispatcher.dispatch({id: '1', event: 'dummyChanged2'}, function(err) { | ||
expect(err).not.to.be.ok(); | ||
@@ -150,3 +150,3 @@ done(); | ||
it('it should call the extender', function(done) { | ||
eventDispatcher.queueEvent({id: '1', event: 'dummyChanged2'}, function(err) { | ||
eventDispatcher.dispatch({id: '1', event: 'dummyChanged2'}, function(err) { | ||
expect(called).to.be.ok(); | ||
@@ -172,3 +172,3 @@ done(); | ||
it('it should callback with success', function(done) { | ||
eventDispatcher.queueEvent({ id: '0', event: 'dummyChanged'}, function(err) { | ||
eventDispatcher.dispatch({ id: '0', event: 'dummyChanged'}, function(err) { | ||
expect(err).not.to.be.ok(); | ||
@@ -180,3 +180,3 @@ done(); | ||
it('the eventQueueStore should contain an entry', function(done) { | ||
eventDispatcher.queueEvent({ id: '1', event: 'dummyChanged'}, function(err) { | ||
eventDispatcher.dispatch({ id: '1', event: 'dummyChanged'}, function(err) { | ||
eventQueue.getAll(function(err, entries) { | ||
@@ -190,7 +190,7 @@ expect(entries).to.have.length(1); | ||
it('the eventQueueStore\'s entries\' workers number should match the number of denormalizers', function(done) { | ||
eventDispatcher.queueEvent({ id: '1', event: 'dummyChanged'}, function(err) { | ||
eventDispatcher.dispatch({ id: '1', event: 'dummyChanged'}, function(err) { | ||
eventQueue.getAll(function(err, entries) { | ||
expect(entries[0]).to.eql({ | ||
id: '1', | ||
data: { | ||
data: { | ||
workers: 2, | ||
@@ -219,3 +219,3 @@ event: { id: '1', event: 'dummyChanged'} | ||
eventQueue.push('1', { | ||
eventQueue.push('1', { | ||
workers: 2, | ||
@@ -235,3 +235,3 @@ event: { id: '1', event: 'dummyChanged'} | ||
id: '1', | ||
data: { | ||
data: { | ||
workers: 1, | ||
@@ -250,3 +250,3 @@ event: { id: '1', event: 'dummyChanged'} | ||
before(function(done) { | ||
eventQueue.push('1', { | ||
eventQueue.push('1', { | ||
workers: 1, | ||
@@ -332,7 +332,10 @@ event: { id: '1', event: 'dummyChanged'} | ||
var handle; | ||
dummyEmitter.on('done', function(evt) { | ||
eventEmitter.removeListener('extended:*', handle); | ||
done(); | ||
}); | ||
eventEmitter.once('extended:*', function(evt) { | ||
eventEmitter.once('extended:*', handle = function(evt) { | ||
expect(false).to.be.ok(); | ||
@@ -339,0 +342,0 @@ }); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
83121
1721
79