🚀 Big News: Socket Acquires Coana to Bring Reachability Analysis to Every Appsec Team.Learn more →

cqrs-saga

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

cqrs-saga - npm Package Compare versions

Comparing version

to
1.11.1

@@ -62,3 +62,7 @@ var Definition = require('../definitionBase'),

return true;
});
});
this.defineShouldHandleEvent(function (evt) {
return true;
});
}

@@ -178,132 +182,121 @@

function retry (retryIn) {
if (_.isNumber(retryIn)) {
retryIn = randomBetween(0, retryIn);
}
var self = this;
if (_.isObject(retryIn) && _.isNumber(retryIn.from) && _.isNumber(retryIn.to)) {
retryIn = randomBetween(retryIn.from, retryIn.to);
// self.shouldHandle(evt, sagaModel, function (err, doHandle) {
this.shouldHandleEvent(evt, function (err, doHandle) {
console.log('doHandle', doHandle);
if (err) {
return callback(err);
}
if (!_.isNumber(retryIn) || retryIn === 0) {
retryIn = randomBetween(0, self.options ? self.options.retryOnConcurrencyTimeout : 800);
if (!doHandle) {
return callback(null, null);
}
debug('retry in ' + retryIn + 'ms');
setTimeout(function() {
self.handle(evt, callback);
}, retryIn);
}
function retry (retryIn) {
if (_.isNumber(retryIn)) {
retryIn = randomBetween(0, retryIn);
}
var self = this;
if (_.isObject(retryIn) && _.isNumber(retryIn.from) && _.isNumber(retryIn.to)) {
retryIn = randomBetween(retryIn.from, retryIn.to);
}
async.waterfall([
if (!_.isNumber(retryIn) || retryIn === 0) {
retryIn = randomBetween(0, self.options ? self.options.retryOnConcurrencyTimeout : 800);
}
function (callb) {
if (!self.id || !dotty.exists(evt, self.id)) {
debug('has no id, generate new one');
debug('retry in ' + retryIn + 'ms');
setTimeout(function() {
self.handle(evt, callback);
}, retryIn);
}
if (!self.getNewIdForThisSaga) {
return self.sagaStore.getNewId(callb);
}
async.waterfall([
self.getNewIdForThisSaga(evt, callb);
} else {
debug('already has an id');
callb(null, dotty.get(evt, self.id));
}
},
function (callb) {
if (!self.id || !dotty.exists(evt, self.id)) {
debug('has no id, generate new one');
function (id, callb) {
self.sagaStore.get(id, function (err, data) {
if (err) {
return callb(err);
}
if (!self.getNewIdForThisSaga) {
return self.sagaStore.getNewId(callb);
}
if (!data && self.existing) {
debug('this saga only wants to be executed, if already existing');
return callback(null, null);
self.getNewIdForThisSaga(evt, callb);
} else {
debug('already has an id');
callb(null, dotty.get(evt, self.id));
}
},
var sagaModel = new SagaModel(id);
if (data) {
sagaModel.set(data);
sagaModel.actionOnCommit = 'update';
}
callb(null, sagaModel);
});
},
function (sagaModel, callb) {
// attach commit function
debug('attach commit function');
/**
* Commits the saga data and its commands.
* @param {Function} clb The function, that will be called when this action is completed.
* `function(err){}`
*/
sagaModel.commit = function (clb) {
async.parallel([
function (callback) {
self.checkForId(sagaModel.getUndispatchedCommands(), callback);
},
function (callback) {
self.checkForId(sagaModel.getTimeoutCommands(), callback);
}
], function (err) {
function (id, callb) {
self.sagaStore.get(id, function (err, data) {
if (err) {
debug(err);
return callback(err);
return callb(err);
}
if (sagaModel.isDestroyed()) {
self.sagaStore.remove(sagaModel.id, clb);
} else {
sagaModel.setCommitStamp(new Date());
var undispCmds = _.map(sagaModel.getUndispatchedCommands(), function (c) {
return { id: dotty.get(c, self.definitions.command.id), payload: c };
});
if (!data && self.existing) {
debug('this saga only wants to be executed, if already existing');
return callback(null, null);
}
self.sagaStore.save(sagaModel.toJSON(), undispCmds, function (err) {
if (err instanceof ConcurrencyError) {
retry(clb);
return;
}
clb(err);
});
var sagaModel = new SagaModel(id);
if (data) {
sagaModel.set(data);
sagaModel.actionOnCommit = 'update';
}
callb(null, sagaModel);
});
};
},
// attach addCommandToSend function
debug('attach addCommandToSend function');
/**
* Adds the passed command to this model.
* @param {Object} cmd The command that should be sent.
*/
sagaModel.addCommandToSend = function (cmd) {
if (!dotty.exists(cmd, self.definitions.command.meta) && dotty.exists(evt, self.definitions.event.meta) &&
!!self.definitions.command.meta && !!self.definitions.event.meta) {
dotty.put(cmd, self.definitions.command.meta, dotty.get(evt, self.definitions.event.meta));
}
function (sagaModel, callb) {
sagaModel.addUnsentCommand(cmd);
};
// attach commit function
debug('attach commit function');
// attach defineTimeout function
debug('attach defineTimeout function');
/**
* Defines a timeout date and optional timeout commands, and adds them to this model.
* @param {Date} date The timeout date.
* @param {Array} cmds The array of commands.
*/
sagaModel.defineTimeout = function (date, cmds) {
cmds = cmds || [];
if (!_.isArray(cmds)) {
cmds = [cmds];
}
/**
* Commits the saga data and its commands.
* @param {Function} clb The function, that will be called when this action is completed.
* `function(err){}`
*/
sagaModel.commit = function (clb) {
async.parallel([
function (callback) {
self.checkForId(sagaModel.getUndispatchedCommands(), callback);
},
function (callback) {
self.checkForId(sagaModel.getTimeoutCommands(), callback);
}
], function (err) {
if (err) {
debug(err);
return callback(err);
}
if (sagaModel.isDestroyed()) {
self.sagaStore.remove(sagaModel.id, clb);
} else {
sagaModel.setCommitStamp(new Date());
cmds.forEach(function (cmd) {
var undispCmds = _.map(sagaModel.getUndispatchedCommands(), function (c) {
return { id: dotty.get(c, self.definitions.command.id), payload: c };
});
self.sagaStore.save(sagaModel.toJSON(), undispCmds, function (err) {
if (err instanceof ConcurrencyError) {
retry(clb);
return;
}
clb(err);
});
}
});
};
// attach addCommandToSend function
debug('attach addCommandToSend function');
/**
* Adds the passed command to this model.
* @param {Object} cmd The command that should be sent.
*/
sagaModel.addCommandToSend = function (cmd) {
if (!dotty.exists(cmd, self.definitions.command.meta) && dotty.exists(evt, self.definitions.event.meta) &&

@@ -313,61 +306,84 @@ !!self.definitions.command.meta && !!self.definitions.event.meta) {

}
});
sagaModel.addTimeout(date, cmds);
};
sagaModel.addUnsentCommand(cmd);
};
callb(null, sagaModel);
},
function (sagaModel, callb) {
var sagaThis = {
retry: function () {
if (arguments.length === 0) {
return retry();
// attach defineTimeout function
debug('attach defineTimeout function');
/**
* Defines a timeout date and optional timeout commands, and adds them to this model.
* @param {Date} date The timeout date.
* @param {Array} cmds The array of commands.
*/
sagaModel.defineTimeout = function (date, cmds) {
cmds = cmds || [];
if (!_.isArray(cmds)) {
cmds = [cmds];
}
return retry(arguments[0]);
}
};
cmds.forEach(function (cmd) {
if (!dotty.exists(cmd, self.definitions.command.meta) && dotty.exists(evt, self.definitions.event.meta) &&
!!self.definitions.command.meta && !!self.definitions.event.meta) {
dotty.put(cmd, self.definitions.command.meta, dotty.get(evt, self.definitions.event.meta));
}
});
self.shouldHandle(evt, sagaModel, function (err, doHandle) {
if (err) {
return callb(err);
}
sagaModel.addTimeout(date, cmds);
};
if (!doHandle) {
return callb(null, sagaModel);
}
callb(null, sagaModel);
},
self.sagaFn.call(sagaThis, self.getPayload(evt), sagaModel, function (err) {
function (sagaModel, callb) {
var sagaThis = {
retry: function () {
if (arguments.length === 0) {
return retry();
}
return retry(arguments[0]);
}
};
self.shouldHandle(evt, sagaModel, function (err, doHandle) {
if (err) {
return callb(err);
}
callb(null, sagaModel);
if (!doHandle) {
return callb(null, sagaModel);
}
self.sagaFn.call(sagaThis, self.getPayload(evt), sagaModel, function (err) {
if (err) {
return callb(err);
}
callb(null, sagaModel);
});
});
});
},
},
function (sagaModel, callb) {
// detach commit function
debug('detach commit function');
if (sagaModel.commit) {
delete sagaModel.commit;
}
function (sagaModel, callb) {
// detach commit function
debug('detach commit function');
if (sagaModel.commit) {
delete sagaModel.commit;
}
// detach addCommandToSend function
debug('detach addCommandToSend function');
if (sagaModel.addCommandToSend) {
delete sagaModel.addCommandToSend;
}
// detach addCommandToSend function
debug('detach addCommandToSend function');
if (sagaModel.addCommandToSend) {
delete sagaModel.addCommandToSend;
}
// detach defineTimeout function
debug('detach defineTimeout function');
if (sagaModel.defineTimeout) {
delete sagaModel.defineTimeout;
// detach defineTimeout function
debug('detach defineTimeout function');
if (sagaModel.defineTimeout) {
delete sagaModel.defineTimeout;
}
callb(null, sagaModel);
}
callb(null, sagaModel);
}
], callback);
], callback);
});
},

@@ -441,2 +457,46 @@

return this;
},
/**
* Inject shouldHandleEvent function.
* @param {Function} fn The function to be injected.
* @returns {Saga} to be able to chain...
*/
defineShouldHandleEvent: function (fn) {
if (!fn || !_.isFunction(fn)) {
var err = new Error('Please pass a valid function!');
debug(err);
throw err;
}
if (fn.length === 2) {
this.shouldHandleEvent = fn;
return this;
}
this.shouldHandleEvent = function (evt, callback) {
callback(null, fn(evt));
};
var unwrappedShouldHandleEvent = this.shouldHandleEvent;
this.shouldHandleEvent = function (evt, clb) {
var wrappedCallback = function () {
try {
clb.apply(this, _.toArray(arguments));
} catch (e) {
debug(e);
process.emit('uncaughtException', e);
}
};
try {
unwrappedShouldHandleEvent.call(this, evt, wrappedCallback);
} catch (e) {
debug(e);
process.emit('uncaughtException', e);
}
};
return this;
}

@@ -443,0 +503,0 @@

@@ -9,2 +9,3 @@ var util = require('util'),

isNew = mongoVersion.indexOf('1.') !== 0,
isNew = mongoVersion.indexOf('1.') !== 0,
ObjectID = isNew ? mongo.ObjectID : mongo.BSONPure.ObjectID;

@@ -110,5 +111,6 @@

self.store = self.db.collection(options.collectionName);
self.store.ensureIndex({ '_commands.id': 1}, function() {});
self.store.ensureIndex({ '_timeoutAt': 1}, function() {});
self.store.ensureIndex({ '_commitStamp': 1}, function() {});
self.store.createIndex({ '_commands.id': 1}, function() {});
self.store.createIndex({ '_timeoutAt': 1}, function() {});
self.store.createIndex({ '_commitStamp': 1}, function() {});
if (!err) {

@@ -201,3 +203,3 @@ self.emit('connect');

saga._hash = new ObjectID().toString();
this.store.insert(saga, { safe: true }, function (err) {
this.store.insertOne(saga, { safe: true, w: 1 }, function (err) {
if (err && err.message && err.message.indexOf('duplicate key') >= 0) {

@@ -211,3 +213,3 @@ return callback(new ConcurrencyError());

saga._hash = new ObjectID().toString();
this.store.update({ _id: saga._id, _hash: currentHash }, saga, { safe: true }, function(err, modifiedCount) {
this.store.updateOne({ _id: saga._id, _hash: currentHash }, { $set: saga }, { safe: true }, function(err, modifiedCount) {
if (isNew) {

@@ -258,3 +260,3 @@ if (modifiedCount && modifiedCount.result && modifiedCount.result.n === 0) {

this.store.remove({ _id: id }, { safe: true }, function (err) {
this.store.deleteOne({ _id: id }, { safe: true, w: 1 }, function (err) {
if (callback) callback(err);

@@ -261,0 +263,0 @@ });

{
"author": "adrai",
"name": "cqrs-saga",
"version": "1.11.0",
"version": "1.11.1",
"private": false,

@@ -6,0 +6,0 @@ "main": "index.js",

@@ -482,3 +482,12 @@ # Introduction

//
// optional define a function that checks if an event should be handled
// optional define a function that checks if an event should be handled ( before saga is loaded )
//.defineShouldHandleEvent(function (evt) {
// return true;
//});
// or
//.defineShouldHandleEvent(function (evt, callback) {
// callback(null, true');
//});
//
// optional define a function that checks if an event should be handled ( after saga is loaded )
//.defineShouldHandle(function (evt, saga) {

@@ -485,0 +494,0 @@ // return true;

@@ -0,1 +1,5 @@

## [v1.11.1](https://github.com/adrai/node-cqrs-saga/compare/v1.11.0...v1.11.1)
- replace deprecated mongo methods ( ensureIndex, insert, remove, update )
- add optional shouldHandleEvent function, to filter events before loading saga
## [v1.11.0](https://github.com/adrai/node-cqrs-saga/compare/v1.10.2...v1.11.0)

@@ -2,0 +6,0 @@ - add option to add custom structureLoader implementation