You're Invited: Meet the Socket team at BSidesSF and RSAC - April 27 - May 1.RSVP
Socket
Sign inDemoInstall
Socket

cqrs-saga

Package Overview
Dependencies
Maintainers
1
Versions
64
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

cqrs-saga - npm Package Compare versions

Comparing version

to
1.1.0

.idea/eslintPlugin.xml

2

lib/definitions/saga.js

@@ -5,3 +5,3 @@ 'use strict';

SagaModel = require('../sagaModel'),
ConcurrencyError = require('../concurrencyError'),
ConcurrencyError = require('../errors/concurrencyError'),
util = require('util'),

@@ -8,0 +8,0 @@ _ = require('lodash'),

@@ -12,2 +12,4 @@ 'use strict';

dotty = require('dotty'),
RevisionGuard = require('./revisionGuard'),
revisionGuardStore = require('./revisionGuardStore'),
SagaModel = require('./sagaModel'),

@@ -37,2 +39,13 @@ structureLoader = require('./structure/structureLoader'),

var defaultRevOpt = {
queueTimeout: 1000,
queueTimeoutMaxLoops: 3
};
options.revisionGuard = options.revisionGuard || {};
_.defaults(options.revisionGuard, defaultRevOpt);
this.revisionGuardStore = revisionGuardStore.create(options.revisionGuard);
this.options = options;

@@ -73,2 +86,6 @@

});
this.onEventMissing(function (info, evt) {
debug('missing events: ', info, evt);
});
}

@@ -161,2 +178,19 @@

/**
* Inject function for event missing handle.
* @param {Function} fn the function to be injected
* @returns {ProcessManager} to be able to chain...
*/
onEventMissing: function (fn) {
if (!fn || !_.isFunction(fn)) {
var err = new Error('Please pass a valid function!');
debug(err);
throw err;
}
this.onEventMissingHandle = fn;
return this;
},
/**
* Call this function to initialize the saga.

@@ -183,15 +217,37 @@ * @param {Function} callback the function that will be called when this action has finished [optional]

// prepare sagaStore...
// prepare infrastructure...
function (callback) {
debug('prepare sagaStore...');
debug('prepare infrastructure...');
async.parallel([
self.sagaStore.on('connect', function () {
self.emit('connect');
});
// prepare sagaStore...
function (callback) {
debug('prepare sagaStore...');
self.sagaStore.on('disconnect', function () {
self.emit('disconnect');
});
self.sagaStore.on('connect', function () {
self.emit('connect');
});
self.sagaStore.connect(callback);
self.sagaStore.on('disconnect', function () {
self.emit('disconnect');
});
self.sagaStore.connect(callback);
},
// prepare revisionGuard...
function (callback) {
debug('prepare revisionGuard...');
self.revisionGuardStore.on('connect', function () {
self.emit('connect');
});
self.revisionGuardStore.on('disconnect', function () {
self.emit('disconnect');
});
self.revisionGuardStore.connect(callback);
}
], callback);
},

@@ -203,2 +259,7 @@

self.revisionGuard = new RevisionGuard(self.revisionGuardStore, self.options.revisionGuard);
self.revisionGuard.onEventMissing(function (info, evt) {
self.onEventMissingHandle(info, evt);
});
self.eventDispatcher = new EventDispatcher(self.sagas, self.definitions.event);

@@ -211,2 +272,4 @@ self.sagas.defineOptions({}) // options???

self.revisionGuard.defineEvent(self.definitions.event);
callback(null);

@@ -223,19 +286,13 @@ }

/**
* Call this function to let the saga handle it.
* Call this function to forward it to the dispatcher.
* @param {Object} evt The event object
* @param {Function} callback The function that will be called when this action has finished [optional]
* `function(err, cmds, sagaModels){}` cmds and sagaModels are of type Array
* `function(errs, evt, notifications){}` notifications is of type Array
*/
handle: function (evt, callback) {
if (!evt || !_.isObject(evt)) {
var err = new Error('Please pass a valid event!');
debug(err);
throw err;
}
dispatch: function (evt, callback) {
var self = this;
var self = this;
this.eventDispatcher.dispatch(evt, function (errs, sagaModels) {
var cmds = [];
if (!sagaModels || sagaModels.length === 0) {

@@ -247,5 +304,5 @@ if (callback) {

}
async.each(sagaModels, function (sagaModel, callback) {
var cmdsToSend = sagaModel.getUndispatchedCommands();

@@ -264,3 +321,3 @@

}
async.each(cmdsToSend, function (cmd, callback) {

@@ -280,6 +337,5 @@

}
}, callback);
}, function (err) {

@@ -295,15 +351,70 @@ if (err) {

if (callback) {
// var sagaModelsData = _.map(sagaModels, function (s) {
// var json = s.toJSON();
// if (s.isDestroyed()) {
// json._destroyed = true;
// }
// return json;
// });
// callback(errs, cmds, sagaModelsData);
callback(errs, cmds, sagaModels);
}
});
});
},
/**
* Call this function to let the saga handle it.
* @param {Object} evt The event object
* @param {Function} callback The function that will be called when this action has finished [optional]
* `function(err, cmds, sagaModels){}` cmds and sagaModels are of type Array
*/
handle: function (evt, callback) {
if (!evt || !_.isObject(evt)) {
var err = new Error('Please pass a valid event!');
debug(err);
throw err;
}
var self = this;
var workWithRevisionGuard = false;
if (!!this.definitions.event.revision && dotty.exists(evt, this.definitions.event.revision) &&
!!this.definitions.event.aggregateId && dotty.exists(evt, this.definitions.event.aggregateId)) {
workWithRevisionGuard = true;
}
if (!workWithRevisionGuard) {
return this.dispatch(evt, callback);
}
this.revisionGuard.guard(evt, function (err, done) {
if (err) {
debug(err);
if (callback) {
callback([err]);
}
return;
}
self.dispatch(evt, function (errs, cmds, sagaModels) {
if (errs) {
debug(errs);
if (callback) {
callback(errs, cmds, sagaModels);
}
return;
}
done(function (err) {
if (err) {
if (!errs) {
errs = [err];
} else if (_.isArray(errs)) {
errs.unshift(err);
}
debug(err);
}
if (callback) {
callback(errs, cmds, sagaModels);
}
});
});
});
},

@@ -310,0 +421,0 @@

@@ -7,3 +7,3 @@ 'use strict';

uuid = require('node-uuid').v4,
ConcurrencyError = require('../../concurrencyError'),
ConcurrencyError = require('../../errors/concurrencyError'),
_ = require('lodash');

@@ -10,0 +10,0 @@

@@ -7,3 +7,3 @@ 'use strict';

debug = require('debug')('saga:mongodb'),
ConcurrencyError = require('../../concurrencyError'),
ConcurrencyError = require('../../errors/concurrencyError'),
mongo = require('mongodb'),

@@ -10,0 +10,0 @@ ObjectID = mongo.BSONPure.ObjectID;

@@ -8,3 +8,3 @@ 'use strict';

uuid = require('node-uuid').v4,
ConcurrencyError = require('../../concurrencyError'),
ConcurrencyError = require('../../errors/concurrencyError'),
jsondate = require('jsondate'),

@@ -11,0 +11,0 @@ async = require('async'),

{
"author": "adrai",
"name": "cqrs-saga",
"version": "1.0.2",
"version": "1.1.0",
"private": false,

@@ -6,0 +6,0 @@ "main": "index.js",

@@ -50,2 +50,19 @@ # Introduction

// password: 'secret' // optional
},
// optional, default is in-memory
// the revisionguard only works if aggregateId and revision are defined in event definition
// currently supports: mongodb, redis, tingodb and inmemory
// hint settings like: [eventstore](https://github.com/adrai/node-eventstore#provide-implementation-for-storage)
revisionGuard: {
queueTimeout: 1000, // optional, timeout for non-handled events in the internal in-memory queue
queueTimeoutMaxLoops: 3 // optional, maximal loop count for non-handled event in the internal in-memory queue
type: 'redis',
host: 'localhost', // optional
port: 6379, // optional
db: 0, // optional
prefix: 'readmodel_revision', // optional
timeout: 10000 // optional
// password: 'secret' // optional
}

@@ -66,4 +83,13 @@ });

// revisionGuardStore
pm.revisionGuardStore.on('connect', function() {
console.log('revisionGuardStore connected');
});
// anything (at the moment only sagaStore)
pm.revisionGuardStore.on('disconnect', function() {
console.log('revisionGuardStore disconnected');
});
// anything (sagaStore or revisionGuardStore)
pm.on('connect', function() {

@@ -91,2 +117,9 @@ console.log('something connected');

// optional, default is 'aggregate.id'
aggregateId: 'aggregate.id',
// optional, default is 'revision'
// will represent the aggregate revision, can be used in next command
revision: 'revision',
// optional

@@ -148,2 +181,21 @@ version: 'version',

## Wire up event missing [optional]
### you can define a synchronous function
pm.onEventMissing(function (info, evt) {
// grab the missing events, depending from info values...
// info.aggregateId
// info.aggregateRevision
// info.aggregate
// info.context
// info.guardRevision
// and call handle...
pm.handle(missingEvent, function (err) {
if (err) { console.log(err); }
});
});
## Initialization

@@ -150,0 +202,0 @@

@@ -0,1 +1,4 @@

## [v1.1.0](https://github.com/adrai/node-cqrs-saga/compare/v1.0.2...v1.1.0)
- introduce revisionGuard
## v1.0.2

@@ -8,3 +11,2 @@ - saga: optional define a function to that returns an id that will be used as saga id

## v1.0.0
- first stable release

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet