Comparing version 1.1.2 to 1.1.3
@@ -28,3 +28,3 @@ 'use strict'; | ||
if (!options.sagaPath) { | ||
if (!options.sagaPath && options.sagaPath !== '') { | ||
var err = new Error('Please provide sagaPath in options'); | ||
@@ -204,3 +204,8 @@ debug(err); | ||
function (callback) { | ||
debug('load saga files..'); | ||
if (self.options.sagaPath === '') { | ||
self.sagas = {}; | ||
debug('empty sagaPath defined so no sagas will be loaded...'); | ||
return callback(null); | ||
} | ||
debug('load saga files...'); | ||
structureLoader(self.options.sagaPath, function (err, sagas) { | ||
@@ -261,11 +266,13 @@ if (err) { | ||
self.eventDispatcher = new EventDispatcher(self.sagas, self.definitions.event); | ||
self.sagas.defineOptions({}) // options??? | ||
.defineCommand(self.definitions.command) | ||
.defineEvent(self.definitions.event) | ||
.idGenerator(self.getNewId) | ||
.useSagaStore(self.sagaStore); | ||
if (self.options.sagaPath !== '') { | ||
self.eventDispatcher = new EventDispatcher(self.sagas, self.definitions.event); | ||
self.sagas.defineOptions({}) // options??? | ||
.defineCommand(self.definitions.command) | ||
.defineEvent(self.definitions.event) | ||
.idGenerator(self.getNewId) | ||
.useSagaStore(self.sagaStore); | ||
} | ||
self.revisionGuard.defineEvent(self.definitions.event); | ||
callback(null); | ||
@@ -289,3 +296,3 @@ } | ||
var self = this; | ||
this.eventDispatcher.dispatch(evt, function (errs, sagaModels) { | ||
@@ -333,3 +340,3 @@ var cmds = []; | ||
}, callback); | ||
}, function (err) { | ||
@@ -364,3 +371,3 @@ if (err) { | ||
} | ||
var self = this; | ||
@@ -411,3 +418,3 @@ | ||
}); | ||
}); | ||
@@ -424,3 +431,3 @@ | ||
var self = this; | ||
this.sagaStore.getTimeoutedSagas(function (err, sagas) { | ||
@@ -431,3 +438,3 @@ if (err) { | ||
} | ||
var sagaModels = []; | ||
@@ -437,7 +444,40 @@ sagas.forEach(function (s) { | ||
sagaModel.set(s); | ||
var calledAddCommandToSend = false; | ||
sagaModel.addCommandToSend = function (cmd) { | ||
calledAddCommandToSend = true; | ||
sagaModel.addUnsentCommand(cmd); | ||
}; | ||
sagaModel.commit = function (clb) { | ||
if (sagaModel.isDestroyed()) { | ||
self.removeSaga(sagaModel, clb); | ||
} else if (calledAddCommandToSend) { | ||
var cmds = sagaModel.getUndispatchedCommands(); | ||
async.each(cmds, function (cmd, fn) { | ||
if (dotty.exists(cmd, self.definitions.command.id)) { | ||
return fn(null); | ||
} | ||
self.getNewId(function (err, id) { | ||
if (err) { | ||
debug(err); | ||
return fn(err); | ||
} | ||
dotty.put(cmd, self.definitions.command.id, id); | ||
fn(null); | ||
}); | ||
}, function (err) { | ||
if (err) { | ||
debug(err); | ||
return callback(err); | ||
} | ||
sagaModel.setCommitStamp(new Date()); | ||
var undispCmds = _.map(sagaModel.getUndispatchedCommands(), function (c) { | ||
return { id: dotty.get(c, self.definitions.command.id), payload: c }; | ||
}); | ||
self.sagaStore.save(sagaModel.toJSON(), undispCmds, clb); | ||
}); | ||
} else { | ||
var err = new Error('Use commit only to remove a saga!'); | ||
var err = new Error('Use commit only to remove a saga or to addCommandToSend!'); | ||
debug(err); | ||
@@ -450,3 +490,3 @@ if (clb) { return clb(err); } | ||
}); | ||
callback(null, sagaModels); | ||
@@ -464,3 +504,3 @@ }); | ||
var self = this; | ||
this.sagaStore.getOlderSagas(date, function (err, sagas) { | ||
@@ -467,0 +507,0 @@ if (err) { |
@@ -45,11 +45,11 @@ 'use strict'; | ||
self.client.createTableIfNotExists(self.options.sagaTableName, callback); | ||
} | ||
}; | ||
var createCommandTable = function (callback) { | ||
self.client.createTableIfNotExists(self.options.commandTableName, callback); | ||
} | ||
}; | ||
var createUndispatchedCommandTable = function (callback) { | ||
self.client.createTableIfNotExists(self.options.undispatchedCommandtableName, callback); | ||
} | ||
}; | ||
@@ -131,3 +131,3 @@ async.parallel([ | ||
}); | ||
} | ||
}; | ||
@@ -148,3 +148,3 @@ var removeCommands = function (callback) { | ||
); | ||
} | ||
}; | ||
@@ -165,3 +165,3 @@ var removeUndispatchedCommands = function (callback) { | ||
); | ||
} | ||
}; | ||
@@ -207,3 +207,4 @@ async.parallel([ | ||
RowKey: eg.String(cmd.id), | ||
payload: eg.String(JSON.stringify(cmd)) | ||
payload: eg.String(JSON.stringify(cmd)), | ||
commitStamp: eg.DateTime(saga._commitStamp) | ||
}; | ||
@@ -397,3 +398,3 @@ | ||
return {sagaId: entity.PartitionKey._, commandId: entity.RowKey._, command: data.payload}; | ||
return {sagaId: entity.PartitionKey._, commandId: entity.RowKey._, command: data.payload, commitStamp: entity.commitStamp._}; | ||
}); | ||
@@ -435,4 +436,3 @@ | ||
} | ||
, | ||
}, | ||
@@ -458,3 +458,3 @@ clear: function (callback) { | ||
); | ||
} | ||
}; | ||
@@ -475,3 +475,3 @@ var clearCommandTable = function (callback) { | ||
); | ||
} | ||
}; | ||
@@ -500,4 +500,3 @@ var clearUndispatchedCommandTable = function (callback) { | ||
} | ||
}) | ||
; | ||
}); | ||
@@ -508,5 +507,5 @@ function sagaResolver(entity) { | ||
return res; | ||
}; | ||
} | ||
module.exports = AzureTable; |
@@ -29,3 +29,3 @@ 'use strict'; | ||
}, | ||
get: function (id, callback) { | ||
@@ -37,3 +37,3 @@ if (!id || !_.isString(id)) { | ||
} | ||
callback(null, this.store[id] || null); | ||
@@ -48,3 +48,3 @@ }, | ||
} | ||
if (this.store[id]) { | ||
@@ -85,3 +85,3 @@ delete this.store[id]; | ||
if ((this.store[saga.id] && saga._hash && saga._hash !== this.store[saga.id]._hash) || | ||
if ((this.store[saga.id] && saga._hash && saga._hash !== this.store[saga.id]._hash) || | ||
(!this.store[saga.id] && saga._hash) || | ||
@@ -96,6 +96,6 @@ (this.store[saga.id] && this.store[saga.id]._hash && !saga._hash)) { | ||
saga._hash = uuid().toString(); | ||
this.store[saga.id] = saga; | ||
this.cmds[saga.id] = this.cmds[saga.id] || {}; | ||
var self = this; | ||
@@ -105,3 +105,3 @@ cmds.forEach(function (cmd) { | ||
}); | ||
if (callback) { callback(null); } | ||
@@ -114,3 +114,3 @@ }, | ||
}); | ||
callback(null, res); | ||
@@ -125,3 +125,3 @@ }, | ||
} | ||
var res = _.filter(_.values(this.store), function (s) { | ||
@@ -138,6 +138,6 @@ return s._commitStamp.getTime() <= (date).getTime(); | ||
for (var cmdId in this.cmds[sagaId]) { | ||
res.push({ sagaId: sagaId, commandId: cmdId, command: this.cmds[sagaId][cmdId] }); | ||
res.push({ sagaId: sagaId, commandId: cmdId, command: this.cmds[sagaId][cmdId], commitStamp: this.store[sagaId]._commitStamp }); | ||
} | ||
} | ||
callback(null, res); | ||
@@ -158,3 +158,3 @@ }, | ||
} | ||
if (!this.cmds[sagaId] || !this.cmds[sagaId][cmdId]) { | ||
@@ -166,3 +166,3 @@ if (callback) { callback(null); } | ||
delete this.cmds[sagaId][cmdId]; | ||
callback(null); | ||
@@ -169,0 +169,0 @@ }, |
@@ -124,3 +124,3 @@ 'use strict'; | ||
} | ||
saga._id = saga.id; | ||
@@ -164,11 +164,11 @@ saga._commands = cmds; | ||
} | ||
if (saga._commands) { | ||
delete saga._commands; | ||
} | ||
callback(null, saga); | ||
}); | ||
}, | ||
remove: function (id, callback) { | ||
@@ -180,3 +180,3 @@ if (!id || !_.isString(id)) { | ||
} | ||
this.store.remove({ _id: id }, { safe: true }, function (err) { | ||
@@ -242,3 +242,3 @@ if (callback) callback(err); | ||
s._commands.forEach(function (c) { | ||
res.push({ sagaId: s._id, commandId: c.id, command: c.payload }); | ||
res.push({ sagaId: s._id, commandId: c.id, command: c.payload, commitStamp: s._commitStamp }); | ||
}); | ||
@@ -245,0 +245,0 @@ } |
@@ -152,3 +152,3 @@ 'use strict'; | ||
} | ||
var cmdMap = []; | ||
@@ -159,2 +159,3 @@ | ||
cmd.payload._commandId = cmd.id; | ||
cmd.payload._commitStamp = saga._commitStamp; | ||
cmdMap.push(self.options.prefix + '_command' + ':' + cmd.payload._sagaId+ ':' + cmd.payload._commandId); | ||
@@ -168,3 +169,3 @@ cmdMap.push(JSON.stringify(cmd.payload)); | ||
} | ||
self.get(saga.id, function (err, s) { | ||
@@ -228,3 +229,3 @@ if (err) { | ||
} | ||
var self = this; | ||
@@ -277,3 +278,3 @@ | ||
var self = this; | ||
async.parallel([ | ||
@@ -314,3 +315,3 @@ function (callback) { | ||
var self = this; | ||
this.client.keys(this.options.prefix + '_saga:*:*:*', function (err, keys) { | ||
@@ -328,3 +329,3 @@ if (err) { | ||
}); | ||
async.each(keys, function (key, callback) { | ||
@@ -350,7 +351,7 @@ var parts = key.split(':'); | ||
} | ||
if (timeoutAtMs > (new Date()).getTime()) { | ||
return callback(null); | ||
} | ||
self.get(sagaId, function (err, saga) { | ||
@@ -365,3 +366,3 @@ if (err) { | ||
}); | ||
}, function (err) { | ||
@@ -471,4 +472,4 @@ if (err) { | ||
} | ||
res.push({ sagaId: data._sagaId, commandId: data._commandId, command: data }); | ||
res.push({ sagaId: data._sagaId, commandId: data._commandId, command: data, commitStamp: data._commitStamp }); | ||
callback(null); | ||
@@ -475,0 +476,0 @@ }); |
{ | ||
"author": "adrai", | ||
"name": "cqrs-saga", | ||
"version": "1.1.2", | ||
"version": "1.1.3", | ||
"private": false, | ||
@@ -6,0 +6,0 @@ "main": "index.js", |
129
README.md
@@ -20,3 +20,3 @@ # Introduction | ||
sagaPath: '/path/to/my/files', | ||
// optional, default is 800 | ||
@@ -27,3 +27,3 @@ // if using in scaled systems and not guaranteeing that each event for a saga "instance" | ||
retryOnConcurrencyTimeout: 1000, | ||
// optional, default is in-memory | ||
@@ -53,3 +53,3 @@ // currently supports: mongodb, redis, azuretable and inmemory | ||
}, | ||
// optional, default is in-memory | ||
@@ -62,3 +62,3 @@ // the revisionguard only works if aggregateId and revision are defined in event definition | ||
queueTimeoutMaxLoops: 3 // optional, maximal loop count for non-handled event in the internal in-memory queue | ||
type: 'redis', | ||
@@ -81,7 +81,7 @@ host: 'localhost', // optional | ||
}); | ||
pm.sagaStore.on('disconnect', function() { | ||
console.log('sagaStore disconnected'); | ||
}); | ||
// revisionGuardStore | ||
@@ -91,8 +91,8 @@ pm.revisionGuardStore.on('connect', function() { | ||
}); | ||
pm.revisionGuardStore.on('disconnect', function() { | ||
console.log('revisionGuardStore disconnected'); | ||
}); | ||
// anything (sagaStore or revisionGuardStore) | ||
@@ -102,3 +102,3 @@ pm.on('connect', function() { | ||
}); | ||
pm.on('disconnect', function() { | ||
@@ -115,19 +115,19 @@ console.log('something disconnected'); | ||
name: 'name', | ||
// optional, only makes sense if contexts are defined in the 'domainPath' structure | ||
// optional, only makes sense if contexts are defined in the 'domainPath' structure | ||
context: 'context.name', | ||
// optional, only makes sense if aggregates with names are defined in the 'domainPath' structure | ||
aggregate: 'aggregate.name', | ||
// optional, default is 'aggregate.id' | ||
aggregateId: 'aggregate.id', | ||
// optional, default is 'revision' | ||
// will represent the aggregate revision, can be used in next command | ||
revision: 'revision', | ||
// optional | ||
version: 'version', | ||
// optional, if defined theses values will be copied to the command (can be used to transport information like userId, etc..) | ||
@@ -144,3 +144,3 @@ meta: 'meta' | ||
id: 'id', | ||
// optional, if defined the values of the event will be copied to the command (can be used to transport information like userId, etc..) | ||
@@ -176,3 +176,3 @@ meta: 'meta' | ||
}); | ||
### or you can define an asynchronous function | ||
@@ -192,3 +192,3 @@ | ||
pm.onEventMissing(function (info, evt) { | ||
// grab the missing events, depending from info values... | ||
@@ -204,3 +204,3 @@ // info.aggregateId | ||
}); | ||
}); | ||
@@ -210,9 +210,9 @@ | ||
## Initialization | ||
pm.init(function (err) { | ||
// this callback is called when all is ready... | ||
}); | ||
// or | ||
pm.init(); // callback is optional | ||
@@ -243,5 +243,5 @@ | ||
}); // callback is optional | ||
### or | ||
pm.handle({ | ||
@@ -274,5 +274,5 @@ id: 'b80ade36-dd05-4340-8a8b-846eea6e286f', | ||
}); | ||
### more infos, can be useful if testing | ||
pm.handle({ | ||
@@ -298,6 +298,6 @@ id: 'b80ade36-dd05-4340-8a8b-846eea6e286f', | ||
// errs: is the same as described before | ||
// cmds: same as passed in 'onCommand' function | ||
// cmds: in case of no error or in case of error here is the array of all commands that should be published | ||
// sagaModels: represents the saga data after have handled the event | ||
@@ -314,30 +314,30 @@ }); | ||
name: 'orderCreated', | ||
// optional | ||
aggregate: 'order', | ||
// optional | ||
context: 'sale', | ||
// optional, default 0 | ||
version: 1, | ||
// optional, default false | ||
// if true it will check if there is already a saga in the db and only if there is something it will continue... | ||
existing: false, | ||
// optional, will catch the event only if it contains the defined properties | ||
containingProperties: ['aggregate.id', 'payload.totalCosts', 'payload.seats'], | ||
// optional, if not defined it will pass the whole event... | ||
payload: 'payload', | ||
// optional, if not defined it will generate a new id | ||
// it will try to load the saga from the db by this id | ||
id: 'aggregate.id', | ||
// optional, default Infinity, all sagas will be sorted by this value | ||
priority: 1 | ||
}, function (evt, saga, callback) { | ||
saga.set('orderId', evt.aggregate.id); | ||
@@ -347,5 +347,5 @@ saga.set('totalCosts', evt.payload.totalCosts); | ||
// saga.set({ orderId: evt.aggregate.id, totalCosts: evt.payload.totalCosts }); | ||
var cmd = { | ||
// if you don't pass an id it will generate a new one | ||
@@ -364,3 +364,3 @@ id: 'my own command id', | ||
}, | ||
// to transport meta infos (like userId)... | ||
@@ -370,11 +370,11 @@ // if not defined, it will use the meta value of the event | ||
}; | ||
saga.addCommandToSend(cmd); | ||
// optionally define a timeout | ||
// this can be useful if you have an other process that will fetch timeouted sagas | ||
var tomorrow = new Date(); | ||
tomorrow.setDate((new Date()).getDate() + 1); | ||
tomorrow.setDate((new Date()).getDate() + 1); | ||
var timeoutCmd = { | ||
// if you don't pass an id it will generate a new one | ||
@@ -393,3 +393,3 @@ id: 'my own command id', | ||
}, | ||
// to transport meta infos (like userId)... | ||
@@ -404,3 +404,3 @@ // if not defined, it will use the meta value of the event | ||
// saga.defineTimeout(tomorrow); | ||
saga.commit(callback); | ||
@@ -425,10 +425,23 @@ }); | ||
if (err) { return console.log('ohh!'); } | ||
sagas.forEach(function (saga) { | ||
// saga.id... | ||
// saga.getTimeoutAt(); | ||
// saga.getTimeoutCommands(); | ||
// if saga does not clean itself after timouted and/or no commands are defined, then: | ||
pm.removeSaga(saga || saga.id, function (err) {}); | ||
var cmds = saga.getTimeoutCommands(); | ||
cmds.forEach(function (cmd) { | ||
saga.addCommandToSend(cmd); | ||
}); | ||
saga.commit(function (err) { | ||
cmds.forEach(function (cmd) { | ||
// publish cmd... | ||
// msgBus.send(cmd); | ||
// ... and set to dispatched... | ||
pm.setCommandToDispatched(cmd.id, saga.id, function (err) {}); | ||
}); | ||
}); | ||
// or if saga does not clean itself after timouted and/or no commands are defined, then: | ||
// pm.removeSaga(saga || saga.id, function (err) {}); | ||
// or | ||
@@ -445,3 +458,3 @@ // saga.destroy(); | ||
if (err) { return console.log('ohh!'); } | ||
sagas.forEach(function (saga) { | ||
@@ -451,3 +464,3 @@ // saga.id... | ||
// saga.getTimeoutCommands(); | ||
// if saga does not clean itself after timouted and/or no commands are defined, then: | ||
@@ -468,6 +481,6 @@ pm.removeSaga(saga || saga.id, function (err) {}); | ||
if (err) { return console.log('ohh!'); } | ||
cmds.forEach(function (cmd) { | ||
// cmd is: { sagaId: 'the id of the saga', commandId: 'the id of the command', command: { /* the command */ } } | ||
// cmd is: { sagaId: 'the id of the saga', commandId: 'the id of the command', commitStamp: 'a date', command: { /* the command */ } } | ||
pm.setCommandToDispatched(cmd.commandId, cmd.sagaId, function (err) {}); | ||
@@ -474,0 +487,0 @@ }); |
@@ -0,1 +1,5 @@ | ||
## [v1.1.3](https://github.com/adrai/node-cqrs-saga/compare/v1.1.2...v1.1.3) | ||
- added commitstamp to getUndispatchedcommands | ||
- added possibility to addCommandToSend for timeoutedSagas | ||
## [v1.1.2](https://github.com/adrai/node-cqrs-saga/compare/v1.1.1...v1.1.2) | ||
@@ -2,0 +6,0 @@ - handle case of same aggregateId in different contexts or aggregates |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
261166
3917
488