Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

cueball

Package Overview
Dependencies
Maintainers
1
Versions
71
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

cueball - npm Package Compare versions

Comparing version 0.3.12 to 0.4.0

bin/cbresolve

113

lib/agent.js

@@ -33,2 +33,4 @@ /*

'options.resolvers');
mod_assert.optionalNumber(options.tcpKeepAliveInitialDelay,
'options.tcpKeepAliveInitialDelay');
mod_assert.optionalObject(options.log, 'options.log');

@@ -47,2 +49,4 @@

this.tcpKAID = options.tcpKeepAliveInitialDelay;
this.pools = {};

@@ -74,2 +78,8 @@ this.resolvers = options.resolvers;

var USE_SECURECONNECT = false;
if (/^v0\.[0-9]\./.test(process.version) ||
/^v0\.10\./.test(process.version)) {
USE_SECURECONNECT = true;
}
/*

@@ -83,5 +93,16 @@ * Sets up a duplex stream to be used for the given HTTP request.

*/
CueBallAgent.prototype.addRequest = function (req, options) {
CueBallAgent.prototype.addRequest = function (req, optionsOrHost, port) {
var self = this;
var options;
mod_assert.object(req, 'req');
if (typeof (optionsOrHost) === 'string') {
options = {};
options.host = optionsOrHost;
options.port = port;
} else {
options = optionsOrHost;
mod_assert.object(options, 'options');
}
var host = options.host || options.hostname;
mod_assert.string(host, 'hostname');
if (this.pools[host] === undefined) {

@@ -93,18 +114,3 @@ var poolOpts = {

domain: host,
constructor: function (backend) {
var opts = {
host: backend.address || backend.name,
port: backend.port || self.defaultPort,
servername: backend.name || host
};
PASS_FIELDS.forEach(function (k) {
if (options.hasOwnProperty(k))
opts[k] = options[k];
});
if (self.protocol === 'https:') {
return (mod_tls.connect(opts));
} else {
return (mod_net.createConnection(opts));
}
},
constructor: constructSocket,
maximum: this.maximum,

@@ -115,2 +121,52 @@ spares: this.spares,

};
function constructSocket(backend) {
var opts = {
host: backend.address || backend.name,
port: backend.port || self.defaultPort,
servername: backend.name || host
};
PASS_FIELDS.forEach(function (k) {
if (options.hasOwnProperty(k))
opts[k] = options[k];
});
var nsock;
if (self.protocol === 'https:') {
nsock = mod_tls.connect(opts);
/*
* In older versions of node, TLS sockets don't
* quite obey the socket interface -- they emit
* the event 'secureConnect' instead of
* 'connect' and they don't support ref/unref.
*
* We polyfill these here.
*/
if (USE_SECURECONNECT) {
nsock.on('secureConnect',
nsock.emit.bind(nsock, 'connect'));
}
if (nsock.unref === undefined) {
nsock.unref = function () {
nsock.socket.unref();
};
nsock.ref = function () {
nsock.socket.ref();
};
}
} else {
nsock = mod_net.createConnection(opts);
}
if (self.tcpKAID !== undefined) {
nsock.on('connect', function () {
if (USE_SECURECONNECT &&
self.protocol === 'https:') {
nsock.socket.setKeepAlive(true,
self.tcpKAID);
} else {
nsock.setKeepAlive(true,
self.tcpKAID);
}
});
}
return (nsock);
}
if (this.cba_ping !== undefined) {

@@ -133,3 +189,13 @@ poolOpts.checkTimeout = this.cba_pingInterval || 30000;

if (err) {
req.emit('error', err);
/*
* addRequest has no way to give an async error back
* to the rest of the http stack, except to create a
* fake socket here and make it emit 'error' in the
* next event loop :(
*/
var fakesock = new FakeSocket();
req.onSocket(fakesock);
setImmediate(function () {
fakesock.emit('error', err);
});
return;

@@ -232,3 +298,14 @@ }

function FakeSocket() {
EventEmitter.call(this);
}
mod_util.inherits(FakeSocket, EventEmitter);
FakeSocket.prototype.read = function () {
return (null);
};
FakeSocket.prototype.destroy = function () {
return;
};
function PingAgent(options) {

@@ -235,0 +312,0 @@ mod_assert.object(options, 'options');

@@ -13,3 +13,5 @@ /*

ConnectionTimeoutError: ConnectionTimeoutError,
ConnectionClosedError: ConnectionClosedError
ConnectionClosedError: ConnectionClosedError,
PoolFailedError: PoolFailedError,
PoolStoppingError: PoolStoppingError
};

@@ -40,2 +42,22 @@

function PoolFailedError(pool) {
if (Error.captureStackTrace)
Error.captureStackTrace(this, PoolFailedError);
this.pool = pool;
this.name = 'PoolFailedError';
this.message = 'Pool ' + pool.p_uuid + ' (' + pool.p_domain + ') ' +
'has failed and cannot take new requests.';
}
mod_util.inherits(PoolFailedError, Error);
function PoolStoppingError(pool) {
if (Error.captureStackTrace)
Error.captureStackTrace(this, PoolStoppingError);
this.pool = pool;
this.name = 'PoolStoppingError';
this.message = 'Pool ' + pool.p_uuid + ' (' + pool.p_domain + ') ' +
'is stopping and cannot take new requests.';
}
mod_util.inherits(PoolStoppingError, Error);
function ConnectionTimeoutError(fsm) {

@@ -42,0 +64,0 @@ if (Error.captureStackTrace)

@@ -20,2 +20,4 @@ /*

Resolver: mod_resolver.Resolver,
StaticIpResolver: mod_resolver.StaticIpResolver,
resolverForIpOrDomain: mod_resolver.resolverForIpOrDomain,

@@ -27,3 +29,5 @@ poolMonitor: mod_pmonitor.monitor,

ConnectionTimeoutError: mod_errors.ConnectionTimeoutError,
ConnectionClosedError: mod_errors.ConnectionClosedError
ConnectionClosedError: mod_errors.ConnectionClosedError,
PoolStoppingError: mod_errors.PoolStoppingError,
PoolFailedError: mod_errors.PoolFailedError
};

@@ -54,8 +54,22 @@ /*

obj.connections = {};
var ks = pool.p_keys;
Object.keys(pool.p_connections).forEach(function (k) {
obj.connections[k] = pool.p_connections[k].length;
if (ks.indexOf(k) === -1)
ks.push(k);
});
ks.forEach(function (k) {
var conns = pool.p_connections[k] || [];
obj.connections[k] = {};
conns.forEach(function (fsm) {
var s = fsm.getState();
if (obj.connections[k][s] === undefined)
obj.connections[k][s] = 0;
++obj.connections[k][s];
});
});
obj.dead_backends = Object.keys(pool.p_dead);
obj.last_rebalance = Math.round(
pool.p_lastRebalance.getTime() / 1000);
obj.resolvers = pool.p_resolver.r_resolvers;
obj.state = pool.getState();
obj.options = {};

@@ -62,0 +76,0 @@ obj.options.domain = pool.p_resolver.r_domain;

@@ -90,2 +90,27 @@ /*

/*
* If our parent pool thinks this backend is dead, resume connection
* attempts with the maximum delay and timeout. Something is going
* wrong, let's not retry too aggressively and make it worse.
*/
if (this.cf_pool.p_dead[this.cf_backend.key] === true) {
/*
* We might be given an infinite maxDelay or maxTimeout. If
* we are, then multiply it by 2^(retries) to get to what the
* value would have been before.
*/
var mult = 1 << this.cf_retries;
this.cf_delay = this.cf_maxDelay;
if (!isFinite(this.cf_delay))
this.cf_delay = initialRecov.delay * mult;
this.cf_timeout = this.cf_maxTimeout;
if (!isFinite(this.cf_timeout))
this.cf_timeout = initialRecov.timeout * mult;
/* Keep retrying a failed backend forever */
this.cf_retries = Infinity;
this.cf_retriesLeft = Infinity;
}
this.allStateEvent('closeAsserted');
FSM.call(this, 'init');

@@ -96,2 +121,9 @@ }

/*
* Return true if this connection was closed due to retry exhaustion.
*/
ConnectionFSM.prototype.retriesExhausted = function () {
return (this.isInState('closed') && this.cf_retriesLeft <= 0);
};
/*
* Mark this Connection as "claimed"; in use by a particular client of the

@@ -169,10 +201,15 @@ * pool.

ConnectionFSM.prototype.state_init = function (on, once) {
ConnectionFSM.prototype.state_init = function (on) {
this.validTransitions(['connect', 'closed']);
var self = this;
once(this, 'startAsserted', function () {
on(this, 'startAsserted', function () {
self.gotoState('connect');
});
on(this, 'closeAsserted', function () {
self.gotoState('closed');
});
};
ConnectionFSM.prototype.state_connect = function (on, once, timeout) {
this.validTransitions(['error', 'idle', 'closed']);
var self = this;

@@ -214,3 +251,4 @@ timeout(this.cf_timeout, function () {

ConnectionFSM.prototype.state_closed = function () {
ConnectionFSM.prototype.state_closed = function (on) {
this.validTransitions([]);
if (this.cf_conn && this.cf_conn.destroy)

@@ -221,6 +259,8 @@ this.cf_conn.destroy();

this.cf_lastError = undefined;
this.cf_log.trace('closed');
this.cf_log.trace('ConnectionFSM closed');
on(this, 'closeAsserted', function () { });
};
ConnectionFSM.prototype.state_error = function (on, once, timeout) {
this.validTransitions(['delay', 'closed']);
if (this.cf_conn && this.cf_conn.destroy)

@@ -235,3 +275,27 @@ this.cf_conn.destroy();

if (this.cf_retries === Infinity || --this.cf_retriesLeft > 0) {
/*
* If the closeAfter flag is set, and this is a connection to a "dead"
* backend (i.e., a "monitor" watching to see when it comes back), then
* exit now. For an ordinary backend, we don't want to do this,
* because we want to give ourselves the opportunity to run out of
* retries.
*
* Otherwise, in a situation where we have two connections that were
* created at the same time, one to a failing backend that's already
* declared dead, and one to a different failing backend not yet
* declared, we may never learn that the second backend is down and
* declare it dead. The already declared dead backend may exit first
* during a pool reshuffle and cause this one to exit prematurely
* (there's a race in who exits first and causes the planner to engage)
*/
if (this.cf_retries === Infinity && this.cf_closeAfter) {
this.cf_retriesLeft = 0;
this.gotoState('closed');
return;
}
if (this.cf_retries !== Infinity)
--this.cf_retriesLeft;
if (this.cf_retries === Infinity || this.cf_retriesLeft > 0) {
this.gotoState('delay');

@@ -246,2 +310,3 @@ } else {

ConnectionFSM.prototype.state_delay = function (on, once, timeout) {
this.validTransitions(['connect', 'closed']);
var delay = this.cf_delay;

@@ -267,2 +332,3 @@

ConnectionFSM.prototype.state_idle = function (on, once, timeout) {
this.validTransitions(['busy', 'error', 'closed']);
var self = this;

@@ -308,4 +374,2 @@

this.cf_retriesLeft = this.cf_retries;
if (this.cf_closeAfter === true) {

@@ -331,2 +395,6 @@ this.cf_closeAfter = false;

});
once(this.cf_conn, 'end', function () {
self.cf_lastError = new mod_errors.ConnectionClosedError(self);
self.gotoState('error');
});
once(this, 'closeAsserted', function () {

@@ -354,2 +422,3 @@ self.gotoState('closed');

ConnectionFSM.prototype.state_ping = function (on, once, timeout) {
this.validTransitions(['error', 'closed', 'idle']);
this.cf_lastCheck = new Date();

@@ -404,2 +473,6 @@

});
once(this.cf_conn, 'end', function () {
self.cf_lastError = new mod_errors.ConnectionClosedError(self);
self.gotoState('error');
});
once(this, 'closeAsserted', function () {

@@ -422,2 +495,3 @@ self.cf_lastError = new mod_errors.ConnectionClosedError(self);

ConnectionFSM.prototype.state_busy = function (on, once, timeout) {
this.validTransitions(['error', 'closed', 'idle']);
var self = this;

@@ -462,2 +536,6 @@ this.cf_conn.ref();

});
once(this.cf_conn, 'end', function () {
self.cf_lastError = new mod_errors.ConnectionClosedError(self);
self.gotoState('error');
});
once(this.cf_conn, 'close', function () {

@@ -542,3 +620,2 @@ self.cf_lastError = new mod_errors.ConnectionClosedError(self);

function CueBallConnectionPool(options) {
EventEmitter.call(this);
mod_assert.object(options);

@@ -553,3 +630,5 @@

'options.resolvers');
mod_assert.optionalObject(options.resolver, 'options.resolver');
mod_assert.string(options.domain, 'options.domain');
this.p_domain = options.domain;
mod_assert.optionalString(options.service, 'options.service');

@@ -586,5 +665,7 @@ mod_assert.optionalNumber(options.maxDNSConcurrency,

this.p_connections = {};
this.p_dead = {};
this.p_lastRebalance = undefined;
this.p_inRebalance = false;
this.p_startedResolver = false;

@@ -596,30 +677,18 @@ this.p_idleq = new Queue();

var self = this;
this.p_resolver = new mod_resolver.Resolver({
resolvers: options.resolvers,
domain: options.domain,
service: options.service,
maxDNSConcurrency: options.maxDNSConcurrency,
defaultPort: options.defaultPort,
log: this.p_log,
recovery: options.recovery
});
this.p_resolver.on('added', function (k, backend) {
backend.key = k;
self.p_keys.push(k);
mod_utils.shuffle(self.p_keys);
self.p_backends[k] = backend;
self.rebalance();
});
this.p_resolver.on('removed', function (k) {
var idx = self.p_keys.indexOf(k);
if (idx !== -1)
self.p_keys.splice(idx, 1);
delete (self.p_backends[k]);
(self.p_connections[k] || []).forEach(function (fsm) {
if (fsm.getState() !== 'busy')
fsm.close();
if (options.resolver !== undefined && options.resolver !== null) {
this.p_resolver = options.resolver;
this.p_resolver_custom = true;
} else {
this.p_resolver = new mod_resolver.Resolver({
resolvers: options.resolvers,
domain: options.domain,
service: options.service,
maxDNSConcurrency: options.maxDNSConcurrency,
defaultPort: options.defaultPort,
log: this.p_log,
recovery: options.recovery
});
delete (self.p_connections[k]);
self.rebalance();
});
this.p_resolver_custom = false;
}
/*

@@ -630,31 +699,199 @@ * Periodically rebalance() so that we catch any connections that

*/
this.p_rebalTimer = setInterval(function () {
self.rebalance();
this.p_rebalTimer = new EventEmitter();
this.p_rebalTimerInst = setInterval(function () {
self.p_rebalTimer.emit('timeout');
}, 10000);
this.p_rebalTimer.unref();
this.p_rebalTimerInst.unref();
this.p_resolver.start();
this.p_shuffleTimer = new EventEmitter();
this.p_shuffleTimerInst = setInterval(function () {
self.p_shuffleTimer.emit('timeout');
}, 60000);
this.p_shuffleTimerInst.unref();
mod_monitor.monitor.registerPool(this);
FSM.call(this, 'starting');
}
mod_util.inherits(CueBallConnectionPool, EventEmitter);
mod_util.inherits(CueBallConnectionPool, FSM);
/* Stop and kill everything. */
CueBallConnectionPool.prototype.stop = function () {
mod_monitor.monitor.unregisterPool(this);
CueBallConnectionPool.prototype.on_resolver_added = function (k, backend) {
backend.key = k;
var idx = Math.floor(Math.random() * (this.p_keys.length + 1));
this.p_keys.splice(idx, 0, k);
this.p_backends[k] = backend;
this.rebalance();
};
clearInterval(this.p_rebalTimer);
this.p_resolver.stop();
CueBallConnectionPool.prototype.on_resolver_removed = function (k) {
var idx = this.p_keys.indexOf(k);
if (idx !== -1)
this.p_keys.splice(idx, 1);
delete (this.p_backends[k]);
(this.p_connections[k] || []).forEach(function (fsm) {
if (!fsm.isInState('busy'))
fsm.close();
});
delete (this.p_connections[k]);
this.rebalance();
};
CueBallConnectionPool.prototype.state_starting =
function (on, once, timeout, onState) {
this.validTransitions(['failed', 'running', 'stopping']);
mod_monitor.monitor.registerPool(this);
on(this.p_resolver, 'added', this.on_resolver_added.bind(this));
on(this.p_resolver, 'removed', this.on_resolver_removed.bind(this));
var self = this;
if (this.p_resolver.isInState('failed')) {
this.p_log.warn('pre-provided resolver has already failed, ' +
'pool will start up in "failed" state');
this.gotoState('failed');
return;
}
onState(this.p_resolver, 'failed', function () {
self.p_log.warn('underlying resolver failed, moving pool ' +
'to "failed" state');
self.gotoState('failed');
});
if (this.p_resolver.isInState('running')) {
var backends = this.p_resolver.list();
Object.keys(backends).forEach(function (k) {
var backend = backends[k];
self.on_resolver_added(k, backend);
});
} else if (this.p_resolver.isInState('stopped') &&
!this.p_resolver_custom) {
this.p_resolver.start();
this.p_startedResolver = true;
}
on(this, 'connectedToBackend', function () {
self.gotoState('running');
});
on(this, 'closedBackend', function (fsm) {
var dead = Object.keys(self.p_dead).length;
if (dead >= self.p_keys.length) {
self.p_log.error(
{ dead: dead },
'pool has exhausted all retries, now moving to ' +
'"failed" state');
self.gotoState('failed');
}
});
on(this, 'stopAsserted', function () {
self.gotoState('stopping');
});
};
CueBallConnectionPool.prototype.state_failed = function (on) {
this.validTransitions(['running', 'stopping']);
on(this.p_resolver, 'added', this.on_resolver_added.bind(this));
on(this.p_resolver, 'removed', this.on_resolver_removed.bind(this));
on(this.p_shuffleTimer, 'timeout', this.reshuffle.bind(this));
var self = this;
on(this, 'connectedToBackend', function () {
mod_assert.ok(!self.p_resolver.isInState('failed'));
self.p_log.info('successfully connected to a backend, ' +
'moving back to running state');
self.gotoState('running');
});
on(this, 'stopAsserted', function () {
self.gotoState('stopping');
});
};
CueBallConnectionPool.prototype.state_running = function (on) {
this.validTransitions(['failed', 'stopping']);
var self = this;
on(this.p_resolver, 'added', this.on_resolver_added.bind(this));
on(this.p_resolver, 'removed', this.on_resolver_removed.bind(this));
on(this.p_rebalTimer, 'timeout', this.rebalance.bind(this));
on(this.p_shuffleTimer, 'timeout', this.reshuffle.bind(this));
on(this, 'closedBackend', function (fsm) {
var dead = Object.keys(self.p_dead).length;
if (dead >= self.p_keys.length) {
self.p_log.error(
{ dead: dead },
'pool has exhausted all retries, now moving to ' +
'"failed" state');
self.gotoState('failed');
}
});
on(this, 'stopAsserted', function () {
self.gotoState('stopping');
});
};
CueBallConnectionPool.prototype.state_stopping =
function (on, once, timeout, onState) {
this.validTransitions(['stopping.backends']);
var self = this;
if (this.p_startedResolver) {
onState(this.p_resolver, 'stopped', function () {
self.gotoState('stopping.backends');
});
this.p_resolver.stop();
} else {
this.gotoState('stopping.backends');
}
};
CueBallConnectionPool.prototype.state_stopping.backends = function () {
this.validTransitions(['stopped']);
var conns = this.p_connections;
var self = this;
var fsms = [];
Object.keys(conns).forEach(function (k) {
conns[k].forEach(function (fsm) {
mod_assert.ok(fsm.getState() !== 'busy');
fsm.close();
fsms.push(fsm);
});
});
mod_vasync.forEachParallel({
func: closeBackend,
inputs: fsms
}, function () {
self.gotoState('stopped');
});
function closeBackend(fsm, cb) {
if (fsm.isInState('busy')) {
fsm.closeAfterRelease();
fsm.onState('closed', cb);
} else {
fsm.close(cb);
}
}
};
CueBallConnectionPool.prototype.state_stopped = function () {
this.validTransitions([]);
mod_monitor.monitor.unregisterPool(this);
this.p_keys = [];
this.p_connections = {};
this.p_backends = {};
clearInterval(this.p_rebalTimerInst);
clearInterval(this.p_shuffleTimerInst);
};
CueBallConnectionPool.prototype.reshuffle = function () {
var taken = this.p_keys.pop();
var idx = Math.floor(Math.random() * (this.p_keys.length + 1));
this.p_keys.splice(idx, 0, taken);
this.rebalance();
};
/* Stop and kill everything. */
CueBallConnectionPool.prototype.stop = function () {
this.emit('stopAsserted');
};
/*

@@ -674,2 +911,5 @@ * Rebalance the pool, by looking at the distribution of connections to

if (this.isInState('stopping') || this.isInState('stopped'))
return;
if (this.p_inRebalance !== false)

@@ -685,2 +925,9 @@ return;

});
Object.keys(this.p_connections).forEach(function (k) {
if (conns[k] === undefined &&
self.p_connections[k] !== undefined &&
self.p_connections[k].length > 0) {
conns[k] = (self.p_connections[k] || []).slice();
}
});
var spares = this.p_idleq.length + this.p_initq.length -

@@ -691,2 +938,4 @@ this.p_waiters.length;

var busy = total - spares;
if (busy < 0)
busy = 0;
var extras = this.p_waiters.length - this.p_initq.length;

@@ -700,6 +949,8 @@ if (extras < 0)

var plan = mod_utils.planRebalance(conns, total, target);
var plan = mod_utils.planRebalance(conns, self.p_dead, target,
self.p_max);
if (plan.remove.length > 0 || plan.add.length > 0) {
this.p_log.trace('rebalancing pool, remove %d, ' +
'add %d (busy = %d, idle = %d, target = %d)',
'add %d (busy = %d, spares = %d, target = %d)',
plan.remove.length, plan.add.length,

@@ -709,14 +960,29 @@ busy, spares, target);

plan.remove.forEach(function (fsm) {
if (fsm.getState() === 'busy') {
fsm.closeAfterRelease();
} else {
/*
* Only tell the FSM to quit *right now* if either:
* 1. it's idle
* 2. there are other FSMs for this backend
* 2. it is connected to a backend that has been
* removed from the resolver
* Otherwise get it to quit gracefully once it's done
* doing whatever it's doing (using closeAfterRelease).
* This way we when we have a failing backend that we
* want to mark as "dead" ASAP, we don't give up early
* and never figure out if it's actually dead or not.
*/
var fsmIdx = self.p_connections[fsm.cf_backend.key].
indexOf(fsm);
if (fsm.isInState('idle') || fsmIdx > 0 ||
self.p_keys.indexOf(fsm.cf_backend.key) === -1) {
fsm.close();
--total;
} else {
fsm.closeAfterRelease();
}
});
plan.add.forEach(function (k) {
if (total > self.p_max)
/* Make sure we *never* exceed our socket limit. */
if (++total > self.p_max)
return;
self.addConnection(k);
++total;
});

@@ -729,3 +995,7 @@

CueBallConnectionPool.prototype.addConnection = function (key) {
if (this.isInState('stopping') || this.isInState('stopped'))
return;
var backend = this.p_backends[key];
backend.key = key;

@@ -749,2 +1019,4 @@ var fsm = new ConnectionFSM({

fsm.on('stateChanged', function (newState) {
var doRebalance = false;
if (fsm.p_initq_node) {

@@ -761,2 +1033,11 @@ /* These transitions mean we're still starting up. */

delete (fsm.p_initq_node);
if (newState === 'idle') {
self.emit('connectedToBackend', key, fsm);
if (self.p_dead[key] !== undefined) {
delete (self.p_dead[key]);
doRebalance = true;
}
}
}

@@ -807,2 +1088,7 @@

}
if (fsm.retriesExhausted()) {
self.p_dead[key] = true;
}
self.emit('closedBackend', key, fsm);
doRebalance = true;
}

@@ -819,4 +1105,7 @@

/* Also rebalance, in case we were closed or died. */
doRebalance = true;
}
if (doRebalance)
self.rebalance();
}
});

@@ -828,2 +1117,7 @@

CueBallConnectionPool.prototype.claimSync = function () {
if (this.isInState('stopping') || this.isInState('stopped'))
throw (new mod_errors.PoolStoppingError(this));
if (this.isInState('failed'))
throw (new mod_errors.PoolFailedError(this));
var e = {};

@@ -849,2 +1143,4 @@ Error.captureStackTrace(e);

var self = this;
var done = false;
if (typeof (options) === 'function' && cb === undefined) {

@@ -860,2 +1156,23 @@ cb = options;

if (this.isInState('stoppping') || this.isInState('stopped')) {
setImmediate(function () {
if (!done)
cb(new mod_errors.PoolStoppingError(self));
done = true;
});
return ({
cancel: function () { done = true; }
});
}
if (this.isInState('failed')) {
setImmediate(function () {
if (!done)
cb(new mod_errors.PoolFailedError(self));
done = true;
});
return ({
cancel: function () { done = true; }
});
}
var e = {};

@@ -873,4 +1190,10 @@ Error.captureStackTrace(e);

if (errOnEmpty && this.p_resolver.count() < 1) {
cb(new mod_errors.NoBackendsError(self));
return (undefined);
setImmediate(function () {
if (!done)
cb(new mod_errors.NoBackendsError(self));
done = true;
});
return ({
cancel: function () { done = true; }
});
}

@@ -880,3 +1203,2 @@

var timer;
var done = false;
var waiter = function () {

@@ -883,0 +1205,0 @@ if (timer !== undefined)

@@ -10,3 +10,14 @@ /*

module.exports = {
Resolver: CueBallResolver
/* This name is for compatibility with pre-0.4 cueball */
Resolver: CueBallDNSResolver,
DNSResolver: CueBallDNSResolver,
StaticIpResolver: CueBallStaticResolver,
resolverForIpOrDomain: resolverForIpOrDomain,
/* exposed for testing only */
ResolverFSM: CueBallResolver,
configForIpOrDomain: configForIpOrDomain,
parseIpOrDomain: parseIpOrDomain
};

@@ -26,2 +37,3 @@

const mod_crypto = require('crypto');
const mod_verror = require('verror');

@@ -32,13 +44,12 @@ const FSM = mod_mooremachine.FSM;

/*
* The CueBallResolver takes a domain (plus service name and default port) and
* resolves it, emitting events 'added' and 'removed' as new hosts behind this
* domain + service are added or removed from DNS.
* Cueball provides two types of resolvers: the primary interface is the
* DNS-based resolver (called just "Resolver" for historical reasons) that
* uses DNS servers to locate backends. This is appropriate for most server
* deployments. The static resolver emits a pre-configured set of IP addresses
* and is intended for development environments and debugging tools where the
* user may want to target specific instances.
*
* Its basic workflow is to query for SRV records, then AAAA, then A, then
* work out which events to emit (if any). After this it sleeps.
* At each step it records the TTL for the information collected, and when a
* TTL expires, we resume the workflow at the point where the expiring
* information was gathered (e.g. if an SRV record expired we would re-query
* all the AAAA and A records, but if an A record expired, only that stage
* would be re-run).
* Resolvers take a domain (plus service name and default port) and resolve it,
* emitting events 'added' and 'removed' as new hosts behind this domain +
* service are discovered (or discovered to be gone).
*

@@ -50,2 +61,102 @@ * The 'added' event receives both a key and an object. The key is a unique

*
* The factory method resolverForDomain can be used for programs that intend to
* support both DNS-based resolution or static IP resolution, depending on
* whether the user provides a DNS hostname or an IP address.
*/
function CueBallResolver(fsm, options) {
mod_assert.object(options, 'options');
mod_assert.object(fsm, 'fsm');
this.r_fsm = fsm;
this.r_fsm.on('added', this.emit.bind(this, 'added'));
this.r_fsm.on('removed', this.emit.bind(this, 'removed'));
mod_assert.optionalObject(options.log, 'options.log');
this.r_log = options.log || mod_bunyan.createLogger({
name: 'CueBallResolver'
});
FSM.call(this, 'stopped');
}
mod_util.inherits(CueBallResolver, FSM);
CueBallResolver.prototype.start = function () {
this.emit('startAsserted');
};
CueBallResolver.prototype.stop = function () {
this.emit('stopAsserted');
};
CueBallResolver.prototype.count = function () {
return (this.r_fsm.count());
};
CueBallResolver.prototype.list = function () {
return (this.r_fsm.list());
};
CueBallResolver.prototype.getLastError = function () {
return (this.r_lastError);
};
CueBallResolver.prototype.state_stopped = function (on) {
var self = this;
on(this, 'startAsserted', function () {
self.gotoState('starting');
});
};
CueBallResolver.prototype.state_starting = function (on) {
var self = this;
this.r_fsm.start();
on(this.r_fsm, 'updated', function (err) {
if (err) {
self.r_lastError = err;
self.gotoState('failed');
} else {
self.gotoState('running');
}
});
on(this, 'stopAsserted', function () {
self.gotoState('stopping');
});
};
CueBallResolver.prototype.state_running = function (on) {
var self = this;
on(this, 'stopAsserted', function () {
self.gotoState('stopping');
});
};
CueBallResolver.prototype.state_failed = function (on) {
var self = this;
on(this.r_fsm, 'updated', function (err) {
if (!err)
self.gotoState('running');
});
on(this, 'stopAsserted', function () {
self.gotoState('stopping');
});
};
CueBallResolver.prototype.state_stopping = function (on) {
this.r_fsm.stop();
this.gotoState('stopped');
};
/*
* DNS-based Resolver
*
* The basic workflow for the DNS-based resolver is to query for SRV records,
* then AAAA, then A, then work out which events to emit (if any). After this it
* sleeps. At each step it records the TTL for the information collected, and
* when a TTL expires, we resume the workflow at the point where the expiring
* information was gathered (e.g. if an SRV record expired we would re-query all
* the AAAA and A records, but if an A record expired, only that stage would be
* re-run).
*
* When SRV record lookup succeeds, the ports will be set based on the contents

@@ -69,4 +180,3 @@ * of these records. If SRV records are not available, then the 'domain' will

*/
function CueBallResolver(options) {
function CueBallDNSResolver(options) {
mod_assert.object(options);

@@ -89,3 +199,3 @@ mod_assert.optionalArrayOfString(options.resolvers,

this.r_log = options.log || mod_bunyan.createLogger({
name: 'CueBallResolver'
name: 'CueBallDNSResolver'
});

@@ -139,5 +249,10 @@ this.r_log = this.r_log.child({domain: this.r_domain});

this.r_nsclient = new mod_nsc.DnsClient({
concurrency: this.r_maxres
});
this.r_nsclient = CueBallDNSResolver.globalNSClients[this.r_maxres];
if (this.r_nsclient === undefined) {
this.r_nsclient = new mod_nsc.DnsClient({
concurrency: this.r_maxres
});
CueBallDNSResolver.globalNSClients[this.r_maxres] =
this.r_nsclient;
}

@@ -147,12 +262,16 @@ this.r_stopping = false;

FSM.call(this, 'init');
return (new CueBallResolver(this, options));
}
mod_util.inherits(CueBallResolver, FSM);
mod_util.inherits(CueBallDNSResolver, FSM);
CueBallResolver.bootstrapResolvers = {};
CueBallDNSResolver.bootstrapResolvers = {};
CueBallResolver.prototype.start = function () {
CueBallDNSResolver.globalNSClients = {};
CueBallDNSResolver.prototype.start = function () {
this.emit('startAsserted');
};
CueBallResolver.prototype.stop = function (cb) {
CueBallDNSResolver.prototype.stop = function (cb) {
this.r_stopping = true;

@@ -162,7 +281,7 @@ this.emit('stopAsserted');

CueBallResolver.prototype.count = function () {
CueBallDNSResolver.prototype.count = function () {
return (Object.keys(this.r_backends).length);
};
CueBallResolver.prototype.list = function () {
CueBallDNSResolver.prototype.list = function () {
var self = this;

@@ -176,3 +295,3 @@ var ret = {};

CueBallResolver.prototype.state_init = function (on) {
CueBallDNSResolver.prototype.state_init = function (on) {
var self = this;

@@ -185,3 +304,3 @@ this.r_stopping = false;

CueBallResolver.prototype.state_check_ns = function (on, once) {
CueBallDNSResolver.prototype.state_check_ns = function (on, once) {
var self = this;

@@ -198,5 +317,6 @@ if (this.r_resolvers.length > 0) {

this.r_resolvers = [];
this.r_bootstrap = CueBallResolver.bootstrapResolvers[notIp[0]];
this.r_bootstrap =
CueBallDNSResolver.bootstrapResolvers[notIp[0]];
if (this.r_bootstrap === undefined) {
this.r_bootstrap = new CueBallResolver({
this.r_bootstrap = new CueBallDNSResolver({
domain: notIp[0],

@@ -208,3 +328,3 @@ service: '_dns._udp',

});
CueBallResolver.bootstrapResolvers[notIp[0]] =
CueBallDNSResolver.bootstrapResolvers[notIp[0]] =
this.r_bootstrap;

@@ -234,3 +354,3 @@ }

CueBallResolver.prototype.state_bootstrap_ns = function (on, once) {
CueBallDNSResolver.prototype.state_bootstrap_ns = function (on, once) {
var self = this;

@@ -270,3 +390,3 @@ this.r_bootstrap.on('added', function (k, srv) {

CueBallResolver.prototype.state_srv = function () {
CueBallDNSResolver.prototype.state_srv = function () {
var r = this.r_srvRetry;

@@ -278,3 +398,3 @@ r.delay = r.minDelay;

CueBallResolver.prototype.state_srv_try = function (on, once, timeout) {
CueBallDNSResolver.prototype.state_srv_try = function (on, once, timeout) {
var self = this;

@@ -326,3 +446,3 @@

CueBallResolver.prototype.state_srv_error = function (on, once, timeout) {
CueBallDNSResolver.prototype.state_srv_error = function (on, once, timeout) {
var self = this;

@@ -362,3 +482,3 @@ var r = self.r_srvRetry;

CueBallResolver.prototype.state_aaaa = function (on, once, timeout) {
CueBallDNSResolver.prototype.state_aaaa = function (on, once, timeout) {
this.r_srvRem = this.r_srvs.slice();

@@ -369,3 +489,3 @@ this.r_nextV6 = undefined;

CueBallResolver.prototype.state_aaaa_next = function () {
CueBallDNSResolver.prototype.state_aaaa_next = function () {
var r = this.r_retry;

@@ -385,3 +505,3 @@ r.delay = r.minDelay;

CueBallResolver.prototype.state_aaaa_try = function (on, once, timeout) {
CueBallDNSResolver.prototype.state_aaaa_try = function (on, once, timeout) {
var self = this;

@@ -419,3 +539,3 @@ var srv = this.r_srv;

CueBallResolver.prototype.state_aaaa_error = function (on, once, timeout) {
CueBallDNSResolver.prototype.state_aaaa_error = function (on, once, timeout) {
var self = this;

@@ -446,3 +566,3 @@ var r = self.r_retry;

CueBallResolver.prototype.state_a = function (on, once, timeout) {
CueBallDNSResolver.prototype.state_a = function (on, once, timeout) {
this.r_srvRem = this.r_srvs.slice();

@@ -453,3 +573,3 @@ this.r_nextV4 = undefined;

CueBallResolver.prototype.state_a_next = function () {
CueBallDNSResolver.prototype.state_a_next = function () {
var r = this.r_retry;

@@ -469,3 +589,3 @@ r.delay = r.minDelay;

CueBallResolver.prototype.state_a_try = function (on, once, timeout) {
CueBallDNSResolver.prototype.state_a_try = function (on, once, timeout) {
var self = this;

@@ -503,3 +623,3 @@ var srv = this.r_srv;

CueBallResolver.prototype.state_a_error = function (on, once, timeout) {
CueBallDNSResolver.prototype.state_a_error = function (on, once, timeout) {
var self = this;

@@ -530,3 +650,3 @@ var r = self.r_retry;

CueBallResolver.prototype.state_process = function () {
CueBallDNSResolver.prototype.state_process = function () {
var self = this;

@@ -557,5 +677,7 @@

if (newKeys.length === 0) {
this.r_log.warn(this.r_lastError, 'failed to find any ' +
'backend records for (%s.)%s (more details in TRACE)',
var err = new mod_verror.VError(this.r_lastError,
'failed to find any DNS records for (%s.)%s',
this.r_service, this.r_domain);
this.r_log.warn(err, 'finished processing');
this.emit('updated', err);
this.gotoState('sleep');

@@ -585,6 +707,7 @@ return;

this.emit('updated');
this.gotoState('sleep');
};
CueBallResolver.prototype.state_sleep = function (on, once, timeout) {
CueBallDNSResolver.prototype.state_sleep = function (on, once, timeout) {
var self = this;

@@ -662,3 +785,3 @@ var now = new Date();

CueBallResolver.prototype.resolve = function (domain, type, timeout) {
CueBallDNSResolver.prototype.resolve = function (domain, type, timeout) {
var opts = {};

@@ -690,3 +813,12 @@ opts.domain = domain;

if (type === 'A' || type === 'AAAA') {
ans = answers.map(function (a) {
ans = [];
answers.forEach(function (a) {
if (a.type !== type) {
if (a.type === 'CNAME' ||
a.type === 'DNAME')
return;
self.r_log.warn('got unsupported ' +
'answer rrtype: %s', a.type);
return;
}
if (minTTL === undefined ||

@@ -696,3 +828,3 @@ a.ttl < minTTL) {

}
return ({
ans.push({
name: a.name,

@@ -707,2 +839,10 @@ address: a.target

addns.forEach(function (rr) {
if (rr.type !== type) {
if (rr.type === 'CNAME' ||
rr.type === 'DNAME')
return;
self.r_log.warn('got unsupported ' +
'additional rrtype: %s', rr.type);
return;
}
if (rr.target) {

@@ -718,3 +858,12 @@ if (minTTL === undefined ||

});
ans = answers.map(function (a) {
ans = [];
answers.forEach(function (a) {
if (a.type !== type) {
if (a.type === 'CNAME' ||
a.type === 'DNAME')
return;
self.r_log.warn('got unsupported ' +
'answer rrtype: %s', a.type);
return;
}
if (minTTL === undefined ||

@@ -727,3 +876,3 @@ a.ttl < minTTL) {

obj.additionals = cache[a.target];
return (obj);
ans.push(obj);
});

@@ -737,1 +886,194 @@

};
/*
* Static IP Resolver
*
* This Resolver implementation emits a fixed list of IP addresses. This is
* useful in development environments and debugging tools, where users may want
* to target specific service instances.
*/
function CueBallStaticResolver(options) {
mod_assert.object(options, 'options');
mod_assert.arrayOfObject(options.backends, 'options.backends');
this.sr_backends = options.backends.map(function (backend, i) {
mod_assert.string(backend.address,
'options.backends[' + i + '].address');
mod_assert.ok(mod_net.isIP(backend.address),
'options.backends[' + i +
'].address must be an IP address');
mod_assert.number(backend.port,
'options.backends[' + i + '].port');
return ({
'name': backend.address + ':' + backend.port,
'address': backend.address,
'port': backend.port
});
});
this.sr_state = 'idle';
EventEmitter.call(this);
return (new CueBallResolver(this, options));
}
mod_util.inherits(CueBallStaticResolver, EventEmitter);
CueBallStaticResolver.prototype.start = function ()
{
var self = this;
mod_assert.equal(this.sr_state, 'idle',
'cannot call start() again without calling stop()');
this.sr_state = 'started';
setImmediate(function () {
self.sr_backends.forEach(function (be) {
self.emit('added', srvKey(be), be);
});
self.emit('updated');
});
};
CueBallStaticResolver.prototype.stop = function ()
{
mod_assert.equal(this.sr_state, 'started',
'cannot call stop() again without calling start()');
this.sr_state = 'idle';
};
CueBallStaticResolver.prototype.count = function ()
{
return (this.sr_backends.length);
};
CueBallStaticResolver.prototype.list = function ()
{
var ret = {};
this.sr_backends.forEach(function (be) {
ret[srvKey(be)] = be;
});
return (ret);
};
/*
* resolverForIpOrDomain(args): given an input string of the form:
*
* HOSTNAME[:PORT]
*
* where HOSTNAME may be either a DNS domain or IP address and PORT is an
* integer representing a TCP port, return an appropriate resolver instance that
* either uses the static IP resolver (if HOSTNAME is determined to be an IP
* address) or the DNS-based Resolver class (otherwise).
*
* Named arguments include:
*
* input the input string (described above)
*
* resolverConfig configuration properties passed to the resolver's
* constructor
*
* This is the appropriate interface for constructing a resolver from
* user-specified configuration because it allows users to specify IP addresses
* or DNS names interchangeably, which is what most users expect.
*
* If the input is well-formed but invalid (e.g., has the correct JavaScript
* types, but the port number is out of range, or the HOSTNAME portion cannot be
* interpreted as either an IP address or a DNS domain), then an Error object is
* returned.
*/
function resolverForIpOrDomain(args)
{
var speccfg, cons, rcfg;
speccfg = configForIpOrDomain(args);
if (speccfg instanceof Error) {
return (speccfg);
}
cons = speccfg.cons;
rcfg = speccfg.mergedConfig;
return (new cons(rcfg));
}
/*
* Implements the guts of resolverForIpOrDomain().
*/
function configForIpOrDomain(args)
{
var rcfg, speccfg, k;
mod_assert.object(args, 'args');
mod_assert.string(args.input, 'args.input');
mod_assert.optionalObject(args.resolverConfig, 'args.resolverConfig');
rcfg = {};
if (args.resolverConfig) {
for (k in args.resolverConfig) {
rcfg[k] = args.resolverConfig[k];
}
}
speccfg = parseIpOrDomain(args.input);
if (speccfg instanceof Error) {
return (speccfg);
}
for (k in speccfg.config) {
rcfg[k] = speccfg.config[k];
}
speccfg.mergedConfig = rcfg;
return (speccfg);
}
/*
* Implements the parsing part of resolverForIpOrDomain().
*/
function parseIpOrDomain(str)
{
var colon, first, port, ret;
colon = str.lastIndexOf(':');
if (colon == -1) {
first = str;
port = undefined;
} else {
first = str.substr(0, colon);
port = parseInt(str.substr(colon + 1), 10);
if (isNaN(port) || port < 0 || port > 65535) {
return (new Error('unsupported port in input: ' + str));
}
}
ret = {};
if (mod_net.isIP(first) === 0) {
ret['kind'] = 'dns';
ret['cons'] = CueBallDNSResolver;
/* XXX validate DNS domain? */
ret['config'] = {
'domain': first
};
if (port !== undefined) {
ret['config']['defaultPort'] = port;
}
} else {
ret['kind'] = 'static';
ret['cons'] = CueBallStaticResolver;
ret['config'] = {
'backends': [ {
'address': first,
'port': port
} ]
};
}
return (ret);
}

238

lib/utils.js

@@ -33,4 +33,4 @@ /*

mod_assert.ok(obj.maxTimeout === undefined ||
obj.timeout < obj.maxTimeout,
name + '.maxTimeout must be > timeout');
obj.timeout <= obj.maxTimeout,
name + '.maxTimeout must be >= timeout');
delete (ks.maxTimeout);

@@ -43,6 +43,30 @@ mod_assert.number(obj.delay, name + '.delay');

mod_assert.ok(obj.maxDelay === undefined ||
obj.delay < obj.maxDelay,
name + '.maxDelay must be > delay');
obj.delay <= obj.maxDelay,
name + '.maxDelay must be >= delay');
delete (ks.maxDelay);
mod_assert.deepEqual(Object.keys(ks), []);
var mult;
if (obj.maxDelay === undefined) {
mod_assert.ok(obj.retries < 32,
name + '.maxDelay is required when retries >= 32 ' +
'(exponential increase becomes unreasonably large)');
mult = 1 << obj.retries;
var maxDelay = obj.delay * mult;
mod_assert.ok(maxDelay < 1000 * 3600 * 24,
name + '.maxDelay is required with given values of ' +
'retries and delay (effective unspecified maxDelay is ' +
' > 1 day)');
}
if (obj.maxTimeout === undefined) {
mod_assert.ok(obj.retries < 32,
name + '.maxTimeout is required when retries >= 32 ' +
'(exponential increase becomes unreasonably large)');
mult = 1 << obj.retries;
var maxTimeout = obj.timeout * mult;
mod_assert.ok(maxTimeout < 1000 * 3600 * 24,
name + '.maxTimeout is required with given values of ' +
'retries and timeout (effective unspecified maxTimeout ' +
'is > 1 day)');
}
}

@@ -63,120 +87,138 @@

function planRebalance(inSpares, total, target) {
var spares = {};
var keyCount = 0;
var buckets = {};
/*
* `planRebalance(connections, dead, target, max)`
*
* Takes an abstract representation of the state of a connection pool and
* returns a 'plan' for what to do next to bring it to an ideal balanced state.
*
* Returns a 'plan': an Object with properties:
* - `add` -- Array of String, backend keys that should be added
* - `remove` -- Array of Object, connections to be closed
*
* Parameters:
* - `connections` -- an Object, map of String (backend id) to Array of Object
* (connections), list of currently open connections
* - `dead` -- an Object, map of String (backend id) to Boolean, true when a
* a given backend is declared dead
* - `target` -- a Number, target number of connections we want to have
* - `max` -- a Number, maximum socket ceiling
*/
function planRebalance(inSpares, dead, target, max) {
var replacements = 0;
var wantedSpares = {};
mod_assert.object(inSpares, 'connections');
mod_assert.number(total, 'count');
mod_assert.number(target, 'target');
mod_assert.number(max, 'max');
mod_assert.ok(total >= 0, 'count must be >= 0');
mod_assert.ok(target >= 0, 'target must be >= 0');
mod_assert.ok(max >= target, 'max must be >= target');
Object.keys(inSpares).forEach(function (key) {
mod_assert.array(inSpares[key], 'connections[' + key + ']');
spares[key] = inSpares[key].slice();
++keyCount;
var count = spares[key].length;
if (buckets[count] === undefined)
buckets[count] = [];
buckets[count].push(key);
});
var keys = Object.keys(inSpares);
var plan = { add: [], remove: [] };
var max = Math.round(target / keyCount);
if (max < 1)
max = 1;
function bucketsBySize() {
return (Object.keys(buckets).map(function (cnt) {
return (parseInt(cnt, 10));
}).sort().reverse());
/*
* Build up the number of FSMs we *want* to have for each backend in
* the wantedSpares map.
*
* First, we want to have the "target" number of connections, spread
* evenly across all the backends. If we find any dead backends along
* the way, make sure we have exactly 1 connection to each and we
* request a replacement for each time we wanted to use it.
*/
var done = 0;
for (var i = 0; i < target; ++i) {
var k = keys.shift();
keys.push(k);
if (wantedSpares[k] === undefined)
wantedSpares[k] = 0;
if (dead[k] !== true) {
++wantedSpares[k];
++done;
continue;
}
if (wantedSpares[k] === 0) {
wantedSpares[k] = 1;
++done;
}
++replacements;
}
function findFatBuckets() {
return (bucketsBySize().filter(function (cnt) {
return (cnt > max);
}));
}
/* Apply the max cap. */
if (done + replacements > max)
replacements = max - done;
/*
* First, cull any excess beyond the target size, working from the
* biggest buckets down to the smallest ones.
* Now try to allocate replacements. These proceed similarly to the
* first allocation, round-robin across all available backends.
*/
var bkts = bucketsBySize();
while (total > target) {
var b = bkts[0];
mod_assert.number(b, 'a backend key');
mod_assert.ok(b > 0, 'backend key must not be zero');
var ks = buckets[b];
mod_assert.arrayOfString(ks, 'at least one key of order ' + b);
while (total > target && ks.length > 0) {
var k = ks.shift();
--total;
var fsm = spares[k].shift();
mod_assert.ok(fsm, 'a spare ' + k + ' connection is ' +
'required');
plan.remove.push(fsm);
if (buckets[b - 1] === undefined)
buckets[b - 1] = [];
buckets[b - 1].push(k);
for (i = 0; i < replacements; ++i) {
k = keys.shift();
keys.push(k);
if (wantedSpares[k] === undefined)
wantedSpares[k] = 0;
if (dead[k] !== true) {
++wantedSpares[k];
++done;
continue;
}
if (ks.length === 0)
delete (buckets[b]);
bkts = bucketsBySize();
}
/*
* We can make replacements for a replacement (and so on) as
* long as we have room under our max socket cap and we haven't
* already tried every backend available.
*
* If this one is marked as dead, though, and we don't have room
* to add both it and a replacement, AND there are backends we
* haven't tried yet or that are alive, skip this one and use
* one of those.
*
* In this way we guarantee that even if our socket cap
* prevents us from making a double-replacement, we still try
* all the backends at least once.
*/
var count = done + replacements - i;
var empties = keys.filter(function (kk) {
return (dead[kk] !== true ||
wantedSpares[kk] === undefined ||
wantedSpares[kk] === 0);
});
if (count + 1 <= max) {
if (wantedSpares[k] === 0) {
wantedSpares[k] = 1;
++done;
}
if (empties.length > 0)
++replacements;
/* Now, cull any buckets that are bigger than the average cap (max). */
var fatbkts = findFatBuckets();
while (fatbkts.length > 0) {
b = fatbkts[0];
mod_assert.number(b, 'a backend key');
mod_assert.ok(b > 0, 'backend key must not be zero');
ks = buckets[b];
mod_assert.arrayOfString(ks, 'at least one key of order ' + b);
while (ks.length > 0) {
k = ks.shift();
--total;
fsm = spares[k].shift();
mod_assert.ok(fsm, 'a spare ' + k + ' connection is ' +
'required');
plan.remove.push(fsm);
if (buckets[b - 1] === undefined)
buckets[b - 1] = [];
buckets[b - 1].push(k);
} else if (count <= max && empties.length > 0) {
++replacements;
}
delete (buckets[b]);
fatbkts = findFatBuckets();
}
/*
* Finally, if we're now under target, add evenly to the lowest buckets
* until we hit it.
* Now calculate the difference between what we want and what we have:
* this will be our plan to return for what to do next.
*/
while (total < target) {
var bs = Object.keys(buckets).map(function (cnt) {
return (parseInt(cnt, 10));
}).sort();
b = bs[0];
ks = buckets[b];
var i = 0;
while (total < target && i < ks.length) {
var kk = ks[i];
plan.add.push(kk);
++total;
if (buckets[b + 1] === undefined)
buckets[b + 1] = [];
buckets[b + 1].push(kk);
++i;
keys = Object.keys(inSpares).reverse();
keys.forEach(function (key) {
var have = (inSpares[key] || []).length;
var want = wantedSpares[key] || 0;
var list = inSpares[key].slice();
while (have > want) {
plan.remove.push(list.shift());
--have;
}
delete (buckets[b]);
}
});
keys.reverse();
keys.forEach(function (key) {
var have = (inSpares[key] || []).length;
var want = wantedSpares[key] || 0;
while (have < want) {
plan.add.push(key);
++have;
}
});
return (plan);
}
{
"name": "cueball",
"version": "0.3.12",
"version": "0.4.0",
"description": "",

@@ -9,10 +9,15 @@ "main": "lib/index.js",

"bunyan": ">=1.5.1 <2.0.0",
"cmdutil": ">=1.0.0 <2.0.0",
"extsprintf": ">=1.3.0 <2.0.0",
"ipaddr.js": ">=1.1.0 <2.0.0",
"mooremachine": ">=1.2.0 <2.0.0",
"named-client": "git://github.com/arekinath/node-named-client#v0.3.3",
"mooremachine": ">=1.4.0 <2.0.0",
"named-client": "git+https://github.com/arekinath/node-named-client.git#v0.3.5",
"node-uuid": ">=1.4.7 <2.0.0",
"posix-getopt": ">=1.2.0 <2.0.0",
"restify-clients": ">=1.1.2 <2.0.0",
"vasync": ">=1.6.3 <2.0.0"
"vasync": ">=1.6.3 <2.0.0",
"verror": ">=1.6.1 <2.0.0"
},
"devDependencies": {
"jsprim": ">=1.3.0 <2.0.0",
"tape": ">=4.4.0 <5.0.0",

@@ -19,0 +24,0 @@ "sinon": ">=1.17.3 <2.0.0",

@@ -78,6 +78,8 @@ cueball

- `maximum` -- optional Number, maximum number of connections per host
- `tcpKeepAliveInitialDelay` -- optional Number, if supplied, enable TCP
level keep-alives with the given initial delay (in milliseconds)
- `ping` -- optional String, URL path to use for health checking. Connection
is considered still viable if this URL returns a non-5xx response code.
- `pingInterval` -- optional Number, interval between health check pings
- `errorOnEmpty` -- optional Boolean,
- `errorOnEmpty` -- optional Boolean

@@ -88,3 +90,5 @@ ## Pool

Creates a new pool of connections.
Creates a new pool of connections. There are two ways of using a
ConnectionPool. You can either provide your own resolver directly, or provide
parameters with which to create the default, DNS-based resolver.

@@ -94,5 +98,5 @@ Parameters

- `options` -- Object, with keys:
- `constructor` -- Function(backend) -> object, must open a new connection
- `constructor` -- Function(backend) -> object, must open a new connection
to the given backend and return it
- `domain` -- String, name to look up to find backends
- `domain` -- String, name to look up to find backends.
- `recovery` -- Object, a recovery spec (see below)

@@ -102,3 +106,3 @@ - `service` -- optional String, name of SRV service (e.g. `_http._tcp`)

- `resolvers` -- optional Array of String, either containing IP addresses to
use as nameservers, or a single string for Dynamic Resolver mode (default
use as nameservers, or a single string for Dynamic Resolver mode (default
uses system resolvers from `/etc/resolv.conf`)

@@ -108,3 +112,3 @@ - `log` -- optional Object, a `bunyan`-style logger to use

- `maximum` -- optional Number, maximum number of connections per host
- `maxDNSConcurrency` -- optional Number, max number of DNS queries to issue
- `maxDNSConcurrency` -- optional Number, max number of DNS queries to issue
at once (default 5)

@@ -115,3 +119,55 @@ - `checkTimeout` -- optional Number, milliseconds of idle time before

connections
- `resolver` -- optional instance of an object meeting the Resolver interface
below. You would typically obtain this object by either creating your own
Resolver directly or using the `resolverForIpOrDomain` function.
Do not confuse `resolvers` (the list of IP addresses for the DNS resolvers to
contact) with `resolver` (a custom object meeting the Resolver interface below).
If you want to use a custom resolver, then you must specify the `resolver`
property. In that case, the `resolvers`, `maxDNSConcurrency`, `defaultPort`,
and `recovery` options are ignored, and the `domain` and `service` properties
are used only for logging.
Otherwise, if want to use the default DNS-based resolver, do not specify the
`resolver` property. A resolver instance will be created based on the other
configuration properties.
### Pool states
ConnectionPool exposes the `mooremachine` FSM interface, with the following
state graph:
| (from failed)
.stop() v
+--------+ connect ok +-------+ +--------+
init --> |starting| +------------> |running| +---> |stopping|
+--------+ +-------+ +--------+
+ ^ + +
resolver | | | |
failed | | | |
OR | +------+ | | v
retries +----> |failed| +-----+ | +-------+
exhausted +------+ connect ok | |stopped|
+ ^ | +-------+
| | |
.stop()| +----------------+
| all retries exhausted
Pools begin their life in the "starting" state. Once they have successfully made
one connection to any backend, they proceed to the "running" state. Otherwise,
if their underlying Resolver enters the "failed" state, or they exhaust their
retry policy attempting to connect to all their backends, they enter the
"failed" state.
A "running" pool can then either be stopped by calling the `.stop()` method, at
which point it enters the "stopping" state and begins tearing down its
connections; or all of its connections become disconnected and it exhausts its
retry policy, in which case it enters the "failed" state.
Failed pools can re-enter the "running" state at any time if they make a
successful connection to a backend and their underlying Resolver is no longer
"failed". A "failed" pool can also have the `.stop()` method called, in which
case it proceeds much as from "running".
### `ConnectionPool#stop()`

@@ -135,11 +191,10 @@

out
- `handle` -- Object, handle to be used to release the connection back to
- `handle` -- Object, handle to be used to release the connection back to
the pool when work is complete
- `connection` -- Object, the actual connection (as returned by the
- `connection` -- Object, the actual connection (as returned by the
`constructor` given to `new ConnectionPool()`)
Returns either `undefined` (if the callback was called immediately), or a
"waiter handle", which is an Object having a `cancel()` method. The `cancel()`
method may be called at any time up to when the `callback` is run, to cancel
the request to the pool and relinquish any queue positions held.
Returns a "waiter handle", which is an Object having a `cancel()` method. The
`cancel()` method may be called at any time up to when the `callback` is run, to
cancel the request to the pool and relinquish any queue positions held.

@@ -150,6 +205,11 @@ When a client is done with a connection, they must call `handle.release()` to

Calling `claim()` on a Pool that is in the "stopping", "stopped" or "failed"
states will result in the callback being called with an error on the next run of
the event loop.
### `ConnectionPool#claimSync()`
Claims a connection from the pool, only if an idle one is immediately
available. Otherwise, throws an Error.
available. Otherwise, throws an Error. Always throws an Error if called on a
Pool that is "stopping", "stopped" or "failed".

@@ -162,21 +222,45 @@ Returns an Object with keys:

### `new mod_cueball.Resolver(options)`
### `mod_cueball.Resolver` interface
Creates a "resolver" -- an object which tracks a given service in DNS and
emits events when backends are added or removed.
An interface for all "resolvers", objects which take in some kind of
configuration (e.g. a DNS name) and track a list of "backends" for that
name. A "backend" is an IP/port pair that describes an endpoint that can
be connected to to reach a given service.
Parameters
Resolver exposes the `mooremachine` FSM interface, with the following state
graph:
- `options` -- Object, with keys:
- `domain` -- String, name to look up to find backends
- `recovery` -- Object, a recovery spec (see below)
- `service` -- optional String, name of SRV service (e.g. `_http._tcp`)
- `defaultPort` -- optional Number, port to use for plain A/AAAA records
- `resolvers` -- optional Array of String, either containing IP addresses to
use as nameservers, or a single string for Dynamic Resolver mode (default
uses system resolvers from `/etc/resolv.conf`)
- `log` -- optional Object, a `bunyan`-style logger to use
- `maxDNSConcurrency` -- optional Number, max number of DNS queries to issue
at once (default 5)
.start() error
+-------+ +--------+ +------+
init -> |stopped| +---> |starting| +---> |failed|
+---+---+ +---+----+ +------+
^ | +
| | ok |
| v |
+---+----+ +---+---+ |
|stopping| <--+ |running| <--------+
+--------+ +-------+ retry success
.stop()
Resolvers begin their life "stopped". When the user calls `.start()`, they
begin the process of resolving the name/configuration they were given into
backends.
If the initial attempt to resolve the name/configuration fails, the Resolver
enters the "failed" state, but continues retrying. If it succeeds, or if any
later retry succeeds, it moves to the "running" state. The reason why the
"failed" state exists is so that commandline tools and other short-lived
processes can make use of it to decide when to "give up" on a name resolution.
Once an attempt has succeeded, the Resolver will begin emitting `added` and
`removed` events (see below) describing the backends that it has found.
In the "running" state, the Resolver continues to monitor the source of its
backends (e.g. in DNS by retrying once the TTL expires) and emitting these
events when changes occur.
Finally, when the `.stop()` method is called, the Resolver transitions to
"stopping", stops monitoring and emitting events, and comes to rest in the
"stopped" state where it started.
### `Resolver#start()`

@@ -192,5 +276,23 @@

### `Resolver#getLastError()`
Returns the last error experienced by the Resolver. This is particularly useful
when the Resolver is in the "failed" state, to produce a log message or user
interface text.
### `Resolver#getState()`
Returns the current state of the Resolver as a string (see diagram above).
Inherited from `mod_mooremachine.FSM`.
### `Resolver#onState(state, cb)`
Registers an event handler to run when the Resolver enters the given state.
Inherited from `mod_mooremachine.FSM`.
### Event `Resolver->added(key, backend)`
Emitted when a new backend has been found in DNS.
Emitted when a new backend has been found.

@@ -207,3 +309,3 @@ Parameters

Emitted when an existing backend has been removed from DNS.
Emitted when an existing backend has been removed.

@@ -213,2 +315,113 @@ Parameters

## DNS-based name resolver
### `new mod_cueball.DNSResolver(options)`
Creates a Resolver that looks up a name in DNS. This Resolver prefers SRV
records if they are available, and falls back to A/AAAA records if they cannot
be found.
Parameters
- `options` -- Object, with keys:
- `domain` -- String, name to look up to find backends
- `recovery` -- Object, a recovery spec (see below)
- `service` -- optional String, name of SRV service (e.g. `_http._tcp`)
- `defaultPort` -- optional Number, port to use for plain A/AAAA records
- `resolvers` -- optional Array of String, either containing IP addresses to
use as nameservers, or a single string for Dynamic Resolver mode (default
uses system resolvers from `/etc/resolv.conf`)
- `log` -- optional Object, a `bunyan`-style logger to use
- `maxDNSConcurrency` -- optional Number, max number of DNS queries to issue
at once (default 5)
## Static IP resolver
### `new mod_cueball.StaticIpResolver(options)`
Creates a new static IP resolver. This object matches the Resolver interface
above, but emits a fixed list of IP addresses when started. This list never
changes. This is intended for development environments and debugging tools,
where a user may have provided an explicit IP address rather than a DNS name to
contact. See also: `resolverForIpOrDomain()`.
Parameters
- `options` -- Object, with keys:
- `backends` -- Array of objects, each having properties:
- `address` -- String, an IP address to emit as a backend
- `port` -- Number, a port number for this backend
This object provides the same `start()` and `stop()` methods as the Resolver
class, as well as the same `added` and `removed` events.
## Picking the right resolver
### `resolverForIpOrDomain(options)`
Services that use DNS for service discovery would typically use a DNS-based
resolver. But in development environments or with debugging tools, it's useful
to be able to point a cueball-using program at an instance located at a specific
IP address and port. That's what the Static IP resolver is for.
To make this easy for programs that want to support connecting to either
hostnames or IP addresses, this function is provided to take configuration
(expected to come from a user, via an environment variable, command-line
option, or other configuration source), determine whether an IP address or DNS
name was specified, and return either a DNS-based or static resolver. If the
input appears to be neither a valid IPv4 nor IPv6 address nor DNS name, or the
port number is not valid, then an Error is returned (not thrown). (If the
input is missing or has the wrong type, an Error object is thrown, since this
is a programmer error.)
Parameters
- `options` -- Object, with keys:
- `input` -- String, either an IP address or DNS name, with optional port
suffix
- `resolverConfig` -- Object, a set of additional properties to pass to
the resolver constructor.
The `input` string has the form `HOSTNAME[:PORT]`, where the `[:PORT]` suffix is
optional, and `HOSTNAME` may be either an IP address or DNS name.
**Example:** create a resolver that will emit one backend for an instance at IP
127.0.0.1 port 2020:
var resolver = mod_cueball.resolverForIpOrDomain({
'input': '127.0.0.1:2020',
'resolverConfig': {
'recovery': {
'default': {
'retries': 1,
'timeout': 1000,
'delay': 1000,
'maxDelay': 1000
}
}
}
})
/* check whether resolver is an Error */
**Example:** create a resolver that will track instances associated with DNS
name `mydomain.example.com`:
var resolver = mod_cueball.resolverForIpOrDomain({
'input': 'mydomain.example.com',
'resolverConfig': {
'recovery': {
'default': {
'retries': 1,
'timeout': 1000,
'delay': 1000,
'maxDelay': 1000
}
}
}
});
/* check whether resolver is an Error */
In these examples, the `input` string is assumed to come from a user
cueball does the expected thing when given an IP address or DNS name.
## Errors

@@ -245,3 +458,3 @@

To specify the retry and timeout behaviour of Cueball DNS and pooled
To specify the retry and timeout behaviour of Cueball DNS and pooled
connections, the "recovery spec object" is a required argument to most

@@ -277,3 +490,3 @@ constructors in the API.

The `delay` field indicates a time to wait between retry attempts. After each
The `delay` field indicates a time to wait between retry attempts. After each
failure, it will be doubled until it exceeds the value of `maxDelay`.

@@ -283,3 +496,3 @@

- `retries` finite Number >= 0, number of retry attempts
- `timeout` finite Number > 0, milliseconds to wait before declaring an
- `timeout` finite Number > 0, milliseconds to wait before declaring an
attempt a failure

@@ -302,2 +515,3 @@ - `maxTimeout` Number > `timeout` (can be `Infinity`), maximum value of

Dynamic Resolver mode

@@ -352,1 +566,48 @@ ---------------------

nameservers we find for the next `napi.coal.joyent.us` lookup as well.
Tools
-----
The `cbresolve` tool is provided to show how cueball would resolve a given
configuration. The output format is not committed. It may change in the
future.
usage: cbresolve HOSTNAME[:PORT] # for DNS-based lookup
cbresolve -S | --static IP[:PORT]... # for static IPs
Locate services in DNS using Cueball resolver.
The following options are available for DNS-based lookups:
-f, --follow periodically re-resolve and report changes
-p, --port PORT default backend port
-r, --resolvers IP[,IP...] list of DNS resolvers
-s, --service SERVICE "service" name (for SRV)
-t, --timeout TIMEOUT timeout for lookups
**Example:** resolve DNS name "1.moray.us-east.joyent.us":
$ cbresolve 1.moray.emy-10.joyent.us
domain: 1.moray.emy-10.joyent.us
timeout: 5000 milliseconds
172.27.10.218 80 lLbminikNKjfy+iwDobYBuod7Hs=
172.27.10.219 80 iJMaVRehJ2zKfiS55H/lUUFPb9o=
**Example:** resolve IP/port "127.0.0.1:2020". This is only useful for seeing
how cueball would parse your input:
$ cbresolve --static 127.0.0.1:2020
using static IP resolver
127.0.0.1 2020 xBut/f1D52k1TpDN/miW82qXw6k=
**Example: resolve DNS name "1.moray.us-east.joyent.us" and watch for changes:
$ cbresolve --follow 1.moray.emy-10.joyent.us
domain: 1.moray.emy-10.joyent.us
timeout: 5000 milliseconds
2016-06-23T00:45:00.312Z added 172.27.10.218:80 (lLbminikNKjfy+iwDobYBuod7Hs=)
2016-06-23T00:45:00.314Z added 172.27.10.219:80 (iJMaVRehJ2zKfiS55H/lUUFPb9o=)
2016-06-23T00:49:00.478Z removed 172.27.10.218:80 (lLbminikNKjfy+iwDobYBuod7Hs=)
In this example, one of the DNS entries was removed a few minutes after the
program was started.
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc