Comparing version
@@ -5,3 +5,3 @@ 'use strict'; | ||
SagaModel = require('../sagaModel'), | ||
ConcurrencyError = require('../concurrencyError'), | ||
ConcurrencyError = require('../errors/concurrencyError'), | ||
util = require('util'), | ||
@@ -8,0 +8,0 @@ _ = require('lodash'), |
177
lib/pm.js
@@ -12,2 +12,4 @@ 'use strict'; | ||
dotty = require('dotty'), | ||
RevisionGuard = require('./revisionGuard'), | ||
revisionGuardStore = require('./revisionGuardStore'), | ||
SagaModel = require('./sagaModel'), | ||
@@ -37,2 +39,13 @@ structureLoader = require('./structure/structureLoader'), | ||
var defaultRevOpt = { | ||
queueTimeout: 1000, | ||
queueTimeoutMaxLoops: 3 | ||
}; | ||
options.revisionGuard = options.revisionGuard || {}; | ||
_.defaults(options.revisionGuard, defaultRevOpt); | ||
this.revisionGuardStore = revisionGuardStore.create(options.revisionGuard); | ||
this.options = options; | ||
@@ -73,2 +86,6 @@ | ||
}); | ||
this.onEventMissing(function (info, evt) { | ||
debug('missing events: ', info, evt); | ||
}); | ||
} | ||
@@ -161,2 +178,19 @@ | ||
/** | ||
* Inject function for event missing handle. | ||
* @param {Function} fn the function to be injected | ||
* @returns {ProcessManager} to be able to chain... | ||
*/ | ||
onEventMissing: function (fn) { | ||
if (!fn || !_.isFunction(fn)) { | ||
var err = new Error('Please pass a valid function!'); | ||
debug(err); | ||
throw err; | ||
} | ||
this.onEventMissingHandle = fn; | ||
return this; | ||
}, | ||
/** | ||
* Call this function to initialize the saga. | ||
@@ -183,15 +217,37 @@ * @param {Function} callback the function that will be called when this action has finished [optional] | ||
// prepare sagaStore... | ||
// prepare infrastructure... | ||
function (callback) { | ||
debug('prepare sagaStore...'); | ||
debug('prepare infrastructure...'); | ||
async.parallel([ | ||
self.sagaStore.on('connect', function () { | ||
self.emit('connect'); | ||
}); | ||
// prepare sagaStore... | ||
function (callback) { | ||
debug('prepare sagaStore...'); | ||
self.sagaStore.on('disconnect', function () { | ||
self.emit('disconnect'); | ||
}); | ||
self.sagaStore.on('connect', function () { | ||
self.emit('connect'); | ||
}); | ||
self.sagaStore.connect(callback); | ||
self.sagaStore.on('disconnect', function () { | ||
self.emit('disconnect'); | ||
}); | ||
self.sagaStore.connect(callback); | ||
}, | ||
// prepare revisionGuard... | ||
function (callback) { | ||
debug('prepare revisionGuard...'); | ||
self.revisionGuardStore.on('connect', function () { | ||
self.emit('connect'); | ||
}); | ||
self.revisionGuardStore.on('disconnect', function () { | ||
self.emit('disconnect'); | ||
}); | ||
self.revisionGuardStore.connect(callback); | ||
} | ||
], callback); | ||
}, | ||
@@ -203,2 +259,7 @@ | ||
self.revisionGuard = new RevisionGuard(self.revisionGuardStore, self.options.revisionGuard); | ||
self.revisionGuard.onEventMissing(function (info, evt) { | ||
self.onEventMissingHandle(info, evt); | ||
}); | ||
self.eventDispatcher = new EventDispatcher(self.sagas, self.definitions.event); | ||
@@ -211,2 +272,4 @@ self.sagas.defineOptions({}) // options??? | ||
self.revisionGuard.defineEvent(self.definitions.event); | ||
callback(null); | ||
@@ -223,19 +286,13 @@ } | ||
/** | ||
* Call this function to let the saga handle it. | ||
* Call this function to forward it to the dispatcher. | ||
* @param {Object} evt The event object | ||
* @param {Function} callback The function that will be called when this action has finished [optional] | ||
* `function(err, cmds, sagaModels){}` cmds and sagaModels are of type Array | ||
* `function(errs, evt, notifications){}` notifications is of type Array | ||
*/ | ||
handle: function (evt, callback) { | ||
if (!evt || !_.isObject(evt)) { | ||
var err = new Error('Please pass a valid event!'); | ||
debug(err); | ||
throw err; | ||
} | ||
dispatch: function (evt, callback) { | ||
var self = this; | ||
var self = this; | ||
this.eventDispatcher.dispatch(evt, function (errs, sagaModels) { | ||
var cmds = []; | ||
if (!sagaModels || sagaModels.length === 0) { | ||
@@ -247,5 +304,5 @@ if (callback) { | ||
} | ||
async.each(sagaModels, function (sagaModel, callback) { | ||
var cmdsToSend = sagaModel.getUndispatchedCommands(); | ||
@@ -264,3 +321,3 @@ | ||
} | ||
async.each(cmdsToSend, function (cmd, callback) { | ||
@@ -280,6 +337,5 @@ | ||
} | ||
}, callback); | ||
}, function (err) { | ||
@@ -295,15 +351,70 @@ if (err) { | ||
if (callback) { | ||
// var sagaModelsData = _.map(sagaModels, function (s) { | ||
// var json = s.toJSON(); | ||
// if (s.isDestroyed()) { | ||
// json._destroyed = true; | ||
// } | ||
// return json; | ||
// }); | ||
// callback(errs, cmds, sagaModelsData); | ||
callback(errs, cmds, sagaModels); | ||
} | ||
}); | ||
}); | ||
}, | ||
/** | ||
* Call this function to let the saga handle it. | ||
* @param {Object} evt The event object | ||
* @param {Function} callback The function that will be called when this action has finished [optional] | ||
* `function(err, cmds, sagaModels){}` cmds and sagaModels are of type Array | ||
*/ | ||
handle: function (evt, callback) { | ||
if (!evt || !_.isObject(evt)) { | ||
var err = new Error('Please pass a valid event!'); | ||
debug(err); | ||
throw err; | ||
} | ||
var self = this; | ||
var workWithRevisionGuard = false; | ||
if (!!this.definitions.event.revision && dotty.exists(evt, this.definitions.event.revision) && | ||
!!this.definitions.event.aggregateId && dotty.exists(evt, this.definitions.event.aggregateId)) { | ||
workWithRevisionGuard = true; | ||
} | ||
if (!workWithRevisionGuard) { | ||
return this.dispatch(evt, callback); | ||
} | ||
this.revisionGuard.guard(evt, function (err, done) { | ||
if (err) { | ||
debug(err); | ||
if (callback) { | ||
callback([err]); | ||
} | ||
return; | ||
} | ||
self.dispatch(evt, function (errs, cmds, sagaModels) { | ||
if (errs) { | ||
debug(errs); | ||
if (callback) { | ||
callback(errs, cmds, sagaModels); | ||
} | ||
return; | ||
} | ||
done(function (err) { | ||
if (err) { | ||
if (!errs) { | ||
errs = [err]; | ||
} else if (_.isArray(errs)) { | ||
errs.unshift(err); | ||
} | ||
debug(err); | ||
} | ||
if (callback) { | ||
callback(errs, cmds, sagaModels); | ||
} | ||
}); | ||
}); | ||
}); | ||
}, | ||
@@ -310,0 +421,0 @@ |
@@ -7,3 +7,3 @@ 'use strict'; | ||
uuid = require('node-uuid').v4, | ||
ConcurrencyError = require('../../concurrencyError'), | ||
ConcurrencyError = require('../../errors/concurrencyError'), | ||
_ = require('lodash'); | ||
@@ -10,0 +10,0 @@ |
@@ -7,3 +7,3 @@ 'use strict'; | ||
debug = require('debug')('saga:mongodb'), | ||
ConcurrencyError = require('../../concurrencyError'), | ||
ConcurrencyError = require('../../errors/concurrencyError'), | ||
mongo = require('mongodb'), | ||
@@ -10,0 +10,0 @@ ObjectID = mongo.BSONPure.ObjectID; |
@@ -8,3 +8,3 @@ 'use strict'; | ||
uuid = require('node-uuid').v4, | ||
ConcurrencyError = require('../../concurrencyError'), | ||
ConcurrencyError = require('../../errors/concurrencyError'), | ||
jsondate = require('jsondate'), | ||
@@ -11,0 +11,0 @@ async = require('async'), |
{ | ||
"author": "adrai", | ||
"name": "cqrs-saga", | ||
"version": "1.0.2", | ||
"version": "1.1.0", | ||
"private": false, | ||
@@ -6,0 +6,0 @@ "main": "index.js", |
@@ -50,2 +50,19 @@ # Introduction | ||
// password: 'secret' // optional | ||
}, | ||
// optional, default is in-memory | ||
// the revisionguard only works if aggregateId and revision are defined in event definition | ||
// currently supports: mongodb, redis, tingodb and inmemory | ||
// hint settings like: [eventstore](https://github.com/adrai/node-eventstore#provide-implementation-for-storage) | ||
revisionGuard: { | ||
queueTimeout: 1000, // optional, timeout for non-handled events in the internal in-memory queue | ||
queueTimeoutMaxLoops: 3 // optional, maximal loop count for non-handled event in the internal in-memory queue | ||
type: 'redis', | ||
host: 'localhost', // optional | ||
port: 6379, // optional | ||
db: 0, // optional | ||
prefix: 'readmodel_revision', // optional | ||
timeout: 10000 // optional | ||
// password: 'secret' // optional | ||
} | ||
@@ -66,4 +83,13 @@ }); | ||
// revisionGuardStore | ||
pm.revisionGuardStore.on('connect', function() { | ||
console.log('revisionGuardStore connected'); | ||
}); | ||
// anything (at the moment only sagaStore) | ||
pm.revisionGuardStore.on('disconnect', function() { | ||
console.log('revisionGuardStore disconnected'); | ||
}); | ||
// anything (sagaStore or revisionGuardStore) | ||
pm.on('connect', function() { | ||
@@ -91,2 +117,9 @@ console.log('something connected'); | ||
// optional, default is 'aggregate.id' | ||
aggregateId: 'aggregate.id', | ||
// optional, default is 'revision' | ||
// will represent the aggregate revision, can be used in next command | ||
revision: 'revision', | ||
// optional | ||
@@ -148,2 +181,21 @@ version: 'version', | ||
## Wire up event missing [optional] | ||
### you can define a synchronous function | ||
pm.onEventMissing(function (info, evt) { | ||
// grab the missing events, depending from info values... | ||
// info.aggregateId | ||
// info.aggregateRevision | ||
// info.aggregate | ||
// info.context | ||
// info.guardRevision | ||
// and call handle... | ||
pm.handle(missingEvent, function (err) { | ||
if (err) { console.log(err); } | ||
}); | ||
}); | ||
## Initialization | ||
@@ -150,0 +202,0 @@ |
@@ -0,1 +1,4 @@ | ||
## [v1.1.0](https://github.com/adrai/node-cqrs-saga/compare/v1.0.2...v1.1.0) | ||
- introduce revisionGuard | ||
## v1.0.2 | ||
@@ -8,3 +11,2 @@ - saga: optional define a function to that returns an id that will be used as saga id | ||
## v1.0.0 | ||
- first stable release |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
165982
13.38%42
31.25%3323
45.49%475
12.29%8
33.33%