Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

persistence

Package Overview
Dependencies
Maintainers
11
Versions
26
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

persistence - npm Package Compare versions

Comparing version 1.0.10 to 2.0.0-alpha.4

68

lib/connection.js

@@ -1,4 +0,3 @@

var redisLib = require('redis'),
var Redis = require('ioredis'),
Tracker = require('callback_tracker'),
sentinelLib = require('redis-sentinel'),
logging = require('minilog')('connection');

@@ -20,3 +19,6 @@

this.name = name;
this.config = config;
this.config = {
...config,
enableReadyCheck: true,
};
this.client = null;

@@ -51,4 +53,6 @@ this.subscriber = null;

Connection.prototype.isReady = function() {
return (this.client && this.client.connected &&
this.subscriber && this.subscriber.connected);
return this.client
&& this.client.status === 'ready'
&& this.subscriber
&& this.subscriber.status === 'ready';
};

@@ -59,4 +63,3 @@

this.connectRedis(config, callback);
}
else {
} else {
this.connectSentinel(config, callback);

@@ -71,3 +74,2 @@ }

this.readyListeners.push(ready);
if (this.isReady()) {

@@ -77,3 +79,3 @@ return this.establishDone();

if (this.readyListeners.length == 1) {
if (this.readyListeners.length === 1) {
var tracker = Tracker.create('establish :' + this.name , function() {

@@ -136,3 +138,8 @@ self.establishDone();

Connection.prototype.connectRedis = function (config, callback) {
var client = redisLib.createClient(config.port, config.host);
var client = new Redis({
port: config.port,
host: config.host,
enableReadyCheck: true,
showFriendlyErrorStack: true
});
if (config.redis_auth) {

@@ -142,6 +149,10 @@ client.auth(config.redis_auth);

client.once('ready', function() {
client.on('ready', function() {
logging.info('Redis client "ready" event.');
callback(null, client);
});
client.on('error', function(error) {
logging.info({ message: `Redis client error: ${error.message}`});
});
};

@@ -154,4 +165,3 @@

sentinelMaster = config.id,
sentinels = config.sentinels,
self = this;
sentinels = config.sentinels;

@@ -167,23 +177,17 @@ if (!sentinels || !sentinels.length || !sentinelMaster) {

sentinel = sentinelLib.createClient(sentinels, sentinelMaster, options);
sentinel = new Redis({
sentinels,
name: sentinelMaster,
sentinelPassword: options.auth_pass,
enableReadyCheck: true,
showFriendlyErrorStack: true
});
sentinel.send_command('SENTINEL', ['get-master-addr-by-name', sentinelMaster], function(error, master) {
sentinel.quit();
sentinel.on('ready', function() {
logging.info('Sentinel client "ready" event.');
callback(null, sentinel);
});
if (error) {
callback(error);
}
if (!master || master.length != 2) {
propagateError(callback, new Error("Unknown master " + sentinelMaster));
return;
}
var newConfig = {
host: master[0],
port: master[1],
redis_auth: config.redis_auth
};
self.connectRedis(newConfig, callback);
sentinel.on('error', function(error) {
logging.info({ message: `Sentinel client error: ${error.message}`});
});

@@ -190,0 +194,0 @@ };

@@ -33,20 +33,21 @@ var Persistence = require('./persistence.js'),

//success
var multi = Persistence.redis().multi();
multi.incr(self.scope, function(error, value) {
if(error) {
const multi = Persistence.redis().multi();
multi.incr(self.scope);
if(self.expiry) {
multi.expire(self.scope, self.expiry);
}
multi.exec(function (error, reply) {
if (error) {
self.processing = false;
// attempt to release the lock
idgen.lock.release();
throw new Error(error);
}
var callback = self.listeners.shift();
if(callback) callback(null, value);
const callback = self.listeners.shift();
if (callback) {
callback(null, reply[0][1])
};
self.processing = false;
self.lock.release();
});
if(self.expiry) {
multi.expire(self.scope, self.expiry);
}
multi.exec();
});

@@ -53,0 +54,0 @@ };

@@ -18,30 +18,27 @@ var Persistence = require('./persistence.js'),

List.prototype.info = function(callback) {
var start, end, size;
var multi = Persistence.redis().multi();
multi.lrange(this.name, 0, 0, function(error, values) {
if(error) throw new Error(error);
if(values.length == 1) {
start = JSON.parse(values[0]).id;
}
});
multi.lrange(this.name, -1, -1, function(error, values) {
if(error) throw new Error(error);
if(values.length == 1) {
end = JSON.parse(values[0]).id;
}
});
multi.llen(this.name, function(error, value) {
if(error) throw new Error(error);
size = value;
});
multi.exec(function(error) {
if(callback) callback(error, start, end, size);
});
multi
.lrange(this.name, 0, 0)
.lrange(this.name, -1, -1)
.llen(this.name)
.exec(function(error, replies) {
if (error) {
throw new Error(error)
}
const list = [];
replies.forEach((response) => {
try {
var decodedData = JSON.parse(response[1]);
list.push(decodedData.id ? decodedData.id : decodedData);
} catch (err) {
logging.error(err)
list.push(undefined);
}
});
if (callback) {
const [start, end, size] = list;
callback(error, start, end, size);
}
});
};

@@ -70,3 +67,3 @@

if(startOffset === null) {
if (startOffset === null) {
if(callback) callback('sync-error');

@@ -77,3 +74,3 @@ return;

redis.lrange(this.name, startOffset, -1, function(error, entries) {
if(error) throw error;
if (error) throw error;

@@ -107,7 +104,7 @@ var parsed = [];

if(list.maxLength > 0) {
if (list.maxLength > 0) {
multi.rpush(list.name, JSON.stringify(entry), Persistence.handler);
}
if(list.expiry) {
if (list.expiry) {
multi.expire(list.name, list.expiry, Persistence.handler);

@@ -119,7 +116,7 @@ } else {

if(list.maxLength) {
if (list.maxLength) {
multi.ltrim(list.name, -list.maxLength, -1, Persistence.handler);
}
multi.exec(function(err) {
multi.exec(function(err, replies) {
if(callback) callback(err, entry);

@@ -126,0 +123,0 @@ });

@@ -1,2 +0,3 @@

var Persistence = require('./persistence.js');
var Persistence = require('./persistence.js'),
logging = require('minilog')('lock');

@@ -17,6 +18,6 @@ var DEFAULT_TIMEOUT_SEC = 10;

redis.set(this.scope, 'locked', 'EX', this.timeout, 'NX', function(error, val) {
if(val != 'OK') {
if (val !== 'OK') {
lock.setupExpiry();
}
if(callback) callback(error, (val === 'OK'));
if (callback) callback(error, (val === 'OK'));
});

@@ -33,3 +34,3 @@ };

Lock.prototype.cleanupExpiry = function() {
if(this.timer) {
if (this.timer) {
clearTimeout(this.timer);

@@ -41,7 +42,12 @@ delete this.timer;

Lock.prototype.release = function() {
Persistence.redis().del(this.scope);
this.cleanupExpiry();
this.emit('released');
Persistence
.redis()
.del(this.scope)
.then((function() {
this.cleanupExpiry();
this.emit('released');
}).bind(this))
.catch((err) => logging.error(err));
};
module.exports = Lock;
var logging = require('minilog')('persistence'),
ConnectionHelper = require('./connection_helper.js'),
// defaults
connectionName = 'default',
connection = {},

@@ -10,3 +9,3 @@ connected = false,

function Persistence() { }
var Persistence = { }

@@ -25,11 +24,2 @@ Persistence.connect = function(done) {

});
var timeout = process.env.CONNECT_TIMEOUT || 10000;
setTimeout(function () {
if (!connection.isReady()) {
logging.error('connection not ready after waiting', timeout/1000, 'seconds');
throw new Error('connection not ready');
}
}, timeout);
} else {

@@ -47,4 +37,4 @@ connection.readyListeners.push(done);

}
if(!connection.client || !connection.client.connected) {
throw new Error('Client: Not connected to redis');
if(!connection.client || connection.subscriber.status !== 'ready') {
logging.error('Client: Not connected to redis');
}

@@ -58,4 +48,4 @@ return connection.client;

}
if(!connection.subscriber || !connection.subscriber.connected) {
throw new Error('Pubsub: Not connected to redis');
if(!connection.subscriber || connection.subscriber.status !== 'ready') {
logging.error('Pubsub: Not connected to redis');
}

@@ -72,8 +62,5 @@ return connection.subscriber;

Persistence.applyPolicy = function(multi, key, policy) {
Persistence.applyPolicy = function (multi, key, policy) {
if(policy.maxCount) {
multi.zremrangebyrank(key, 0, -policy.maxCount-1, function(err, res) {
logging.debug('Enforce max count: '+(0-policy.maxCount-1)+' removed '+res);
if(err) throw new Error(err);
});
multi.zremrangebyrank(key, 0, -policy.maxCount-1);
}

@@ -83,6 +70,3 @@

var maxScore = Date.now()-policy.maxAgeSeconds * 1000;
multi.zremrangebyscore(key, 0, maxScore, function(err, res) {
logging.debug('Enforce max age ('+key+'): '+new Date(maxScore).toUTCString()+' removed '+res);
if(err) throw new Error(err);
});
multi.zremrangebyscore(key, 0, maxScore);
}

@@ -93,6 +77,10 @@ };

var multi = Persistence.redis().multi();
var replyCount = 0;
switch(arguments.length) {
case 3:
if (policy) Persistence.applyPolicy(multi, key, policy);
if (policy) {
if (policy.maxCount) replyCount ++
if (policy.maxAgeSeconds) replyCount ++
Persistence.applyPolicy(multi, key, policy);
}
break;

@@ -104,18 +92,19 @@ case 2:

// sync up to 100 messages, starting from the newest
multi.zrange(key, -100, -1, 'WITHSCORES', function (err, replies) {
multi.zrange(key, -100, -1, 'WITHSCORES');
multi.exec(function(err, replies) {
if(err) throw new Error(err);
logging.debug(key+' '+ (replies.length /2) + ' items to sync');
// (nherment) TODO: deserialize the result here because it is being serialized in persistOrdered()
// The problem is that radar_client currently deserializes the response.
// We need to make the client not deserialize the response so that we can deserialize it here.
callback(replies);
callback(replies[replyCount][1]);
});
multi.exec();
};
Persistence.persistOrdered = function(key, value, callback) {
Persistence.redis().zadd(key, Date.now(), JSON.stringify(value), callback);
Persistence
.redis()
.zadd(key, Date.now(), JSON.stringify(value))
.then(() => {
if (callback) callback()
}, (err) => {
if (callback) callback(err)
});
};

@@ -149,23 +138,30 @@

logging.debug('readHashValue:', hash, key);
Persistence.redis().hget(hash, key, function(err, reply) {
if (err) throw new Error(err);
callback(JSON.parse(reply));
});
Persistence.redis()
.hget(hash, key)
.then((reply) => {
callback(JSON.parse(reply));
}, () => {
if (err) throw new Error(err);
})
};
Persistence.readHashAll = function(hash, callback) {
Persistence.redis().hgetall(hash, function (err, replies) {
if(err) throw new Error(err);
if(replies) {
Object.keys(replies).forEach(function(attr) {
try {
replies[attr] = JSON.parse(replies[attr]);
} catch(parseError) {
logging.error('Corrupted key value in redis [' + hash + '][' + attr + ']. ' + parseError.message + ': '+ parseError.stack);
delete replies[attr];
}
});
}
callback(replies);
});
Persistence
.redis()
.hgetall(hash)
.then((replies) => {
if(replies) {
Object.keys(replies).forEach(function(attr) {
try {
replies[attr] = JSON.parse(replies[attr]);
} catch(parseError) {
logging.error('Corrupted key value in redis [' + hash + '][' + attr + ']. ' + parseError.message + ': '+ parseError.stack);
delete replies[attr];
}
});
}
callback(replies);
}, (err) => {
if(err) throw new Error(err);
});
};

@@ -178,6 +174,3 @@

logging.debug('readKey:', key);
multi.get(key, function (err, reply) {
if (err) throw new Error(err);
callback(JSON.parse(reply));
});
multi.get(key);

@@ -188,3 +181,6 @@ if (expireTTL) {

multi.exec();
multi.exec(function (err, replies) {
if (err || replies[0][0]) throw new Error(err);
callback(JSON.parse(replies[0][1]))
});
};

@@ -196,3 +192,3 @@

logging.debug('persistKey:', key, value);
multi.set(key, JSON.stringify(value), Persistence.handler);
multi.set(key, JSON.stringify(value));
if (expireTTL) {

@@ -212,3 +208,3 @@ multi.expire(key, expireTTL);

logging.debug('expire', key, seconds);
Persistence.redis().expire(key, seconds, Persistence.handler);
Persistence.redis().expire(key, seconds, Persistence.handler)
};

@@ -250,16 +246,3 @@

}
if (/READONLY/.test(String(err))) {
logging.debug('Failover happened, about to reconnect.');
Persistence.reconnect(function () {
if (connection.isReady()) {
logging.debug('Connected successfully after failover happened.');
} else {
throw new Error('Failed to reconnect to redis after failover happened.')
}
});
} else {
logging.error(err);
}
logging.error(err);
}

@@ -276,14 +259,4 @@ };

Persistence.reconnect = function (callback) {
Persistence.disconnect(function() {
ConnectionHelper.destroyConnection(configuration, function () {
Persistence.connect(callback);
})
})
}
Persistence.isConnectionReady = () => Persistence.redis() && Persistence.pubsub();
Persistence.isConnectionReady = function () {
return connection.isReady();
}
module.exports = Persistence;
{
"name": "persistence",
"version": "1.0.10",
"version": "2.0.0-alpha.4",
"description": "An abstraction library for redis and sentinel connection management",

@@ -30,5 +30,4 @@ "main": "lib/index.js",

"dependencies": {
"redis": "^0.12.1",
"redis-sentinel": "zendesk/node-redis-sentinel.git"
"ioredis": "^4.14.1"
}
}

@@ -32,3 +32,2 @@ var assert = require('assert'),

};
describe('with redis configuration', function() {

@@ -52,4 +51,6 @@ var helper_config = {

connection.establish(function() {
assert.deepEqual(connection.config, { host: 'localhost', port: 16379 });
ConnectionHelper.destroyConnection(config, done);
assert.deepEqual(connection.config, { host: 'localhost', port: 16379, enableReadyCheck: true });
connection.teardown(function() {
ConnectionHelper.destroyConnection(config, done);
})
});

@@ -63,5 +64,7 @@ });

connection.establish(function() {
assert.deepEqual(connection.config, { host: 'localhost', port: 16379 });
assert.deepEqual(connection.config, { host: 'localhost', port: 16379, enableReadyCheck: true });
assert.deepEqual(connection, ConnectionHelper.connection(config));
ConnectionHelper.destroyConnection(config, done);
connection.teardown(function() {
ConnectionHelper.destroyConnection(config, done);
})
});

@@ -93,6 +96,4 @@ });

it('should connect', function(done) {
var config = JSON.parse(JSON.stringify(configuration));
config.use_connection = 'sentinel';
var connection = ConnectionHelper.connection(config);

@@ -105,4 +106,5 @@ connection.establish(function() {

assert.deepStrictEqual(received_sentinels, expected_sentinels);
ConnectionHelper.destroyConnection(config, done);
connection.teardown(function() {
ConnectionHelper.destroyConnection(config, done);
})
});

@@ -119,7 +121,8 @@ });

assert.deepEqual(connection, ConnectionHelper.connection(config));
ConnectionHelper.destroyConnection(config, done);
connection.teardown(function() {
ConnectionHelper.destroyConnection(config, done);
})
});
});
});
describe('given a connection', function() {

@@ -158,3 +161,2 @@ var connection;

});
});

@@ -82,3 +82,2 @@ var assert = require('assert');

Persistence.readOrderedWithScores(key, undefined, function(replies) {
assert(replies instanceof Array);

@@ -106,3 +105,2 @@ assert.equal(2, replies.length);

Persistence.readOrderedWithScores(key, undefined, function(replies) {
assert(replies instanceof Array);

@@ -109,0 +107,0 @@ assert.equal(2, replies.length);

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc