Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

ioredis

Package Overview
Dependencies
Maintainers
1
Versions
228
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

ioredis - npm Package Compare versions

Comparing version 0.0.3 to 0.0.4

.idea/.name

44

lib/command.js
var _ = require('lodash');
var Promise = require('bluebird');
var fbuffer = require('flexbuffer');
var utils = require('./utils');
/**

@@ -107,8 +107,16 @@ * Command instance

// Convert buffer/buffer[] to string/string[]
if (_this.replyEncoding) {
value = convertBufferToString(value, _this.replyEncoding);
var result = value;
var transformer;
try {
if (_this.replyEncoding) {
result = utils.convertBufferToString(value, _this.replyEncoding);
}
transformer = Command.transformer.reply[_this.name];
if (transformer) {
result = transformer(result);
}
resolve(result);
} catch (err) {
_this.reject(err);
}
var transformer = Command.transformer.reply[_this.name];
resolve(transformer ? transformer(value) : value);
return _this.promise;

@@ -124,3 +132,5 @@ };

// Commands that will turn current connection into subscriber mode
ENTER_SUBSCRIBER_MODE: ['subscribe', 'psubscribe', 'unsubscribe', 'punsubscribe'],
ENTER_SUBSCRIBER_MODE: ['subscribe', 'psubscribe'],
// Commands that may make current connection quit subscriber mode
EXIT_SUBSCRIBER_MODE: ['unsubscribe', 'punsubscribe'],
// Commands that will make client disconnect from server TODO shutdown?

@@ -136,21 +146,1 @@ WILL_DISCONNECT: ['quit']

module.exports = Command;
function convertBufferToString(value, encoding) {
if (value instanceof Buffer) {
return value.toString(encoding);
}
if (Array.isArray(value)) {
var res = [];
for (var i = 0; i < value.length; ++i) {
if (value[i] instanceof Buffer) {
value[i] = value[i].toString(encoding);
} else if (Array.isArray(value[i])) {
res[i] = convertBufferToString(value[i], encoding);
} else {
res[i] = value[i];
}
}
return res;
}
return value;
}

@@ -8,2 +8,4 @@ var _ = require('lodash');

var Command = require('./command');
var SubscriptionSet = require('./subscription_set');
var SubscriptionQueue = require('./subscription_queue');

@@ -32,3 +34,3 @@ /**

* To work around this, when this option is `true`,
* ioRedis will check the status of the Redis server,
* ioredis will check the status of the Redis server,
* and when the Redis server is able to process commands,

@@ -91,2 +93,4 @@ * a `ready` event will be emitted.

this.offlineQueue = new Queue();
this.subscriptionSet = new SubscriptionSet();
this.subscriptionQueue = new SubscriptionQueue();

@@ -93,0 +97,0 @@ // disconnected -> connected -> ready -> closing -> closed

@@ -79,3 +79,3 @@ var _ = require('lodash');

* Most of the time you won't invoke this method directly.
* However when you want to send a command that is not supported by ioRedis yet,
* However when you want to send a command that is not supported by ioredis yet,
* this command will be useful.

@@ -110,24 +110,41 @@ *

if (this.status === 'ready') {
var writable = (this.status === 'ready') || ((this.status === 'connected') && _.includes(Command.FLAGS.VALID_WHEN_LOADING, command.name));
if (writable) {
this.connection.write(command.toWritable());
this.commandQueue.push(command);
} else if (this.status === 'connected' && _.includes(Command.FLAGS.VALID_WHEN_LOADING, command.name)) {
this.connection.write(command.toWritable());
this.commandQueue.push(command);
} else {
if (this.options.enableOfflineQueue) {
this.offlineQueue.push(command);
// Subscribe commands are special
var isSubscribeCommand = _.includes(Command.FLAGS.ENTER_SUBSCRIBER_MODE, command.name);
var isUnsubscribeCommand = !isSubscribeCommand && _.includes(Command.FLAGS.EXIT_SUBSCRIBER_MODE, command.name);
if (isSubscribeCommand || isUnsubscribeCommand) {
if (isSubscribeCommand && !this.condition.mode.subscriber) {
this.condition.mode.subscriber = true;
}
var channels = command.args;
if (isUnsubscribeCommand && channels.length === 0) {
channels = this.subscriptionSet.channels(command.name);
}
command.remainReplies = channels.length;
for (var i = 0; i < channels.length; ++i) {
var channel = channels[i];
if (isSubscribeCommand) {
this.subscriptionSet.add(command.name, channel);
} else {
this.subscriptionSet.del(command.name, channel);
}
this.subscriptionQueue.push(command.name, channel, command);
}
} else {
command.reject(new Error('Stream isn\'t writeable and enableOfflineQueue options is false'));
this.commandQueue.push(command);
}
}
if (_.includes(Command.FLAGS.ENTER_SUBSCRIBER_MODE, command.name)) {
if (_.includes(Command.FLAGS.WILL_DISCONNECT, command.name)) {
this.manuallyClosing = true;
}
} else if (this.options.enableOfflineQueue) {
this.offlineQueue.push(command);
} else {
command.reject(new Error('Stream isn\'t writeable and enableOfflineQueue options is false'));
}
if (_.includes(Command.FLAGS.WILL_DISCONNECT, command.name)) {
this.manuallyClosing = true;
}
return command.promise;
};

@@ -25,5 +25,3 @@ var Queue = require('fastqueue');

if (self.condition.auth) {
console.log(self.condition.auth);
self.sendCommand(new Command('auth', [self.condition.auth], 'utf8', function (err, res) {
console.log('auth', err, res);
if (res === 'OK') {

@@ -145,14 +143,14 @@ return;

var callback = function () {
callback_count--;
if (callback_count === 0) {
self.emit('ready');
}
callback_count--;
if (callback_count === 0) {
self.emit('ready');
}
};
Object.keys(self.subscription_set).forEach(function (key) {
var parts = key.split(" ");
if (exports.debug_mode) {
console.warn("sending pub/sub on_ready " + parts[0] + ", " + parts[1]);
}
callback_count++;
self.send_command(parts[0] + "scribe", [parts[1]], callback);
var parts = key.split(" ");
if (exports.debug_mode) {
console.warn("sending pub/sub on_ready " + parts[0] + ", " + parts[1]);
}
callback_count++;
self.send_command(parts[0] + "scribe", [parts[1]], callback);
});

@@ -159,0 +157,0 @@ }

@@ -72,27 +72,3 @@ var _ = require('lodash');

var replyType = Array.isArray(reply) ? reply[0] : null;
if (this.condition.mode.subscriber) {
// If the reply is a message/pmessage,
// then just emit it instead of considering it as a reply
// TODO current we assume chennel name is a utf8 string, which may be incorrect.
if (utils.bufferEqual(sharedBuffers.message, replyType)) {
this.emit('message', reply[1].toString(), reply[2]); // channel, message
return;
}
if (utils.bufferEqual(sharedBuffers.pmessage, replyType)) {
this.emit('pmessage', reply[1].toString(), reply[2].toString(), reply[3]); // pattern, channel, message
return;
}
// Some commands still valid in the subscriber mode(like unsubscribe, ping, quit etc).
// So we don't return here.
}
var associatedCommand = this.commandQueue.shift();
if (!associatedCommand) {
this.emit('error', new Error('ioRedis command queue state error. If you can reproduce this, please report it.'));
return;
}
if (this.commandQueue.length === 0) {

@@ -102,12 +78,20 @@ this.commandQueue = new Queue();

try {
associatedCommand.resolve(reply);
} catch (err) {
associatedCommand.reject(err);
}
if (this.condition.mode.subscriber && !associatedCommand) {
// If the reply is a message/pmessage,
// then just emit it instead of considering it as a reply
if (replyType) {
_(['subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe']).some(function (type) {
if (utils.bufferEqual(sharedBuffers[type], replyType)) {
// TODO support binary channel name
// TODO current we assume channel name is a utf8 string,
// maybe binary channel name should also be supported.
var replyType = Array.isArray(reply) ? reply[0].toString() : null;
switch (replyType) {
case 'message':
this.emit('message', reply[1].toString(), reply[2]); // channel, message
break;
case 'pmessage':
this.emit('pmessage', reply[1].toString(), reply[2].toString(), reply[3]); // pattern, channel, message
break;
case 'subscribe':
case 'psubscribe':
case 'unsubscribe':
case 'punsubscribe':
var channel = reply[1].toString();

@@ -117,13 +101,20 @@ var count = reply[2];

this.condition.mode.subscriber = false;
debug('All subscriptions removed, exiting pub/sub mode');
} else {
this.condition.mode.subscriber = true;
}
this.emit(type, channel, count);
return true;
}
});
var command = this.subscriptionQueue.shift(replyType, channel);
if (command) {
command.remainReplies -= 1;
if (command.remainReplies === 0) {
command.resolve(count);
}
}
break;
default:
this.emit('error', new Error('Subscription queue state error. If you can reproduce this, please report it.'));
}
} else if (!associatedCommand) {
this.emit('error', new Error('Command queue state error. If you can reproduce this, please report it.'));
} else {
associatedCommand.resolve(reply);
}
};

@@ -0,1 +1,8 @@

/** Test if two buffers are equal
*
* @param {Buffer} a
* @param {Buffer} b
* @return {Boolean} Whether the two buffers are equal
* @public
*/
exports.bufferEqual = function (a, b) {

@@ -15,1 +22,32 @@ if (typeof a.equals === 'function') {

};
/** Convert a buffer to string, supports buffer array
*
* @param {*} value - The input value
* @param {string} encoding - string encoding
* @return {*} The result
* @example
* var input = [new Buffer('foo'), [new Buffer('bar')]];
* var res = convertBufferToString(input, 'utf8');
* expect(res).to.eql(['foo', ['bar']]);
* @public
*/
exports.convertBufferToString = function (value, encoding) {
if (value instanceof Buffer) {
return value.toString(encoding);
}
if (Array.isArray(value)) {
var res = [];
for (var i = 0; i < value.length; ++i) {
if (value[i] instanceof Buffer) {
value[i] = value[i].toString(encoding);
} else if (Array.isArray(value[i])) {
res[i] = exports.convertBufferToString(value[i], encoding);
} else {
res[i] = value[i];
}
}
return res;
}
return value;
};
{
"name": "ioredis",
"version": "0.0.3",
"version": "0.0.4",
"description": "A delightful, performance-focused Redis client for Node and io.js",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -8,3 +8,3 @@ ioredis

**[WIP]** A delightful, performance-focused Redis client for Node and io.js
**[Work In Progress]** A delightful, performance-focused Redis client for Node and io.js

@@ -78,2 +78,33 @@ Support Redis >= 2.6.12 and (Node.js >= 0.11.6 or io.js).

Pub/Sub
-------
Here is a simple example of the API for publish / subscribe.
This program opens two client connections, subscribes to a channel on one of them,
and publishes to that channel on the other:
```javascript
var Redis = require('ioredis');
var redis = new Redis();
var pub = new Redis();
redis.subscribe('news', 'music', function (err, count) {
// Now both channel 'news' and 'music' are subscribed successfully.
// `count` arg represents the number of channels we are currently subscribed to.
pub.publish('news', 'Hello world!');
pub.publish('music', 'Hello again!');
});
redis.on('message', function (channel, message) {
// Receive message Hello world! from channel news
// Receive message Hello again! from channel music
console.log('Receive message %s from channel %s', message, channel);
});
```
When a client issues a SUBSCRIBE or PSUBSCRIBE, that connection is put into a "subscriber" mode.
At that point, only commands that modify the subscription set are valid.
When the subscription set is empty, the connection is put back into regular mode.
If you need to send regular commands to Redis while in subscriber mode, just open another connection.
Handle Binary Data

@@ -94,1 +125,26 @@ ------------

```
Motivation
----------------------
Firstly we used the Redis client [node_redis](https://github.com/mranney/node_redis),
however over a period of time we found out it's not robust enough for us to use
in the production environment. The library has some not trivial bugs and many unresolved
issues in the GitHub(165 so far), for instance:
```javascript
var redis = require('redis');
var client = redis.createClient();
client.set('foo', 'message');
client.set('bar', 'Hello world');
client.mget('foo', 'bar');
client.subscribe('channel');
client.on('message', function (msg) {
// Will print "Hello world", although no `publish` in invoked.
console.log('received ', msg);
});
```
I submited some pull requests but sadly none of them has been merged, so here ioredis is.
var Redis = require('./');
var redis = new Redis();
var pub = new Redis();
redis.subscribe('news', 'music', function (err, count) {
// Now both channel 'news' and 'music' are subscribed successfully.
// `count` arg represents the number of channels we are currently subscribed to.
redis.eval("return {1,2,{3,'Hello World!'}}", 0, function (err, result) {
console.log(err, result);
pub.publish('news', 'Hello world!');
pub.publish('music', 'Hello again!');
});
redis.on('message', function (channel, message) {
console.log('Receive message %s from channel %s', message, channel);
});

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc