Comparing version
@@ -62,3 +62,7 @@ var Definition = require('../definitionBase'), | ||
return true; | ||
}); | ||
}); | ||
this.defineShouldHandleEvent(function (evt) { | ||
return true; | ||
}); | ||
} | ||
@@ -178,132 +182,121 @@ | ||
function retry (retryIn) { | ||
if (_.isNumber(retryIn)) { | ||
retryIn = randomBetween(0, retryIn); | ||
} | ||
var self = this; | ||
if (_.isObject(retryIn) && _.isNumber(retryIn.from) && _.isNumber(retryIn.to)) { | ||
retryIn = randomBetween(retryIn.from, retryIn.to); | ||
// self.shouldHandle(evt, sagaModel, function (err, doHandle) { | ||
this.shouldHandleEvent(evt, function (err, doHandle) { | ||
console.log('doHandle', doHandle); | ||
if (err) { | ||
return callback(err); | ||
} | ||
if (!_.isNumber(retryIn) || retryIn === 0) { | ||
retryIn = randomBetween(0, self.options ? self.options.retryOnConcurrencyTimeout : 800); | ||
if (!doHandle) { | ||
return callback(null, null); | ||
} | ||
debug('retry in ' + retryIn + 'ms'); | ||
setTimeout(function() { | ||
self.handle(evt, callback); | ||
}, retryIn); | ||
} | ||
function retry (retryIn) { | ||
if (_.isNumber(retryIn)) { | ||
retryIn = randomBetween(0, retryIn); | ||
} | ||
var self = this; | ||
if (_.isObject(retryIn) && _.isNumber(retryIn.from) && _.isNumber(retryIn.to)) { | ||
retryIn = randomBetween(retryIn.from, retryIn.to); | ||
} | ||
async.waterfall([ | ||
if (!_.isNumber(retryIn) || retryIn === 0) { | ||
retryIn = randomBetween(0, self.options ? self.options.retryOnConcurrencyTimeout : 800); | ||
} | ||
function (callb) { | ||
if (!self.id || !dotty.exists(evt, self.id)) { | ||
debug('has no id, generate new one'); | ||
debug('retry in ' + retryIn + 'ms'); | ||
setTimeout(function() { | ||
self.handle(evt, callback); | ||
}, retryIn); | ||
} | ||
if (!self.getNewIdForThisSaga) { | ||
return self.sagaStore.getNewId(callb); | ||
} | ||
async.waterfall([ | ||
self.getNewIdForThisSaga(evt, callb); | ||
} else { | ||
debug('already has an id'); | ||
callb(null, dotty.get(evt, self.id)); | ||
} | ||
}, | ||
function (callb) { | ||
if (!self.id || !dotty.exists(evt, self.id)) { | ||
debug('has no id, generate new one'); | ||
function (id, callb) { | ||
self.sagaStore.get(id, function (err, data) { | ||
if (err) { | ||
return callb(err); | ||
} | ||
if (!self.getNewIdForThisSaga) { | ||
return self.sagaStore.getNewId(callb); | ||
} | ||
if (!data && self.existing) { | ||
debug('this saga only wants to be executed, if already existing'); | ||
return callback(null, null); | ||
self.getNewIdForThisSaga(evt, callb); | ||
} else { | ||
debug('already has an id'); | ||
callb(null, dotty.get(evt, self.id)); | ||
} | ||
}, | ||
var sagaModel = new SagaModel(id); | ||
if (data) { | ||
sagaModel.set(data); | ||
sagaModel.actionOnCommit = 'update'; | ||
} | ||
callb(null, sagaModel); | ||
}); | ||
}, | ||
function (sagaModel, callb) { | ||
// attach commit function | ||
debug('attach commit function'); | ||
/** | ||
* Commits the saga data and its commands. | ||
* @param {Function} clb The function, that will be called when this action is completed. | ||
* `function(err){}` | ||
*/ | ||
sagaModel.commit = function (clb) { | ||
async.parallel([ | ||
function (callback) { | ||
self.checkForId(sagaModel.getUndispatchedCommands(), callback); | ||
}, | ||
function (callback) { | ||
self.checkForId(sagaModel.getTimeoutCommands(), callback); | ||
} | ||
], function (err) { | ||
function (id, callb) { | ||
self.sagaStore.get(id, function (err, data) { | ||
if (err) { | ||
debug(err); | ||
return callback(err); | ||
return callb(err); | ||
} | ||
if (sagaModel.isDestroyed()) { | ||
self.sagaStore.remove(sagaModel.id, clb); | ||
} else { | ||
sagaModel.setCommitStamp(new Date()); | ||
var undispCmds = _.map(sagaModel.getUndispatchedCommands(), function (c) { | ||
return { id: dotty.get(c, self.definitions.command.id), payload: c }; | ||
}); | ||
if (!data && self.existing) { | ||
debug('this saga only wants to be executed, if already existing'); | ||
return callback(null, null); | ||
} | ||
self.sagaStore.save(sagaModel.toJSON(), undispCmds, function (err) { | ||
if (err instanceof ConcurrencyError) { | ||
retry(clb); | ||
return; | ||
} | ||
clb(err); | ||
}); | ||
var sagaModel = new SagaModel(id); | ||
if (data) { | ||
sagaModel.set(data); | ||
sagaModel.actionOnCommit = 'update'; | ||
} | ||
callb(null, sagaModel); | ||
}); | ||
}; | ||
}, | ||
// attach addCommandToSend function | ||
debug('attach addCommandToSend function'); | ||
/** | ||
* Adds the passed command to this model. | ||
* @param {Object} cmd The command that should be sent. | ||
*/ | ||
sagaModel.addCommandToSend = function (cmd) { | ||
if (!dotty.exists(cmd, self.definitions.command.meta) && dotty.exists(evt, self.definitions.event.meta) && | ||
!!self.definitions.command.meta && !!self.definitions.event.meta) { | ||
dotty.put(cmd, self.definitions.command.meta, dotty.get(evt, self.definitions.event.meta)); | ||
} | ||
function (sagaModel, callb) { | ||
sagaModel.addUnsentCommand(cmd); | ||
}; | ||
// attach commit function | ||
debug('attach commit function'); | ||
// attach defineTimeout function | ||
debug('attach defineTimeout function'); | ||
/** | ||
* Defines a timeout date and optional timeout commands, and adds them to this model. | ||
* @param {Date} date The timeout date. | ||
* @param {Array} cmds The array of commands. | ||
*/ | ||
sagaModel.defineTimeout = function (date, cmds) { | ||
cmds = cmds || []; | ||
if (!_.isArray(cmds)) { | ||
cmds = [cmds]; | ||
} | ||
/** | ||
* Commits the saga data and its commands. | ||
* @param {Function} clb The function, that will be called when this action is completed. | ||
* `function(err){}` | ||
*/ | ||
sagaModel.commit = function (clb) { | ||
async.parallel([ | ||
function (callback) { | ||
self.checkForId(sagaModel.getUndispatchedCommands(), callback); | ||
}, | ||
function (callback) { | ||
self.checkForId(sagaModel.getTimeoutCommands(), callback); | ||
} | ||
], function (err) { | ||
if (err) { | ||
debug(err); | ||
return callback(err); | ||
} | ||
if (sagaModel.isDestroyed()) { | ||
self.sagaStore.remove(sagaModel.id, clb); | ||
} else { | ||
sagaModel.setCommitStamp(new Date()); | ||
cmds.forEach(function (cmd) { | ||
var undispCmds = _.map(sagaModel.getUndispatchedCommands(), function (c) { | ||
return { id: dotty.get(c, self.definitions.command.id), payload: c }; | ||
}); | ||
self.sagaStore.save(sagaModel.toJSON(), undispCmds, function (err) { | ||
if (err instanceof ConcurrencyError) { | ||
retry(clb); | ||
return; | ||
} | ||
clb(err); | ||
}); | ||
} | ||
}); | ||
}; | ||
// attach addCommandToSend function | ||
debug('attach addCommandToSend function'); | ||
/** | ||
* Adds the passed command to this model. | ||
* @param {Object} cmd The command that should be sent. | ||
*/ | ||
sagaModel.addCommandToSend = function (cmd) { | ||
if (!dotty.exists(cmd, self.definitions.command.meta) && dotty.exists(evt, self.definitions.event.meta) && | ||
@@ -313,61 +306,84 @@ !!self.definitions.command.meta && !!self.definitions.event.meta) { | ||
} | ||
}); | ||
sagaModel.addTimeout(date, cmds); | ||
}; | ||
sagaModel.addUnsentCommand(cmd); | ||
}; | ||
callb(null, sagaModel); | ||
}, | ||
function (sagaModel, callb) { | ||
var sagaThis = { | ||
retry: function () { | ||
if (arguments.length === 0) { | ||
return retry(); | ||
// attach defineTimeout function | ||
debug('attach defineTimeout function'); | ||
/** | ||
* Defines a timeout date and optional timeout commands, and adds them to this model. | ||
* @param {Date} date The timeout date. | ||
* @param {Array} cmds The array of commands. | ||
*/ | ||
sagaModel.defineTimeout = function (date, cmds) { | ||
cmds = cmds || []; | ||
if (!_.isArray(cmds)) { | ||
cmds = [cmds]; | ||
} | ||
return retry(arguments[0]); | ||
} | ||
}; | ||
cmds.forEach(function (cmd) { | ||
if (!dotty.exists(cmd, self.definitions.command.meta) && dotty.exists(evt, self.definitions.event.meta) && | ||
!!self.definitions.command.meta && !!self.definitions.event.meta) { | ||
dotty.put(cmd, self.definitions.command.meta, dotty.get(evt, self.definitions.event.meta)); | ||
} | ||
}); | ||
self.shouldHandle(evt, sagaModel, function (err, doHandle) { | ||
if (err) { | ||
return callb(err); | ||
} | ||
sagaModel.addTimeout(date, cmds); | ||
}; | ||
if (!doHandle) { | ||
return callb(null, sagaModel); | ||
} | ||
callb(null, sagaModel); | ||
}, | ||
self.sagaFn.call(sagaThis, self.getPayload(evt), sagaModel, function (err) { | ||
function (sagaModel, callb) { | ||
var sagaThis = { | ||
retry: function () { | ||
if (arguments.length === 0) { | ||
return retry(); | ||
} | ||
return retry(arguments[0]); | ||
} | ||
}; | ||
self.shouldHandle(evt, sagaModel, function (err, doHandle) { | ||
if (err) { | ||
return callb(err); | ||
} | ||
callb(null, sagaModel); | ||
if (!doHandle) { | ||
return callb(null, sagaModel); | ||
} | ||
self.sagaFn.call(sagaThis, self.getPayload(evt), sagaModel, function (err) { | ||
if (err) { | ||
return callb(err); | ||
} | ||
callb(null, sagaModel); | ||
}); | ||
}); | ||
}); | ||
}, | ||
}, | ||
function (sagaModel, callb) { | ||
// detach commit function | ||
debug('detach commit function'); | ||
if (sagaModel.commit) { | ||
delete sagaModel.commit; | ||
} | ||
function (sagaModel, callb) { | ||
// detach commit function | ||
debug('detach commit function'); | ||
if (sagaModel.commit) { | ||
delete sagaModel.commit; | ||
} | ||
// detach addCommandToSend function | ||
debug('detach addCommandToSend function'); | ||
if (sagaModel.addCommandToSend) { | ||
delete sagaModel.addCommandToSend; | ||
} | ||
// detach addCommandToSend function | ||
debug('detach addCommandToSend function'); | ||
if (sagaModel.addCommandToSend) { | ||
delete sagaModel.addCommandToSend; | ||
} | ||
// detach defineTimeout function | ||
debug('detach defineTimeout function'); | ||
if (sagaModel.defineTimeout) { | ||
delete sagaModel.defineTimeout; | ||
// detach defineTimeout function | ||
debug('detach defineTimeout function'); | ||
if (sagaModel.defineTimeout) { | ||
delete sagaModel.defineTimeout; | ||
} | ||
callb(null, sagaModel); | ||
} | ||
callb(null, sagaModel); | ||
} | ||
], callback); | ||
], callback); | ||
}); | ||
}, | ||
@@ -441,2 +457,46 @@ | ||
return this; | ||
}, | ||
/** | ||
* Inject shouldHandleEvent function. | ||
* @param {Function} fn The function to be injected. | ||
* @returns {Saga} to be able to chain... | ||
*/ | ||
defineShouldHandleEvent: function (fn) { | ||
if (!fn || !_.isFunction(fn)) { | ||
var err = new Error('Please pass a valid function!'); | ||
debug(err); | ||
throw err; | ||
} | ||
if (fn.length === 2) { | ||
this.shouldHandleEvent = fn; | ||
return this; | ||
} | ||
this.shouldHandleEvent = function (evt, callback) { | ||
callback(null, fn(evt)); | ||
}; | ||
var unwrappedShouldHandleEvent = this.shouldHandleEvent; | ||
this.shouldHandleEvent = function (evt, clb) { | ||
var wrappedCallback = function () { | ||
try { | ||
clb.apply(this, _.toArray(arguments)); | ||
} catch (e) { | ||
debug(e); | ||
process.emit('uncaughtException', e); | ||
} | ||
}; | ||
try { | ||
unwrappedShouldHandleEvent.call(this, evt, wrappedCallback); | ||
} catch (e) { | ||
debug(e); | ||
process.emit('uncaughtException', e); | ||
} | ||
}; | ||
return this; | ||
} | ||
@@ -443,0 +503,0 @@ |
@@ -9,2 +9,3 @@ var util = require('util'), | ||
isNew = mongoVersion.indexOf('1.') !== 0, | ||
isNew = mongoVersion.indexOf('1.') !== 0, | ||
ObjectID = isNew ? mongo.ObjectID : mongo.BSONPure.ObjectID; | ||
@@ -110,5 +111,6 @@ | ||
self.store = self.db.collection(options.collectionName); | ||
self.store.ensureIndex({ '_commands.id': 1}, function() {}); | ||
self.store.ensureIndex({ '_timeoutAt': 1}, function() {}); | ||
self.store.ensureIndex({ '_commitStamp': 1}, function() {}); | ||
self.store.createIndex({ '_commands.id': 1}, function() {}); | ||
self.store.createIndex({ '_timeoutAt': 1}, function() {}); | ||
self.store.createIndex({ '_commitStamp': 1}, function() {}); | ||
if (!err) { | ||
@@ -201,3 +203,3 @@ self.emit('connect'); | ||
saga._hash = new ObjectID().toString(); | ||
this.store.insert(saga, { safe: true }, function (err) { | ||
this.store.insertOne(saga, { safe: true, w: 1 }, function (err) { | ||
if (err && err.message && err.message.indexOf('duplicate key') >= 0) { | ||
@@ -211,3 +213,3 @@ return callback(new ConcurrencyError()); | ||
saga._hash = new ObjectID().toString(); | ||
this.store.update({ _id: saga._id, _hash: currentHash }, saga, { safe: true }, function(err, modifiedCount) { | ||
this.store.updateOne({ _id: saga._id, _hash: currentHash }, { $set: saga }, { safe: true }, function(err, modifiedCount) { | ||
if (isNew) { | ||
@@ -258,3 +260,3 @@ if (modifiedCount && modifiedCount.result && modifiedCount.result.n === 0) { | ||
this.store.remove({ _id: id }, { safe: true }, function (err) { | ||
this.store.deleteOne({ _id: id }, { safe: true, w: 1 }, function (err) { | ||
if (callback) callback(err); | ||
@@ -261,0 +263,0 @@ }); |
{ | ||
"author": "adrai", | ||
"name": "cqrs-saga", | ||
"version": "1.11.0", | ||
"version": "1.11.1", | ||
"private": false, | ||
@@ -6,0 +6,0 @@ "main": "index.js", |
@@ -482,3 +482,12 @@ # Introduction | ||
// | ||
// optional define a function that checks if an event should be handled | ||
// optional define a function that checks if an event should be handled ( before saga is loaded ) | ||
//.defineShouldHandleEvent(function (evt) { | ||
// return true; | ||
//}); | ||
// or | ||
//.defineShouldHandleEvent(function (evt, callback) { | ||
// callback(null, true'); | ||
//}); | ||
// | ||
// optional define a function that checks if an event should be handled ( after saga is loaded ) | ||
//.defineShouldHandle(function (evt, saga) { | ||
@@ -485,0 +494,0 @@ // return true; |
@@ -0,1 +1,5 @@ | ||
## [v1.11.1](https://github.com/adrai/node-cqrs-saga/compare/v1.11.0...v1.11.1) | ||
- replace deprecated mongo methods ( ensureIndex, insert, remove, update ) | ||
- add optional shouldHandleEvent function, to filter events before loading saga | ||
## [v1.11.0](https://github.com/adrai/node-cqrs-saga/compare/v1.10.2...v1.11.0) | ||
@@ -2,0 +6,0 @@ - add option to add custom structureLoader implementation |
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
178934
1.34%4744
1.09%610
1.5%0
-100%