memcached
Advanced tools
Comparing version 0.0.3 to 0.0.4
@@ -11,10 +11,18 @@ var EventEmitter = require('events').EventEmitter | ||
// The constructor | ||
/** | ||
* Constructs a new memcached client | ||
* | ||
* @constructor | ||
* @param {Mixed} args Array, string or object with servers | ||
* @param {Object} options options | ||
* @api public | ||
*/ | ||
function Client(args, options){ | ||
if(!(this && this.hasOwnProperty && (this instanceof Client))) this = new Client(); | ||
var servers = [] | ||
, weights = {} | ||
, key; | ||
// Parse down the connection arguments | ||
@@ -33,3 +41,3 @@ switch (Object.prototype.toString.call(args)){ | ||
} | ||
if (!servers.length) throw new Error('No servers where supplied in the arguments'); | ||
@@ -80,3 +88,3 @@ | ||
, undefined; | ||
// Creates or generates a new connection for the give server, the callback will receive the connection | ||
@@ -87,10 +95,10 @@ // if the operation was successful | ||
if (server in this.issues && this.issues[server].failed) return callback(false, false); | ||
// fetch from connection pool | ||
if (server in this.connections) return this.connections[server].allocate(callback); | ||
// No connection factory created yet, so we must build one | ||
var serverTokens = /(.*):(\d+){1,}$/.exec(server).reverse() | ||
, memcached = this; | ||
serverTokens.pop(); | ||
@@ -102,3 +110,3 @@ | ||
, Manager = this; | ||
// config the Stream | ||
@@ -113,3 +121,3 @@ S.streamID = sid++; | ||
S.tokens = serverTokens; | ||
// Add the event listeners | ||
@@ -121,6 +129,6 @@ Utils.fuse(S, { | ||
, data: Utils.curry(memcached, private.buffer, S) | ||
, timeout: function streamTimeout(){ memcached.connectionIssue('timeout', S, callback) } | ||
, timeout: function streamTimeout(){ Manager.remove(this) } | ||
, end: S.end | ||
}); | ||
// connect the net.Stream [port, hostname] | ||
@@ -130,7 +138,7 @@ S.connect.apply(S, serverTokens); | ||
}); | ||
// now that we have setup our connection factory we can allocate a new connection | ||
this.connections[server].allocate(callback); | ||
}; | ||
// Creates a multi stream, so it's easier to query agains | ||
@@ -143,3 +151,3 @@ // multiple memcached servers. | ||
, i; | ||
// gets all servers based on the supplied keys, | ||
@@ -161,3 +169,3 @@ // or just gives all servers if we don't have keys | ||
} | ||
i = servers.length; | ||
@@ -168,8 +176,7 @@ while(i--){ | ||
}; | ||
// Executes the command on the net.Stream, if no server is supplied it will use the query.key to get | ||
// the server from the HashRing | ||
memcached.command = function memcachedCommand(queryCompiler, server){ | ||
// generate a regular query, | ||
// generate a regular query, | ||
var query = queryCompiler() | ||
@@ -179,9 +186,9 @@ , redundancy = this.redundancy && this.redundancy < this.servers.length | ||
, memcached = this; | ||
// validate the arguments | ||
if (query.validation && !Utils.validateArg(query, this)) return; | ||
// fetch servers | ||
server = server ? server : redundancy && queryRedundancy ? (redundancy = this.HashRing.createRange(query.key, (this.redundancy + 1), true)).shift() : this.HashRing.getNode(query.key); | ||
// check if the server is still alive | ||
@@ -198,3 +205,3 @@ if (server in this.issues && this.issues[server].failed) return query.callback && query.callback(false, false); | ||
if (S.readyState !== 'open') return query.callback && query.callback('Connection readyState is set to ' + S.readySate); | ||
// used for request timing | ||
@@ -205,7 +212,7 @@ query.start = Date.now(); | ||
}); | ||
// if we have redundancy enabled and the query is used for redundancy, than we are going loop over | ||
// the servers, check if we can reach them, and connect to the correct net connection. | ||
// because all redundancy queries are executed with "no reply" we do not need to store the callback | ||
// as there will be no value to parse. | ||
// as there will be no value to parse. | ||
if (redundancy && queryRedundancy){ | ||
@@ -215,3 +222,3 @@ queryRedundancy = queryCompiler(queryRedundancy); | ||
if (server in memcached.issues && memcached.issues[server].failed) return; | ||
memcached.connect(server, function allocateMemcachedConnection(error, S){ | ||
@@ -224,3 +231,3 @@ if (!S || error || S.readyState !== 'open') return; | ||
}; | ||
// Logs all connection issues, and handles them off. Marking all requests as cache misses. | ||
@@ -231,7 +238,7 @@ memcached.connectionIssue = function connectionIssue(error, S, callback){ | ||
if (callback) callback(false, false); | ||
var issues | ||
, server = S.server | ||
, memcached = this; | ||
// check for existing issue logs, or create a new log | ||
@@ -249,3 +256,3 @@ if (server in this.issues){ | ||
}); | ||
// proxy the events | ||
@@ -261,3 +268,3 @@ Utils.fuse(issues, { | ||
memcached.connections[server].end(); | ||
if (this.failOverServers && this.failOverServers.length){ | ||
@@ -271,7 +278,7 @@ memcached.HashRing.replaceServer(server, this.failOverServers.shift()); | ||
} | ||
// log the issue | ||
issues.log(error); | ||
}; | ||
// Kills all active connections | ||
@@ -284,3 +291,3 @@ memcached.end = function endMemcached(){ | ||
}; | ||
// These do not need to be publicly available as it's one of the most important | ||
@@ -295,3 +302,3 @@ // parts of the whole client, the parser commands: | ||
, 'SERVER_ERROR': function(tokens, dataSet, err, queue, S, memcached){ memcached.connectionIssue(tokens.splice(1).join(' '), S); return [CONTINUE, false] } | ||
// keyword based responses | ||
@@ -303,3 +310,3 @@ , 'STORED': function(tokens, dataSet){ return [CONTINUE, true] } | ||
, 'END': function(tokens, dataSet, err, queue){ if (!queue.length) queue.push(false); return [FLUSH, true] } | ||
// value parsing: | ||
@@ -313,3 +320,3 @@ , 'VALUE': function(tokens, dataSet, err, queue){ | ||
, tmp; | ||
switch (flag){ | ||
@@ -325,3 +332,3 @@ case FLAG_JSON: | ||
} | ||
// Add to queue as multiple get key key key key key returns multiple values | ||
@@ -335,3 +342,3 @@ if (!multi){ | ||
} | ||
return [BUFFER, false] | ||
@@ -346,3 +353,3 @@ } | ||
var versionTokens = /(\d+)(?:\.)(\d+)(?:\.)(\d+)$/.exec(tokens.pop()); | ||
return [CONTINUE, { | ||
@@ -365,3 +372,3 @@ server: this.server | ||
}; | ||
// Parses down result sets | ||
@@ -372,6 +379,6 @@ private.resultParsers = { | ||
var response = {}; | ||
// add references to the retrieved server | ||
response.server = this.server; | ||
// Fill the object | ||
@@ -381,6 +388,6 @@ resultSet.forEach(function(statSet){ | ||
}); | ||
return response; | ||
} | ||
// the settings uses the same parse format as the regular stats | ||
@@ -391,15 +398,14 @@ , 'stats settings': function(){ return private.resultParsers.stats.apply(this, arguments) } | ||
var response = {}; | ||
// add references to the retrieved server | ||
response.server = this.server; | ||
// Fill the object | ||
resultSet.forEach(function(statSet){ | ||
var identifier = statSet[0].split(':'); | ||
if (!response[identifier[0]]) response[identifier[0]] = {}; | ||
response[identifier[0]][identifier[1]] = statSet[1]; | ||
}); | ||
return response; | ||
@@ -409,23 +415,23 @@ } | ||
var response = {}; | ||
// add references to the retrieved server | ||
response.server = this.server; | ||
// Fill the object | ||
resultSet.forEach(function(statSet){ | ||
var identifier = statSet[0].split(':'); | ||
if (!response[identifier[1]]) response[identifier[1]] = {}; | ||
response[identifier[1]][identifier[2]] = statSet[1]; | ||
}); | ||
return response; | ||
} | ||
}; | ||
// Generates a RegExp that can be used to check if a chunk is memcached response identifier | ||
private.allCommands = new RegExp('^(?:' + Object.keys(private.parsers).join('|') + '|\\d' + ')'); | ||
private.bufferedCommands = new RegExp('^(?:' + Object.keys(private.parsers).join('|') + ')'); | ||
// When working with large chunks of responses, node chunks it in to pieces. So we might have | ||
@@ -451,3 +457,3 @@ // half responses. So we are going to buffer up the buffer and user our buffered buffer to query | ||
}; | ||
// The actual parsers function that scan over the responseBuffer in search of Memcached response | ||
@@ -465,8 +471,8 @@ // identifiers. Once we have found one, we will send it to the dedicated parsers that will transform | ||
, tmp; | ||
while(S.bufferArray.length && private.allCommands.test(S.bufferArray[0])){ | ||
token = S.bufferArray.shift(); | ||
tokenSet = token.split(' '); | ||
// special case for digit only's these are responses from INCR and DECR | ||
@@ -481,6 +487,6 @@ if (/^\d+$/.test(tokenSet[0])) tokenSet.unshift('INCRDECR'); | ||
} | ||
// check for dedicated parser | ||
if (private.parsers[tokenSet[0]]){ | ||
// fetch the response content | ||
@@ -496,3 +502,3 @@ if (tokenSet[0] == 'VALUE') { | ||
resultSet = private.parsers[tokenSet[0]].call(S, tokenSet, dataSet || token, err, queue, this); | ||
// check how we need to handle the resultSet response | ||
@@ -502,7 +508,7 @@ switch(resultSet.shift()){ | ||
break; | ||
case FLUSH: | ||
metaData = S.metaData.shift(); | ||
resultSet = queue; | ||
// if we have a callback, call it | ||
@@ -513,3 +519,3 @@ if (metaData && metaData.callback){ | ||
metaData, err.length ? err : err[0], | ||
// see if optional parsing needs to be applied to make the result set more readable | ||
@@ -520,10 +526,10 @@ private.resultParsers[metaData.type] ? private.resultParsers[metaData.type].call(S, resultSet, err) : | ||
} | ||
queue.length = err.length = 0; | ||
break; | ||
case CONTINUE: | ||
default: | ||
metaData = S.metaData.shift(); | ||
if (metaData && metaData.callback){ | ||
@@ -533,3 +539,3 @@ metaData.execution = Date.now() - metaData.start; | ||
} | ||
err.length = 0; | ||
@@ -546,7 +552,7 @@ break; | ||
} | ||
// cleanup | ||
dataSet = '' | ||
tokenSet = metaData = undefined; | ||
// check if we need to remove an empty item from the array, as splitting on /r/n might cause an empty | ||
@@ -557,7 +563,7 @@ // item at the end.. | ||
}; | ||
// Small wrapper function that only executes errors when we have a callback | ||
private.errorResponse = function errorResponse(error, callback){ | ||
if (typeof callback == 'function') callback(error, false); | ||
return false; | ||
@@ -569,3 +575,3 @@ }; | ||
if (Array.isArray(key)) return this.getMulti.apply(this, arguments); | ||
this.command(function getCommand(noreply){ return { | ||
@@ -579,3 +585,3 @@ key: key | ||
}; | ||
// the difference between get and gets is that gets, also returns a cas value | ||
@@ -592,3 +598,3 @@ // and gets doesn't support multi-gets at this moment. | ||
}; | ||
// Handles get's with multiple keys | ||
@@ -600,16 +606,16 @@ memcached.getMulti = function getMulti(keys, callback){ | ||
, calls | ||
// handle multiple responses and cache them untill we receive all. | ||
, handle = function(err, results){ | ||
if (err) errors.push(err); | ||
// add all responses to the array | ||
(Array.isArray(results) ? results : [results]).forEach(function(value){ Utils.merge(responses, value) }); | ||
if (!--calls) callback(errors.length ? errors : false, responses); | ||
}; | ||
this.multi(keys, function(server, key, index, totals){ | ||
if (!calls) calls = totals; | ||
memcached.command(function getMultiCommand(noreply){ return { | ||
@@ -625,3 +631,3 @@ callback: handle | ||
}; | ||
// As all command nearly use the same syntax we are going to proxy them all to this | ||
@@ -636,3 +642,3 @@ // function to ease maintenance. This is possible because most set commands will use the same | ||
, length; | ||
if (Buffer.isBuffer(value)){ | ||
@@ -647,6 +653,6 @@ flag = FLAG_BINARY; | ||
} | ||
length = Buffer.byteLength(value); | ||
if (length > memcached.maxValue) return private.errorResponse('The length of the value is greater than ' + memcached.maxValue, callback); | ||
memcached.command(function settersCommand(noreply){ return { | ||
@@ -667,3 +673,3 @@ key: key | ||
}; | ||
// Curry the function and so we can tell the type our private set function | ||
@@ -673,15 +679,15 @@ memcached.set = Utils.curry(false, private.setters, 'set', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]]); | ||
memcached.add = Utils.curry(false, private.setters, 'add', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]]); | ||
memcached.cas = function checkandset(key, value, cas, lifetime, callback){ | ||
private.setters.call(this, 'cas', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]], key, value, lifetime, callback, cas); | ||
}; | ||
memcached.append = function append(key, value, callback){ | ||
private.setters.call(this, 'append', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]], key, value, 0, callback); | ||
}; | ||
memcached.prepend = function prepend(key, value, callback){ | ||
private.setters.call(this, 'prepend', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]], key, value, 0, callback); | ||
}; | ||
// Small handler for incr and decr's | ||
@@ -700,7 +706,7 @@ private.incrdecr = function incrdecr(type, key, value, callback){ | ||
}; | ||
// Curry the function and so we can tell the type our private incrdecr | ||
memcached.increment = memcached.incr = Utils.curry(false, private.incrdecr, 'incr'); | ||
memcached.decrement = memcached.decr = Utils.curry(false, private.incrdecr, 'decr'); | ||
// Deletes the keys from the servers | ||
@@ -719,3 +725,3 @@ memcached.del = function del(key, callback){ | ||
memcached['delete'] = memcached.del; | ||
// Small wrapper that handle single keyword commands such as FLUSH ALL, VERSION and STAT | ||
@@ -727,3 +733,3 @@ private.singles = function singles(type, callback){ | ||
, calls | ||
// handle multiple servers | ||
@@ -733,10 +739,10 @@ , handle = function(err, results){ | ||
if (results) responses = responses.concat(results); | ||
// multi calls should ALWAYS return an array! | ||
if (!--calls) callback(errors, responses); | ||
}; | ||
this.multi(false, function(server, keys, index, totals){ | ||
if (!calls) calls = totals; | ||
memcached.command(function singlesCommand(noreply){ return { | ||
@@ -751,3 +757,3 @@ callback: handle | ||
}; | ||
// Curry the function and so we can tell the type our private singles | ||
@@ -760,3 +766,3 @@ memcached.version = Utils.curry(false, private.singles, 'version'); | ||
memcached.items = Utils.curry(false, private.singles, 'stats items'); | ||
// You need to use the items dump to get the correct server and slab settings | ||
@@ -776,5 +782,5 @@ // see simple_cachedump.js for an example | ||
}; | ||
})(Client); | ||
module.exports = Client; |
{ | ||
"name": "memcached" | ||
, "version": "0.0.3" | ||
, "version": "0.0.4" | ||
, "author": "Arnout Kazemier" | ||
@@ -5,0 +5,0 @@ , "description": "A fully featured Memcached API client, supporting both single and clustered Memcached servers through consistent hashing and failover/failure. Memcached is rewrite of nMemcached, which will be deprecated in the near future." |
1970
425655