Comparing version 1.2.7 to 1.3.0
@@ -5,2 +5,6 @@ ## Changelog | ||
### v1.3.0 - May 15, 2015 | ||
* Support pipeline redirection in Cluster mode. | ||
### v1.2.7 - May 15, 2015 | ||
@@ -7,0 +11,0 @@ |
@@ -225,3 +225,3 @@ 'use strict'; | ||
var item = offlineQueue.shift(); | ||
this.sendCommand(item.command, item.stream, item.slot); | ||
this.sendCommand(item.command, item.stream, item.node); | ||
} | ||
@@ -231,3 +231,3 @@ } | ||
Cluster.prototype.sendCommand = function (command, stream, slot) { | ||
Cluster.prototype.sendCommand = function (command, stream, node) { | ||
if (this.status === 'end') { | ||
@@ -238,47 +238,30 @@ command.reject(new Error('Connection is closed.')); | ||
var targetSlot = (typeof slot === 'number') ? slot : command.getSlot(); | ||
var ttl = this.options.maxRedirections; | ||
var targetSlot = node ? node.slot : command.getSlot(); | ||
var ttl = {}; | ||
var reject = command.reject; | ||
var _this = this; | ||
command.reject = function (err) { | ||
if (err instanceof Redis.ReplyError) { | ||
var errv = err.message.split(' '); | ||
if (!slot && (errv[0] === 'MOVED' || errv[0] === 'ASK')) { | ||
ttl -= 1; | ||
if (ttl <= 0) { | ||
return reject.call(command, new Error('Too many Cluster redirections. Last error: ' + err)); | ||
} | ||
var hostPort = errv[2].split(':'); | ||
var node = _this.createNode(hostPort[1], hostPort[0]); | ||
if (errv[0] === 'MOVED') { | ||
if (!node) { | ||
command.reject = function (err) { | ||
_this.handleError(err, ttl, { | ||
moved: function (node, slot, hostPort) { | ||
debug('command %s is moved to %s:%s', command.name, hostPort[0], hostPort[1]); | ||
_this.slots[errv[1]] = node; | ||
_this.slots[slot] = node; | ||
tryConnection(); | ||
_this.refreshSlotsCache(); | ||
} else { | ||
}, | ||
ask: function (node, slot, hostPort) { | ||
debug('command %s is required to ask %s:%s', command.name, hostPort[0], hostPort[1]); | ||
tryConnection(false, node); | ||
}, | ||
clusterDown: tryConnection, | ||
connectionClosed: tryConnection, | ||
maxRedirections: function (redirectionError) { | ||
reject.call(command, redirectionError); | ||
}, | ||
defaults: function () { | ||
reject.call(command, err); | ||
} | ||
} else if (errv[0] === 'CLUSTERDOWN' && _this.options.retryDelayOnClusterDown > 0) { | ||
debug('command %s is rejected because CLUSTERDOWN. Refreshing the slot cache after %dms', command.name, _this.options.retryDelayOnClusterDown); | ||
setTimeout(function () { | ||
_this.refreshSlotsCache(function () { | ||
tryConnection(); | ||
}); | ||
}, _this.options.retryDelayOnClusterDown); | ||
} else { | ||
reject.call(command, err); | ||
} | ||
} else if (err.message === 'Connection is closed.' && _this.options.retryDelayOnFailover > 0) { | ||
debug('command %s is rejected because %s. Refreshing the slot cache...', command.name, err); | ||
setTimeout(function () { | ||
_this.refreshSlotsCache(function () { | ||
tryConnection(); | ||
}); | ||
}, _this.options.retryDelayOnFailover); | ||
} else { | ||
reject.call(command, err); | ||
} | ||
}; | ||
}); | ||
}; | ||
} | ||
tryConnection(); | ||
@@ -293,10 +276,18 @@ | ||
var redis; | ||
if (typeof targetSlot === 'number') { | ||
redis = _this.slots[targetSlot]; | ||
} else if (asking && !random) { | ||
redis = asking; | ||
redis.asking(); | ||
if (node && node.redis) { | ||
redis = node.redis; | ||
} else { | ||
if (typeof targetSlot === 'number') { | ||
redis = _this.slots[targetSlot]; | ||
} | ||
if (asking && !random) { | ||
redis = asking; | ||
redis.asking(); | ||
} | ||
if (random || !redis) { | ||
redis = _this.nodes[_.sample(Object.keys(_this.nodes))]; | ||
} | ||
} | ||
if (random || typeof targetSlot !== 'number' || !redis) { | ||
redis = _this.nodes[_.sample(Object.keys(_this.nodes))]; | ||
if (node && !node.redis) { | ||
node.redis = redis; | ||
} | ||
@@ -308,3 +299,3 @@ redis.sendCommand(command, stream); | ||
stream: stream, | ||
slot: slot | ||
node: node | ||
}); | ||
@@ -318,2 +309,38 @@ } else { | ||
Cluster.prototype.handleError = function (error, ttl, handlers) { | ||
var _this = this; | ||
if (typeof ttl.value === 'undefined') { | ||
ttl.value = this.options.maxRedirections; | ||
} | ||
var errv = error.message.split(' '); | ||
if (errv[0] === 'MOVED' || errv[0] === 'ASK') { | ||
ttl.value -= 1; | ||
if (ttl.value <= 0) { | ||
handlers.maxRedirections(new Error('Too many Cluster redirections. Last error: ' + error)); | ||
return; | ||
} | ||
var hostPort = errv[2].split(':'); | ||
var node = this.createNode(hostPort[1], hostPort[0]); | ||
if (errv[0] === 'MOVED') { | ||
handlers.moved(node, errv[1], hostPort); | ||
} else { | ||
handlers.ask(node, errv[1], hostPort); | ||
} | ||
} else if (errv[0] === 'CLUSTERDOWN' && this.options.retryDelayOnClusterDown > 0) { | ||
setTimeout(function () { | ||
_this.refreshSlotsCache(function () { | ||
handlers.clusterDown(); | ||
}); | ||
}, this.options.retryDelayOnClusterDown); | ||
} else if (error.message === 'Connection is closed.' && this.options.retryDelayOnFailover > 0) { | ||
setTimeout(function () { | ||
_this.refreshSlotsCache(function () { | ||
handlers.connectionClosed(); | ||
}); | ||
}, this.options.retryDelayOnFailover); | ||
} else { | ||
handlers.defaults(); | ||
} | ||
}; | ||
Cluster.prototype.getInfoFromNode = function (redis, callback) { | ||
@@ -320,0 +347,0 @@ if (!redis) { |
@@ -17,3 +17,4 @@ 'use strict'; | ||
* @param {string[]} [args=null] - An array of command arguments | ||
* @param {string} [replyEncoding=null] - Set the encoding of the reply, | ||
* @param {object} [options] | ||
* @param {string} [options.replyEncoding=null] - Set the encoding of the reply, | ||
* by default buffer will be returned. | ||
@@ -46,10 +47,17 @@ * @param {function} [callback=null] - The callback that handles the response. | ||
this.args = args ? _.flatten(args) : []; | ||
this.callback = callback; | ||
this.initPromise(); | ||
} | ||
Command.prototype.initPromise = function () { | ||
var _this = this; | ||
this.promise = new Promise(function (resolve, reject) { | ||
var transformer = Command._transformer.argument[_this.name]; | ||
if (transformer) { | ||
_this.args = transformer(_this.args); | ||
if (!_this.transformed) { | ||
_this.transformed = true; | ||
var transformer = Command._transformer.argument[_this.name]; | ||
if (transformer) { | ||
_this.args = transformer(_this.args); | ||
} | ||
_this.stringifyArguments(); | ||
} | ||
_this.stringifyArguments(); | ||
@@ -64,4 +72,4 @@ _this.resolve = _this._convertValue(resolve); | ||
} | ||
}).nodeify(callback); | ||
} | ||
}).nodeify(this.callback); | ||
}; | ||
@@ -68,0 +76,0 @@ Command.prototype.getSlot = function () { |
'use strict'; | ||
var _ = require('lodash'); | ||
var Commander = require('./commander'); | ||
var Command = require('./command'); | ||
var fbuffer = require('flexbuffer'); | ||
var Promise = require('bluebird'); | ||
var utils = require('./utils'); | ||
var commands = require('../commands'); | ||
@@ -13,2 +14,3 @@ function Pipeline(redis) { | ||
this.redis = redis; | ||
this.isCluster = this.redis.constructor.name === 'Cluster'; | ||
this.options = redis.options; | ||
@@ -38,5 +40,107 @@ this._queue = []; | ||
this._result[position] = value; | ||
if (!--this.replyPending) { | ||
this.resolve(this._result); | ||
if (--this.replyPending) { | ||
return; | ||
} | ||
var i; | ||
if (this.isCluster) { | ||
var retriable = true; | ||
var commonError; | ||
var inTransaction; | ||
for (i = 0; i < this._result.length; ++i) { | ||
var error = this._result[i][0]; | ||
var command = this._queue[i]; | ||
if (command.name === 'multi') { | ||
inTransaction = true; | ||
} else if (command.name === 'exec') { | ||
inTransaction = false; | ||
} | ||
if (error) { | ||
if (command.name === 'exec' && error.message === 'EXECABORT Transaction discarded because of previous errors.') { | ||
continue; | ||
} | ||
if (!commonError) { | ||
commonError = { | ||
name: error.name, | ||
message: error.message | ||
}; | ||
} else if (commonError.name !== error.name || commonError.message !== error.message) { | ||
retriable = false; | ||
break; | ||
} | ||
} else if (!inTransaction) { | ||
var commandDef = commands[command.name]; | ||
var isReadOnly = commandDef && _.include(commandDef.flags, 'readonly'); | ||
if (!isReadOnly) { | ||
retriable = false; | ||
break; | ||
} | ||
} | ||
} | ||
if (commonError && retriable) { | ||
var _this = this; | ||
var errv = commonError.message.split(' '); | ||
var queue = this._queue; | ||
inTransaction = false; | ||
this._queue = []; | ||
for (i = 0; i < queue.length; ++i) { | ||
if (errv[0] === 'ASK' && !inTransaction && | ||
queue[i].name !== 'asking' && | ||
(!queue[i - 1] || queue[i - 1].name !== 'asking')) { | ||
var asking = new Command('asking'); | ||
asking.ignore = true; | ||
this.sendCommand(asking); | ||
} | ||
queue[i].initPromise(); | ||
this.sendCommand(queue[i]); | ||
if (queue[i].name === 'multi') { | ||
inTransaction = true; | ||
} else if (queue[i].name === 'exec') { | ||
inTransaction = false; | ||
} | ||
} | ||
var matched = true; | ||
if (typeof this.leftRedirections === 'undefined') { | ||
this.leftRedirections = {}; | ||
} | ||
this.redis.handleError(commonError, this.leftRedirections, { | ||
moved: function (node, slot) { | ||
_this.preferNode = node; | ||
_this.redis.slots[errv[1]] = node; | ||
_this.redis.refreshSlotsCache(); | ||
_this.exec(); | ||
}, | ||
ask: function (node) { | ||
_this.preferNode = node; | ||
_this.exec(); | ||
}, | ||
clusterDown: function () { | ||
_this.exec(); | ||
}, | ||
connectionClosed: function () { | ||
_this.exec(); | ||
}, | ||
maxRedirections: function () { | ||
matched = false; | ||
}, | ||
defaults: function () { | ||
matched = false; | ||
} | ||
}); | ||
if (matched) { | ||
return; | ||
} | ||
} | ||
} | ||
var ignoredCount = 0; | ||
for (i = 0; i < this._queue.length - ignoredCount; ++i) { | ||
if (this._queue[i + ignoredCount].ignore) { | ||
ignoredCount += 1; | ||
} | ||
this._result[i] = this._result[i + ignoredCount]; | ||
} | ||
this.resolve(this._result.slice(0, this._result.length - ignoredCount)); | ||
}; | ||
@@ -71,5 +175,7 @@ | ||
} | ||
var isCluster = this.redis.constructor.name === 'Cluster'; | ||
var sampleKey; | ||
if (!this.nodeifiedPromise) { | ||
this.nodeifiedPromise = true; | ||
this.promise.nodeify(callback); | ||
} | ||
var pipelineSlot; | ||
// Check whether scripts exists and get a sampleKey. | ||
@@ -79,5 +185,19 @@ var scripts = []; | ||
var item = this._queue[i]; | ||
if (isCluster && !sampleKey) { | ||
sampleKey = item.getKeys()[0]; | ||
if (this.isCluster) { | ||
var keys = item.getKeys(); | ||
for (var j = 0; j < keys.length; ++j) { | ||
var slot = utils.calcSlot(keys[j]); | ||
if (typeof pipelineSlot === 'undefined') { | ||
pipelineSlot = slot; | ||
} | ||
if (pipelineSlot !== slot) { | ||
this.reject(new Error('All keys in the pipeline should belong to the same slot(expect "' + keys[j] + '" belongs to slot ' + pipelineSlot + ').')); | ||
return this.promise; | ||
} | ||
} | ||
} | ||
if (this.isCluster && item.isCustomCommand) { | ||
this.reject(new Error('Sending custom commands in pipeline is not supported in Cluster mode.')); | ||
return this.promise; | ||
} | ||
if (item.name !== 'evalsha') { | ||
@@ -92,9 +212,4 @@ continue; | ||
} | ||
var sampleSlot; | ||
if (isCluster) { | ||
if (sampleKey) { | ||
sampleSlot = utils.calcSlot(sampleKey); | ||
} else { | ||
sampleSlot = Math.random() * 16384 | 0; | ||
} | ||
if (this.isCluster && typeof pipelineSlot === 'undefined') { | ||
pipelineSlot = Math.random() * 16384 | 0; | ||
} | ||
@@ -105,9 +220,3 @@ | ||
if (scripts.length) { | ||
var redis; | ||
if (isCluster) { | ||
redis = this.redis.slots[sampleSlot]; | ||
} else { | ||
redis = this.redis; | ||
} | ||
promise = redis.script('exists', scripts.map(function (item) { | ||
promise = this.redis.script('exists', scripts.map(function (item) { | ||
return item.sha; | ||
@@ -122,3 +231,3 @@ })).then(function (results) { | ||
return Promise.all(pending.map(function (script) { | ||
return redis.script('load', script.lua); | ||
return _this.redis.script('load', script.lua); | ||
})); | ||
@@ -134,2 +243,3 @@ }); | ||
var node = { slot: pipelineSlot, redis: _this.preferNode }; | ||
var bufferMode = false; | ||
@@ -155,4 +265,4 @@ var stream = { | ||
} | ||
if (isCluster) { | ||
_this.redis.slots[sampleSlot].stream.write(data); | ||
if (_this.isCluster) { | ||
node.redis.stream.write(data); | ||
} else { | ||
@@ -166,8 +276,8 @@ _this.redis.stream.write(data); | ||
for (var i = 0; i < _this._queue.length; ++i) { | ||
_this.redis.sendCommand(_this._queue[i], stream, sampleSlot); | ||
_this.redis.sendCommand(_this._queue[i], stream, node); | ||
} | ||
return _this.promise; | ||
}).nodeify(callback); | ||
}); | ||
}; | ||
module.exports = Pipeline; |
@@ -18,3 +18,5 @@ 'use strict'; | ||
var result = container.sendCommand(new Command('evalsha', [this.sha].concat(args), options)); | ||
var evalsha = new Command('evalsha', [this.sha].concat(args), options); | ||
evalsha.isCustomCommand = true; | ||
var result = container.sendCommand(evalsha); | ||
if (result instanceof Promise) { | ||
@@ -21,0 +23,0 @@ var _this = this; |
@@ -21,3 +21,5 @@ 'use strict'; | ||
pipeline.exec = function (callback) { | ||
exec.call(pipeline); | ||
if (this._transactions > 0) { | ||
exec.call(pipeline); | ||
} | ||
var promise = exec.call(pipeline); | ||
@@ -24,0 +26,0 @@ return promise.then(function (result) { |
{ | ||
"name": "ioredis", | ||
"version": "1.2.7", | ||
"version": "1.3.0", | ||
"description": "A delightful, performance-focused Redis client for Node and io.js", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -508,8 +508,14 @@ # ioredis | ||
### Transaction and pipeline in Cluster mode | ||
Almost all features that are supported by `Redis` also works in Cluster mode, e.g. custom commands, transaction and pipeline. | ||
However there are something different when use transaction and pipeline in Cluster mode: | ||
Almost all features that are supported by `Redis` also supported by `Redis.Cluster`, e.g. custom commands, transaction and pipeline. | ||
However there are some differences when using transaction and pipeline in Cluster mode: | ||
0. You can't use `multi` without pipeline(aka `cluster.multi({ pipeline: false })`). This is because when you call `cluster.multi({ pipeline: false })`, ioredis doesn't know which node should the `multi` command to be sent to. | ||
0. With pipeline, cluster uses the first key in the pipeline queue as the sample key to calculate the slot, and all commands in the pipeline will be sent to the node that the slot belongs to. | ||
0. All keys in a pipeline should belong to the same slot since ioredis sends all commands in the pipeline to the same node. For example: | ||
0. You can't use `multi` without pipeline(aka `cluster.multi({ pipeline: false })`). This is because when you call `cluster.multi({ pipeline: false })`, ioredis doesn't know which node should the `multi` command be sent to. | ||
0. Chaining custom commands in the pipeline is not supported in Cluster mode. | ||
When any commands in a pipeline receives a `MOVED` or `ASK` error, ioredis will resend the whole pipeline to the specified node automatically if all of the following conditions are satisfied: | ||
0. All errors received in the pipeline are same. For example, we won't resend the pipeline if we got two `MOVED` error pointing to different nodes. | ||
0. All commands executed successfully are readonly commands. This makes sure that resending the pipeline won't have side effect. | ||
## hiredis | ||
@@ -516,0 +522,0 @@ If [hiredis](https://github.com/redis/hiredis-node) is installed(by `npm install hiredis`), |
@@ -344,22 +344,54 @@ 'use strict'; | ||
describe('pipeline', function () { | ||
it('should use the first key to calculate the slot', function (done) { | ||
it('should throw when not all keys belong to the same slot', function (done) { | ||
var slotTable = [ | ||
[0, 12181, ['127.0.0.1', 30001]], | ||
[12182, 12183, ['127.0.0.1', 30002]], | ||
[12184, 16383, ['127.0.0.1', 30001]], | ||
]; | ||
var node1 = new MockServer(30001, function (argv) { | ||
if (argv[0] === 'cluster' && argv[1] === 'slots') { | ||
return [ | ||
[0, 12181, ['127.0.0.1', 30001]], | ||
[12182, 12183, ['127.0.0.1', 30002]], | ||
[12184, 16383, ['127.0.0.1', 30001]], | ||
]; | ||
return slotTable; | ||
} | ||
}); | ||
var pending = 2; | ||
var node2 = new MockServer(30002, function (argv) { | ||
if (argv.toString() === 'set,foo,bar') { | ||
pending -= 1; | ||
} else if (argv.toString() === 'get,foo2') { | ||
pending -= 1; | ||
if (!pending) { | ||
cluster.disconnect(); | ||
disconnect([node1, node2], done); | ||
if (argv[0] === 'cluster' && argv[1] === 'slots') { | ||
return slotTable; | ||
} | ||
}); | ||
var cluster = new Redis.Cluster([ | ||
{ host: '127.0.0.1', port: '30001' } | ||
]); | ||
cluster.pipeline().set('foo', 'bar').get('foo2').exec().catch(function (err) { | ||
expect(err.message).to.match(/All keys in the pipeline should belong to the same slot/); | ||
cluster.disconnect(); | ||
disconnect([node1, node2], done); | ||
}); | ||
}); | ||
it('should auto redirect commands on MOVED', function (done) { | ||
var moved = false; | ||
var slotTable = [ | ||
[0, 12181, ['127.0.0.1', 30001]], | ||
[12182, 12183, ['127.0.0.1', 30002]], | ||
[12184, 16383, ['127.0.0.1', 30001]], | ||
]; | ||
var node1 = new MockServer(30001, function (argv) { | ||
if (argv[0] === 'cluster' && argv[1] === 'slots') { | ||
return slotTable; | ||
} | ||
if (argv[0] === 'get' && argv[1] === 'foo') { | ||
return 'bar'; | ||
} | ||
}); | ||
var node2 = new MockServer(30002, function (argv) { | ||
if (argv[0] === 'cluster' && argv[1] === 'slots') { | ||
return slotTable; | ||
} | ||
if (argv[1] === 'foo') { | ||
if (argv[0] === 'set') { | ||
expect(moved).to.eql(false); | ||
moved = true; | ||
} | ||
return new Error('MOVED ' + utils.calcSlot('foo') + ' 127.0.0.1:30001'); | ||
} | ||
@@ -371,6 +403,226 @@ }); | ||
]); | ||
cluster.pipeline().set('foo', 'bar').get('foo2').exec(function (err, result) { | ||
cluster.pipeline().get('foo').set('foo', 'bar').exec(function (err, result) { | ||
expect(err).to.eql(null); | ||
expect(result[0]).to.eql([null, 'bar']); | ||
expect(result[1]).to.eql([null, 'OK']); | ||
cluster.disconnect(); | ||
disconnect([node1, node2], done); | ||
}); | ||
}); | ||
it('should auto redirect commands on ASK', function (done) { | ||
var asked = false; | ||
var slotTable = [ | ||
[0, 12181, ['127.0.0.1', 30001]], | ||
[12182, 12183, ['127.0.0.1', 30002]], | ||
[12184, 16383, ['127.0.0.1', 30001]], | ||
]; | ||
var node1 = new MockServer(30001, function (argv) { | ||
if (argv[0] === 'cluster' && argv[1] === 'slots') { | ||
return slotTable; | ||
} | ||
if (argv[0] === 'asking') { | ||
asked = true; | ||
} | ||
if (argv[0] === 'get' && argv[1] === 'foo') { | ||
expect(asked).to.eql(true); | ||
return 'bar'; | ||
} | ||
if (argv[0] !== 'asking') { | ||
asked = false; | ||
} | ||
}); | ||
var node2 = new MockServer(30002, function (argv) { | ||
if (argv[0] === 'cluster' && argv[1] === 'slots') { | ||
return slotTable; | ||
} | ||
if (argv[1] === 'foo') { | ||
return new Error('ASK ' + utils.calcSlot('foo') + ' 127.0.0.1:30001'); | ||
} | ||
}); | ||
var cluster = new Redis.Cluster([ | ||
{ host: '127.0.0.1', port: '30001' } | ||
]); | ||
cluster.pipeline().get('foo').set('foo', 'bar').exec(function (err, result) { | ||
expect(err).to.eql(null); | ||
expect(result[0]).to.eql([null, 'bar']); | ||
expect(result[1]).to.eql([null, 'OK']); | ||
cluster.disconnect(); | ||
disconnect([node1, node2], done); | ||
}); | ||
}); | ||
it('should not redirect commands on a non-readonly command is successful', function (done) { | ||
var slotTable = [ | ||
[0, 12181, ['127.0.0.1', 30001]], | ||
[12182, 12183, ['127.0.0.1', 30002]], | ||
[12184, 16383, ['127.0.0.1', 30001]], | ||
]; | ||
var node1 = new MockServer(30001, function (argv) { | ||
if (argv[0] === 'cluster' && argv[1] === 'slots') { | ||
return slotTable; | ||
} | ||
if (argv[0] === 'get' && argv[1] === 'foo') { | ||
return 'bar'; | ||
} | ||
}); | ||
var node2 = new MockServer(30002, function (argv) { | ||
if (argv[0] === 'cluster' && argv[1] === 'slots') { | ||
return slotTable; | ||
} | ||
if (argv[0] === 'get' && argv[1] === 'foo') { | ||
return new Error('MOVED ' + utils.calcSlot('foo') + ' 127.0.0.1:30001'); | ||
} | ||
}); | ||
var cluster = new Redis.Cluster([ | ||
{ host: '127.0.0.1', port: '30001' } | ||
]); | ||
cluster.pipeline().get('foo').set('foo', 'bar').exec(function (err, result) { | ||
expect(err).to.eql(null); | ||
expect(result[0][0].message).to.match(/MOVED/); | ||
expect(result[1]).to.eql([null, 'OK']); | ||
cluster.disconnect(); | ||
disconnect([node1, node2], done); | ||
}); | ||
}); | ||
it('should retry when redis is down', function (done) { | ||
var slotTable = [ | ||
[0, 12181, ['127.0.0.1', 30001]], | ||
[12182, 12183, ['127.0.0.1', 30002]], | ||
[12184, 16383, ['127.0.0.1', 30001]], | ||
]; | ||
var node1 = new MockServer(30001, function (argv) { | ||
if (argv[0] === 'cluster' && argv[1] === 'slots') { | ||
return slotTable; | ||
} | ||
}); | ||
var node2 = new MockServer(30002, function (argv) { | ||
if (argv[0] === 'cluster' && argv[1] === 'slots') { | ||
return slotTable; | ||
} | ||
if (argv[0] === 'get' && argv[1] === 'foo') { | ||
return 'bar'; | ||
} | ||
}); | ||
var cluster = new Redis.Cluster([ | ||
{ host: '127.0.0.1', port: '30001' } | ||
], { retryDelayOnFailover: 1 }); | ||
stub(cluster, 'refreshSlotsCache', function () { | ||
node2.connect(); | ||
cluster.refreshSlotsCache.restore(); | ||
cluster.refreshSlotsCache.apply(cluster, arguments); | ||
}); | ||
node2.disconnect(); | ||
cluster.pipeline().get('foo').set('foo', 'bar').exec(function (err, result) { | ||
expect(err).to.eql(null); | ||
expect(result[0]).to.eql([null, 'bar']); | ||
expect(result[1]).to.eql([null, 'OK']); | ||
cluster.disconnect(); | ||
disconnect([node1, node2], done); | ||
}); | ||
}); | ||
}); | ||
describe('transaction', function () { | ||
it('should auto redirect commands on MOVED', function (done) { | ||
var moved = false; | ||
var slotTable = [ | ||
[0, 12181, ['127.0.0.1', 30001]], | ||
[12182, 12183, ['127.0.0.1', 30002]], | ||
[12184, 16383, ['127.0.0.1', 30001]], | ||
]; | ||
var node1 = new MockServer(30001, function (argv) { | ||
if (argv[0] === 'cluster' && argv[1] === 'slots') { | ||
return slotTable; | ||
} | ||
if (argv[1] === 'foo') { | ||
return 'QUEUED'; | ||
} | ||
if (argv[0] === 'exec') { | ||
expect(moved).to.eql(true); | ||
return ['bar', 'OK']; | ||
} | ||
}); | ||
var node2 = new MockServer(30002, function (argv) { | ||
if (argv[0] === 'cluster' && argv[1] === 'slots') { | ||
return slotTable; | ||
} | ||
if (argv[0] === 'get' && argv[1] === 'foo') { | ||
moved = true; | ||
return new Error('MOVED ' + utils.calcSlot('foo') + ' 127.0.0.1:30001'); | ||
} | ||
if (argv[0] === 'exec') { | ||
return new Error('EXECABORT Transaction discarded because of previous errors.'); | ||
} | ||
}); | ||
var cluster = new Redis.Cluster([ | ||
{ host: '127.0.0.1', port: '30001' } | ||
]); | ||
cluster.multi().get('foo').set('foo', 'bar').exec(function (err, result) { | ||
expect(err).to.eql(null); | ||
expect(result[0]).to.eql([null, 'bar']); | ||
expect(result[1]).to.eql([null, 'OK']); | ||
cluster.disconnect(); | ||
disconnect([node1, node2], done); | ||
}); | ||
}); | ||
it('should auto redirect commands on ASK', function (done) { | ||
var asked = false; | ||
var slotTable = [ | ||
[0, 12181, ['127.0.0.1', 30001]], | ||
[12182, 12183, ['127.0.0.1', 30002]], | ||
[12184, 16383, ['127.0.0.1', 30001]], | ||
]; | ||
var node1 = new MockServer(30001, function (argv) { | ||
if (argv[0] === 'cluster' && argv[1] === 'slots') { | ||
return slotTable; | ||
} | ||
if (argv[0] === 'asking') { | ||
asked = true; | ||
} | ||
if (argv[0] === 'multi') { | ||
expect(asked).to.eql(true); | ||
} | ||
if (argv[0] === 'get' && argv[1] === 'foo') { | ||
expect(asked).to.eql(false); | ||
return 'bar'; | ||
} | ||
if (argv[0] === 'exec') { | ||
expect(asked).to.eql(false); | ||
return ['bar', 'OK']; | ||
} | ||
if (argv[0] !== 'asking') { | ||
asked = false; | ||
} | ||
}); | ||
var node2 = new MockServer(30002, function (argv) { | ||
if (argv[0] === 'cluster' && argv[1] === 'slots') { | ||
return slotTable; | ||
} | ||
if (argv[0] === 'get' && argv[1] === 'foo') { | ||
return new Error('ASK ' + utils.calcSlot('foo') + ' 127.0.0.1:30001'); | ||
} | ||
if (argv[0] === 'exec') { | ||
return new Error('EXECABORT Transaction discarded because of previous errors.'); | ||
} | ||
}); | ||
var cluster = new Redis.Cluster([ | ||
{ host: '127.0.0.1', port: '30001' } | ||
]); | ||
cluster.multi().get('foo').set('foo', 'bar').exec(function (err, result) { | ||
expect(err).to.eql(null); | ||
expect(result[0]).to.eql([null, 'bar']); | ||
expect(result[1]).to.eql([null, 'OK']); | ||
cluster.disconnect(); | ||
disconnect([node1, node2], done); | ||
}); | ||
}); | ||
}); | ||
}); | ||
@@ -377,0 +629,0 @@ |
@@ -12,4 +12,12 @@ 'use strict'; | ||
this.port = port; | ||
this.handler = handler; | ||
this.connect(); | ||
} | ||
util.inherits(MockServer, EventEmitter); | ||
MockServer.prototype.connect = function () { | ||
var _this = this; | ||
this.handler = handler; | ||
this.socket = net.createServer(function (c) { | ||
@@ -37,8 +45,7 @@ process.nextTick(function () { | ||
}); | ||
this.socket.listen(port); | ||
this.socket.listen(this.port); | ||
enableDestroy(this.socket); | ||
} | ||
}; | ||
util.inherits(MockServer, EventEmitter); | ||
MockServer.prototype.disconnect = function (callback) { | ||
@@ -45,0 +52,0 @@ this.socket.destroy(callback); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
253423
6773
702