Comparing version 1.6.0 to 1.6.1
@@ -220,2 +220,39 @@ 'use strict'; | ||
scan: function (key, cursor, handleKeys, callback) { | ||
var self = this; | ||
if (!callback) { | ||
callback = handleKeys; | ||
handleKeys = cursor; | ||
cursor = 0; | ||
} | ||
(function scanRecursive (curs) { | ||
self.client.scan(curs, 'match', key, function (err, res) { | ||
if (err) { | ||
return callback(err); | ||
} | ||
function next () { | ||
if (res[0] === '0') { | ||
callback(null); | ||
} else { | ||
scanRecursive(res[0]); | ||
} | ||
} | ||
if (res[1].length === 0) { | ||
return next(); | ||
} | ||
handleKeys(res[1], function (err) { | ||
if (err) { | ||
return callback(err); | ||
} | ||
next(); | ||
}); | ||
}); | ||
})(cursor); | ||
}, | ||
get: function (id, callback) { | ||
@@ -230,37 +267,44 @@ if (!id || !_.isString(id)) { | ||
this.client.keys(this.options.prefix + '_saga:*:*:' + id, function (err, keys) { | ||
if (err) { | ||
debug(err); | ||
if (callback) callback(err); | ||
return; | ||
} | ||
var allKeys = []; | ||
if (keys.length === 0) { | ||
if (callback) callback(null, null); | ||
return; | ||
} | ||
keys = _.sortBy(keys, function (s) { | ||
return s; | ||
}); | ||
self.client.get(keys[0], function (err, saga) { | ||
this.scan(this.options.prefix + '_saga:*:*:' + id, | ||
function (keys, fn) { | ||
allKeys = allKeys.concat(keys); | ||
fn(); | ||
}, function (err) { | ||
if (err) { | ||
return callback(err); | ||
debug(err); | ||
if (callback) callback(err); | ||
return; | ||
} | ||
if (!saga) { | ||
return callback(null, null); | ||
} | ||
try { | ||
saga = jsondate.parse(saga.toString()); | ||
} catch (error) { | ||
if (callback) callback(err); | ||
if (allKeys.length === 0) { | ||
if (callback) callback(null, null); | ||
return; | ||
} | ||
callback(null, saga); | ||
}); | ||
}); | ||
allKeys = _.sortBy(allKeys, function (s) { | ||
return s; | ||
}); | ||
self.client.get(allKeys[0], function (err, saga) { | ||
if (err) { | ||
return callback(err); | ||
} | ||
if (!saga) { | ||
return callback(null, null); | ||
} | ||
try { | ||
saga = jsondate.parse(saga.toString()); | ||
} catch (error) { | ||
if (callback) callback(err); | ||
return; | ||
} | ||
callback(null, saga); | ||
}); | ||
} | ||
); | ||
}, | ||
@@ -279,23 +323,19 @@ | ||
function (callback) { | ||
self.client.keys(self.options.prefix + '_saga:*:*:' + id, function (err, keys) { | ||
if (err) { | ||
return callback(err); | ||
} | ||
async.each(keys, function (key, callback) { | ||
self.client.del(key, callback); | ||
}, callback); | ||
}); | ||
self.scan(self.options.prefix + '_saga:*:*:' + id, | ||
function (keys, fn) { | ||
async.each(keys, function (key, callback) { | ||
self.client.del(key, callback); | ||
}, fn); | ||
}, callback | ||
); | ||
}, | ||
function (callback) { | ||
self.client.keys(self.options.prefix + '_command:' + id + ':*', function (err, keys) { | ||
if (err) { | ||
return callback(err); | ||
} | ||
async.each(keys, function (key, callback) { | ||
self.client.del(key, callback); | ||
}, callback); | ||
}); | ||
self.scan(self.options.prefix + '_command:' + id + ':*', | ||
function (keys, fn) { | ||
async.each(keys, function (key, callback) { | ||
self.client.del(key, callback); | ||
}, fn); | ||
}, callback | ||
); | ||
} | ||
@@ -314,57 +354,66 @@ ], function (err) { | ||
this.client.keys(this.options.prefix + '_saga:*:*:*', function (err, keys) { | ||
if (err) { | ||
return callback(err); | ||
} | ||
var allKeys = []; | ||
if (keys.length === 0) { | ||
return callback(null, res); | ||
} | ||
this.scan(this.options.prefix + '_saga:*:*:*', | ||
function (keys, fn) { | ||
allKeys = allKeys.concat(keys); | ||
fn(); | ||
}, function (err) { | ||
if (err) { | ||
debug(err); | ||
if (callback) callback(err); | ||
return; | ||
} | ||
keys = _.sortBy(keys, function (s) { | ||
return s; | ||
}); | ||
if (allKeys.length === 0) { | ||
return callback(null, res); | ||
} | ||
async.each(keys, function (key, callback) { | ||
var parts = key.split(':'); | ||
var prefix = parts[0]; | ||
var commitStampMs = parts[1]; | ||
var timeoutAtMs = parts[2]; | ||
var sagaId = parts[3]; | ||
allKeys = _.sortBy(allKeys, function (s) { | ||
return s; | ||
}); | ||
if (commitStampMs === 'Infinity') { | ||
commitStampMs = Infinity; | ||
} | ||
if (_.isString(commitStampMs)) { | ||
commitStampMs = parseInt(commitStampMs, 10); | ||
} | ||
async.each(allKeys, function (key, callback) { | ||
var parts = key.split(':'); | ||
var prefix = parts[0]; | ||
var commitStampMs = parts[1]; | ||
var timeoutAtMs = parts[2]; | ||
var sagaId = parts[3]; | ||
if (timeoutAtMs === 'Infinity') { | ||
timeoutAtMs = Infinity; | ||
} | ||
if (_.isString(timeoutAtMs)) { | ||
timeoutAtMs = parseInt(timeoutAtMs, 10); | ||
} | ||
if (commitStampMs === 'Infinity') { | ||
commitStampMs = Infinity; | ||
} | ||
if (_.isString(commitStampMs)) { | ||
commitStampMs = parseInt(commitStampMs, 10); | ||
} | ||
if (timeoutAtMs > (new Date()).getTime()) { | ||
return callback(null); | ||
} | ||
if (timeoutAtMs === 'Infinity') { | ||
timeoutAtMs = Infinity; | ||
} | ||
if (_.isString(timeoutAtMs)) { | ||
timeoutAtMs = parseInt(timeoutAtMs, 10); | ||
} | ||
self.get(sagaId, function (err, saga) { | ||
if (timeoutAtMs > (new Date()).getTime()) { | ||
return callback(null); | ||
} | ||
self.get(sagaId, function (err, saga) { | ||
if (err) { | ||
return callback(err); | ||
} | ||
if (saga) { | ||
res.push(saga); | ||
} | ||
callback(null); | ||
}); | ||
}, function (err) { | ||
if (err) { | ||
return callback(err); | ||
} | ||
if (saga) { | ||
res.push(saga); | ||
} | ||
callback(null); | ||
callback(null, res); | ||
}); | ||
}, function (err) { | ||
if (err) { | ||
return callback(err); | ||
} | ||
callback(null, res); | ||
}); | ||
}); | ||
} | ||
); | ||
}, | ||
@@ -382,57 +431,66 @@ | ||
this.client.keys(this.options.prefix + '_saga:*:*:*', function (err, keys) { | ||
if (err) { | ||
return callback(err); | ||
} | ||
var allKeys = []; | ||
if (keys.length === 0) { | ||
return callback(null, res); | ||
} | ||
this.scan(this.options.prefix + '_saga:*:*:*', | ||
function (keys, fn) { | ||
allKeys = allKeys.concat(keys); | ||
fn(); | ||
}, function (err) { | ||
if (err) { | ||
debug(err); | ||
if (callback) callback(err); | ||
return; | ||
} | ||
keys = _.sortBy(keys, function (s) { | ||
return s; | ||
}); | ||
if (allKeys.length === 0) { | ||
return callback(null, res); | ||
} | ||
async.each(keys, function (key, callback) { | ||
var parts = key.split(':'); | ||
var prefix = parts[0]; | ||
var commitStampMs = parts[1]; | ||
var timeoutAtMs = parts[2]; | ||
var sagaId = parts[3]; | ||
allKeys = _.sortBy(allKeys, function (s) { | ||
return s; | ||
}); | ||
if (commitStampMs === 'Infinity') { | ||
commitStampMs = Infinity; | ||
} | ||
if (_.isString(commitStampMs)) { | ||
commitStampMs = parseInt(commitStampMs, 10); | ||
} | ||
async.each(allKeys, function (key, callback) { | ||
var parts = key.split(':'); | ||
var prefix = parts[0]; | ||
var commitStampMs = parts[1]; | ||
var timeoutAtMs = parts[2]; | ||
var sagaId = parts[3]; | ||
if (timeoutAtMs === 'Infinity') { | ||
timeoutAtMs = Infinity; | ||
} | ||
if (_.isString(timeoutAtMs)) { | ||
timeoutAtMs = parseInt(timeoutAtMs, 10); | ||
} | ||
if (commitStampMs === 'Infinity') { | ||
commitStampMs = Infinity; | ||
} | ||
if (_.isString(commitStampMs)) { | ||
commitStampMs = parseInt(commitStampMs, 10); | ||
} | ||
if (commitStampMs > date.getTime()) { | ||
return callback(null); | ||
} | ||
if (timeoutAtMs === 'Infinity') { | ||
timeoutAtMs = Infinity; | ||
} | ||
if (_.isString(timeoutAtMs)) { | ||
timeoutAtMs = parseInt(timeoutAtMs, 10); | ||
} | ||
self.get(sagaId, function (err, saga) { | ||
if (commitStampMs > date.getTime()) { | ||
return callback(null); | ||
} | ||
self.get(sagaId, function (err, saga) { | ||
if (err) { | ||
return callback(err); | ||
} | ||
if (saga) { | ||
res.push(saga); | ||
} | ||
callback(null); | ||
}); | ||
}, function (err) { | ||
if (err) { | ||
return callback(err); | ||
} | ||
if (saga) { | ||
res.push(saga); | ||
} | ||
callback(null); | ||
callback(null, res); | ||
}); | ||
}, function (err) { | ||
if (err) { | ||
return callback(err); | ||
} | ||
callback(null, res); | ||
}); | ||
}); | ||
} | ||
); | ||
}, | ||
@@ -444,37 +502,46 @@ | ||
this.client.keys(this.options.prefix + '_command:*:*', function (err, keys) { | ||
if (err) { | ||
return callback(err); | ||
} | ||
var allKeys = []; | ||
keys = _.sortBy(keys, function (s) { | ||
return s; | ||
}); | ||
this.scan(this.options.prefix + '_command:*:*', | ||
function (keys, fn) { | ||
allKeys = allKeys.concat(keys); | ||
fn(); | ||
}, function (err) { | ||
if (err) { | ||
debug(err); | ||
if (callback) callback(err); | ||
return; | ||
} | ||
async.each(keys, function (key, callback) { | ||
self.client.get(key, function (err, data) { | ||
if (err) { | ||
return callback(err); | ||
} | ||
allKeys = _.sortBy(allKeys, function (s) { | ||
return s; | ||
}); | ||
if (!data) { | ||
return callback(null); | ||
} | ||
async.each(allKeys, function (key, callback) { | ||
self.client.get(key, function (err, data) { | ||
if (err) { | ||
return callback(err); | ||
} | ||
try { | ||
data = jsondate.parse(data.toString()); | ||
} catch (error) { | ||
return callback(err); | ||
if (!data) { | ||
return callback(null); | ||
} | ||
try { | ||
data = jsondate.parse(data.toString()); | ||
} catch (error) { | ||
return callback(err); | ||
} | ||
res.push({ sagaId: data._sagaId, commandId: data._commandId, command: data, commitStamp: data._commitStamp }); | ||
callback(null); | ||
}); | ||
}, function (err) { | ||
if (err) { | ||
debug(err); | ||
} | ||
res.push({ sagaId: data._sagaId, commandId: data._commandId, command: data, commitStamp: data._commitStamp }); | ||
callback(null); | ||
if (callback) callback(err, res); | ||
}); | ||
}, function (err) { | ||
if (err) { | ||
debug(err); | ||
} | ||
if (callback) callback(err, res); | ||
}); | ||
}); | ||
} | ||
); | ||
}, | ||
@@ -481,0 +548,0 @@ |
{ | ||
"author": "adrai", | ||
"name": "cqrs-saga", | ||
"version": "1.6.0", | ||
"version": "1.6.1", | ||
"private": false, | ||
@@ -6,0 +6,0 @@ "main": "index.js", |
@@ -0,1 +1,4 @@ | ||
## [v1.6.1](https://github.com/adrai/node-cqrs-saga/compare/v1.6.0...v1.6.1) | ||
- redis: replace .keys() calls with .scan() calls => scales better | ||
## [v1.6.0](https://github.com/adrai/node-cqrs-saga/compare/v1.5.0...v1.6.0) | ||
@@ -2,0 +5,0 @@ - introduce possibility to define a shouldHandle function |
Sorry, the diff of this file is not supported yet
4264
269062