Comparing version 0.0.3 to 0.0.4
var _ = require('lodash'); | ||
var Promise = require('bluebird'); | ||
var fbuffer = require('flexbuffer'); | ||
var utils = require('./utils'); | ||
/** | ||
@@ -107,8 +107,16 @@ * Command instance | ||
// Convert buffer/buffer[] to string/string[] | ||
if (_this.replyEncoding) { | ||
value = convertBufferToString(value, _this.replyEncoding); | ||
var result = value; | ||
var transformer; | ||
try { | ||
if (_this.replyEncoding) { | ||
result = utils.convertBufferToString(value, _this.replyEncoding); | ||
} | ||
transformer = Command.transformer.reply[_this.name]; | ||
if (transformer) { | ||
result = transformer(result); | ||
} | ||
resolve(result); | ||
} catch (err) { | ||
_this.reject(err); | ||
} | ||
var transformer = Command.transformer.reply[_this.name]; | ||
resolve(transformer ? transformer(value) : value); | ||
return _this.promise; | ||
@@ -124,3 +132,5 @@ }; | ||
// Commands that will turn current connection into subscriber mode | ||
ENTER_SUBSCRIBER_MODE: ['subscribe', 'psubscribe', 'unsubscribe', 'punsubscribe'], | ||
ENTER_SUBSCRIBER_MODE: ['subscribe', 'psubscribe'], | ||
// Commands that may make current connection quit subscriber mode | ||
EXIT_SUBSCRIBER_MODE: ['unsubscribe', 'punsubscribe'], | ||
// Commands that will make client disconnect from server TODO shutdown? | ||
@@ -136,21 +146,1 @@ WILL_DISCONNECT: ['quit'] | ||
module.exports = Command; | ||
function convertBufferToString(value, encoding) { | ||
if (value instanceof Buffer) { | ||
return value.toString(encoding); | ||
} | ||
if (Array.isArray(value)) { | ||
var res = []; | ||
for (var i = 0; i < value.length; ++i) { | ||
if (value[i] instanceof Buffer) { | ||
value[i] = value[i].toString(encoding); | ||
} else if (Array.isArray(value[i])) { | ||
res[i] = convertBufferToString(value[i], encoding); | ||
} else { | ||
res[i] = value[i]; | ||
} | ||
} | ||
return res; | ||
} | ||
return value; | ||
} |
@@ -8,2 +8,4 @@ var _ = require('lodash'); | ||
var Command = require('./command'); | ||
var SubscriptionSet = require('./subscription_set'); | ||
var SubscriptionQueue = require('./subscription_queue'); | ||
@@ -32,3 +34,3 @@ /** | ||
* To work around this, when this option is `true`, | ||
* ioRedis will check the status of the Redis server, | ||
* ioredis will check the status of the Redis server, | ||
* and when the Redis server is able to process commands, | ||
@@ -91,2 +93,4 @@ * a `ready` event will be emitted. | ||
this.offlineQueue = new Queue(); | ||
this.subscriptionSet = new SubscriptionSet(); | ||
this.subscriptionQueue = new SubscriptionQueue(); | ||
@@ -93,0 +97,0 @@ // disconnected -> connected -> ready -> closing -> closed |
@@ -79,3 +79,3 @@ var _ = require('lodash'); | ||
* Most of the time you won't invoke this method directly. | ||
* However when you want to send a command that is not supported by ioRedis yet, | ||
* However when you want to send a command that is not supported by ioredis yet, | ||
* this command will be useful. | ||
@@ -110,24 +110,41 @@ * | ||
if (this.status === 'ready') { | ||
var writable = (this.status === 'ready') || ((this.status === 'connected') && _.includes(Command.FLAGS.VALID_WHEN_LOADING, command.name)); | ||
if (writable) { | ||
this.connection.write(command.toWritable()); | ||
this.commandQueue.push(command); | ||
} else if (this.status === 'connected' && _.includes(Command.FLAGS.VALID_WHEN_LOADING, command.name)) { | ||
this.connection.write(command.toWritable()); | ||
this.commandQueue.push(command); | ||
} else { | ||
if (this.options.enableOfflineQueue) { | ||
this.offlineQueue.push(command); | ||
// Subscribe commands are special | ||
var isSubscribeCommand = _.includes(Command.FLAGS.ENTER_SUBSCRIBER_MODE, command.name); | ||
var isUnsubscribeCommand = !isSubscribeCommand && _.includes(Command.FLAGS.EXIT_SUBSCRIBER_MODE, command.name); | ||
if (isSubscribeCommand || isUnsubscribeCommand) { | ||
if (isSubscribeCommand && !this.condition.mode.subscriber) { | ||
this.condition.mode.subscriber = true; | ||
} | ||
var channels = command.args; | ||
if (isUnsubscribeCommand && channels.length === 0) { | ||
channels = this.subscriptionSet.channels(command.name); | ||
} | ||
command.remainReplies = channels.length; | ||
for (var i = 0; i < channels.length; ++i) { | ||
var channel = channels[i]; | ||
if (isSubscribeCommand) { | ||
this.subscriptionSet.add(command.name, channel); | ||
} else { | ||
this.subscriptionSet.del(command.name, channel); | ||
} | ||
this.subscriptionQueue.push(command.name, channel, command); | ||
} | ||
} else { | ||
command.reject(new Error('Stream isn\'t writeable and enableOfflineQueue options is false')); | ||
this.commandQueue.push(command); | ||
} | ||
} | ||
if (_.includes(Command.FLAGS.ENTER_SUBSCRIBER_MODE, command.name)) { | ||
if (_.includes(Command.FLAGS.WILL_DISCONNECT, command.name)) { | ||
this.manuallyClosing = true; | ||
} | ||
} else if (this.options.enableOfflineQueue) { | ||
this.offlineQueue.push(command); | ||
} else { | ||
command.reject(new Error('Stream isn\'t writeable and enableOfflineQueue options is false')); | ||
} | ||
if (_.includes(Command.FLAGS.WILL_DISCONNECT, command.name)) { | ||
this.manuallyClosing = true; | ||
} | ||
return command.promise; | ||
}; |
@@ -25,5 +25,3 @@ var Queue = require('fastqueue'); | ||
if (self.condition.auth) { | ||
console.log(self.condition.auth); | ||
self.sendCommand(new Command('auth', [self.condition.auth], 'utf8', function (err, res) { | ||
console.log('auth', err, res); | ||
if (res === 'OK') { | ||
@@ -145,14 +143,14 @@ return; | ||
var callback = function () { | ||
callback_count--; | ||
if (callback_count === 0) { | ||
self.emit('ready'); | ||
} | ||
callback_count--; | ||
if (callback_count === 0) { | ||
self.emit('ready'); | ||
} | ||
}; | ||
Object.keys(self.subscription_set).forEach(function (key) { | ||
var parts = key.split(" "); | ||
if (exports.debug_mode) { | ||
console.warn("sending pub/sub on_ready " + parts[0] + ", " + parts[1]); | ||
} | ||
callback_count++; | ||
self.send_command(parts[0] + "scribe", [parts[1]], callback); | ||
var parts = key.split(" "); | ||
if (exports.debug_mode) { | ||
console.warn("sending pub/sub on_ready " + parts[0] + ", " + parts[1]); | ||
} | ||
callback_count++; | ||
self.send_command(parts[0] + "scribe", [parts[1]], callback); | ||
}); | ||
@@ -159,0 +157,0 @@ } |
@@ -72,27 +72,3 @@ var _ = require('lodash'); | ||
var replyType = Array.isArray(reply) ? reply[0] : null; | ||
if (this.condition.mode.subscriber) { | ||
// If the reply is a message/pmessage, | ||
// then just emit it instead of considering it as a reply | ||
// TODO current we assume chennel name is a utf8 string, which may be incorrect. | ||
if (utils.bufferEqual(sharedBuffers.message, replyType)) { | ||
this.emit('message', reply[1].toString(), reply[2]); // channel, message | ||
return; | ||
} | ||
if (utils.bufferEqual(sharedBuffers.pmessage, replyType)) { | ||
this.emit('pmessage', reply[1].toString(), reply[2].toString(), reply[3]); // pattern, channel, message | ||
return; | ||
} | ||
// Some commands still valid in the subscriber mode(like unsubscribe, ping, quit etc). | ||
// So we don't return here. | ||
} | ||
var associatedCommand = this.commandQueue.shift(); | ||
if (!associatedCommand) { | ||
this.emit('error', new Error('ioRedis command queue state error. If you can reproduce this, please report it.')); | ||
return; | ||
} | ||
if (this.commandQueue.length === 0) { | ||
@@ -102,12 +78,20 @@ this.commandQueue = new Queue(); | ||
try { | ||
associatedCommand.resolve(reply); | ||
} catch (err) { | ||
associatedCommand.reject(err); | ||
} | ||
if (this.condition.mode.subscriber && !associatedCommand) { | ||
// If the reply is a message/pmessage, | ||
// then just emit it instead of considering it as a reply | ||
if (replyType) { | ||
_(['subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe']).some(function (type) { | ||
if (utils.bufferEqual(sharedBuffers[type], replyType)) { | ||
// TODO support binary channel name | ||
// TODO current we assume channel name is a utf8 string, | ||
// maybe binary channel name should also be supported. | ||
var replyType = Array.isArray(reply) ? reply[0].toString() : null; | ||
switch (replyType) { | ||
case 'message': | ||
this.emit('message', reply[1].toString(), reply[2]); // channel, message | ||
break; | ||
case 'pmessage': | ||
this.emit('pmessage', reply[1].toString(), reply[2].toString(), reply[3]); // pattern, channel, message | ||
break; | ||
case 'subscribe': | ||
case 'psubscribe': | ||
case 'unsubscribe': | ||
case 'punsubscribe': | ||
var channel = reply[1].toString(); | ||
@@ -117,13 +101,20 @@ var count = reply[2]; | ||
this.condition.mode.subscriber = false; | ||
debug('All subscriptions removed, exiting pub/sub mode'); | ||
} else { | ||
this.condition.mode.subscriber = true; | ||
} | ||
this.emit(type, channel, count); | ||
return true; | ||
} | ||
}); | ||
var command = this.subscriptionQueue.shift(replyType, channel); | ||
if (command) { | ||
command.remainReplies -= 1; | ||
if (command.remainReplies === 0) { | ||
command.resolve(count); | ||
} | ||
} | ||
break; | ||
default: | ||
this.emit('error', new Error('Subscription queue state error. If you can reproduce this, please report it.')); | ||
} | ||
} else if (!associatedCommand) { | ||
this.emit('error', new Error('Command queue state error. If you can reproduce this, please report it.')); | ||
} else { | ||
associatedCommand.resolve(reply); | ||
} | ||
}; | ||
@@ -0,1 +1,8 @@ | ||
/** Test if two buffers are equal | ||
* | ||
* @param {Buffer} a | ||
* @param {Buffer} b | ||
* @return {Boolean} Whether the two buffers are equal | ||
* @public | ||
*/ | ||
exports.bufferEqual = function (a, b) { | ||
@@ -15,1 +22,32 @@ if (typeof a.equals === 'function') { | ||
}; | ||
/** Convert a buffer to string, supports buffer array | ||
* | ||
* @param {*} value - The input value | ||
* @param {string} encoding - string encoding | ||
* @return {*} The result | ||
* @example | ||
* var input = [new Buffer('foo'), [new Buffer('bar')]]; | ||
* var res = convertBufferToString(input, 'utf8'); | ||
* expect(res).to.eql(['foo', ['bar']]); | ||
* @public | ||
*/ | ||
exports.convertBufferToString = function (value, encoding) { | ||
if (value instanceof Buffer) { | ||
return value.toString(encoding); | ||
} | ||
if (Array.isArray(value)) { | ||
var res = []; | ||
for (var i = 0; i < value.length; ++i) { | ||
if (value[i] instanceof Buffer) { | ||
value[i] = value[i].toString(encoding); | ||
} else if (Array.isArray(value[i])) { | ||
res[i] = exports.convertBufferToString(value[i], encoding); | ||
} else { | ||
res[i] = value[i]; | ||
} | ||
} | ||
return res; | ||
} | ||
return value; | ||
}; |
{ | ||
"name": "ioredis", | ||
"version": "0.0.3", | ||
"version": "0.0.4", | ||
"description": "A delightful, performance-focused Redis client for Node and io.js", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -8,3 +8,3 @@ ioredis | ||
**[WIP]** A delightful, performance-focused Redis client for Node and io.js | ||
**[Work In Progress]** A delightful, performance-focused Redis client for Node and io.js | ||
@@ -78,2 +78,33 @@ Support Redis >= 2.6.12 and (Node.js >= 0.11.6 or io.js). | ||
Pub/Sub | ||
------- | ||
Here is a simple example of the API for publish / subscribe. | ||
This program opens two client connections, subscribes to a channel on one of them, | ||
and publishes to that channel on the other: | ||
```javascript | ||
var Redis = require('ioredis'); | ||
var redis = new Redis(); | ||
var pub = new Redis(); | ||
redis.subscribe('news', 'music', function (err, count) { | ||
// Now both channel 'news' and 'music' are subscribed successfully. | ||
// `count` arg represents the number of channels we are currently subscribed to. | ||
pub.publish('news', 'Hello world!'); | ||
pub.publish('music', 'Hello again!'); | ||
}); | ||
redis.on('message', function (channel, message) { | ||
// Receive message Hello world! from channel news | ||
// Receive message Hello again! from channel music | ||
console.log('Receive message %s from channel %s', message, channel); | ||
}); | ||
``` | ||
When a client issues a SUBSCRIBE or PSUBSCRIBE, that connection is put into a "subscriber" mode. | ||
At that point, only commands that modify the subscription set are valid. | ||
When the subscription set is empty, the connection is put back into regular mode. | ||
If you need to send regular commands to Redis while in subscriber mode, just open another connection. | ||
Handle Binary Data | ||
@@ -94,1 +125,26 @@ ------------ | ||
``` | ||
Motivation | ||
---------------------- | ||
Firstly we used the Redis client [node_redis](https://github.com/mranney/node_redis), | ||
however over a period of time we found out it's not robust enough for us to use | ||
in the production environment. The library has some not trivial bugs and many unresolved | ||
issues in the GitHub(165 so far), for instance: | ||
```javascript | ||
var redis = require('redis'); | ||
var client = redis.createClient(); | ||
client.set('foo', 'message'); | ||
client.set('bar', 'Hello world'); | ||
client.mget('foo', 'bar'); | ||
client.subscribe('channel'); | ||
client.on('message', function (msg) { | ||
// Will print "Hello world", although no `publish` in invoked. | ||
console.log('received ', msg); | ||
}); | ||
``` | ||
I submited some pull requests but sadly none of them has been merged, so here ioredis is. |
13
test.js
var Redis = require('./'); | ||
var redis = new Redis(); | ||
var pub = new Redis(); | ||
redis.subscribe('news', 'music', function (err, count) { | ||
// Now both channel 'news' and 'music' are subscribed successfully. | ||
// `count` arg represents the number of channels we are currently subscribed to. | ||
redis.eval("return {1,2,{3,'Hello World!'}}", 0, function (err, result) { | ||
console.log(err, result); | ||
pub.publish('news', 'Hello world!'); | ||
pub.publish('music', 'Hello again!'); | ||
}); | ||
redis.on('message', function (channel, message) { | ||
console.log('Receive message %s from channel %s', message, channel); | ||
}); |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
668836
55
2403
148