New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

zcache

Package Overview
Dependencies
Maintainers
1
Versions
49
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

zcache - npm Package Compare versions

Comparing version 0.0.11 to 0.1.0

dump.rdb

262

lib/CacheCluster.js

@@ -1,44 +0,128 @@

var CacheServer = require('./CacheServer')
var poolModule = require('generic-pool')
var util = require('util')
var Q = require('kew')
var CacheInstance = require('./CacheInstance')
var ConsistentHasher = require('./ConsistentHasher')
var common = require('./common')
function CacheCluster(opts) {
CacheInstance.call(this)
function CacheCluster(options) {
options = options || {}
opts = opts || {}
this._opts = {
create: opts.create,
nodesPerRead: opts.nodesPerRead || 3,
nodesPerWrite: opts.nodesPerWrite || 3
}
this._clientsPerKey = options.clientsPerKey || 3
this._clientCtor = options.clientCtor
this._state = {
shouldConnect: false
}
this._servers = {}
this._capacityIntervals = {}
this._capacityWarmUpMs = {}
this._currentCapacities = {}
this._targetCapacities = {}
this._hasher = new ConsistentHasher
}
util.inherits(CacheCluster, CacheInstance)
/**
* Set the capacity for a server
*
* @param {string} uri colon-delimited uri with port
* @param {number} capacity the capacity for the server
* @param {number} msPerCapacityUnit number of milliseconds between capacity units
*/
CacheCluster.prototype.setServerCapacity = function (uri, capacity, msPerCapacityUnit) {
if (!this._servers[uri]) {
this._servers[uri] = new CacheServer(this._clientCtor, uri)
this._servers[uri].on('capacity', this._onCapacity.bind(this, uri))
this._servers[uri].on('error', function () {})
CacheCluster.prototype.setCapacity = function (uri, capacity, opts, warmUpMs) {
if (!capacity && !this._servers[uri]) return
if (capacity > 0 && !this._targetCapacities[uri]) {
var self = this
self._currentCapacities[uri] = 0
self._targetCapacities[uri] = 0
self._setTargetCapacity(uri, capacity, warmUpMs)
// add a cache instance to the cluster
this._opts.create(uri, opts, function (err, cacheInstance) {
if (err) return
self._servers[uri] = cacheInstance
self._setTargetCapacity(uri)
})
} else {
// update the cache capacity for an instance
this._setTargetCapacity(uri, capacity, warmUpMs)
}
}
try {
this._servers[uri].setCapacity(capacity, msPerCapacityUnit)
} catch (e) {
console.error(e.stack)
CacheCluster.prototype._resetCapacityInterval = function (uri) {
if (this._capacityIntervals[uri]) {
// clear any running timers
clearInterval(this._capacityIntervals[uri])
delete this._capacityIntervals[uri]
}
}
CacheCluster.prototype._setTargetCapacity = function (uri, capacity, warmUpMs) {
if (typeof capacity === 'undefined' && typeof warmUpMs === 'undefined') {
// no args means we should use the last known value for this uri
capacity = this._targetCapacities[uri]
warmUpMs = this._capacityWarmUpMs[uri]
}
capacity = Math.floor(capacity)
this._resetCapacityInterval(uri)
// if the current and target capacities match the specified capacity, nothing needs to be done
if (capacity === this._targetCapacities[uri] && capacity === this._currentCapacities[uri]) return
// keep track of the capacities and warm up times for stopping and starting this cluster
this._targetCapacities[uri] = capacity
this._capacityWarmUpMs[uri] = warmUpMs
// if the cluster isn't connected, just exist
if (!this._state.shouldConnect) return
if (!warmUpMs || warmUpMs < 1) {
// warm immediately
this._currentCapacities[uri] = capacity
this._hasher.setNodeCapacity(uri, capacity)
} else {
if (!this._servers[uri]) return
// warm with 1 capacity unit every n millis
var self = this
this._capacityIntervals[uri] = setInterval(function () {
if (!self._servers[uri] || self._targetCapacities[uri] === self._currentCapacities[uri]) {
clearInterval(self._capacityIntervals[uri])
} else {
self._currentCapacities[uri] += (self._currentCapacities[uri] < self._targetCapacities[uri] ? 1 : -1)
self._hasher.setNodeCapacity(uri, self._currentCapacities[uri])
}
}, warmUpMs)
}
}
/**
* Retrieve an array of memcache clients for each key requested
*
* @param {Array.<string>} keys an array of keys
* @return {Object} a map of keys to arrays of memcache clients
*/
CacheCluster.prototype.getClientsForKeys = function (keys) {
CacheCluster.prototype.isAvailable = function () {
for (var key in this._servers) {
if (this._servers[key].isAvailable()) return true
}
return false
}
CacheCluster.prototype.connect = function () {
this._state.shouldConnect = true
for (var uri in this._servers) {
this._servers[uri].connect()
this._setTargetCapacity(uri)
}
this.emit('connect')
}
CacheCluster.prototype.disconnect = function () {
this._state.shouldConnect = false
for (var uri in this._servers) {
this._resetCapacityInterval(uri)
this._servers[uri].disconnect()
}
this.emit('disconnect')
}
CacheCluster.prototype._getCacheInstancesForKeys = function (keys) {
var uris = this._hasher.getNodesForKeys(keys, this._clientsPerKey)

@@ -55,4 +139,4 @@

if (this._servers[uri] && this._servers[uri].getStatus() === common.SERVER_STATUS.CONNECTED) {
clients[key].push(this._servers[uri].getClient())
if (this._servers[uri] && this._servers[uri].isAvailable()) {
clients[key].push(this._servers[uri])
}

@@ -65,19 +149,53 @@ }

CacheCluster.prototype.getStats = function (key) {
var stats = {}
CacheCluster.prototype.destroy = function () {
this.disconnect()
for (var uri in this._servers) {
this._servers[uri].destroy()
delete this._servers[uri]
}
this.emit('destroy')
}
function onError(e) {
console.error(e)
return undefined
}
function chainGetPromise(currentPromise, key, nextCacheInstance) {
// create the defer to grab the key from the next cacheInstance
var promise = nextCacheInstance.get(key)
// no promise currently exists, return the first in the chain
if (!currentPromise) return promise
// return the next in the chain
return currentPromise
.fail(function (e) {
return promise
})
.then(function (data) {
return data || promise
})
}
CacheCluster.prototype.mget = function (keys) {
var clients = this._getCacheInstancesForKeys(keys, this._opts.nodesPerRead)
var i, j
var promises = []
for (var uri in this._servers) {
if (this._servers[uri].getStatus() === common.SERVER_STATUS.CONNECTED) {
(function (uri) {
var defer = Q.defer()
this._servers[uri].getClient().stats(key, defer.makeNodeResolver())
promises.push(defer.promise.then(function (data) {
stats[uri] = data
}))
})(uri)
for (i = 0; i < keys.length; i++) {
var key = keys[i]
if (!key) {
promises.push(undefined)
} else {
stats[uri] = {
status: "disconnected"
var keyClients = clients[key]
var promise = undefined
for (j = 0; j < keyClients.length; j++) {
// for every client available to this
promise = chainGetPromise(promise, key, keyClients[j])
}
promises.push(promise ? promise.fail(onError).then(function (data) {
return data
}) : undefined)
}

@@ -87,41 +205,43 @@ }

return Q.all(promises)
.then(function () {
return stats
.then(function (data) {
return data
})
}
/**
* Retrieve a map of servers to their capacities
*
* @return {Object} a map of uris to their capacities
*/
CacheCluster.prototype.getServerCapacities = function () {
var capacities = {}
CacheCluster.prototype.get = function (key) {
return this.mget([key])
.then(function (results) {
return results[0]
})
}
for (var key in this._servers) {
capacities[key] = {
current: this._servers[key].getCurrentCapacity(),
target: this._servers[key].getTargetCapacity()
}
CacheCluster.prototype.set = function (key, val, maxAgeMs) {
var cacheInstances = this._getCacheInstancesForKeys([key], this._opts.nodesPerWrite)[key]
if (!cacheInstances) return Q.resolve(true)
var promises = []
for (var i = 0; i < cacheInstances.length; i++) {
promises.push(cacheInstances[i].set(key, val, maxAgeMs))
}
return capacities
return Q.all(promises).then(function () {
return true
})
}
/**
* Respond to capacity changes from the cache server by updating the
* consistent hasher
*
* @param {string} uri
* @param {number} capacity
*/
CacheCluster.prototype._onCapacity = function (uri, capacity) {
if (capacity === 0) {
this._servers[uri].close()
delete this._servers[uri]
CacheCluster.prototype.del = function (key) {
var cacheInstances = this._getCacheInstancesForKeys([key], this._opts.nodesPerWrite)[key]
if (!cacheInstances) return Q.resolve(true)
var promises = []
for (var i = 0; i < cacheInstances.length; i++) {
promises.push(cacheInstances[i].del(key))
}
this._hasher.setNodeCapacity(uri, capacity)
return Q.all(promises).then(function () {
return true
})
}
module.exports = CacheCluster
{
"name": "zcache"
, "description": "AWS zone-aware caching"
, "version": "0.0.11"
, "version": "0.1.0"
, "homepage": "https://github.com/azulus/zcache"

@@ -19,7 +19,9 @@ , "authors": [

, "dependencies": {
"node-memcache-parser-obvfork": "0.1.1"
"node-memcache-parser-obvfork": "0.1.1",
"generic-pool": "2.0.3",
"kew": "*",
"redis": "0.8.2"
}
, "devDependencies": {
"nodeunit": "0.7.4",
"kew": "*"
"nodeunit": "0.7.4"
}

@@ -26,0 +28,0 @@ , "scripts": {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc