New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

cqrs-saga

Package Overview
Dependencies
Maintainers
1
Versions
64
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

cqrs-saga - npm Package Compare versions

Comparing version 1.6.0 to 1.6.1

393

lib/store/databases/redis.js

@@ -220,2 +220,39 @@ 'use strict';

scan: function (key, cursor, handleKeys, callback) {
var self = this;
if (!callback) {
callback = handleKeys;
handleKeys = cursor;
cursor = 0;
}
(function scanRecursive (curs) {
self.client.scan(curs, 'match', key, function (err, res) {
if (err) {
return callback(err);
}
function next () {
if (res[0] === '0') {
callback(null);
} else {
scanRecursive(res[0]);
}
}
if (res[1].length === 0) {
return next();
}
handleKeys(res[1], function (err) {
if (err) {
return callback(err);
}
next();
});
});
})(cursor);
},
get: function (id, callback) {

@@ -230,37 +267,44 @@ if (!id || !_.isString(id)) {

this.client.keys(this.options.prefix + '_saga:*:*:' + id, function (err, keys) {
if (err) {
debug(err);
if (callback) callback(err);
return;
}
var allKeys = [];
if (keys.length === 0) {
if (callback) callback(null, null);
return;
}
keys = _.sortBy(keys, function (s) {
return s;
});
self.client.get(keys[0], function (err, saga) {
this.scan(this.options.prefix + '_saga:*:*:' + id,
function (keys, fn) {
allKeys = allKeys.concat(keys);
fn();
}, function (err) {
if (err) {
return callback(err);
debug(err);
if (callback) callback(err);
return;
}
if (!saga) {
return callback(null, null);
}
try {
saga = jsondate.parse(saga.toString());
} catch (error) {
if (callback) callback(err);
if (allKeys.length === 0) {
if (callback) callback(null, null);
return;
}
callback(null, saga);
});
});
allKeys = _.sortBy(allKeys, function (s) {
return s;
});
self.client.get(allKeys[0], function (err, saga) {
if (err) {
return callback(err);
}
if (!saga) {
return callback(null, null);
}
try {
saga = jsondate.parse(saga.toString());
} catch (error) {
if (callback) callback(err);
return;
}
callback(null, saga);
});
}
);
},

@@ -279,23 +323,19 @@

function (callback) {
self.client.keys(self.options.prefix + '_saga:*:*:' + id, function (err, keys) {
if (err) {
return callback(err);
}
async.each(keys, function (key, callback) {
self.client.del(key, callback);
}, callback);
});
self.scan(self.options.prefix + '_saga:*:*:' + id,
function (keys, fn) {
async.each(keys, function (key, callback) {
self.client.del(key, callback);
}, fn);
}, callback
);
},
function (callback) {
self.client.keys(self.options.prefix + '_command:' + id + ':*', function (err, keys) {
if (err) {
return callback(err);
}
async.each(keys, function (key, callback) {
self.client.del(key, callback);
}, callback);
});
self.scan(self.options.prefix + '_command:' + id + ':*',
function (keys, fn) {
async.each(keys, function (key, callback) {
self.client.del(key, callback);
}, fn);
}, callback
);
}

@@ -314,57 +354,66 @@ ], function (err) {

this.client.keys(this.options.prefix + '_saga:*:*:*', function (err, keys) {
if (err) {
return callback(err);
}
var allKeys = [];
if (keys.length === 0) {
return callback(null, res);
}
this.scan(this.options.prefix + '_saga:*:*:*',
function (keys, fn) {
allKeys = allKeys.concat(keys);
fn();
}, function (err) {
if (err) {
debug(err);
if (callback) callback(err);
return;
}
keys = _.sortBy(keys, function (s) {
return s;
});
if (allKeys.length === 0) {
return callback(null, res);
}
async.each(keys, function (key, callback) {
var parts = key.split(':');
var prefix = parts[0];
var commitStampMs = parts[1];
var timeoutAtMs = parts[2];
var sagaId = parts[3];
allKeys = _.sortBy(allKeys, function (s) {
return s;
});
if (commitStampMs === 'Infinity') {
commitStampMs = Infinity;
}
if (_.isString(commitStampMs)) {
commitStampMs = parseInt(commitStampMs, 10);
}
async.each(allKeys, function (key, callback) {
var parts = key.split(':');
var prefix = parts[0];
var commitStampMs = parts[1];
var timeoutAtMs = parts[2];
var sagaId = parts[3];
if (timeoutAtMs === 'Infinity') {
timeoutAtMs = Infinity;
}
if (_.isString(timeoutAtMs)) {
timeoutAtMs = parseInt(timeoutAtMs, 10);
}
if (commitStampMs === 'Infinity') {
commitStampMs = Infinity;
}
if (_.isString(commitStampMs)) {
commitStampMs = parseInt(commitStampMs, 10);
}
if (timeoutAtMs > (new Date()).getTime()) {
return callback(null);
}
if (timeoutAtMs === 'Infinity') {
timeoutAtMs = Infinity;
}
if (_.isString(timeoutAtMs)) {
timeoutAtMs = parseInt(timeoutAtMs, 10);
}
self.get(sagaId, function (err, saga) {
if (timeoutAtMs > (new Date()).getTime()) {
return callback(null);
}
self.get(sagaId, function (err, saga) {
if (err) {
return callback(err);
}
if (saga) {
res.push(saga);
}
callback(null);
});
}, function (err) {
if (err) {
return callback(err);
}
if (saga) {
res.push(saga);
}
callback(null);
callback(null, res);
});
}, function (err) {
if (err) {
return callback(err);
}
callback(null, res);
});
});
}
);
},

@@ -382,57 +431,66 @@

this.client.keys(this.options.prefix + '_saga:*:*:*', function (err, keys) {
if (err) {
return callback(err);
}
var allKeys = [];
if (keys.length === 0) {
return callback(null, res);
}
this.scan(this.options.prefix + '_saga:*:*:*',
function (keys, fn) {
allKeys = allKeys.concat(keys);
fn();
}, function (err) {
if (err) {
debug(err);
if (callback) callback(err);
return;
}
keys = _.sortBy(keys, function (s) {
return s;
});
if (allKeys.length === 0) {
return callback(null, res);
}
async.each(keys, function (key, callback) {
var parts = key.split(':');
var prefix = parts[0];
var commitStampMs = parts[1];
var timeoutAtMs = parts[2];
var sagaId = parts[3];
allKeys = _.sortBy(allKeys, function (s) {
return s;
});
if (commitStampMs === 'Infinity') {
commitStampMs = Infinity;
}
if (_.isString(commitStampMs)) {
commitStampMs = parseInt(commitStampMs, 10);
}
async.each(allKeys, function (key, callback) {
var parts = key.split(':');
var prefix = parts[0];
var commitStampMs = parts[1];
var timeoutAtMs = parts[2];
var sagaId = parts[3];
if (timeoutAtMs === 'Infinity') {
timeoutAtMs = Infinity;
}
if (_.isString(timeoutAtMs)) {
timeoutAtMs = parseInt(timeoutAtMs, 10);
}
if (commitStampMs === 'Infinity') {
commitStampMs = Infinity;
}
if (_.isString(commitStampMs)) {
commitStampMs = parseInt(commitStampMs, 10);
}
if (commitStampMs > date.getTime()) {
return callback(null);
}
if (timeoutAtMs === 'Infinity') {
timeoutAtMs = Infinity;
}
if (_.isString(timeoutAtMs)) {
timeoutAtMs = parseInt(timeoutAtMs, 10);
}
self.get(sagaId, function (err, saga) {
if (commitStampMs > date.getTime()) {
return callback(null);
}
self.get(sagaId, function (err, saga) {
if (err) {
return callback(err);
}
if (saga) {
res.push(saga);
}
callback(null);
});
}, function (err) {
if (err) {
return callback(err);
}
if (saga) {
res.push(saga);
}
callback(null);
callback(null, res);
});
}, function (err) {
if (err) {
return callback(err);
}
callback(null, res);
});
});
}
);
},

@@ -444,37 +502,46 @@

this.client.keys(this.options.prefix + '_command:*:*', function (err, keys) {
if (err) {
return callback(err);
}
var allKeys = [];
keys = _.sortBy(keys, function (s) {
return s;
});
this.scan(this.options.prefix + '_command:*:*',
function (keys, fn) {
allKeys = allKeys.concat(keys);
fn();
}, function (err) {
if (err) {
debug(err);
if (callback) callback(err);
return;
}
async.each(keys, function (key, callback) {
self.client.get(key, function (err, data) {
if (err) {
return callback(err);
}
allKeys = _.sortBy(allKeys, function (s) {
return s;
});
if (!data) {
return callback(null);
}
async.each(allKeys, function (key, callback) {
self.client.get(key, function (err, data) {
if (err) {
return callback(err);
}
try {
data = jsondate.parse(data.toString());
} catch (error) {
return callback(err);
if (!data) {
return callback(null);
}
try {
data = jsondate.parse(data.toString());
} catch (error) {
return callback(err);
}
res.push({ sagaId: data._sagaId, commandId: data._commandId, command: data, commitStamp: data._commitStamp });
callback(null);
});
}, function (err) {
if (err) {
debug(err);
}
res.push({ sagaId: data._sagaId, commandId: data._commandId, command: data, commitStamp: data._commitStamp });
callback(null);
if (callback) callback(err, res);
});
}, function (err) {
if (err) {
debug(err);
}
if (callback) callback(err, res);
});
});
}
);
},

@@ -481,0 +548,0 @@

{
"author": "adrai",
"name": "cqrs-saga",
"version": "1.6.0",
"version": "1.6.1",
"private": false,

@@ -6,0 +6,0 @@ "main": "index.js",

@@ -0,1 +1,4 @@

## [v1.6.1](https://github.com/adrai/node-cqrs-saga/compare/v1.6.0...v1.6.1)
- redis: replace .keys() calls with .scan() calls => scales better
## [v1.6.0](https://github.com/adrai/node-cqrs-saga/compare/v1.5.0...v1.6.0)

@@ -2,0 +5,0 @@ - introduce possibility to define a shouldHandle function

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc