Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

cueball

Package Overview
Dependencies
Maintainers
1
Versions
71
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

cueball - npm Package Compare versions

Comparing version 0.4.1 to 0.5.0

lib/connection-fsm.js

2

lib/index.js

@@ -14,2 +14,3 @@ /*

const mod_errors = require('./errors');
const mod_cset = require('./set');

@@ -20,2 +21,3 @@ module.exports = {

ConnectionPool: mod_pool.ConnectionPool,
ConnectionSet: mod_cset.ConnectionSet,
Resolver: mod_resolver.Resolver,

@@ -22,0 +24,0 @@ StaticIpResolver: mod_resolver.StaticIpResolver,

@@ -17,2 +17,3 @@ /*

const mod_pool = require('./pool');
const mod_cset = require('./set');
const mod_os = require('os');

@@ -22,2 +23,3 @@

this.pm_pools = {};
this.pm_sets = {};
}

@@ -36,2 +38,13 @@

CueBallPoolMonitor.prototype.registerSet = function (set) {
mod_assert.ok(set instanceof mod_cset.ConnectionSet);
this.pm_sets[set.cs_uuid] = set;
};
CueBallPoolMonitor.prototype.unregisterSet = function (set) {
mod_assert.ok(set instanceof mod_cset.ConnectionSet);
mod_assert.ok(this.pm_sets[set.cs_uuid]);
delete (this.pm_sets[set.cs_uuid]);
};
CueBallPoolMonitor.prototype.toKangOptions = function () {

@@ -41,12 +54,26 @@ var self = this;

function listTypes() {
return (['pool']);
return (['pool', 'set']);
}
function listObjects(type) {
mod_assert.strictEqual(type, 'pool');
return (Object.keys(self.pm_pools));
if (type === 'pool') {
return (Object.keys(self.pm_pools));
} else if (type === 'set') {
return (Object.keys(self.pm_sets));
} else {
throw (new Error('Invalid type "' + type + '"'));
}
}
function get(type, id) {
mod_assert.strictEqual(type, 'pool');
if (type === 'pool') {
return (getPool(id));
} else if (type === 'set') {
return (getSet(id));
} else {
throw (new Error('Invalid type "' + type + '"'));
}
}
function getPool(id) {
var pool = self.pm_pools[id];

@@ -74,4 +101,6 @@ mod_assert.object(pool);

obj.dead_backends = Object.keys(pool.p_dead);
obj.last_rebalance = Math.round(
pool.p_lastRebalance.getTime() / 1000);
if (pool.p_lastRebalance !== undefined) {
obj.last_rebalance = Math.round(
pool.p_lastRebalance.getTime() / 1000);
}
obj.resolvers = pool.p_resolver.r_resolvers;

@@ -89,2 +118,42 @@ obj.state = pool.getState();

function getSet(id) {
var cset = self.pm_sets[id];
mod_assert.object(cset);
var obj = {};
obj.backends = cset.cs_backends;
obj.fsms = {};
obj.connections = Object.keys(cset.cs_connections);
var ks = cset.cs_keys;
Object.keys(cset.cs_fsms).forEach(function (k) {
if (ks.indexOf(k) === -1)
ks.push(k);
});
ks.forEach(function (k) {
var conns = cset.cs_fsms[k] || [];
obj.fsms[k] = {};
conns.forEach(function (fsm) {
var s = fsm.getState();
if (obj.fsms[k][s] === undefined)
obj.fsms[k][s] = 0;
++obj.fsms[k][s];
});
});
obj.dead_backends = Object.keys(cset.cs_dead);
if (cset.cs_lastRebalance !== undefined) {
obj.last_rebalance = Math.round(
cset.cs_lastRebalance.getTime() / 1000);
}
obj.resolvers = cset.cs_resolver.r_resolvers;
obj.state = cset.getState();
obj.counters = cset.cs_counters;
obj.target = cset.cs_target;
obj.maximum = cset.cs_max;
obj.options = {};
obj.options.domain = cset.cs_resolver.r_domain;
obj.options.service = cset.cs_resolver.r_service;
obj.options.defaultPort = cset.cs_resolver.r_defport;
return (obj);
}
function stats() {

@@ -91,0 +160,0 @@ return ({});

774

lib/pool.js

@@ -31,575 +31,5 @@ /*

const Queue = require('./queue');
const ConnectionFSM = require('./connection-fsm');
/*
* ConnectionFSM is the state machine for a "connection" -- an abstract entity
* that is managed by a ConnectionPool. ConnectionFSMs are associated with a
* particular 'backend', and hold a backing object cf_conn. Typically this
* backing object is a TCP socket, but may be any EventEmitter that emits
* 'close' and 'error'.
*/
function ConnectionFSM(options) {
mod_assert.object(options, 'options');
mod_assert.object(options.pool, 'options.pool');
mod_assert.func(options.constructor, 'options.constructor');
mod_assert.object(options.backend, 'options.backend');
mod_assert.object(options.log, 'options.log');
mod_assert.optionalFunc(options.checker, 'options.checker');
mod_assert.optionalNumber(options.checkTimeout, 'options.checkTimeout');
this.cf_pool = options.pool;
this.cf_constructor = options.constructor;
this.cf_backend = options.backend;
this.cf_claimed = false;
this.cf_claimStack = [];
this.cf_releaseStack = [];
this.cf_lastError = undefined;
this.cf_conn = undefined;
this.cf_shadow = undefined;
this.cf_closeAfter = false;
this.cf_oldListeners = {};
this.cf_checkTimeout = options.checkTimeout;
this.cf_checker = options.checker;
this.cf_lastCheck = new Date();
this.cf_log = options.log.child({
backend: this.cf_backend.key
});
mod_assert.object(options.recovery, 'options.recovery');
var connectRecov = options.recovery.default;
var initialRecov = options.recovery.default;
if (options.recovery.connect) {
initialRecov = options.recovery.connect;
connectRecov = options.recovery.connect;
}
if (options.recovery.initial)
initialRecov = options.recovery.initial;
mod_utils.assertRecovery(connectRecov, 'recovery.connect');
mod_utils.assertRecovery(initialRecov, 'recovery.initial');
this.cf_initialRecov = initialRecov;
this.cf_connectRecov = connectRecov;
this.cf_retries = initialRecov.retries;
this.cf_retriesLeft = initialRecov.retries;
this.cf_minDelay = initialRecov.delay;
this.cf_delay = initialRecov.delay;
this.cf_maxDelay = initialRecov.maxDelay || Infinity;
this.cf_timeout = initialRecov.timeout;
this.cf_maxTimeout = initialRecov.maxTimeout || Infinity;
/*
* If our parent pool thinks this backend is dead, resume connection
* attempts with the maximum delay and timeout. Something is going
* wrong, let's not retry too aggressively and make it worse.
*/
if (this.cf_pool.p_dead[this.cf_backend.key] === true) {
/*
* We might be given an infinite maxDelay or maxTimeout. If
* we are, then multiply it by 2^(retries) to get to what the
* value would have been before.
*/
var mult = 1 << this.cf_retries;
this.cf_delay = this.cf_maxDelay;
if (!isFinite(this.cf_delay))
this.cf_delay = initialRecov.delay * mult;
this.cf_timeout = this.cf_maxTimeout;
if (!isFinite(this.cf_timeout))
this.cf_timeout = initialRecov.timeout * mult;
/* Keep retrying a failed backend forever */
this.cf_retries = Infinity;
this.cf_retriesLeft = Infinity;
}
this.allStateEvent('closeAsserted');
FSM.call(this, 'init');
}
mod_util.inherits(ConnectionFSM, FSM);
/*
* Return true if this connection was closed due to retry exhaustion.
*/
ConnectionFSM.prototype.retriesExhausted = function () {
return (this.isInState('closed') && this.cf_retriesLeft <= 0);
};
/*
* Mark this Connection as "claimed"; in use by a particular client of the
* pool.
*
* Normally this will be called by the pool itself, which will give the 'stack'
* argument as a copy of the stack trace from its caller.
*
* We keep track of the stack trace of our last claimer and releaser to aid
* in debugging.
*/
ConnectionFSM.prototype.claim = function (stack, cb) {
mod_assert.ok(this.cf_claimed === false);
mod_assert.strictEqual(this.getState(), 'idle');
if (typeof (stack) === 'function') {
cb = stack;
stack = undefined;
}
if (stack === undefined) {
var e = {};
Error.captureStackTrace(e);
stack = e.stack;
}
this.cf_claimStack = stack.split('\n').slice(1).
map(function (l) { return (l.replace(/^[ ]*at /, '')); });
this.cf_claimed = true;
if (cb) {
var self = this;
this.onState('busy', function () {
/*
* Give the client our ConnectionHandle, and the
* backing object.
*
* They use the ConnectionHandle to call release().
*/
cb(null, self.cf_shadow, self.cf_conn);
});
}
this.emit('claimAsserted');
};
/*
* Mark this Connection as "free" and ready to be re-used. This is normally
* called via the ConnectionHandle.
*/
ConnectionFSM.prototype.release = function (cb) {
mod_assert.ok(this.cf_claimed === true);
mod_assert.ok(['busy', 'ping'].indexOf(this.getState()) !== -1,
'connection is not held');
var e = {};
Error.captureStackTrace(e);
this.cf_releaseStack = e.stack.split('\n').slice(1).
map(function (l) { return (l.replace(/^[ ]*at /, '')); });
if (cb)
this.onState('idle', cb);
this.emit('releaseAsserted');
};
ConnectionFSM.prototype.close = function (cb) {
if (cb)
this.onState('closed', cb);
this.emit('closeAsserted');
};
ConnectionFSM.prototype.start = function () {
this.emit('startAsserted');
};
ConnectionFSM.prototype.closeAfterRelease = function () {
this.cf_closeAfter = true;
};
ConnectionFSM.prototype.state_init = function (on) {
this.validTransitions(['connect', 'closed']);
var self = this;
on(this, 'startAsserted', function () {
self.gotoState('connect');
});
on(this, 'closeAsserted', function () {
self.gotoState('closed');
});
};
ConnectionFSM.prototype.state_connect = function (on, once, timeout) {
this.validTransitions(['error', 'idle', 'closed']);
var self = this;
timeout(this.cf_timeout, function () {
self.cf_lastError = new mod_errors.ConnectionTimeoutError(self);
self.gotoState('error');
});
this.cf_conn = this.cf_constructor(this.cf_backend);
mod_assert.object(this.cf_conn, 'constructor return value');
this.cf_conn.cf_fsm = this;
once(this.cf_conn, 'connect', function () {
self.gotoState('idle');
});
once(this.cf_conn, 'error', function (err) {
self.cf_lastError = err;
self.gotoState('error');
self.cf_pool._incrCounter('error-during-connect');
});
once(this.cf_conn, 'connectError', function (err) {
self.cf_lastError = err;
self.gotoState('error');
self.cf_pool._incrCounter('error-during-connect');
});
once(this.cf_conn, 'close', function () {
self.cf_lastError = new mod_errors.ConnectionClosedError(self);
self.gotoState('error');
self.cf_pool._incrCounter('close-during-connect');
});
once(this.cf_conn, 'timeout', function () {
self.cf_lastError = new mod_errors.ConnectionTimeoutError(self);
self.gotoState('error');
self.cf_pool._incrCounter('timeout-during-connect');
});
once(this.cf_conn, 'connectTimeout', function (err) {
self.cf_lastError = new mod_errors.ConnectionTimeoutError(self);
self.gotoState('error');
self.cf_pool._incrCounter('timeout-during-connect');
});
once(this, 'closeAsserted', function () {
self.gotoState('closed');
});
};
ConnectionFSM.prototype.state_closed = function (on) {
this.validTransitions([]);
if (this.cf_conn && this.cf_conn.destroy)
this.cf_conn.destroy();
this.cf_conn = undefined;
this.cf_closeAfter = false;
this.cf_lastError = undefined;
this.cf_log.trace('ConnectionFSM closed');
on(this, 'closeAsserted', function () { });
};
ConnectionFSM.prototype.state_error = function (on, once, timeout) {
this.validTransitions(['delay', 'closed']);
var self = this;
on(this, 'closeAsserted', function () {
self.gotoState('closed');
});
if (this.cf_conn && this.cf_conn.destroy)
this.cf_conn.destroy();
this.cf_conn = undefined;
if (this.cf_shadow) {
this.cf_shadow.sh_error = true;
this.cf_shadow = undefined;
}
/*
* If the closeAfter flag is set, and this is a connection to a "dead"
* backend (i.e., a "monitor" watching to see when it comes back), then
* exit now. For an ordinary backend, we don't want to do this,
* because we want to give ourselves the opportunity to run out of
* retries.
*
* Otherwise, in a situation where we have two connections that were
* created at the same time, one to a failing backend that's already
* declared dead, and one to a different failing backend not yet
* declared, we may never learn that the second backend is down and
* declare it dead. The already declared dead backend may exit first
* during a pool reshuffle and cause this one to exit prematurely
* (there's a race in who exits first and causes the planner to engage)
*/
if (this.cf_retries === Infinity && this.cf_closeAfter) {
this.cf_retriesLeft = 0;
this.gotoState('closed');
return;
}
if (this.cf_retries !== Infinity)
--this.cf_retriesLeft;
if (this.cf_retries === Infinity || this.cf_retriesLeft > 0) {
this.gotoState('delay');
} else {
this.cf_log.warn(this.cf_lastError, 'failed to connect to ' +
'backend %s (%j)', this.cf_backend.key, this.cf_backend);
this.cf_pool._incrCounter('retries-exhausted');
this.gotoState('closed');
}
};
ConnectionFSM.prototype.state_delay = function (on, once, timeout) {
this.validTransitions(['connect', 'closed']);
var delay = this.cf_delay;
this.cf_delay *= 2;
this.cf_timeout *= 2;
if (this.cf_timeout > this.cf_maxTimeout)
this.cf_timeout = this.cf_maxTimeout;
if (this.cf_delay > this.cf_maxDelay)
this.cf_delay = this.cf_maxDelay;
var self = this;
var t = timeout(delay, function () {
self.gotoState('connect');
});
t.unref();
once(this, 'closeAsserted', function () {
self.gotoState('closed');
});
};
ConnectionFSM.prototype.state_idle = function (on, once, timeout) {
this.validTransitions(['busy', 'error', 'closed']);
var self = this;
this.cf_claimed = false;
this.cf_claimStack = [];
this.cf_log.trace('connected, idling');
if (this.cf_shadow) {
this.cf_shadow.sh_claimed = false;
this.cf_shadow.sh_releaseStack = this.cf_releaseStack;
this.cf_shadow = undefined;
}
['close', 'error', 'readable', 'data'].forEach(function (evt) {
var newCount = self.cf_conn.listeners(evt).filter(
function (h) { return (typeof (h) === 'function'); }).
length;
var oldCount = self.cf_oldListeners[evt];
if (oldCount !== undefined && newCount > oldCount) {
var info = {};
info.stack = self.cf_releaseStack;
info.handlers = self.cf_conn.listeners(evt).map(
function (f) { return (f.toString()); });
info.event = evt;
self.cf_log.warn(info, 'connection claimer looks ' +
'like it leaked event handlers');
}
});
/*
* Reset retries and retry delay to their defaults since we are now
* connected.
*/
this.cf_retries = this.cf_connectRecov.retries;
this.cf_retriesLeft = this.cf_connectRecov.retries;
this.cf_minDelay = this.cf_connectRecov.delay;
this.cf_delay = this.cf_connectRecov.delay;
this.cf_maxDelay = this.cf_connectRecov.maxDelay || Infinity;
this.cf_timeout = this.cf_connectRecov.timeout;
this.cf_maxTimeout = this.cf_connectRecov.maxTimeout || Infinity;
if (this.cf_closeAfter === true) {
this.cf_closeAfter = false;
this.cf_lastError = undefined;
this.gotoState('closed');
on(this, 'closeAsserted', function () { });
return;
}
this.cf_conn.unref();
once(this, 'claimAsserted', function () {
self.gotoState('busy');
});
once(this.cf_conn, 'error', function (err) {
self.cf_lastError = err;
self.gotoState('error');
self.cf_pool._incrCounter('error-during-idle');
});
once(this.cf_conn, 'close', function () {
self.cf_lastError = new mod_errors.ConnectionClosedError(self);
self.gotoState('error');
self.cf_pool._incrCounter('close-during-idle');
});
once(this.cf_conn, 'end', function () {
self.cf_lastError = new mod_errors.ConnectionClosedError(self);
self.gotoState('error');
self.cf_pool._incrCounter('end-during-idle');
});
once(this, 'closeAsserted', function () {
self.gotoState('closed');
});
if (this.cf_checkTimeout !== undefined) {
var now = new Date();
var sinceLast = (now - this.cf_lastCheck);
var delay;
if (sinceLast > this.cf_checkTimeout) {
delay = 1000;
} else {
delay = this.cf_checkTimeout - sinceLast;
if (delay < 1000)
delay = 1000;
}
var t = timeout(delay, function () {
self.gotoState('ping');
});
t.unref();
}
};
ConnectionFSM.prototype.state_ping = function (on, once, timeout) {
this.validTransitions(['error', 'closed', 'idle']);
this.cf_lastCheck = new Date();
this.cf_claimStack = [
'ConnectionFSM.prototype.state_ping',
'(periodic_health_check)'];
this.cf_claimed = true;
var self = this;
this.cf_conn.ref();
this.cf_releaseStack = [];
this.cf_log.trace('doing health check');
/*
* Write down the count of event handlers on the backing object so that
* we can spot if the client leaked any common ones in release().
*/
this.cf_oldListeners = {};
['close', 'error', 'readable', 'data'].forEach(function (evt) {
var count = self.cf_conn.listeners(evt).filter(
function (h) { return (typeof (h) === 'function'); }).
length;
self.cf_oldListeners[evt] = count;
});
/*
* The ConnectionHandle is a one-time use object that proxies calls to
* our release() and close() functions. We use it so that we can assert
* that this particular client only releases us once. If we only
* asserted on our current state, there could be a race where we get
* claimed by a different client in the meantime.
*/
this.cf_shadow = new ConnectionHandle(this);
once(this, 'releaseAsserted', function () {
if (self.cf_closeAfter === true) {
self.gotoState('closed');
} else {
self.gotoState('idle');
}
});
once(this.cf_conn, 'error', function (err) {
self.cf_lastError = err;
self.gotoState('error');
self.cf_pool._incrCounter('error-during-ping');
});
once(this.cf_conn, 'close', function () {
self.cf_lastError = new mod_errors.ConnectionClosedError(self);
self.gotoState('error');
self.cf_pool._incrCounter('close-during-ping');
});
once(this.cf_conn, 'end', function () {
self.cf_lastError = new mod_errors.ConnectionClosedError(self);
self.gotoState('error');
self.cf_pool._incrCounter('end-during-ping');
});
once(this, 'closeAsserted', function () {
self.cf_lastError = new mod_errors.ConnectionClosedError(self);
self.gotoState('error');
});
var t = timeout(this.cf_checkTimeout, function () {
var info = {};
info.stack = self.cf_claimStack;
self.cf_log.warn(info, 'health check is taking too ' +
'long to run (has been more than %d ms)',
self.cf_checkTimeout);
});
t.unref();
this.cf_checker.call(undefined, this.cf_shadow, this.cf_conn);
};
ConnectionFSM.prototype.state_busy = function (on, once, timeout) {
this.validTransitions(['error', 'closed', 'idle']);
var self = this;
this.cf_conn.ref();
this.cf_releaseStack = [];
this.cf_log.trace('busy, claimed by %s',
this.cf_claimStack[1].split(' ')[0]);
/*
* Write down the count of event handlers on the backing object so that
* we can spot if the client leaked any common ones in release().
*/
this.cf_oldListeners = {};
['close', 'error', 'readable', 'data'].forEach(function (evt) {
var count = self.cf_conn.listeners(evt).filter(
function (h) { return (typeof (h) === 'function'); }).
length;
self.cf_oldListeners[evt] = count;
});
/*
* The ConnectionHandle is a one-time use object that proxies calls to
* our release() and close() functions. We use it so that we can assert
* that this particular client only releases us once. If we only
* asserted on our current state, there could be a race where we get
* claimed by a different client in the meantime.
*/
this.cf_shadow = new ConnectionHandle(this);
once(this, 'releaseAsserted', function () {
if (self.cf_closeAfter === true) {
self.gotoState('closed');
} else {
self.gotoState('idle');
}
});
once(this.cf_conn, 'error', function (err) {
self.cf_lastError = err;
self.gotoState('error');
self.cf_pool._incrCounter('error-during-busy');
});
once(this.cf_conn, 'end', function () {
self.cf_closeAfter = true;
self.cf_pool._incrCounter('end-during-busy');
});
once(this.cf_conn, 'close', function () {
self.cf_lastError = new mod_errors.ConnectionClosedError(self);
self.gotoState('error');
self.cf_pool._incrCounter('close-during-busy');
});
once(this, 'closeAsserted', function () {
self.cf_lastError = new mod_errors.ConnectionClosedError(self);
self.gotoState('error');
});
if (this.cf_checkTimeout !== undefined) {
var t = timeout(this.cf_checkTimeout, function () {
var info = {};
info.stack = self.cf_claimStack;
self.cf_log.warn(info, 'connection held for longer ' +
'than checkTimeout (%d ms), may have been leaked',
self.cf_checkTimeout);
});
t.unref();
}
};
function ConnectionHandle(cf) {
this.sh_cf = cf;
this.sh_claimed = true;
this.sh_error = false;
this.sh_releaseStack = [];
}
ConnectionHandle.prototype.close = function () {
mod_assert.ok(this.sh_claimed, 'Connection not claimed by ' +
'this handle, released by ' + this.sh_releaseStack[2]);
if (this.sh_error) {
this.sh_claimed = false;
return (undefined);
}
return (this.sh_cf.close.apply(this.sh_cf, arguments));
};
ConnectionHandle.prototype.release = function () {
mod_assert.ok(this.sh_claimed, 'Connection not claimed by ' +
'this handle, released by ' + this.sh_releaseStack[2]);
if (this.sh_error) {
this.sh_claimed = false;
return (undefined);
}
return (this.sh_cf.release.apply(this.sh_cf, arguments));
};
ConnectionHandle.prototype.closeAfterRelease = function () {
mod_assert.ok(this.sh_claimed, 'Connection not claimed by ' +
'this handle, released by ' + this.sh_releaseStack[2]);
if (this.sh_error) {
this.sh_claimed = false;
return (undefined);
}
return (this.sh_cf.closeAfterRelease.apply(this.sh_cf,
arguments));
};
/*
* A ConnectionPool holds a pool of ConnectionFSMs that are kept up to date

@@ -675,2 +105,3 @@ * based on the output of a Resolver. At any given time the pool may contain:

this.p_inRebalance = false;
this.p_rebalScheduled = false;
this.p_startedResolver = false;

@@ -749,9 +180,8 @@

CueBallConnectionPool.prototype.state_starting =
function (on, once, timeout, onState) {
this.validTransitions(['failed', 'running', 'stopping']);
CueBallConnectionPool.prototype.state_starting = function (S) {
S.validTransitions(['failed', 'running', 'stopping']);
mod_monitor.monitor.registerPool(this);
on(this.p_resolver, 'added', this.on_resolver_added.bind(this));
on(this.p_resolver, 'removed', this.on_resolver_removed.bind(this));
S.on(this.p_resolver, 'added', this.on_resolver_added.bind(this));
S.on(this.p_resolver, 'removed', this.on_resolver_removed.bind(this));

@@ -763,10 +193,12 @@ var self = this;

'pool will start up in "failed" state');
this.gotoState('failed');
S.gotoState('failed');
return;
}
onState(this.p_resolver, 'failed', function () {
self.p_log.warn('underlying resolver failed, moving pool ' +
'to "failed" state');
self.gotoState('failed');
S.on(this.p_resolver, 'stateChanged', function (state) {
if (state === 'failed') {
self.p_log.warn('underlying resolver failed, moving ' +
'pool to "failed" state');
S.gotoState('failed');
}
});

@@ -786,7 +218,7 @@

on(this, 'connectedToBackend', function () {
self.gotoState('running');
S.on(this, 'connectedToBackend', function () {
S.gotoState('running');
});
on(this, 'closedBackend', function (fsm) {
S.on(this, 'closedBackend', function (fsm) {
var dead = Object.keys(self.p_dead).length;

@@ -798,39 +230,39 @@ if (dead >= self.p_keys.length) {

'"failed" state');
self.gotoState('failed');
S.gotoState('failed');
}
});
on(this, 'stopAsserted', function () {
self.gotoState('stopping');
S.on(this, 'stopAsserted', function () {
S.gotoState('stopping');
});
};
CueBallConnectionPool.prototype.state_failed = function (on) {
this.validTransitions(['running', 'stopping']);
on(this.p_resolver, 'added', this.on_resolver_added.bind(this));
on(this.p_resolver, 'removed', this.on_resolver_removed.bind(this));
on(this.p_shuffleTimer, 'timeout', this.reshuffle.bind(this));
CueBallConnectionPool.prototype.state_failed = function (S) {
S.validTransitions(['running', 'stopping']);
S.on(this.p_resolver, 'added', this.on_resolver_added.bind(this));
S.on(this.p_resolver, 'removed', this.on_resolver_removed.bind(this));
S.on(this.p_shuffleTimer, 'timeout', this.reshuffle.bind(this));
var self = this;
on(this, 'connectedToBackend', function () {
S.on(this, 'connectedToBackend', function () {
mod_assert.ok(!self.p_resolver.isInState('failed'));
self.p_log.info('successfully connected to a backend, ' +
'moving back to running state');
self.gotoState('running');
S.gotoState('running');
});
on(this, 'stopAsserted', function () {
self.gotoState('stopping');
S.on(this, 'stopAsserted', function () {
S.gotoState('stopping');
});
};
CueBallConnectionPool.prototype.state_running = function (on) {
this.validTransitions(['failed', 'stopping']);
CueBallConnectionPool.prototype.state_running = function (S) {
S.validTransitions(['failed', 'stopping']);
var self = this;
on(this.p_resolver, 'added', this.on_resolver_added.bind(this));
on(this.p_resolver, 'removed', this.on_resolver_removed.bind(this));
on(this.p_rebalTimer, 'timeout', this.rebalance.bind(this));
on(this.p_shuffleTimer, 'timeout', this.reshuffle.bind(this));
S.on(this.p_resolver, 'added', this.on_resolver_added.bind(this));
S.on(this.p_resolver, 'removed', this.on_resolver_removed.bind(this));
S.on(this.p_rebalTimer, 'timeout', this.rebalance.bind(this));
S.on(this.p_shuffleTimer, 'timeout', this.reshuffle.bind(this));
on(this, 'closedBackend', function (fsm) {
S.on(this, 'closedBackend', function (fsm) {
var dead = Object.keys(self.p_dead).length;

@@ -842,29 +274,31 @@ if (dead >= self.p_keys.length) {

'"failed" state');
self.gotoState('failed');
S.gotoState('failed');
}
});
on(this, 'stopAsserted', function () {
self.gotoState('stopping');
S.on(this, 'stopAsserted', function () {
S.gotoState('stopping');
});
};
CueBallConnectionPool.prototype.state_stopping =
function (on, once, timeout, onState) {
this.validTransitions(['stopping.backends']);
var self = this;
CueBallConnectionPool.prototype.state_stopping = function (S) {
S.validTransitions(['stopping.backends']);
if (this.p_startedResolver) {
onState(this.p_resolver, 'stopped', function () {
self.gotoState('stopping.backends');
S.on(this.p_resolver, 'stateChanged', function (s) {
if (s === 'stopped') {
S.gotoState('stopping.backends');
}
});
this.p_resolver.stop();
if (this.p_resolver.isInState('stopped')) {
S.gotoState('stopping.backends');
}
} else {
this.gotoState('stopping.backends');
S.gotoState('stopping.backends');
}
};
CueBallConnectionPool.prototype.state_stopping.backends = function () {
this.validTransitions(['stopped']);
CueBallConnectionPool.prototype.state_stopping.backends = function (S) {
S.validTransitions(['stopped']);
var conns = this.p_connections;
var self = this;
var fsms = [];

@@ -880,3 +314,3 @@ Object.keys(conns).forEach(function (k) {

}, function () {
self.gotoState('stopped');
S.gotoState('stopped');
});

@@ -886,3 +320,6 @@ function closeBackend(fsm, cb) {

fsm.closeAfterRelease();
fsm.onState('closed', cb);
fsm.on('stateChanged', function (st) {
if (st === 'closed')
cb();
});
} else {

@@ -894,4 +331,4 @@ fsm.close(cb);

CueBallConnectionPool.prototype.state_stopped = function () {
this.validTransitions([]);
CueBallConnectionPool.prototype.state_stopped = function (S) {
S.validTransitions([]);
mod_monitor.monitor.unregisterPool(this);

@@ -905,2 +342,6 @@ this.p_keys = [];

CueBallConnectionPool.prototype.isDeclaredDead = function (backend) {
return (this.p_dead[backend] === true);
};
CueBallConnectionPool.prototype.reshuffle = function () {

@@ -918,2 +359,20 @@ var taken = this.p_keys.pop();

CueBallConnectionPool.prototype.rebalance = function () {
if (this.p_keys.length < 1)
return;
if (this.isInState('stopping') || this.isInState('stopped'))
return;
if (this.p_rebalScheduled !== false)
return;
this.p_rebalScheduled = true;
var self = this;
setImmediate(function () {
self._rebalance();
});
};
/*

@@ -927,14 +386,9 @@ * Rebalance the pool, by looking at the distribution of connections to

*/
CueBallConnectionPool.prototype.rebalance = function () {
CueBallConnectionPool.prototype._rebalance = function () {
var self = this;
if (this.p_keys.length < 1)
return;
if (this.isInState('stopping') || this.isInState('stopped'))
return;
if (this.p_inRebalance !== false)
return;
this.p_inRebalance = true;
this.p_rebalScheduled = false;

@@ -1036,8 +490,6 @@ var total = 0;

fsm.on('stateChanged', function (newState) {
var doRebalance = false;
if (fsm.p_initq_node) {
/* These transitions mean we're still starting up. */
if (newState === 'delay' || newState === 'error' ||
newState === 'connect')
if (newState === 'init' || newState === 'delay' ||
newState === 'error' || newState === 'connect')
return;

@@ -1056,3 +508,3 @@ /*

delete (self.p_dead[key]);
doRebalance = true;
self.rebalance();
}

@@ -1062,3 +514,3 @@ }

if (newState === 'idle') {
if (newState === 'idle' && fsm.isInState('idle')) {
/*

@@ -1110,3 +562,3 @@ * This backend has just become available, either

self.emit('closedBackend', key, fsm);
doRebalance = true;
self.rebalance();
}

@@ -1123,7 +575,4 @@

/* Also rebalance, in case we were closed or died. */
doRebalance = true;
self.rebalance();
}
if (doRebalance)
self.rebalance();
});

@@ -1134,26 +583,2 @@

CueBallConnectionPool.prototype.claimSync = function () {
if (this.isInState('stopping') || this.isInState('stopped'))
throw (new mod_errors.PoolStoppingError(this));
if (this.isInState('failed'))
throw (new mod_errors.PoolFailedError(this));
var e = {};
Error.captureStackTrace(e);
/* If there are idle connections sitting around, take one. */
if (this.p_idleq.length > 0) {
var fsm = this.p_idleq.shift();
delete (fsm.p_idleq_node);
fsm.claim(e.stack);
mod_assert.ok(fsm.cf_shadow);
return ({
handle: fsm.cf_shadow,
connection: fsm.cf_conn
});
}
throw (new mod_errors.NoBackendsError(this));
};
CueBallConnectionPool.prototype.claim = function (options, cb) {

@@ -1169,7 +594,9 @@ var self = this;

mod_assert.optionalNumber(options.timeout, 'options.timeout');
var timeout = options.timeout || Infinity;
var timeout = options.timeout;
if (timeout === undefined)
timeout = Infinity;
mod_assert.optionalBool(options.errorOnEmpty, 'options.errorOnEmpty');
var errOnEmpty = options.errorOnEmpty;
if (this.isInState('stoppping') || this.isInState('stopped')) {
if (this.isInState('stopping') || this.isInState('stopped')) {
setImmediate(function () {

@@ -1202,4 +629,19 @@ if (!done)

delete (fsm.p_idleq_node);
fsm.claim(e.stack, cb);
return (undefined);
fsm.claim(e.stack, function (err, hdl, conn) {
if (err) {
if (!done)
cb(err);
done = true;
return;
}
if (done) {
hdl.release();
return;
}
done = true;
cb(err, hdl, conn);
});
return ({
cancel: function () { done = true; }
});
}

@@ -1206,0 +648,0 @@

@@ -101,46 +101,45 @@ /*

CueBallResolver.prototype.state_stopped = function (on) {
var self = this;
on(this, 'startAsserted', function () {
self.gotoState('starting');
CueBallResolver.prototype.state_stopped = function (S) {
S.on(this, 'startAsserted', function () {
S.gotoState('starting');
});
};
CueBallResolver.prototype.state_starting = function (on) {
CueBallResolver.prototype.state_starting = function (S) {
var self = this;
this.r_fsm.start();
on(this.r_fsm, 'updated', function (err) {
S.on(this.r_fsm, 'updated', function (err) {
if (err) {
self.r_lastError = err;
self.gotoState('failed');
S.gotoState('failed');
} else {
self.gotoState('running');
S.gotoState('running');
}
});
on(this, 'stopAsserted', function () {
self.gotoState('stopping');
S.on(this, 'stopAsserted', function () {
S.gotoState('stopping');
});
};
CueBallResolver.prototype.state_running = function (on) {
var self = this;
on(this, 'stopAsserted', function () {
self.gotoState('stopping');
CueBallResolver.prototype.state_running = function (S) {
S.on(this, 'stopAsserted', function () {
S.gotoState('stopping');
});
};
CueBallResolver.prototype.state_failed = function (on) {
var self = this;
on(this.r_fsm, 'updated', function (err) {
CueBallResolver.prototype.state_failed = function (S) {
S.on(this.r_fsm, 'updated', function (err) {
if (!err)
self.gotoState('running');
S.gotoState('running');
});
on(this, 'stopAsserted', function () {
self.gotoState('stopping');
S.on(this, 'stopAsserted', function () {
S.gotoState('stopping');
});
};
CueBallResolver.prototype.state_stopping = function (on) {
CueBallResolver.prototype.state_stopping = function (S) {
this.r_fsm.stop();
this.gotoState('stopped');
S.immediate(function () {
S.gotoState('stopped');
});
};

@@ -287,11 +286,10 @@

CueBallDNSResolver.prototype.state_init = function (on) {
var self = this;
CueBallDNSResolver.prototype.state_init = function (S) {
this.r_stopping = false;
on(this, 'startAsserted', function () {
self.gotoState('check_ns');
S.on(this, 'startAsserted', function () {
S.gotoState('check_ns');
});
};
CueBallDNSResolver.prototype.state_check_ns = function (on, once) {
CueBallDNSResolver.prototype.state_check_ns = function (S) {
var self = this;

@@ -303,3 +301,3 @@ if (this.r_resolvers.length > 0) {

if (notIp.length === 0) {
this.gotoState('srv');
S.gotoState('srv');
return;

@@ -322,3 +320,3 @@ }

}
this.gotoState('bootstrap_ns');
S.gotoState('bootstrap_ns');
} else {

@@ -329,3 +327,3 @@ mod_fs.readFile('/etc/resolv.conf', 'ascii',

self.r_resolvers = ['8.8.8.8', '8.8.4.4'];
self.gotoState('srv');
S.gotoState('srv');
return;

@@ -341,3 +339,3 @@ }

});
self.gotoState('srv');
S.gotoState('srv');
});

@@ -347,3 +345,3 @@ }

CueBallDNSResolver.prototype.state_bootstrap_ns = function (on, once) {
CueBallDNSResolver.prototype.state_bootstrap_ns = function (S) {
var self = this;

@@ -374,6 +372,6 @@ this.r_bootstrap.on('added', function (k, srv) {

});
self.gotoState('srv');
S.gotoState('srv');
} else {
once(this.r_bootstrap, 'added', function () {
self.gotoState('srv');
S.on(this.r_bootstrap, 'added', function () {
S.gotoState('srv');
});

@@ -384,10 +382,10 @@ this.r_bootstrap.start();

CueBallDNSResolver.prototype.state_srv = function () {
CueBallDNSResolver.prototype.state_srv = function (S) {
var r = this.r_srvRetry;
r.delay = r.minDelay;
r.count = r.max;
this.gotoState('srv_try');
S.gotoState('srv_try');
};
CueBallDNSResolver.prototype.state_srv_try = function (on, once, timeout) {
CueBallDNSResolver.prototype.state_srv_try = function (S) {
var self = this;

@@ -397,3 +395,3 @@

var req = this.resolve(name, 'SRV', this.r_srvRetry.timeout);
once(req, 'answers', function (ans, ttl) {
S.on(req, 'answers', function (ans, ttl) {
var d = new Date();

@@ -404,5 +402,5 @@ d.setTime(d.getTime() + 1000*ttl);

self.r_srvs = ans;
self.gotoState('aaaa');
S.gotoState('aaaa');
});
once(req, 'error', function (err) {
S.on(req, 'error', function (err) {
self.r_lastError = err;

@@ -433,5 +431,5 @@

self.gotoState('aaaa');
S.gotoState('aaaa');
} else {
self.gotoState('srv_error');
S.gotoState('srv_error');
}

@@ -442,8 +440,8 @@ });

CueBallDNSResolver.prototype.state_srv_error = function (on, once, timeout) {
CueBallDNSResolver.prototype.state_srv_error = function (S) {
var self = this;
var r = self.r_srvRetry;
if (--r.count > 0) {
timeout(r.delay, function () {
self.gotoState('srv_try');
S.timeout(r.delay, function () {
S.gotoState('srv_try');
});

@@ -474,13 +472,13 @@

self.gotoState('aaaa');
S.gotoState('aaaa');
}
};
CueBallDNSResolver.prototype.state_aaaa = function (on, once, timeout) {
CueBallDNSResolver.prototype.state_aaaa = function (S) {
this.r_srvRem = this.r_srvs.slice();
this.r_nextV6 = undefined;
this.gotoState('aaaa_next');
S.gotoState('aaaa_next');
};
CueBallDNSResolver.prototype.state_aaaa_next = function () {
CueBallDNSResolver.prototype.state_aaaa_next = function (S) {
var r = this.r_retry;

@@ -493,10 +491,10 @@ r.delay = r.minDelay;

this.r_srv = srv;
this.gotoState('aaaa_try');
S.gotoState('aaaa_try');
} else {
/* Lookups are all done, proceed on through. */
this.gotoState('a');
S.gotoState('a');
}
};
CueBallDNSResolver.prototype.state_aaaa_try = function (on, once, timeout) {
CueBallDNSResolver.prototype.state_aaaa_try = function (S) {
var self = this;

@@ -511,3 +509,3 @@ var srv = this.r_srv;

});
self.gotoState('aaaa_next');
S.gotoState('aaaa_next');
return;

@@ -517,3 +515,3 @@ }

var req = this.resolve(srv.name, 'AAAA', this.r_retry.timeout);
once(req, 'answers', function (ans, ttl) {
S.on(req, 'answers', function (ans, ttl) {
var d = new Date();

@@ -527,7 +525,7 @@ d.setTime(d.getTime() + 1000*ttl);

});
self.gotoState('aaaa_next');
S.gotoState('aaaa_next');
});
once(req, 'error', function (err) {
S.on(req, 'error', function (err) {
self.r_lastError = err;
self.gotoState('aaaa_error');
S.gotoState('aaaa_error');
});

@@ -537,8 +535,8 @@ req.send();

CueBallDNSResolver.prototype.state_aaaa_error = function (on, once, timeout) {
CueBallDNSResolver.prototype.state_aaaa_error = function (S) {
var self = this;
var r = self.r_retry;
if (--r.count > 0) {
timeout(r.delay, function () {
self.gotoState('aaaa_try');
S.timeout(r.delay, function () {
S.gotoState('aaaa_try');
});

@@ -560,13 +558,13 @@

self.gotoState('aaaa_next');
S.gotoState('aaaa_next');
}
};
CueBallDNSResolver.prototype.state_a = function (on, once, timeout) {
CueBallDNSResolver.prototype.state_a = function (S) {
this.r_srvRem = this.r_srvs.slice();
this.r_nextV4 = undefined;
this.gotoState('a_next');
S.gotoState('a_next');
};
CueBallDNSResolver.prototype.state_a_next = function () {
CueBallDNSResolver.prototype.state_a_next = function (S) {
var r = this.r_retry;

@@ -579,10 +577,10 @@ r.delay = r.minDelay;

this.r_srv = srv;
this.gotoState('a_try');
S.gotoState('a_try');
} else {
/* Lookups are all done, proceed on through. */
this.gotoState('process');
S.gotoState('process');
}
};
CueBallDNSResolver.prototype.state_a_try = function (on, once, timeout) {
CueBallDNSResolver.prototype.state_a_try = function (S) {
var self = this;

@@ -597,3 +595,3 @@ var srv = this.r_srv;

});
self.gotoState('a_next');
S.gotoState('a_next');
return;

@@ -603,3 +601,3 @@ }

var req = this.resolve(srv.name, 'A', this.r_retry.timeout);
once(req, 'answers', function (ans, ttl) {
S.on(req, 'answers', function (ans, ttl) {
var d = new Date();

@@ -613,7 +611,7 @@ d.setTime(d.getTime() + 1000*ttl);

});
self.gotoState('a_next');
S.gotoState('a_next');
});
once(req, 'error', function (err) {
S.on(req, 'error', function (err) {
self.r_lastError = err;
self.gotoState('a_error');
S.gotoState('a_error');
});

@@ -623,8 +621,8 @@ req.send();

CueBallDNSResolver.prototype.state_a_error = function (on, once, timeout) {
CueBallDNSResolver.prototype.state_a_error = function (S) {
var self = this;
var r = self.r_retry;
if (--r.count > 0) {
timeout(r.delay, function () {
self.gotoState('a_try');
S.timeout(r.delay, function () {
S.gotoState('a_try');
});

@@ -646,7 +644,7 @@

self.gotoState('a_next');
S.gotoState('a_next');
}
};
CueBallDNSResolver.prototype.state_process = function () {
CueBallDNSResolver.prototype.state_process = function (S) {
var self = this;

@@ -682,3 +680,3 @@

this.emit('updated', err);
this.gotoState('sleep');
S.gotoState('sleep');
return;

@@ -708,6 +706,6 @@ }

this.emit('updated');
this.gotoState('sleep');
S.gotoState('sleep');
};
CueBallDNSResolver.prototype.state_sleep = function (on, once, timeout) {
CueBallDNSResolver.prototype.state_sleep = function (S) {
var self = this;

@@ -718,3 +716,3 @@ var now = new Date();

if (this.r_stopping) {
this.gotoState('init');
S.gotoState('init');
return;

@@ -735,12 +733,12 @@ }

if (minDelay < 0) {
this.gotoState(state);
S.gotoState(state);
} else {
self.r_log.trace({state: state, delay: minDelay},
'sleeping until next TTL expiry');
var t = timeout(minDelay, function () {
self.gotoState(state);
var t = S.timeout(minDelay, function () {
S.gotoState(state);
});
t.unref();
on(this, 'stopAsserted', function () {
self.gotoState('init');
S.on(this, 'stopAsserted', function () {
S.gotoState('init');
});

@@ -838,5 +836,6 @@ }

addns.forEach(function (rr) {
if (rr.type !== type) {
if (rr.type !== 'A' && rr.type !== 'AAAA') {
if (rr.type === 'CNAME' ||
rr.type === 'DNAME')
rr.type === 'DNAME' ||
rr.type === 'OPT')
return;

@@ -843,0 +842,0 @@ self.r_log.warn('got unsupported ' +

@@ -24,2 +24,3 @@ /*

const mod_errors = require('./errors');
const mod_monitor = require('./pool-monitor');

@@ -40,11 +41,4 @@ const FSM = mod_mooremachine.FSM;

mod_assert.optionalArrayOfString(options.resolvers,
'options.resolvers');
mod_assert.optionalObject(options.resolver, 'options.resolver');
mod_assert.string(options.domain, 'options.domain');
this.cs_domain = options.domain;
mod_assert.optionalString(options.service, 'options.service');
mod_assert.optionalNumber(options.maxDNSConcurrency,
'options.maxDNSConcurrency');
mod_assert.optionalNumber(options.defaultPort, 'options.defaultPort');
mod_assert.object(options.resolver, 'options.resolver');
this.cs_resolver = options.resolver;

@@ -69,28 +63,56 @@ mod_assert.object(options.recovery, 'options.recovery');

/* Array of string keys for all currently available backends. */
this.cs_keys = [];
/* Map of backend key => backend info objects. */
this.cs_backends = {};
this.cs_connections = {};
/* Map of backend key => array of ConnectionFSM instances. */
this.cs_fsms = {};
/* Map of backend key => bool, if true the backend is declared dead. */
this.cs_dead = {};
/*
* Map of backend key => integer, latest serial number for connections
* to each backend. We use the serial numbers to generate the per-
* connection keys.
*/
this.cs_serials = {};
/*
* Map of connection key => connection instance (as returned by
* cs_constructor).
*/
this.cs_connections = {};
/* For debugging, track when we last rebalanced. */
this.cs_lastRebalance = undefined;
/*
* If true, we are currently doing a rebalance. Rebalancing may involve
* closing connections or opening new ones, which can then trigger a
* recursive call into rebalance(), so we use this flag to avoid
* problems caused by this.
*/
this.cs_inRebalance = false;
this.cs_startedResolver = false;
/*
* If true, a rebalance() is scheduled for the next turn of the event
* loop (in a setImmediate).
*/
this.cs_rebalScheduled = false;
/* Debugging counters. */
this.cs_counters = {};
var self = this;
if (options.resolver !== undefined && options.resolver !== null) {
this.p_resolver = options.resolver;
this.p_resolver_custom = true;
} else {
this.p_resolver = new mod_resolver.Resolver({
resolvers: options.resolvers,
domain: options.domain,
service: options.service,
maxDNSConcurrency: options.maxDNSConcurrency,
defaultPort: options.defaultPort,
log: this.p_log,
recovery: options.recovery
});
this.p_resolver_custom = false;
}
this.cs_rebalTimer = new EventEmitter();
this.cs_rebalTimerInst = setInterval(function () {
self.cs_rebalTimer.emit('timeout');
}, 10000);
this.cs_rebalTimerInst.unref();
this.cs_shuffleTimer = new EventEmitter();
this.cs_shuffleTimerInst = setInterval(function () {
self.cs_shuffleTimer.emit('timeout');
}, 60000);
this.cs_shuffleTimerInst.unref();
FSM.call(this, 'starting');

@@ -113,8 +135,11 @@ }

delete (this.cs_backends[k]);
(this.cs_connections[k] || []).forEach(function (fsm) {
if (!fsm.isInState('busy'))
fsm.close();
var self = this;
var cks = Object.keys(this.cs_connections).filter(function (ck) {
return (ck.indexOf(k + '.') === 0);
});
delete (this.cs_connections[k]);
this.rebalance();
cks.forEach(function (ck) {
var conn = self.cs_connections[ck];
delete (self.cs_connections[ck]);
self.assertEmit('removed', ck, conn);
});
};

@@ -126,3 +151,332 @@

CueBallConnectionSet.prototype.state_starting = function (on) {
CueBallConnectionSet.prototype.state_starting = function (S) {
S.validTransitions(['failed', 'running', 'stopping']);
mod_monitor.monitor.registerSet(this);
S.on(this.cs_resolver, 'added', this.on_resolver_added.bind(this));
S.on(this.cs_resolver, 'removed', this.on_resolver_removed.bind(this));
var self = this;
if (this.cs_resolver.isInState('failed')) {
this.cs_log.warn('resolver has already failed, cset will ' +
'start up in "failed" state');
S.gotoState('failed');
return;
}
S.on(this.cs_resolver, 'stateChanged', function (st) {
if (st === 'failed') {
self.cs_log.warn('underlying resolver failed, moving ' +
'cset to "failed" state');
S.gotoState('failed');
}
});
if (this.cs_resolver.isInState('running')) {
var backends = this.cs_resolver.list();
Object.keys(backends).forEach(function (k) {
var backend = backends[k];
self.on_resolver_added(k, backend);
});
}
S.on(this, 'connectedToBackend', function () {
S.gotoState('running');
});
S.on(this, 'closedBackend', function (fsm) {
var dead = Object.keys(self.cs_dead).length;
if (dead >= self.cs_keys.length) {
self.cs_log.error(
{ dead: dead },
'cset has exhausted all retries, now moving to ' +
'"failed" state');
S.gotoState('failed');
}
});
S.on(this, 'stopAsserted', function () {
S.gotoState('stopping');
});
};
CueBallConnectionSet.prototype.state_failed = function (S) {
S.validTransitions(['running', 'stopping']);
S.on(this.cs_resolver, 'added', this.on_resolver_added.bind(this));
S.on(this.cs_resolver, 'removed', this.on_resolver_removed.bind(this));
S.on(this.cs_shuffleTimer, 'timeout', this.reshuffle.bind(this));
var self = this;
S.on(this, 'connectedToBackend', function () {
mod_assert.ok(!self.cs_resolver.isInState('failed'));
self.cs_log.info('successfully connected to a backend, ' +
'moving back to running state');
S.gotoState('running');
});
S.on(this, 'stopAsserted', function () {
S.gotoState('stopping');
});
};
CueBallConnectionSet.prototype.state_running = function (S) {
S.validTransitions(['failed', 'stopping']);
var self = this;
S.on(this.cs_resolver, 'added', this.on_resolver_added.bind(this));
S.on(this.cs_resolver, 'removed', this.on_resolver_removed.bind(this));
S.on(this.cs_rebalTimer, 'timeout', this.rebalance.bind(this));
S.on(this.cs_shuffleTimer, 'timeout', this.reshuffle.bind(this));
S.on(this, 'closedBackend', function (fsm) {
var dead = Object.keys(self.cs_dead).length;
if (dead >= self.cs_keys.length) {
self.cs_log.error(
{ dead: dead },
'pool has exhausted all retries, now moving to ' +
'"failed" state');
S.gotoState('failed');
}
});
S.on(this, 'stopAsserted', function () {
S.gotoState('stopping');
});
};
CueBallConnectionSet.prototype.state_stopping = function (S) {
S.validTransitions(['stopped']);
var conns = this.cs_fsms;
var fsms = [];
Object.keys(conns).forEach(function (k) {
conns[k].forEach(function (fsm) {
fsms.push(fsm);
});
});
mod_vasync.forEachParallel({
func: closeBackend,
inputs: fsms
}, function () {
S.gotoState('stopped');
});
function closeBackend(fsm, cb) {
if (fsm.isInState('busy')) {
fsm.closeAfterRelease();
fsm.on('stateChanged', function (s) {
if (s === 'closed')
cb();
});
} else {
fsm.close(cb);
}
}
};
CueBallConnectionSet.prototype.state_stopped = function (S) {
S.validTransitions([]);
mod_monitor.monitor.unregisterSet(this);
this.cs_keys = [];
this.cs_fsms = {};
this.cs_connections = {};
this.cs_backends = {};
clearInterval(this.cs_rebalTimerInst);
clearInterval(this.cs_shuffleTimerInst);
};
CueBallConnectionSet.prototype.isDeclaredDead = function (backend) {
return (this.cs_dead[backend] === true);
};
CueBallConnectionSet.prototype.reshuffle = function () {
var taken = this.cs_keys.pop();
var idx = Math.floor(Math.random() * (this.cs_keys.length + 1));
this.cs_keys.splice(idx, 0, taken);
this.rebalance();
};
/* Stop and kill everything. */
CueBallConnectionSet.prototype.stop = function () {
this.emit('stopAsserted');
};
CueBallConnectionSet.prototype.setTarget = function (target) {
this.cs_target = target;
this.rebalance();
};
CueBallConnectionSet.prototype.rebalance = function () {
if (this.cs_keys.length < 1)
return;
if (this.isInState('stopping') || this.isInState('stopped'))
return;
if (this.cs_rebalScheduled !== false)
return;
this.p_rebalScheduled = true;
var self = this;
setImmediate(function () {
self._rebalance();
});
};
/*
* Rebalance the set, by looking at the distribution of connections to
* backends amongst the "init" and "idle" queues.
*
* If the connections are not evenly distributed over the available backends,
* then planRebalance() will return a plan to take us back to an even
* distribution, which we then apply.
*/
CueBallConnectionSet.prototype._rebalance = function () {
var self = this;
if (this.cs_inRebalance !== false)
return;
this.cs_inRebalance = true;
this.cs_rebalScheduled = false;
var conns = {};
var total = 0;
this.cs_keys.forEach(function (k) {
conns[k] = [];
});
Object.keys(this.cs_fsms).forEach(function (k) {
conns[k] = self.cs_fsms[k].slice();
total += self.cs_fsms[k].length;
});
var plan = mod_utils.planRebalance(
conns, this.cs_dead, this.cs_target, this.cs_max, true);
if (plan.remove.length > 0 || plan.add.length > 0) {
this.cs_log.trace('rebalancing cset, remove %d, ' +
'add %d (target = %d, total = %d)',
plan.remove.length, plan.add.length, this.cs_target,
total);
}
plan.remove.forEach(function (fsm) {
var k = fsm.cf_backend.key;
var cks = Object.keys(self.cs_connections).filter(
function (ck) {
return (ck.indexOf(k + '.') === 0);
});
cks.forEach(function (ck) {
var conn = self.cs_connections[ck];
delete (self.cs_connections[ck]);
self.emit('removed', ck, conn);
});
if (cks.length === 0) {
var fsmIdx = self.cs_fsms[k].indexOf(fsm);
if (fsmIdx > 0 || self.cs_keys.indexOf(k) === -1) {
fsm.close();
--total;
} else {
fsm.closeAfterRelease();
}
}
});
plan.add.forEach(function (k) {
/* Make sure we *never* exceed our socket limit. */
if (++total > self.cs_max)
return;
self.addConnection(k);
});
this.cs_inRebalance = false;
this.cs_lastRebalance = new Date();
};
CueBallConnectionSet.prototype.assertEmit = function () {
var args = arguments;
var event = args[0];
if (this.listeners(event).length < 1) {
throw (new Error('Event "' + event + '" on ConnectionSet ' +
'must be handled'));
}
return (this.emit.apply(this, args));
};
CueBallConnectionSet.prototype.addConnection = function (key) {
if (this.isInState('stopping') || this.isInState('stopped'))
return;
var backend = this.cs_backends[key];
backend.key = key;
var fsm = new ConnectionFSM({
constructor: this.cs_constructor,
backend: backend,
log: this.cs_log,
pool: this,
recovery: this.cs_recovery,
doRef: false
});
if (this.cs_fsms[key] === undefined)
this.cs_fsms[key] = [];
if (this.cs_serials[key] === undefined)
this.cs_serials[key] = 1;
this.cs_fsms[key].push(fsm);
var serial;
var ckey;
var self = this;
fsm.on('stateChanged', function (newState) {
if (newState === 'idle' && fsm.isInState('idle')) {
if (serial === undefined) {
self.emit('connectedToBackend', key, fsm);
}
if (ckey !== undefined && self.cs_connections[ckey]) {
var conn = self.cs_connections[ckey];
delete (self.cs_connections[ckey]);
self.assertEmit('removed', ckey, conn);
}
conn = fsm.getConnection();
serial = self.cs_serials[key]++;
ckey = key + '.' + serial;
conn.cs_serial = serial;
fsm.cs_serial = serial;
self.cs_connections[ckey] = conn;
self.assertEmit('added', ckey, conn);
self.rebalance();
return;
}
if (newState === 'closed') {
if (self.cs_connections[ckey]) {
conn = self.cs_connections[ckey];
delete (self.cs_connections[ckey]);
self.assertEmit('removed', ckey, conn);
}
if (self.cs_fsms[key]) {
var idx = self.cs_fsms[key].indexOf(fsm);
self.cs_fsms[key].splice(idx, 1);
}
if (fsm.retriesExhausted()) {
self.cs_dead[key] = true;
}
self.emit('closedBackend', fsm);
self.rebalance();
return;
}
});
fsm.start();
};
CueBallConnectionSet.prototype.getConnections = function () {
var self = this;
return (Object.keys(this.cs_connections).map(function (k) {
return (self.cs_connections[k]);
}));
};
CueBallConnectionSet.prototype._incrCounter = function (counter) {
if (this.cs_counters[counter] === undefined)
this.cs_counters[counter] = 0;
++this.cs_counters[counter];
};

@@ -102,4 +102,6 @@ /*

* - `max` -- a Number, maximum socket ceiling
* - `singleton` -- optional Boolean (default false), create only a single
* connection per distinct backend. used for Sets.
*/
function planRebalance(inSpares, dead, target, max) {
function planRebalance(inSpares, dead, target, max, singleton) {
var replacements = 0;

@@ -135,4 +137,11 @@ var wantedSpares = {};

if (dead[k] !== true) {
++wantedSpares[k];
++done;
if (singleton) {
if (wantedSpares[k] === 0) {
wantedSpares[k] = 1;
++done;
}
} else {
++wantedSpares[k];
++done;
}
continue;

@@ -161,5 +170,13 @@ }

if (dead[k] !== true) {
++wantedSpares[k];
++done;
continue;
if (singleton) {
if (wantedSpares[k] === 0) {
wantedSpares[k] = 1;
++done;
continue;
}
} else {
++wantedSpares[k];
++done;
continue;
}
}

@@ -182,5 +199,11 @@ /*

var empties = keys.filter(function (kk) {
return (dead[kk] !== true ||
wantedSpares[kk] === undefined ||
wantedSpares[kk] === 0);
if (singleton) {
return (dead[kk] !== true && (
wantedSpares[kk] === undefined ||
wantedSpares[kk] === 0));
} else {
return (dead[kk] !== true ||
wantedSpares[kk] === undefined ||
wantedSpares[kk] === 0);
}
});

@@ -187,0 +210,0 @@ if (count + 1 <= max) {

{
"name": "cueball",
"version": "0.4.1",
"version": "0.5.0",
"description": "",

@@ -12,4 +12,4 @@ "main": "lib/index.js",

"ipaddr.js": ">=1.1.0 <2.0.0",
"mooremachine": ">=1.4.2 <2.0.0",
"named-client": "git+https://github.com/arekinath/node-named-client.git#v0.3.5",
"mooremachine": ">=2.0.0 <3.0.0",
"named-client": "git+https://github.com/arekinath/node-named-client.git#v0.3.6",
"node-uuid": ">=1.4.7 <2.0.0",

@@ -16,0 +16,0 @@ "posix-getopt": ">=1.2.0 <2.0.0",

@@ -198,2 +198,7 @@ cueball

If a client determines that a connection must be closed immediately (e.g. due
to a protocol error making it impossible to continue using it safely), it must
call the `.close()` method on the *handle*, not any `.destroy()` or similar
method on the connection itself.
Calling `claim()` on a Pool that is in the "stopping", "stopped" or "failed"

@@ -416,2 +421,70 @@ states will result in the callback being called with an error on the next run of

## Connection interface
Objects returned by a `constructor` function (such as supplied to the
`ConnectionPool` constructor) must obey a subset of the node.js socket
interface. In particular they must support the following events and methods:
### `constructor(backend)`
Parameters:
- `backend` -- an Object, with properties:
- `key` -- a String, the backend key as supplied via the Resolver interface.
Can be used to uniquely identify the backend.
- `address` -- a String, address of the backend (IPv4 or IPv6)
- `port` -- a Number, TCP or UDP port number
Returns an object obeying the Connection interface.
### Event `connect` (required)
At construction, the connection object must immediately attempt to make a
connection to the backend specified by the first argument to the constructor.
When the connection succeeds, it must emit the event `connect`. No arguments are
required.
### Event `error`
Connection objects may emit `error` at any time in response to a fatal error.
The connection will be immediately terminated (by calling `.destroy()`) upon the
emission of any `error` event.
The `error` event should be emitted with an Object as the first parameter. This
is expected to have `Error` on its prototype chain (`obj instanceof Error`
should be `true`).
May also be emitted as `connectError` only in the state before `connect` has
been emitted.
### Event `close` (required)
Connection objects must emit `close` as the final event they emit after the
connection has ended. No events may be emitted after `close`.
### `#ref()` (required)
Marks the connection as "referenced", meaning that it should keep the Node
process running even if no event handlers are registered on it.
It is generally acceptable for long-lived server processes (which never intend
to exit in normal operation) to provide a Connection that implements `ref()` and
`unref()` as no-ops.
### `#unref()` (required)
Marks the connection as "unreferenced", allowing the Node process to exit if it
is the only remaining source of events.
### `#destroy()` (required)
Immediately disconnects the connection and proceeds to emit `close`.
### Event `timeout`
Optional. Equivalent to emitting `error` with a ConnectionTimeoutError as an
argument.
May also be emitted as `connectTimeout` only in the state before `connect` has
been emitted.
## Errors

@@ -553,3 +626,67 @@

ConnectionSet
-------------
Cueball also includes an alternative to the ConnectionPool, named ConnectionSet.
This is a more low-level API which is useful for implementing clients for
protocols that are not as strictly connection-oriented.
Key differences to ConnectionPool:
- Each backend in a ConnectionSet has a maximum of 1 connection open to it
(it's expected to be used with protocols that multiplex operations over a
single socket.)
- No support for leases (claim/release). ConnectionSet does not track whether
connections are busy or not, and expects its consumer to manage this.
ConnectionSets have an identical state graph to ConnectionPools.
### `new mod_cueball.ConnectionSet(options)`
Parameters
- `options` -- Object, with keys:
- `resolver` -- Object, an instance of the Resolver interface
- `constructor` -- Function, same as in ConnectionPool
- `recovery` -- Object, a recovery spec (see below)
- `log` -- optional Object, a `bunyan`-style logger to use
- `target` -- optional Number, target number of connections to be made
available in the entire set
- `maximum` -- optional Number, maximum number of connections per host
### Event `'added'`
Emitted when a new connection becomes available in the set. This event *must*
have a handler on it at all times.
Parameters
- `key` -- String, a unique key to identify this connection
- `connection` -- Object, the connection as returned by the constructor
### Event `removed`
Emitted when an existing connection should be removed from the pool. This event
*must* have a handler on it at all times. The handler is obligated to take all
necessary actions to drain the connection of outstanding requests and then close
it. The emission of this event must cause the connection object to emit
`'close'` as soon as possible.
Parameters
- `key` -- String, a unique key to identify the connection
### `ConnectionSet#stop()`
Stops the ConnectionSet, disconnecting all available connections (by first
emitting `'removed'` for them.)
### `ConnectionSet#setTarget(target)`
Sets the target number of connections in the ConnectionSet. Will trigger an
async operation to add or remove connections in order to meet the new target.
Parameters:
- `target` -- Number
### `ConnectionSet#getConnections()`
Returns all the currently open connections in the Set, as an Array.
Tools

@@ -556,0 +693,0 @@ -----

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc