Comparing version 0.0.11 to 0.1.0
@@ -1,44 +0,128 @@ | ||
var CacheServer = require('./CacheServer') | ||
var poolModule = require('generic-pool') | ||
var util = require('util') | ||
var Q = require('kew') | ||
var CacheInstance = require('./CacheInstance') | ||
var ConsistentHasher = require('./ConsistentHasher') | ||
var common = require('./common') | ||
function CacheCluster(opts) { | ||
CacheInstance.call(this) | ||
function CacheCluster(options) { | ||
options = options || {} | ||
opts = opts || {} | ||
this._opts = { | ||
create: opts.create, | ||
nodesPerRead: opts.nodesPerRead || 3, | ||
nodesPerWrite: opts.nodesPerWrite || 3 | ||
} | ||
this._clientsPerKey = options.clientsPerKey || 3 | ||
this._clientCtor = options.clientCtor | ||
this._state = { | ||
shouldConnect: false | ||
} | ||
this._servers = {} | ||
this._capacityIntervals = {} | ||
this._capacityWarmUpMs = {} | ||
this._currentCapacities = {} | ||
this._targetCapacities = {} | ||
this._hasher = new ConsistentHasher | ||
} | ||
util.inherits(CacheCluster, CacheInstance) | ||
/** | ||
* Set the capacity for a server | ||
* | ||
* @param {string} uri colon-delimited uri with port | ||
* @param {number} capacity the capacity for the server | ||
* @param {number} msPerCapacityUnit number of milliseconds between capacity units | ||
*/ | ||
CacheCluster.prototype.setServerCapacity = function (uri, capacity, msPerCapacityUnit) { | ||
if (!this._servers[uri]) { | ||
this._servers[uri] = new CacheServer(this._clientCtor, uri) | ||
this._servers[uri].on('capacity', this._onCapacity.bind(this, uri)) | ||
this._servers[uri].on('error', function () {}) | ||
CacheCluster.prototype.setCapacity = function (uri, capacity, opts, warmUpMs) { | ||
if (!capacity && !this._servers[uri]) return | ||
if (capacity > 0 && !this._targetCapacities[uri]) { | ||
var self = this | ||
self._currentCapacities[uri] = 0 | ||
self._targetCapacities[uri] = 0 | ||
self._setTargetCapacity(uri, capacity, warmUpMs) | ||
// add a cache instance to the cluster | ||
this._opts.create(uri, opts, function (err, cacheInstance) { | ||
if (err) return | ||
self._servers[uri] = cacheInstance | ||
self._setTargetCapacity(uri) | ||
}) | ||
} else { | ||
// update the cache capacity for an instance | ||
this._setTargetCapacity(uri, capacity, warmUpMs) | ||
} | ||
} | ||
try { | ||
this._servers[uri].setCapacity(capacity, msPerCapacityUnit) | ||
} catch (e) { | ||
console.error(e.stack) | ||
CacheCluster.prototype._resetCapacityInterval = function (uri) { | ||
if (this._capacityIntervals[uri]) { | ||
// clear any running timers | ||
clearInterval(this._capacityIntervals[uri]) | ||
delete this._capacityIntervals[uri] | ||
} | ||
} | ||
CacheCluster.prototype._setTargetCapacity = function (uri, capacity, warmUpMs) { | ||
if (typeof capacity === 'undefined' && typeof warmUpMs === 'undefined') { | ||
// no args means we should use the last known value for this uri | ||
capacity = this._targetCapacities[uri] | ||
warmUpMs = this._capacityWarmUpMs[uri] | ||
} | ||
capacity = Math.floor(capacity) | ||
this._resetCapacityInterval(uri) | ||
// if the current and target capacities match the specified capacity, nothing needs to be done | ||
if (capacity === this._targetCapacities[uri] && capacity === this._currentCapacities[uri]) return | ||
// keep track of the capacities and warm up times for stopping and starting this cluster | ||
this._targetCapacities[uri] = capacity | ||
this._capacityWarmUpMs[uri] = warmUpMs | ||
// if the cluster isn't connected, just exist | ||
if (!this._state.shouldConnect) return | ||
if (!warmUpMs || warmUpMs < 1) { | ||
// warm immediately | ||
this._currentCapacities[uri] = capacity | ||
this._hasher.setNodeCapacity(uri, capacity) | ||
} else { | ||
if (!this._servers[uri]) return | ||
// warm with 1 capacity unit every n millis | ||
var self = this | ||
this._capacityIntervals[uri] = setInterval(function () { | ||
if (!self._servers[uri] || self._targetCapacities[uri] === self._currentCapacities[uri]) { | ||
clearInterval(self._capacityIntervals[uri]) | ||
} else { | ||
self._currentCapacities[uri] += (self._currentCapacities[uri] < self._targetCapacities[uri] ? 1 : -1) | ||
self._hasher.setNodeCapacity(uri, self._currentCapacities[uri]) | ||
} | ||
}, warmUpMs) | ||
} | ||
} | ||
/** | ||
* Retrieve an array of memcache clients for each key requested | ||
* | ||
* @param {Array.<string>} keys an array of keys | ||
* @return {Object} a map of keys to arrays of memcache clients | ||
*/ | ||
CacheCluster.prototype.getClientsForKeys = function (keys) { | ||
CacheCluster.prototype.isAvailable = function () { | ||
for (var key in this._servers) { | ||
if (this._servers[key].isAvailable()) return true | ||
} | ||
return false | ||
} | ||
CacheCluster.prototype.connect = function () { | ||
this._state.shouldConnect = true | ||
for (var uri in this._servers) { | ||
this._servers[uri].connect() | ||
this._setTargetCapacity(uri) | ||
} | ||
this.emit('connect') | ||
} | ||
CacheCluster.prototype.disconnect = function () { | ||
this._state.shouldConnect = false | ||
for (var uri in this._servers) { | ||
this._resetCapacityInterval(uri) | ||
this._servers[uri].disconnect() | ||
} | ||
this.emit('disconnect') | ||
} | ||
CacheCluster.prototype._getCacheInstancesForKeys = function (keys) { | ||
var uris = this._hasher.getNodesForKeys(keys, this._clientsPerKey) | ||
@@ -55,4 +139,4 @@ | ||
if (this._servers[uri] && this._servers[uri].getStatus() === common.SERVER_STATUS.CONNECTED) { | ||
clients[key].push(this._servers[uri].getClient()) | ||
if (this._servers[uri] && this._servers[uri].isAvailable()) { | ||
clients[key].push(this._servers[uri]) | ||
} | ||
@@ -65,19 +149,53 @@ } | ||
CacheCluster.prototype.getStats = function (key) { | ||
var stats = {} | ||
CacheCluster.prototype.destroy = function () { | ||
this.disconnect() | ||
for (var uri in this._servers) { | ||
this._servers[uri].destroy() | ||
delete this._servers[uri] | ||
} | ||
this.emit('destroy') | ||
} | ||
function onError(e) { | ||
console.error(e) | ||
return undefined | ||
} | ||
function chainGetPromise(currentPromise, key, nextCacheInstance) { | ||
// create the defer to grab the key from the next cacheInstance | ||
var promise = nextCacheInstance.get(key) | ||
// no promise currently exists, return the first in the chain | ||
if (!currentPromise) return promise | ||
// return the next in the chain | ||
return currentPromise | ||
.fail(function (e) { | ||
return promise | ||
}) | ||
.then(function (data) { | ||
return data || promise | ||
}) | ||
} | ||
CacheCluster.prototype.mget = function (keys) { | ||
var clients = this._getCacheInstancesForKeys(keys, this._opts.nodesPerRead) | ||
var i, j | ||
var promises = [] | ||
for (var uri in this._servers) { | ||
if (this._servers[uri].getStatus() === common.SERVER_STATUS.CONNECTED) { | ||
(function (uri) { | ||
var defer = Q.defer() | ||
this._servers[uri].getClient().stats(key, defer.makeNodeResolver()) | ||
promises.push(defer.promise.then(function (data) { | ||
stats[uri] = data | ||
})) | ||
})(uri) | ||
for (i = 0; i < keys.length; i++) { | ||
var key = keys[i] | ||
if (!key) { | ||
promises.push(undefined) | ||
} else { | ||
stats[uri] = { | ||
status: "disconnected" | ||
var keyClients = clients[key] | ||
var promise = undefined | ||
for (j = 0; j < keyClients.length; j++) { | ||
// for every client available to this | ||
promise = chainGetPromise(promise, key, keyClients[j]) | ||
} | ||
promises.push(promise ? promise.fail(onError).then(function (data) { | ||
return data | ||
}) : undefined) | ||
} | ||
@@ -87,41 +205,43 @@ } | ||
return Q.all(promises) | ||
.then(function () { | ||
return stats | ||
.then(function (data) { | ||
return data | ||
}) | ||
} | ||
/** | ||
* Retrieve a map of servers to their capacities | ||
* | ||
* @return {Object} a map of uris to their capacities | ||
*/ | ||
CacheCluster.prototype.getServerCapacities = function () { | ||
var capacities = {} | ||
CacheCluster.prototype.get = function (key) { | ||
return this.mget([key]) | ||
.then(function (results) { | ||
return results[0] | ||
}) | ||
} | ||
for (var key in this._servers) { | ||
capacities[key] = { | ||
current: this._servers[key].getCurrentCapacity(), | ||
target: this._servers[key].getTargetCapacity() | ||
} | ||
CacheCluster.prototype.set = function (key, val, maxAgeMs) { | ||
var cacheInstances = this._getCacheInstancesForKeys([key], this._opts.nodesPerWrite)[key] | ||
if (!cacheInstances) return Q.resolve(true) | ||
var promises = [] | ||
for (var i = 0; i < cacheInstances.length; i++) { | ||
promises.push(cacheInstances[i].set(key, val, maxAgeMs)) | ||
} | ||
return capacities | ||
return Q.all(promises).then(function () { | ||
return true | ||
}) | ||
} | ||
/** | ||
* Respond to capacity changes from the cache server by updating the | ||
* consistent hasher | ||
* | ||
* @param {string} uri | ||
* @param {number} capacity | ||
*/ | ||
CacheCluster.prototype._onCapacity = function (uri, capacity) { | ||
if (capacity === 0) { | ||
this._servers[uri].close() | ||
delete this._servers[uri] | ||
CacheCluster.prototype.del = function (key) { | ||
var cacheInstances = this._getCacheInstancesForKeys([key], this._opts.nodesPerWrite)[key] | ||
if (!cacheInstances) return Q.resolve(true) | ||
var promises = [] | ||
for (var i = 0; i < cacheInstances.length; i++) { | ||
promises.push(cacheInstances[i].del(key)) | ||
} | ||
this._hasher.setNodeCapacity(uri, capacity) | ||
return Q.all(promises).then(function () { | ||
return true | ||
}) | ||
} | ||
module.exports = CacheCluster |
{ | ||
"name": "zcache" | ||
, "description": "AWS zone-aware caching" | ||
, "version": "0.0.11" | ||
, "version": "0.1.0" | ||
, "homepage": "https://github.com/azulus/zcache" | ||
@@ -19,7 +19,9 @@ , "authors": [ | ||
, "dependencies": { | ||
"node-memcache-parser-obvfork": "0.1.1" | ||
"node-memcache-parser-obvfork": "0.1.1", | ||
"generic-pool": "2.0.3", | ||
"kew": "*", | ||
"redis": "0.8.2" | ||
} | ||
, "devDependencies": { | ||
"nodeunit": "0.7.4", | ||
"kew": "*" | ||
"nodeunit": "0.7.4" | ||
} | ||
@@ -26,0 +28,0 @@ , "scripts": { |
Wildcard dependency
QualityPackage has a dependency with a floating version range. This can cause issues if the dependency publishes a new major version.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
61698
1
19
1301
4
2
1
+ Addedgeneric-pool@2.0.3
+ Addedkew@*
+ Addedredis@0.8.2
+ Addedgeneric-pool@2.0.3(transitive)
+ Addedkew@0.7.0(transitive)
+ Addedredis@0.8.2(transitive)