Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

cqrs-saga

Package Overview
Dependencies
Maintainers
1
Versions
64
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

cqrs-saga - npm Package Compare versions

Comparing version 1.1.2 to 1.1.3

projectFilesBackup/.idea/workspace.xml

76

lib/pm.js

@@ -28,3 +28,3 @@ 'use strict';

if (!options.sagaPath) {
if (!options.sagaPath && options.sagaPath !== '') {
var err = new Error('Please provide sagaPath in options');

@@ -204,3 +204,8 @@ debug(err);

function (callback) {
debug('load saga files..');
if (self.options.sagaPath === '') {
self.sagas = {};
debug('empty sagaPath defined so no sagas will be loaded...');
return callback(null);
}
debug('load saga files...');
structureLoader(self.options.sagaPath, function (err, sagas) {

@@ -261,11 +266,13 @@ if (err) {

self.eventDispatcher = new EventDispatcher(self.sagas, self.definitions.event);
self.sagas.defineOptions({}) // options???
.defineCommand(self.definitions.command)
.defineEvent(self.definitions.event)
.idGenerator(self.getNewId)
.useSagaStore(self.sagaStore);
if (self.options.sagaPath !== '') {
self.eventDispatcher = new EventDispatcher(self.sagas, self.definitions.event);
self.sagas.defineOptions({}) // options???
.defineCommand(self.definitions.command)
.defineEvent(self.definitions.event)
.idGenerator(self.getNewId)
.useSagaStore(self.sagaStore);
}
self.revisionGuard.defineEvent(self.definitions.event);
callback(null);

@@ -289,3 +296,3 @@ }

var self = this;
this.eventDispatcher.dispatch(evt, function (errs, sagaModels) {

@@ -333,3 +340,3 @@ var cmds = [];

}, callback);
}, function (err) {

@@ -364,3 +371,3 @@ if (err) {

}
var self = this;

@@ -411,3 +418,3 @@

});
});

@@ -424,3 +431,3 @@

var self = this;
this.sagaStore.getTimeoutedSagas(function (err, sagas) {

@@ -431,3 +438,3 @@ if (err) {

}
var sagaModels = [];

@@ -437,7 +444,40 @@ sagas.forEach(function (s) {

sagaModel.set(s);
var calledAddCommandToSend = false;
sagaModel.addCommandToSend = function (cmd) {
calledAddCommandToSend = true;
sagaModel.addUnsentCommand(cmd);
};
sagaModel.commit = function (clb) {
if (sagaModel.isDestroyed()) {
self.removeSaga(sagaModel, clb);
} else if (calledAddCommandToSend) {
var cmds = sagaModel.getUndispatchedCommands();
async.each(cmds, function (cmd, fn) {
if (dotty.exists(cmd, self.definitions.command.id)) {
return fn(null);
}
self.getNewId(function (err, id) {
if (err) {
debug(err);
return fn(err);
}
dotty.put(cmd, self.definitions.command.id, id);
fn(null);
});
}, function (err) {
if (err) {
debug(err);
return callback(err);
}
sagaModel.setCommitStamp(new Date());
var undispCmds = _.map(sagaModel.getUndispatchedCommands(), function (c) {
return { id: dotty.get(c, self.definitions.command.id), payload: c };
});
self.sagaStore.save(sagaModel.toJSON(), undispCmds, clb);
});
} else {
var err = new Error('Use commit only to remove a saga!');
var err = new Error('Use commit only to remove a saga or to addCommandToSend!');
debug(err);

@@ -450,3 +490,3 @@ if (clb) { return clb(err); }

});
callback(null, sagaModels);

@@ -464,3 +504,3 @@ });

var self = this;
this.sagaStore.getOlderSagas(date, function (err, sagas) {

@@ -467,0 +507,0 @@ if (err) {

@@ -45,11 +45,11 @@ 'use strict';

self.client.createTableIfNotExists(self.options.sagaTableName, callback);
}
};
var createCommandTable = function (callback) {
self.client.createTableIfNotExists(self.options.commandTableName, callback);
}
};
var createUndispatchedCommandTable = function (callback) {
self.client.createTableIfNotExists(self.options.undispatchedCommandtableName, callback);
}
};

@@ -131,3 +131,3 @@ async.parallel([

});
}
};

@@ -148,3 +148,3 @@ var removeCommands = function (callback) {

);
}
};

@@ -165,3 +165,3 @@ var removeUndispatchedCommands = function (callback) {

);
}
};

@@ -207,3 +207,4 @@ async.parallel([

RowKey: eg.String(cmd.id),
payload: eg.String(JSON.stringify(cmd))
payload: eg.String(JSON.stringify(cmd)),
commitStamp: eg.DateTime(saga._commitStamp)
};

@@ -397,3 +398,3 @@

return {sagaId: entity.PartitionKey._, commandId: entity.RowKey._, command: data.payload};
return {sagaId: entity.PartitionKey._, commandId: entity.RowKey._, command: data.payload, commitStamp: entity.commitStamp._};
});

@@ -435,4 +436,3 @@

}
,
},

@@ -458,3 +458,3 @@ clear: function (callback) {

);
}
};

@@ -475,3 +475,3 @@ var clearCommandTable = function (callback) {

);
}
};

@@ -500,4 +500,3 @@ var clearUndispatchedCommandTable = function (callback) {

}
})
;
});

@@ -508,5 +507,5 @@ function sagaResolver(entity) {

return res;
};
}
module.exports = AzureTable;

@@ -29,3 +29,3 @@ 'use strict';

},
get: function (id, callback) {

@@ -37,3 +37,3 @@ if (!id || !_.isString(id)) {

}
callback(null, this.store[id] || null);

@@ -48,3 +48,3 @@ },

}
if (this.store[id]) {

@@ -85,3 +85,3 @@ delete this.store[id];

if ((this.store[saga.id] && saga._hash && saga._hash !== this.store[saga.id]._hash) ||
if ((this.store[saga.id] && saga._hash && saga._hash !== this.store[saga.id]._hash) ||
(!this.store[saga.id] && saga._hash) ||

@@ -96,6 +96,6 @@ (this.store[saga.id] && this.store[saga.id]._hash && !saga._hash)) {

saga._hash = uuid().toString();
this.store[saga.id] = saga;
this.cmds[saga.id] = this.cmds[saga.id] || {};
var self = this;

@@ -105,3 +105,3 @@ cmds.forEach(function (cmd) {

});
if (callback) { callback(null); }

@@ -114,3 +114,3 @@ },

});
callback(null, res);

@@ -125,3 +125,3 @@ },

}
var res = _.filter(_.values(this.store), function (s) {

@@ -138,6 +138,6 @@ return s._commitStamp.getTime() <= (date).getTime();

for (var cmdId in this.cmds[sagaId]) {
res.push({ sagaId: sagaId, commandId: cmdId, command: this.cmds[sagaId][cmdId] });
res.push({ sagaId: sagaId, commandId: cmdId, command: this.cmds[sagaId][cmdId], commitStamp: this.store[sagaId]._commitStamp });
}
}
callback(null, res);

@@ -158,3 +158,3 @@ },

}
if (!this.cmds[sagaId] || !this.cmds[sagaId][cmdId]) {

@@ -166,3 +166,3 @@ if (callback) { callback(null); }

delete this.cmds[sagaId][cmdId];
callback(null);

@@ -169,0 +169,0 @@ },

@@ -124,3 +124,3 @@ 'use strict';

}
saga._id = saga.id;

@@ -164,11 +164,11 @@ saga._commands = cmds;

}
if (saga._commands) {
delete saga._commands;
}
callback(null, saga);
});
},
remove: function (id, callback) {

@@ -180,3 +180,3 @@ if (!id || !_.isString(id)) {

}
this.store.remove({ _id: id }, { safe: true }, function (err) {

@@ -242,3 +242,3 @@ if (callback) callback(err);

s._commands.forEach(function (c) {
res.push({ sagaId: s._id, commandId: c.id, command: c.payload });
res.push({ sagaId: s._id, commandId: c.id, command: c.payload, commitStamp: s._commitStamp });
});

@@ -245,0 +245,0 @@ }

@@ -152,3 +152,3 @@ 'use strict';

}
var cmdMap = [];

@@ -159,2 +159,3 @@

cmd.payload._commandId = cmd.id;
cmd.payload._commitStamp = saga._commitStamp;
cmdMap.push(self.options.prefix + '_command' + ':' + cmd.payload._sagaId+ ':' + cmd.payload._commandId);

@@ -168,3 +169,3 @@ cmdMap.push(JSON.stringify(cmd.payload));

}
self.get(saga.id, function (err, s) {

@@ -228,3 +229,3 @@ if (err) {

}
var self = this;

@@ -277,3 +278,3 @@

var self = this;
async.parallel([

@@ -314,3 +315,3 @@ function (callback) {

var self = this;
this.client.keys(this.options.prefix + '_saga:*:*:*', function (err, keys) {

@@ -328,3 +329,3 @@ if (err) {

});
async.each(keys, function (key, callback) {

@@ -350,7 +351,7 @@ var parts = key.split(':');

}
if (timeoutAtMs > (new Date()).getTime()) {
return callback(null);
}
self.get(sagaId, function (err, saga) {

@@ -365,3 +366,3 @@ if (err) {

});
}, function (err) {

@@ -471,4 +472,4 @@ if (err) {

}
res.push({ sagaId: data._sagaId, commandId: data._commandId, command: data });
res.push({ sagaId: data._sagaId, commandId: data._commandId, command: data, commitStamp: data._commitStamp });
callback(null);

@@ -475,0 +476,0 @@ });

{
"author": "adrai",
"name": "cqrs-saga",
"version": "1.1.2",
"version": "1.1.3",
"private": false,

@@ -6,0 +6,0 @@ "main": "index.js",

@@ -20,3 +20,3 @@ # Introduction

sagaPath: '/path/to/my/files',
// optional, default is 800

@@ -27,3 +27,3 @@ // if using in scaled systems and not guaranteeing that each event for a saga "instance"

retryOnConcurrencyTimeout: 1000,
// optional, default is in-memory

@@ -53,3 +53,3 @@ // currently supports: mongodb, redis, azuretable and inmemory

},
// optional, default is in-memory

@@ -62,3 +62,3 @@ // the revisionguard only works if aggregateId and revision are defined in event definition

queueTimeoutMaxLoops: 3 // optional, maximal loop count for non-handled event in the internal in-memory queue
type: 'redis',

@@ -81,7 +81,7 @@ host: 'localhost', // optional

});
pm.sagaStore.on('disconnect', function() {
console.log('sagaStore disconnected');
});
// revisionGuardStore

@@ -91,8 +91,8 @@ pm.revisionGuardStore.on('connect', function() {

});
pm.revisionGuardStore.on('disconnect', function() {
console.log('revisionGuardStore disconnected');
});
// anything (sagaStore or revisionGuardStore)

@@ -102,3 +102,3 @@ pm.on('connect', function() {

});
pm.on('disconnect', function() {

@@ -115,19 +115,19 @@ console.log('something disconnected');

name: 'name',
// optional, only makes sense if contexts are defined in the 'domainPath' structure
// optional, only makes sense if contexts are defined in the 'domainPath' structure
context: 'context.name',
// optional, only makes sense if aggregates with names are defined in the 'domainPath' structure
aggregate: 'aggregate.name',
// optional, default is 'aggregate.id'
aggregateId: 'aggregate.id',
// optional, default is 'revision'
// will represent the aggregate revision, can be used in next command
revision: 'revision',
// optional
version: 'version',
// optional, if defined theses values will be copied to the command (can be used to transport information like userId, etc..)

@@ -144,3 +144,3 @@ meta: 'meta'

id: 'id',
// optional, if defined the values of the event will be copied to the command (can be used to transport information like userId, etc..)

@@ -176,3 +176,3 @@ meta: 'meta'

});
### or you can define an asynchronous function

@@ -192,3 +192,3 @@

pm.onEventMissing(function (info, evt) {
// grab the missing events, depending from info values...

@@ -204,3 +204,3 @@ // info.aggregateId

});
});

@@ -210,9 +210,9 @@

## Initialization
pm.init(function (err) {
// this callback is called when all is ready...
});
// or
pm.init(); // callback is optional

@@ -243,5 +243,5 @@

}); // callback is optional
### or
pm.handle({

@@ -274,5 +274,5 @@ id: 'b80ade36-dd05-4340-8a8b-846eea6e286f',

});
### more infos, can be useful if testing
pm.handle({

@@ -298,6 +298,6 @@ id: 'b80ade36-dd05-4340-8a8b-846eea6e286f',

// errs: is the same as described before
// cmds: same as passed in 'onCommand' function
// cmds: in case of no error or in case of error here is the array of all commands that should be published
// sagaModels: represents the saga data after have handled the event

@@ -314,30 +314,30 @@ });

name: 'orderCreated',
// optional
aggregate: 'order',
// optional
context: 'sale',
// optional, default 0
version: 1,
// optional, default false
// if true it will check if there is already a saga in the db and only if there is something it will continue...
existing: false,
// optional, will catch the event only if it contains the defined properties
containingProperties: ['aggregate.id', 'payload.totalCosts', 'payload.seats'],
// optional, if not defined it will pass the whole event...
payload: 'payload',
// optional, if not defined it will generate a new id
// it will try to load the saga from the db by this id
id: 'aggregate.id',
// optional, default Infinity, all sagas will be sorted by this value
priority: 1
}, function (evt, saga, callback) {
saga.set('orderId', evt.aggregate.id);

@@ -347,5 +347,5 @@ saga.set('totalCosts', evt.payload.totalCosts);

// saga.set({ orderId: evt.aggregate.id, totalCosts: evt.payload.totalCosts });
var cmd = {
// if you don't pass an id it will generate a new one

@@ -364,3 +364,3 @@ id: 'my own command id',

},
// to transport meta infos (like userId)...

@@ -370,11 +370,11 @@ // if not defined, it will use the meta value of the event

};
saga.addCommandToSend(cmd);
// optionally define a timeout
// this can be useful if you have an other process that will fetch timeouted sagas
var tomorrow = new Date();
tomorrow.setDate((new Date()).getDate() + 1);
tomorrow.setDate((new Date()).getDate() + 1);
var timeoutCmd = {
// if you don't pass an id it will generate a new one

@@ -393,3 +393,3 @@ id: 'my own command id',

},
// to transport meta infos (like userId)...

@@ -404,3 +404,3 @@ // if not defined, it will use the meta value of the event

// saga.defineTimeout(tomorrow);
saga.commit(callback);

@@ -425,10 +425,23 @@ });

if (err) { return console.log('ohh!'); }
sagas.forEach(function (saga) {
// saga.id...
// saga.getTimeoutAt();
// saga.getTimeoutCommands();
// if saga does not clean itself after timouted and/or no commands are defined, then:
pm.removeSaga(saga || saga.id, function (err) {});
var cmds = saga.getTimeoutCommands();
cmds.forEach(function (cmd) {
saga.addCommandToSend(cmd);
});
saga.commit(function (err) {
cmds.forEach(function (cmd) {
// publish cmd...
// msgBus.send(cmd);
// ... and set to dispatched...
pm.setCommandToDispatched(cmd.id, saga.id, function (err) {});
});
});
// or if saga does not clean itself after timouted and/or no commands are defined, then:
// pm.removeSaga(saga || saga.id, function (err) {});
// or

@@ -445,3 +458,3 @@ // saga.destroy();

if (err) { return console.log('ohh!'); }
sagas.forEach(function (saga) {

@@ -451,3 +464,3 @@ // saga.id...

// saga.getTimeoutCommands();
// if saga does not clean itself after timouted and/or no commands are defined, then:

@@ -468,6 +481,6 @@ pm.removeSaga(saga || saga.id, function (err) {});

if (err) { return console.log('ohh!'); }
cmds.forEach(function (cmd) {
// cmd is: { sagaId: 'the id of the saga', commandId: 'the id of the command', command: { /* the command */ } }
// cmd is: { sagaId: 'the id of the saga', commandId: 'the id of the command', commitStamp: 'a date', command: { /* the command */ } }
pm.setCommandToDispatched(cmd.commandId, cmd.sagaId, function (err) {});

@@ -474,0 +487,0 @@ });

@@ -0,1 +1,5 @@

## [v1.1.3](https://github.com/adrai/node-cqrs-saga/compare/v1.1.2...v1.1.3)
- added commitstamp to getUndispatchedcommands
- added possibility to addCommandToSend for timeoutedSagas
## [v1.1.2](https://github.com/adrai/node-cqrs-saga/compare/v1.1.1...v1.1.2)

@@ -2,0 +6,0 @@ - handle case of same aggregateId in different contexts or aggregates

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc