Comparing version 1.0.0 to 1.0.1
module.exports = require('./lib/redis'); | ||
var Redis = module.exports; | ||
var redis = Redis(); | ||
redis.get('foo', function (err, b) { | ||
console.log(err, b); | ||
}); |
@@ -1,44 +0,153 @@ | ||
function Command (commandName, args, callback) { | ||
this.commandName = commandName; | ||
this.args = args; | ||
this.callback = callback; | ||
this.subscriber = ['subscribe', 'psubscribe', 'unsubscribe', 'punsubscribe'].indexOf(commandName) !== -1; | ||
var _ = require('lodash'); | ||
var Promise = require('bluebird'); | ||
var fbuffer = require('flexbuffer'); | ||
var utils = require('./utils'); | ||
/** | ||
* Command instance | ||
* | ||
* It's rare that you need to create a Command instance yourself. | ||
* | ||
* @constructor | ||
* @param {string} name - Command name | ||
* @param {string[]} [args=null] - An array of command arguments | ||
* @param {string} [replyEncoding=null] - Set the encoding of the reply, | ||
* by default buffer will be returned. | ||
* @param {function} [callback=null] - The callback that handles the response. | ||
* If omit, the response will be handled via Promise. | ||
* @example | ||
* ```js | ||
* var infoCommand = new Command('info', null, function (err, result) { | ||
* console.log('result', result); | ||
* }); | ||
* | ||
* redis.sendCommand(infoCommand); | ||
* | ||
* // When no callback provided, Command instance will have a `promise` property, | ||
* // which will resolve/reject with the result of the command. | ||
* var getCommand = new Command('get', ['foo']); | ||
* getCommand.promise.then(function (result) { | ||
* console.log('result', result); | ||
* }); | ||
* ``` | ||
* | ||
* @see {@link Redis#sendCommand} which can send a Command instance to Redis | ||
* @public | ||
*/ | ||
function Command(name, args, replyEncoding, callback) { | ||
var _this = this; | ||
this.promise = new Promise(function (resolve, reject) { | ||
_this.resolve = _this._convertValue(resolve); | ||
_this.reject = reject; | ||
_this.name = name; | ||
_this.args = args ? _.flatten(args) : []; | ||
var transformer = Command._transformer.argument[_this.name]; | ||
if (transformer) { | ||
_this.args = transformer(_this.args); | ||
} | ||
_this.replyEncoding = replyEncoding; | ||
}).nodeify(callback); | ||
} | ||
Command.prototype.toString = function () { | ||
return Command.multiBulk([commandName].concat(this.args)); | ||
/** | ||
* Convert command to writable buffer or string | ||
* | ||
* @return {string|Buffer} | ||
* @see {@link Redis#sendCommand} | ||
* @public | ||
*/ | ||
Command.prototype.toWritable = function () { | ||
var bufferMode = false; | ||
var i; | ||
for (i = 0; i < this.args.length; ++i) { | ||
if (this.args[i] instanceof Buffer) { | ||
bufferMode = true; | ||
break; | ||
} | ||
} | ||
var result; | ||
var commandStr = '*' + (this.args.length + 1) + '\r\n$' + this.name.length + '\r\n' + this.name + '\r\n'; | ||
if (bufferMode) { | ||
var resultBuffer = new fbuffer.FlexBuffer(); | ||
resultBuffer.write(commandStr); | ||
for (i = 0; i < this.args.length; ++i) { | ||
var arg = this.args[i]; | ||
if (arg instanceof Buffer || arg instanceof String) { | ||
if (arg.length === 0) { | ||
resultBuffer.write('$0\r\n\r\n'); | ||
} else { | ||
resultBuffer.write('$' + arg.length + '\r\n'); | ||
resultBuffer.write(arg); | ||
resultBuffer.write('\r\n'); | ||
} | ||
} else { | ||
resultBuffer.write('$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n'); | ||
} | ||
} | ||
result = resultBuffer.getBuffer(); | ||
} else { | ||
result = commandStr; | ||
for (i = 0; i < this.args.length; ++i) { | ||
result += '$' + Buffer.byteLength(this.args[i]) + '\r\n' + this.args[i] + '\r\n'; | ||
} | ||
} | ||
return result; | ||
}; | ||
Command.prototype.resolve = function (value) { | ||
this.callback(null, value); | ||
/** | ||
* Convert the value from buffer to the target encoding. | ||
* | ||
* @param {function} resolve - The resolve function of the Promise | ||
* @return {function} A funtion to transform and resolve a value | ||
* @private | ||
*/ | ||
Command.prototype._convertValue = function (resolve) { | ||
var _this = this; | ||
return function (value) { | ||
// Convert buffer/buffer[] to string/string[] | ||
var result = value; | ||
var transformer; | ||
try { | ||
if (_this.replyEncoding) { | ||
result = utils.convertBufferToString(value, _this.replyEncoding); | ||
} | ||
transformer = Command._transformer.reply[_this.name]; | ||
if (transformer) { | ||
result = transformer(result); | ||
} | ||
resolve(result); | ||
} catch (err) { | ||
_this.reject(err); | ||
} | ||
return _this.promise; | ||
}; | ||
}; | ||
Command.prototype.reject = function (err) { | ||
this.callback(err); | ||
Command.FLAGS = { | ||
// Commands that can be processed when Redis is loading data from disk | ||
VALID_WHEN_LOADING: ['info', 'auth', 'select', 'subscribe', 'unsubscribe', 'psubscribe', 'pubsubscribe', 'publish', 'shutdown', 'replconf', 'role', 'pubsub', 'command', 'latency'], | ||
// Commands that can be processed when client is in the subscriber mode | ||
VALID_IN_SUBSCRIBER_MODE: ['subscribe', 'psubscribe', 'unsubscribe', 'punsubscribe', 'ping', 'quit'], | ||
// Commands that will turn current connection into subscriber mode | ||
ENTER_SUBSCRIBER_MODE: ['subscribe', 'psubscribe'], | ||
// Commands that may make current connection quit subscriber mode | ||
EXIT_SUBSCRIBER_MODE: ['unsubscribe', 'punsubscribe'], | ||
// Commands that will make client disconnect from server TODO shutdown? | ||
WILL_DISCONNECT: ['quit'] | ||
}; | ||
Command.prototype.promise = function () { | ||
if (this.callback) { | ||
return null; | ||
} | ||
var _this = this; | ||
return new Promise(function (resolve, reject) { | ||
_this.resolve = resolve; | ||
_this.reject = reject; | ||
}); | ||
Command._transformer = { | ||
argument: {}, | ||
reply: {} | ||
}; | ||
Command.multiBulk = function (values) { | ||
var str = '*' + values.length + '\r\n'; | ||
for (var i = 0; i < values.length; ++i) { | ||
str += this.bulk(values[i]); | ||
} | ||
return str; | ||
Command.setArgumentTransformer = function (name, func) { | ||
Command._transformer.argument[name] = func; | ||
}; | ||
Command.bulk = function (value) { | ||
return '$' + Buffer.byteLength(value) + '\r\n' + value + '\r\n'; | ||
Command.setReplyTransformer = function (name, func) { | ||
Command._transformer.reply[name] = func; | ||
}; | ||
module.exports = Command; |
680
lib/redis.js
var _ = require('lodash'); | ||
// TODO benchmark | ||
var Queue = require('fastqueue'); | ||
var util = require('util'); | ||
var EventEmitter = require('events').EventEmitter; | ||
var net = require('net'); | ||
var Promise = require('bluebird'); | ||
var url = require('url'); | ||
var Pipeline = require('./pipeline'); | ||
var Queue = require('fastqueue'); | ||
var Command = require('./command'); | ||
var Commander = require('./commander'); | ||
var Script = require('./script'); | ||
var SubscriptionSet = require('./subscription_set'); | ||
var SubscriptionQueue = require('./subscription_queue'); | ||
var utils = require('./utils'); | ||
var eventHandler = require('./redis/event_handler'); | ||
var debug = require('debug')('ioredis:redis'); | ||
function Redis(port, host) { | ||
if (!(this instanceof Redis)) return new Redis(port, host); | ||
/** | ||
* Creates a Redis instance | ||
* | ||
* @constructor | ||
* @param {(number|string|Object)} [port=6379] - Port of the Redis server, | ||
* or a URL string(see the examples below), | ||
* or the `options` object(see the third argument). | ||
* @param {string|Object} [host=localhost] - Host of the Redis server, | ||
* when the first argument is a URL string, | ||
* this argument is an object represents the options. | ||
* @param {Object} [options] - Other options. | ||
* @param {number} [options.port=6379] - Port of the Redis server. | ||
* @param {string} [options.host=localhost] - Host of the Redis server. | ||
* @param {string} [options.family=4] - Version of IP stack. Defaults to 4. | ||
* @param {string} [options.path=null] - Local domain socket path. If set the `port`, `host` | ||
* and `family` will be ignored. | ||
* @param {string} [options.auth=null] - If set, client will send AUTH command | ||
* with the value of this option when connected. | ||
* @param {boolean} [options.enableReadyCheck=true] - When a connection is established to | ||
* the Redis server, the server might still be loading the database from disk. | ||
* While loading, the server not respond to any commands. | ||
* To work around this, when this option is `true`, | ||
* ioredis will check the status of the Redis server, | ||
* and when the Redis server is able to process commands, | ||
* a `ready` event will be emitted. | ||
* @param {boolean} [options.enableOfflineQueue=true] - By default, | ||
* if there is no active connection to the Redis server, | ||
* commands are added to a queue and are executed once the connection is "ready" | ||
* (when `enableReadyCheck` is `true`, | ||
* "ready" means the Redis server has loaded the database from disk, otherwise means the connection | ||
* to the Redis server has been established). If this option is false, | ||
* when execute the command when the connection isn't ready, an error will be returned. | ||
* @param {boolean} [options.lazyConnect=false] - By default, | ||
* When a new `Redis` instance is created, it will connect to Redis server automatically. | ||
* If you want to keep disconnected util a command is called, you can pass the `lazyConnect` option to | ||
* the constructor: | ||
* ```javascript | ||
* var redis = new Redis({ lazyConnect: true }); | ||
* // No attempting to connect to the Redis server here. | ||
* // Now let's connect to the Redis server | ||
* redis.get('foo', function () { | ||
* }); | ||
* ``` | ||
* @param {function} [options.retryStrategy] - See "Quick Start" section | ||
* @extends [EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter) | ||
* @example | ||
* ```js | ||
* var Redis = require('ioredis'); | ||
* | ||
* var redis = new Redis(); | ||
* // or: var redis = Redis(); | ||
* | ||
* var redisOnPort6380 = new Redis(6380); | ||
* var anotherRedis = new Redis(6380, '192.168.100.1'); | ||
* var unixSocketRedis = new Redis({ path: '/tmp/echo.sock' }); | ||
* var unixSocketRedis2 = new Redis('/tmp/echo.sock'); | ||
* var urlRedis = new Redis('redis://user:password@redis-service.com:6379/'); | ||
* var urlRedis2 = new Redis('//localhost:6379'); | ||
* var authedRedis = new Redis(6380, '192.168.100.1', { auth: 'password' }); | ||
* ``` | ||
*/ | ||
function Redis(port, host, options) { | ||
if (!(this instanceof Redis)) { | ||
return new Redis(port, host, options); | ||
} | ||
EventEmitter.call(this); | ||
Commander.call(this); | ||
if (typeof port === 'object') { | ||
this.options = port; | ||
// Redis(options) | ||
this.options = _.cloneDeep(port); | ||
} else if (typeof port === 'string' && !utils.isInt(port)) { | ||
// Redis(url, options) | ||
var parsedOptions = {}; | ||
var parsedURL = url.parse(port, true, true); | ||
if (parsedURL.hostname) { | ||
parsedOptions.port = parsedURL.port; | ||
parsedOptions.host = parsedURL.hostname; | ||
if (parsedURL.auth) { | ||
parsedOptions.password = parsedURL.auth.split(':')[1]; | ||
} | ||
if (parsedURL.path) { | ||
parsedOptions.db = parseInt(parsedURL.path.slice(1), 10); | ||
} | ||
} else { | ||
parsedOptions.path = port; | ||
} | ||
this.options = _.defaults(host ? _.cloneDeep(host) : {}, parsedOptions); | ||
} else { | ||
this.options = { | ||
port: port, | ||
host: host | ||
}; | ||
// Redis(port, host, options) or Redis(port, options) | ||
if (host && typeof host === 'object') { | ||
this.options = _.defaults(_.cloneDeep(host), { port: port }); | ||
} else { | ||
this.options = _.defaults(options ? _.cloneDeep(options) : {}, { port: port, host: host }); | ||
} | ||
} | ||
_.defaults(this.options, Redis.defaultOptions); | ||
this.parser = require('./parser/javascript'); | ||
_.defaults(this.options, Redis._defaultOptions); | ||
if (typeof this.options.port === 'string') { | ||
this.options.port = parseInt(this.options.port, 10); | ||
} | ||
// disconnected -> connecting -> connected | ||
this.status = 'disconnected'; | ||
if (this.options.parser === 'javascript') { | ||
this.Parser = require('./parsers/javascript'); | ||
} else { | ||
try { | ||
this.Parser = require('./parsers/hiredis'); | ||
} catch (e) { | ||
if (this.options.parser === 'hiredis') { | ||
throw e; | ||
} | ||
this.Parser = require('./parsers/javascript'); | ||
} | ||
} | ||
this.connect(); | ||
this.commandQueue = new Queue(); | ||
this.offlineQueue = new Queue(); | ||
this.scriptsSet = {}; | ||
this.subscriptionQueue = new SubscriptionQueue(); | ||
if (this.options.sentinels) { | ||
this._sentinel = new Sentinel(this.options.sentinels, this.options.role, this.options.name); | ||
} | ||
this.retryAttempts = 0; | ||
// disconnected(or inactive) -> connected -> ready -> closing -> closed | ||
if (this.options.lazyConnect) { | ||
this.status = 'inactive'; | ||
} else { | ||
this.status = 'disconnected'; | ||
this.connect(); | ||
} | ||
} | ||
util.inherits(Redis, EventEmitter); | ||
_.extend(Redis.prototype, Commander.prototype); | ||
Redis.defaultOptions = { | ||
/** | ||
* Create a Redis instance | ||
* | ||
* @deprecated | ||
*/ | ||
Redis.prototype.createClient = util.deprecate(function () { | ||
return Redis.apply(this, arguments); | ||
}, 'Redis.createClient: Use new Redis() instead'); | ||
/** | ||
* Default options | ||
* | ||
* @var _defaultOptions | ||
* @private | ||
*/ | ||
Redis._defaultOptions = { | ||
port: 6379, | ||
host: '127.0.0.1' | ||
host: 'localhost', | ||
family: 4, | ||
enableOfflineQueue: true, | ||
enableReadyCheck: true, | ||
retryStrategy: function (times) { | ||
var delay = Math.min(times * 2, 2000); | ||
return delay; | ||
}, | ||
parser: 'auto', | ||
lazyConnect: false, | ||
password: null, | ||
db: 0, | ||
role: 'master', | ||
sentinel: null, | ||
roleRetryDelay: 500, | ||
name: null | ||
}; | ||
/** | ||
* Create a connection to Redis. | ||
* This method will be invoked automatically when creating a new Redis instance. | ||
* @public | ||
*/ | ||
Redis.prototype.connect = function () { | ||
if (this.options.path) { | ||
this.connectionOptions = _.pick(this.options, ['path']); | ||
this.condition = { | ||
select: this.options.db, | ||
auth: this.options.password, | ||
mode: { | ||
subscriber: false, | ||
monitor: false | ||
} | ||
}; | ||
if (this._sentinel) { | ||
var _this = this; | ||
this._sentinel.removeAllListeners(); | ||
this._sentinel.once('connect', function (connection) { | ||
debug('received connection from sentinel'); | ||
_this.connection = connection; | ||
bindEvent(_this); | ||
}); | ||
this._sentinel.on('error', function (error) { | ||
_this.silentEmit('error', error); | ||
}); | ||
this._sentinel.connect(this.options.role); | ||
} else { | ||
this.connectionOptions = _.pick(this.options, ['port', 'host', 'family']); | ||
var connectionOptions; | ||
if (this.options.path) { | ||
connectionOptions = _.pick(this.options, ['path']); | ||
} else { | ||
connectionOptions = _.pick(this.options, ['port', 'host', 'family']); | ||
} | ||
this.connection = net.createConnection(connectionOptions); | ||
bindEvent(this); | ||
} | ||
var stream = this.stream = net.createConnection(this.connectionOptions); | ||
stream.on('connect', this.onConnect.bind(this)); | ||
stream.on('data', this.onData.bind(this)); | ||
stream.on('error', this.onError.bind(this)); | ||
stream.on('close', this.onClose.bind(this)); | ||
stream.on('end', this.onEnd.bind(this)); | ||
stream.on('drain', this.onDrain.bind(this)); | ||
function bindEvent(self) { | ||
self.connection.once('connect', eventHandler.connectHandler(self)); | ||
self.connection.once('error', eventHandler.errorHandler(self)); | ||
self.connection.once('close', eventHandler.closeHandler(self)); | ||
self.connection.on('data', eventHandler.dataHandler(self)); | ||
} | ||
}; | ||
Redis.prototype.sendCommand = function (commandName, args, callback) { | ||
var arg, command_obj, i, il, elem_count, buffer_args, stream = this.stream, command_str = "", buffered_writes = 0, last_arg_type, lcaseCommand; | ||
/** | ||
* Disconnect from Redis. | ||
* | ||
* This method closes the connection immediately, | ||
* and may lose some pending replies that haven't written to clien. | ||
* If you want to wait for the pending replies, use Redis#quit instead. | ||
* @public | ||
*/ | ||
Redis.prototype.disconnect = function (options) { | ||
options = options || {}; | ||
this.manuallyClosing = !options.reconnect; | ||
if (typeof command !== 'string') { | ||
throw new Error('First argument to send_command must be the command name string, not ' + typeof command); | ||
if (this.connection) { | ||
this.connection.end(); | ||
} | ||
if (!Array.isArray(args)) { | ||
throw new Error('Second argument must be an array'); | ||
if (this._sentinel) { | ||
this._sentinel.disconnect(options); | ||
} | ||
if (callback && typeof callback !== 'function') { | ||
throw new Error('Last argument must be a callback or undefined'); | ||
} | ||
args = _.flatten(args); | ||
}; | ||
var command = new Command(commandName, args, callback); | ||
/** | ||
* Create a new instance, using the same options. | ||
* | ||
* @example | ||
* ```js | ||
* var redis = new Redis(6380); | ||
* var anotherRedis = redis.duplicate(); | ||
* ``` | ||
* | ||
* @public | ||
*/ | ||
Redis.prototype.duplicate = function (override) { | ||
return new Redis(_.defaults(override || {}, this.options)); | ||
}; | ||
if (this.mode.subscriber && !command.subscriber) { | ||
command.reject(new Error('Connection in subscriber mode, only subscriber commands may be used')); | ||
/** | ||
* Flush offline queue and command queue with error. | ||
* | ||
* @param {Error} error - The error object to send to the commands | ||
* @private | ||
*/ | ||
Redis.prototype.flushQueue = function (error) { | ||
var item; | ||
while (this.offlineQueue.length > 0) { | ||
item = this.offlineQueue.shift(); | ||
item.command.reject(error); | ||
} | ||
this.offlineQueue = new Queue(); | ||
if (this.status === 'connected') { | ||
this.stream.write(command.toString()); | ||
} else { | ||
if (this.options.enableOfflineQueue) { | ||
this.offlineQueue.push(command); | ||
} else { | ||
command.reject(new Error('Stream isn\'t writeable and enableofflineQueue options is false')); | ||
} | ||
var command; | ||
while (this.commandQueue.length > 0) { | ||
command = this.commandQueue.shift(); | ||
command.reject(error); | ||
} | ||
this.commandQueue = new Queue(); | ||
}; | ||
this.commandQueue.push(command); | ||
/** | ||
* Check whether Redis has finished loading the persistent data and is able to | ||
* process commands. | ||
* | ||
* @param {Function} callback | ||
* @private | ||
*/ | ||
Redis.prototype._readyCheck = function (callback) { | ||
var _this = this; | ||
this.sendCommand(new Command('info', null, 'utf8', function (err, res) { | ||
if (err) { | ||
return callback(err); | ||
} | ||
if (typeof res !== 'string') { | ||
return callback(null, res); | ||
} | ||
return command.promise(); | ||
}; | ||
var info = {}; | ||
Redis.prototype.initParser = function () { | ||
var self = this; | ||
var lines = res.split('\r\n'); | ||
for (var i = 0; i < lines.length; ++i) { | ||
var parts = lines[i].split(':'); | ||
if (parts[1]) { | ||
info[parts[0]] = parts[1]; | ||
} | ||
} | ||
// return_buffers sends back Buffers from parser to callback. detect_buffers sends back Buffers from parser, but | ||
// converts to Strings if the input arguments are not Buffers. | ||
this.reply_parser = new this.parser.Parser({ | ||
return_buffers: self.options.return_buffers || self.options.detect_buffers || false | ||
}); | ||
// "reply error" is an error sent back by Redis | ||
this.reply_parser.on("reply error", function (reply) { | ||
if (reply instanceof Error) { | ||
self.return_error(reply); | ||
if (!info.loading || info.loading === '0') { | ||
callback(null, info); | ||
} else { | ||
self.return_error(new Error(reply)); | ||
var retryTime = (info.loading_eta_seconds || 1) * 1000; | ||
debug('Redis server still loading, trying again in ' + retryTime + 'ms'); | ||
setTimeout(function () { | ||
_this._readyCheck(callback); | ||
}, retryTime); | ||
} | ||
}); | ||
this.reply_parser.on("reply", function (reply) { | ||
self.return_reply(reply); | ||
}); | ||
// "error" is bad. Somehow the parser got confused. It'll try to reset and continue. | ||
this.reply_parser.on("error", function (err) { | ||
self.emit("error", new Error("Redis reply parser error: " + err.stack)); | ||
}); | ||
})); | ||
}; | ||
Redis.prototype.return_error = function (err) { | ||
var command_obj = this.command_queue.shift(), queue_len = this.command_queue.getLength(); | ||
/** | ||
* Emit only when there's at least one listener. | ||
* | ||
* @param {string} eventName - Event to emit | ||
* @param {...*} arguments - Arguments | ||
* @return {boolean} Returns true if event had listeners, false otherwise. | ||
* @private | ||
*/ | ||
Redis.prototype.silentEmit = function (eventName) { | ||
if (this.listeners(eventName).length > 0) { | ||
return this.emit.apply(this, arguments); | ||
} | ||
return false; | ||
}; | ||
if (this.pub_sub_mode === false && queue_len === 0) { | ||
this.command_queue = new Queue(); | ||
this.emit("idle"); | ||
Redis.prototype.defineCommand = function (name, definition) { | ||
var script = new Script(definition.lua, definition.numberOfKeys); | ||
this.scriptsSet[name] = script; | ||
this[name] = function () { | ||
var args = _.toArray(arguments); | ||
var callback; | ||
if (typeof args[args.length - 1] === 'function') { | ||
callback = args.pop(); | ||
} | ||
if (this.should_buffer && queue_len <= this.command_queue_low_water) { | ||
this.emit("drain"); | ||
this.should_buffer = false; | ||
return script.execute(this, args, 'utf8', callback); | ||
}; | ||
this[name + 'Buffer'] = function () { | ||
var args = _.toArray(arguments); | ||
var callback; | ||
if (typeof args[args.length - 1] === 'function') { | ||
callback = args.pop(); | ||
} | ||
if (command_obj && typeof command_obj.callback === "function") { | ||
try { | ||
command_obj.callback(err); | ||
} catch (callback_err) { | ||
// if a callback throws an exception, re-throw it on a new stack so the parser can keep going | ||
process.nextTick(function () { | ||
throw callback_err; | ||
}); | ||
} | ||
} else { | ||
console.log("node_redis: no callback to send error: " + err.message); | ||
// this will probably not make it anywhere useful, but we might as well throw | ||
process.nextTick(function () { | ||
throw err; | ||
}); | ||
} | ||
return script.execute(this, args, null, callback); | ||
}; | ||
}; | ||
Redis.prototype.return_reply = function (reply) { | ||
var replyType; | ||
if (Array.isArray(reply) && reply.length > 0 && reply[0]) { | ||
replyType = reply[0].toString(); | ||
} | ||
/** | ||
* Listen for all requests received by the server in real time. | ||
* | ||
* This command will create a new connection to Redis and send a | ||
* MONITOR command via the new connection in order to avoid disturbing | ||
* the current connection. | ||
* | ||
* @param {function} [callback] The callback function. If omit, a promise will be returned. | ||
* @example | ||
* ```js | ||
* var redis = new Redis(); | ||
* redis.monitor(function (err, monitor) { | ||
* // Entering monitoring mode. | ||
* monitor.on('monitor', function (time, args) { | ||
* console.log(time + ": " + util.inspect(args)); | ||
* }); | ||
* }); | ||
* | ||
* // supports promise as well as other commands | ||
* redis.monitor().then(function (monitor) { | ||
* monitor.on('monitor', function (time, args) { | ||
* console.log(time + ": " + util.inspect(args)); | ||
* }); | ||
* }); | ||
* ``` | ||
* @public | ||
*/ | ||
Redis.prototype.monitor = function (callback) { | ||
var monitorInstance = this.duplicate({ lazyConnect: false }); | ||
monitorInstance.options.enableReadyCheck = false; | ||
monitorInstance.condition.mode.monitoring = true; | ||
monitorInstance.prevCondition = monitorInstance.condition; | ||
var associatedCommand; | ||
return new Promise(function (resolve) { | ||
monitorInstance.once('monitoring', function () { | ||
resolve(monitorInstance); | ||
}); | ||
}).nodeify(callback); | ||
}; | ||
var isPubSubMessage = _.includes(['message', 'pmessage'], replyType); | ||
if (!(this.mode.subscriber && isPubSubMessage)) { | ||
associatedCommand = this.commandQueue.shift(); | ||
Redis.prototype.pipeline = function () { | ||
var pipeline = new Pipeline(this); | ||
return pipeline; | ||
}; | ||
var multi = Redis.prototype.multi; | ||
Redis.prototype.multi = function (options) { | ||
if (options && options.pipeline === false) { | ||
multi.call(this); | ||
} else { | ||
var pipeline = new Pipeline(this); | ||
pipeline.multi(); | ||
var exec = pipeline.exec; | ||
pipeline.exec = function (callback) { | ||
exec.call(pipeline); | ||
var promise = exec.call(pipeline); | ||
return promise.then(function (result) { | ||
var execResult = result[result.length - 1]; | ||
if (execResult[0]) { | ||
throw execResult[0]; | ||
} | ||
return utils.wrapMultiResult(execResult[1]); | ||
}).nodeify(callback); | ||
}; | ||
return pipeline; | ||
} | ||
}; | ||
var queueLength = this.commandQueue.length; | ||
var exec = Redis.prototype.exec; | ||
Redis.prototype.exec = function (callback) { | ||
var wrapper = function (err, results) { | ||
if (Array.isArray(results)) { | ||
results = utils.wrapMultiResult(results); | ||
} | ||
callback(err, results); | ||
}; | ||
exec.call(this, wrapper); | ||
}; | ||
if (this.commandQueue.length === 0) { | ||
// TODO | ||
this.commandQueue = new Queue(); | ||
/** | ||
* Send a command to Redis | ||
* | ||
* This method is used internally by the `Redis#set`, `Redis#lpush` etc. | ||
* Most of the time you won't invoke this method directly. | ||
* However when you want to send a command that is not supported by ioredis yet, | ||
* this command will be useful. | ||
* | ||
* @method sendCommand | ||
* @memberOf Redis# | ||
* @param {Command} command - The Command instance to send. | ||
* @see {@link Command} | ||
* @example | ||
* ```js | ||
* var redis = new Redis(); | ||
* | ||
* // Use callback | ||
* var get = new Command('get', ['foo'], 'utf8', function (err, result) { | ||
* console.log(result); | ||
* }); | ||
* redis.sendCommand(get); | ||
* | ||
* // Use promise | ||
* var set = new Command('set', ['foo', 'bar'], 'utf8'); | ||
* set.promise.then(function (result) { | ||
* console.log(result); | ||
* }); | ||
* redis.sendCommand(set); | ||
* ``` | ||
* @private | ||
*/ | ||
Redis.prototype.sendCommand = function (command, stream) { | ||
if (this.status === 'inactive') { | ||
this.status = 'disconnected'; | ||
this.connect(); | ||
} | ||
if (this.condition.mode.subscriber && !_.includes(Command.FLAGS.VALID_IN_SUBSCRIBER_MODE, command.name)) { | ||
command.reject(new Error('Connection in subscriber mode, only subscriber commands may be used')); | ||
return command.promise; | ||
} | ||
if (associatedCommand && !associatedCommand.subscriber) { | ||
associatedCommand.resolve(reply); | ||
} else if (this.mode.subscriber || (associatedCommand && associatedCommand.subscriber)) { | ||
switch (replyType) { | ||
case 'message': | ||
this.emit('message', reply[1].toString(), reply[2]); // channel, message | ||
break; | ||
case 'pmessage': | ||
this.emit("pmessage", reply[1].toString(), reply[2].toString(), reply[3]); // pattern, channel, message | ||
break; | ||
case 'subscribe': | ||
case 'unsubscribe': | ||
case 'psubscribe': | ||
case 'punsubscribe': | ||
if (associatedCommand) { | ||
associatedCommand.resolve(reply); | ||
var writable = this.connection && this.connection.writable && ((this.status === 'ready') || ((this.status === 'connected') && _.includes(Command.FLAGS.VALID_WHEN_LOADING, command.name))); | ||
if (writable) { | ||
debug('write command[%d] -> %s(%s)', this.condition.select, command.name, command.args); | ||
(stream || this.connection).write(command.toWritable()); | ||
// Subscribe commands are special | ||
var isSubscribeCommand = _.includes(Command.FLAGS.ENTER_SUBSCRIBER_MODE, command.name); | ||
var isUnsubscribeCommand = !isSubscribeCommand && _.includes(Command.FLAGS.EXIT_SUBSCRIBER_MODE, command.name); | ||
if (isSubscribeCommand || isUnsubscribeCommand) { | ||
if (isSubscribeCommand && !this.condition.mode.subscriber) { | ||
this.condition.mode.subscriber = new SubscriptionSet(); | ||
} | ||
var channel = reply[1].toString(); | ||
var count = reply[2]; | ||
if (count === 0) { | ||
this.mode.subscriber = false; | ||
debug('All subscriptions removed, exiting pub/sub mode'); | ||
} else { | ||
this.mode.subscriber = true; | ||
var channels = command.args; | ||
if (isUnsubscribeCommand && channels.length === 0) { | ||
channels = this.condition.mode.subscriber.channels(command.name); | ||
} | ||
this.emit(replyType, channel, count); | ||
break; | ||
default: | ||
this.emit('error', new Error('subscriptions are active but got an invalid reply: ' + reply)); | ||
command.remainReplies = channels.length; | ||
for (var i = 0; i < channels.length; ++i) { | ||
var channel = channels[i]; | ||
if (isSubscribeCommand) { | ||
this.condition.mode.subscriber.add(command.name, channel); | ||
} else { | ||
this.condition.mode.subscriber.del(command.name, channel); | ||
} | ||
this.subscriptionQueue.push(command.name, channel, command); | ||
} | ||
} else { | ||
this.commandQueue.push(command); | ||
} | ||
} else if (this.mode.monitor) { | ||
var len = reply.indexOf(' '); | ||
var timestamp = reply.slice(0, len); | ||
var argindex = reply.indexOf('"'); | ||
var args = reply.slice(argindex + 1, -1).split('" "').map(function (elem) { | ||
return elem.replace(/\\"/g, '"'); | ||
if (_.includes(Command.FLAGS.WILL_DISCONNECT, command.name)) { | ||
this.manuallyClosing = true; | ||
} | ||
if (command.name === 'select' && utils.isInt(command.args[0])) { | ||
this.condition.select = parseInt(command.args[0], 10); | ||
debug('switch to db [%d]', this.condition.select); | ||
} | ||
} else if (this.options.enableOfflineQueue) { | ||
debug('queue command[%d] -> %s(%s)', this.condition.select, command.name, command.args); | ||
this.offlineQueue.push({ | ||
command: command, | ||
stream: stream, | ||
select: this.condition.select | ||
}); | ||
this.emit('monitor', timestamp, args); | ||
if (command.name === 'select' && utils.isInt(command.args[0])) { | ||
this.condition.select = parseInt(command.args[0], 10); | ||
debug('switch to db [%d]', this.condition.select); | ||
} | ||
} else { | ||
throw new Error("node_redis command queue state error. If you can reproduce this, please report it."); | ||
command.reject(new Error('Stream isn\'t writeable and enableOfflineQueue options is false')); | ||
} | ||
return command.promise; | ||
}; | ||
_.assign(Redis.prototype, require('./mixin/commands')); | ||
_.assign(Redis.prototype, require('./mixin/events')); | ||
_.assign(Redis.prototype, require('./redis/prototype/parser')); | ||
module.exports = Redis; | ||
Redis.Command = Command; | ||
Redis.Command.setArgumentTransformer('mset', function (args) { | ||
if (args.length === 1) { | ||
if (typeof Map !== 'undefined' && args[0] instanceof Map) { | ||
return utils.convertMapToArray(args[0]); | ||
} | ||
if ( typeof args[0] === 'object' && args[0] !== null) { | ||
return utils.convertObjectToArray(args[0]); | ||
} | ||
} | ||
return args; | ||
}); | ||
Redis.Command.setArgumentTransformer('hmset', function (args) { | ||
if (args.length === 2) { | ||
if (typeof Map !== 'undefined' && args[1] instanceof Map) { | ||
return [args[0]].concat(utils.convertMapToArray(args[1])); | ||
} | ||
if ( typeof args[1] === 'object' && args[1] !== null) { | ||
return [args[0]].concat(utils.convertObjectToArray(args[1])); | ||
} | ||
} | ||
return args; | ||
}); | ||
Redis.Command.setReplyTransformer('hgetall', function (result) { | ||
if (Array.isArray(result)) { | ||
var obj = {}; | ||
for (var i = 0; i < result.length; i += 2) { | ||
obj[result[i]] = result[i + 1]; | ||
} | ||
return obj; | ||
} | ||
return result; | ||
}); | ||
Redis.Cluster = require('./redis_cluster'); | ||
Redis.ReplyError = require('./reply_error'); | ||
var Sentinel = require('./sentinel'); |
{ | ||
"name": "ioredis", | ||
"version": "1.0.0", | ||
"version": "1.0.1", | ||
"description": "A delightful, performance-focused Redis client for Node and io.js", | ||
"main": "index.js", | ||
"scripts": { | ||
"test": "mocha" | ||
"test": "NODE_ENV=test mocha", | ||
"test:debug": "NODE_ENV=test DEBUG=ioredis:* mocha", | ||
"test:cov": "NODE_ENV=test node ./node_modules/istanbul/lib/cli.js cover --preserve-comments ./node_modules/mocha/bin/_mocha -- -R spec", | ||
"generate-docs": "jsdoc2md lib/redis.js lib/redis_cluster.js > API.md", | ||
"bench": "matcha benchmark.js" | ||
}, | ||
"repository": { | ||
"type": "git", | ||
"url": "git://github.com/luin/ioredis.git" | ||
}, | ||
"keywords": [ | ||
@@ -17,6 +25,23 @@ "redis", | ||
"dependencies": { | ||
"bluebird": "^2.9.21", | ||
"debug": "^2.1.3", | ||
"fastqueue": "^0.1.0", | ||
"flexbuffer": "0.0.6", | ||
"ioredis-commands": "^3.0.0", | ||
"lodash": "^3.6.0" | ||
}, | ||
"devDependencies": { | ||
"chai": "^2.2.0", | ||
"codeclimate-test-reporter": "0.0.4", | ||
"istanbul": "^0.3.13", | ||
"jsdoc": "^3.3.0-beta3", | ||
"matcha": "^0.6.0", | ||
"mocha": "^2.2.1", | ||
"server-destroy": "^1.0.0", | ||
"sinon": "^1.14.1" | ||
}, | ||
"engines": { | ||
"node": ">= 0.11.16", | ||
"iojs": ">= 1.0.0" | ||
} | ||
} |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No README
QualityPackage does not have a README. This may indicate a failed publish or a low quality package.
Found 1 instance in 1 package
No repository
Supply chain riskPackage does not have a linked source code repository. Without this field, a package will have no reference to the location of the source code use to generate the package.
Found 1 instance in 1 package
147768
45
3701
0
500
0
6
8
4
+ Addedbluebird@^2.9.21
+ Addeddebug@^2.1.3
+ Addedflexbuffer@0.0.6
+ Addedbluebird@2.11.0(transitive)
+ Addeddebug@2.6.9(transitive)
+ Addedflexbuffer@0.0.6(transitive)
+ Addedms@2.0.0(transitive)