Comparing version 0.4.1 to 0.5.0
@@ -14,2 +14,3 @@ /* | ||
const mod_errors = require('./errors'); | ||
const mod_cset = require('./set'); | ||
@@ -20,2 +21,3 @@ module.exports = { | ||
ConnectionPool: mod_pool.ConnectionPool, | ||
ConnectionSet: mod_cset.ConnectionSet, | ||
Resolver: mod_resolver.Resolver, | ||
@@ -22,0 +24,0 @@ StaticIpResolver: mod_resolver.StaticIpResolver, |
@@ -17,2 +17,3 @@ /* | ||
const mod_pool = require('./pool'); | ||
const mod_cset = require('./set'); | ||
const mod_os = require('os'); | ||
@@ -22,2 +23,3 @@ | ||
this.pm_pools = {}; | ||
this.pm_sets = {}; | ||
} | ||
@@ -36,2 +38,13 @@ | ||
CueBallPoolMonitor.prototype.registerSet = function (set) { | ||
mod_assert.ok(set instanceof mod_cset.ConnectionSet); | ||
this.pm_sets[set.cs_uuid] = set; | ||
}; | ||
CueBallPoolMonitor.prototype.unregisterSet = function (set) { | ||
mod_assert.ok(set instanceof mod_cset.ConnectionSet); | ||
mod_assert.ok(this.pm_sets[set.cs_uuid]); | ||
delete (this.pm_sets[set.cs_uuid]); | ||
}; | ||
CueBallPoolMonitor.prototype.toKangOptions = function () { | ||
@@ -41,12 +54,26 @@ var self = this; | ||
function listTypes() { | ||
return (['pool']); | ||
return (['pool', 'set']); | ||
} | ||
function listObjects(type) { | ||
mod_assert.strictEqual(type, 'pool'); | ||
return (Object.keys(self.pm_pools)); | ||
if (type === 'pool') { | ||
return (Object.keys(self.pm_pools)); | ||
} else if (type === 'set') { | ||
return (Object.keys(self.pm_sets)); | ||
} else { | ||
throw (new Error('Invalid type "' + type + '"')); | ||
} | ||
} | ||
function get(type, id) { | ||
mod_assert.strictEqual(type, 'pool'); | ||
if (type === 'pool') { | ||
return (getPool(id)); | ||
} else if (type === 'set') { | ||
return (getSet(id)); | ||
} else { | ||
throw (new Error('Invalid type "' + type + '"')); | ||
} | ||
} | ||
function getPool(id) { | ||
var pool = self.pm_pools[id]; | ||
@@ -74,4 +101,6 @@ mod_assert.object(pool); | ||
obj.dead_backends = Object.keys(pool.p_dead); | ||
obj.last_rebalance = Math.round( | ||
pool.p_lastRebalance.getTime() / 1000); | ||
if (pool.p_lastRebalance !== undefined) { | ||
obj.last_rebalance = Math.round( | ||
pool.p_lastRebalance.getTime() / 1000); | ||
} | ||
obj.resolvers = pool.p_resolver.r_resolvers; | ||
@@ -89,2 +118,42 @@ obj.state = pool.getState(); | ||
function getSet(id) { | ||
var cset = self.pm_sets[id]; | ||
mod_assert.object(cset); | ||
var obj = {}; | ||
obj.backends = cset.cs_backends; | ||
obj.fsms = {}; | ||
obj.connections = Object.keys(cset.cs_connections); | ||
var ks = cset.cs_keys; | ||
Object.keys(cset.cs_fsms).forEach(function (k) { | ||
if (ks.indexOf(k) === -1) | ||
ks.push(k); | ||
}); | ||
ks.forEach(function (k) { | ||
var conns = cset.cs_fsms[k] || []; | ||
obj.fsms[k] = {}; | ||
conns.forEach(function (fsm) { | ||
var s = fsm.getState(); | ||
if (obj.fsms[k][s] === undefined) | ||
obj.fsms[k][s] = 0; | ||
++obj.fsms[k][s]; | ||
}); | ||
}); | ||
obj.dead_backends = Object.keys(cset.cs_dead); | ||
if (cset.cs_lastRebalance !== undefined) { | ||
obj.last_rebalance = Math.round( | ||
cset.cs_lastRebalance.getTime() / 1000); | ||
} | ||
obj.resolvers = cset.cs_resolver.r_resolvers; | ||
obj.state = cset.getState(); | ||
obj.counters = cset.cs_counters; | ||
obj.target = cset.cs_target; | ||
obj.maximum = cset.cs_max; | ||
obj.options = {}; | ||
obj.options.domain = cset.cs_resolver.r_domain; | ||
obj.options.service = cset.cs_resolver.r_service; | ||
obj.options.defaultPort = cset.cs_resolver.r_defport; | ||
return (obj); | ||
} | ||
function stats() { | ||
@@ -91,0 +160,0 @@ return ({}); |
774
lib/pool.js
@@ -31,575 +31,5 @@ /* | ||
const Queue = require('./queue'); | ||
const ConnectionFSM = require('./connection-fsm'); | ||
/* | ||
* ConnectionFSM is the state machine for a "connection" -- an abstract entity | ||
* that is managed by a ConnectionPool. ConnectionFSMs are associated with a | ||
* particular 'backend', and hold a backing object cf_conn. Typically this | ||
* backing object is a TCP socket, but may be any EventEmitter that emits | ||
* 'close' and 'error'. | ||
*/ | ||
function ConnectionFSM(options) { | ||
mod_assert.object(options, 'options'); | ||
mod_assert.object(options.pool, 'options.pool'); | ||
mod_assert.func(options.constructor, 'options.constructor'); | ||
mod_assert.object(options.backend, 'options.backend'); | ||
mod_assert.object(options.log, 'options.log'); | ||
mod_assert.optionalFunc(options.checker, 'options.checker'); | ||
mod_assert.optionalNumber(options.checkTimeout, 'options.checkTimeout'); | ||
this.cf_pool = options.pool; | ||
this.cf_constructor = options.constructor; | ||
this.cf_backend = options.backend; | ||
this.cf_claimed = false; | ||
this.cf_claimStack = []; | ||
this.cf_releaseStack = []; | ||
this.cf_lastError = undefined; | ||
this.cf_conn = undefined; | ||
this.cf_shadow = undefined; | ||
this.cf_closeAfter = false; | ||
this.cf_oldListeners = {}; | ||
this.cf_checkTimeout = options.checkTimeout; | ||
this.cf_checker = options.checker; | ||
this.cf_lastCheck = new Date(); | ||
this.cf_log = options.log.child({ | ||
backend: this.cf_backend.key | ||
}); | ||
mod_assert.object(options.recovery, 'options.recovery'); | ||
var connectRecov = options.recovery.default; | ||
var initialRecov = options.recovery.default; | ||
if (options.recovery.connect) { | ||
initialRecov = options.recovery.connect; | ||
connectRecov = options.recovery.connect; | ||
} | ||
if (options.recovery.initial) | ||
initialRecov = options.recovery.initial; | ||
mod_utils.assertRecovery(connectRecov, 'recovery.connect'); | ||
mod_utils.assertRecovery(initialRecov, 'recovery.initial'); | ||
this.cf_initialRecov = initialRecov; | ||
this.cf_connectRecov = connectRecov; | ||
this.cf_retries = initialRecov.retries; | ||
this.cf_retriesLeft = initialRecov.retries; | ||
this.cf_minDelay = initialRecov.delay; | ||
this.cf_delay = initialRecov.delay; | ||
this.cf_maxDelay = initialRecov.maxDelay || Infinity; | ||
this.cf_timeout = initialRecov.timeout; | ||
this.cf_maxTimeout = initialRecov.maxTimeout || Infinity; | ||
/* | ||
* If our parent pool thinks this backend is dead, resume connection | ||
* attempts with the maximum delay and timeout. Something is going | ||
* wrong, let's not retry too aggressively and make it worse. | ||
*/ | ||
if (this.cf_pool.p_dead[this.cf_backend.key] === true) { | ||
/* | ||
* We might be given an infinite maxDelay or maxTimeout. If | ||
* we are, then multiply it by 2^(retries) to get to what the | ||
* value would have been before. | ||
*/ | ||
var mult = 1 << this.cf_retries; | ||
this.cf_delay = this.cf_maxDelay; | ||
if (!isFinite(this.cf_delay)) | ||
this.cf_delay = initialRecov.delay * mult; | ||
this.cf_timeout = this.cf_maxTimeout; | ||
if (!isFinite(this.cf_timeout)) | ||
this.cf_timeout = initialRecov.timeout * mult; | ||
/* Keep retrying a failed backend forever */ | ||
this.cf_retries = Infinity; | ||
this.cf_retriesLeft = Infinity; | ||
} | ||
this.allStateEvent('closeAsserted'); | ||
FSM.call(this, 'init'); | ||
} | ||
mod_util.inherits(ConnectionFSM, FSM); | ||
/* | ||
* Return true if this connection was closed due to retry exhaustion. | ||
*/ | ||
ConnectionFSM.prototype.retriesExhausted = function () { | ||
return (this.isInState('closed') && this.cf_retriesLeft <= 0); | ||
}; | ||
/* | ||
* Mark this Connection as "claimed"; in use by a particular client of the | ||
* pool. | ||
* | ||
* Normally this will be called by the pool itself, which will give the 'stack' | ||
* argument as a copy of the stack trace from its caller. | ||
* | ||
* We keep track of the stack trace of our last claimer and releaser to aid | ||
* in debugging. | ||
*/ | ||
ConnectionFSM.prototype.claim = function (stack, cb) { | ||
mod_assert.ok(this.cf_claimed === false); | ||
mod_assert.strictEqual(this.getState(), 'idle'); | ||
if (typeof (stack) === 'function') { | ||
cb = stack; | ||
stack = undefined; | ||
} | ||
if (stack === undefined) { | ||
var e = {}; | ||
Error.captureStackTrace(e); | ||
stack = e.stack; | ||
} | ||
this.cf_claimStack = stack.split('\n').slice(1). | ||
map(function (l) { return (l.replace(/^[ ]*at /, '')); }); | ||
this.cf_claimed = true; | ||
if (cb) { | ||
var self = this; | ||
this.onState('busy', function () { | ||
/* | ||
* Give the client our ConnectionHandle, and the | ||
* backing object. | ||
* | ||
* They use the ConnectionHandle to call release(). | ||
*/ | ||
cb(null, self.cf_shadow, self.cf_conn); | ||
}); | ||
} | ||
this.emit('claimAsserted'); | ||
}; | ||
/* | ||
* Mark this Connection as "free" and ready to be re-used. This is normally | ||
* called via the ConnectionHandle. | ||
*/ | ||
ConnectionFSM.prototype.release = function (cb) { | ||
mod_assert.ok(this.cf_claimed === true); | ||
mod_assert.ok(['busy', 'ping'].indexOf(this.getState()) !== -1, | ||
'connection is not held'); | ||
var e = {}; | ||
Error.captureStackTrace(e); | ||
this.cf_releaseStack = e.stack.split('\n').slice(1). | ||
map(function (l) { return (l.replace(/^[ ]*at /, '')); }); | ||
if (cb) | ||
this.onState('idle', cb); | ||
this.emit('releaseAsserted'); | ||
}; | ||
ConnectionFSM.prototype.close = function (cb) { | ||
if (cb) | ||
this.onState('closed', cb); | ||
this.emit('closeAsserted'); | ||
}; | ||
ConnectionFSM.prototype.start = function () { | ||
this.emit('startAsserted'); | ||
}; | ||
ConnectionFSM.prototype.closeAfterRelease = function () { | ||
this.cf_closeAfter = true; | ||
}; | ||
ConnectionFSM.prototype.state_init = function (on) { | ||
this.validTransitions(['connect', 'closed']); | ||
var self = this; | ||
on(this, 'startAsserted', function () { | ||
self.gotoState('connect'); | ||
}); | ||
on(this, 'closeAsserted', function () { | ||
self.gotoState('closed'); | ||
}); | ||
}; | ||
ConnectionFSM.prototype.state_connect = function (on, once, timeout) { | ||
this.validTransitions(['error', 'idle', 'closed']); | ||
var self = this; | ||
timeout(this.cf_timeout, function () { | ||
self.cf_lastError = new mod_errors.ConnectionTimeoutError(self); | ||
self.gotoState('error'); | ||
}); | ||
this.cf_conn = this.cf_constructor(this.cf_backend); | ||
mod_assert.object(this.cf_conn, 'constructor return value'); | ||
this.cf_conn.cf_fsm = this; | ||
once(this.cf_conn, 'connect', function () { | ||
self.gotoState('idle'); | ||
}); | ||
once(this.cf_conn, 'error', function (err) { | ||
self.cf_lastError = err; | ||
self.gotoState('error'); | ||
self.cf_pool._incrCounter('error-during-connect'); | ||
}); | ||
once(this.cf_conn, 'connectError', function (err) { | ||
self.cf_lastError = err; | ||
self.gotoState('error'); | ||
self.cf_pool._incrCounter('error-during-connect'); | ||
}); | ||
once(this.cf_conn, 'close', function () { | ||
self.cf_lastError = new mod_errors.ConnectionClosedError(self); | ||
self.gotoState('error'); | ||
self.cf_pool._incrCounter('close-during-connect'); | ||
}); | ||
once(this.cf_conn, 'timeout', function () { | ||
self.cf_lastError = new mod_errors.ConnectionTimeoutError(self); | ||
self.gotoState('error'); | ||
self.cf_pool._incrCounter('timeout-during-connect'); | ||
}); | ||
once(this.cf_conn, 'connectTimeout', function (err) { | ||
self.cf_lastError = new mod_errors.ConnectionTimeoutError(self); | ||
self.gotoState('error'); | ||
self.cf_pool._incrCounter('timeout-during-connect'); | ||
}); | ||
once(this, 'closeAsserted', function () { | ||
self.gotoState('closed'); | ||
}); | ||
}; | ||
ConnectionFSM.prototype.state_closed = function (on) { | ||
this.validTransitions([]); | ||
if (this.cf_conn && this.cf_conn.destroy) | ||
this.cf_conn.destroy(); | ||
this.cf_conn = undefined; | ||
this.cf_closeAfter = false; | ||
this.cf_lastError = undefined; | ||
this.cf_log.trace('ConnectionFSM closed'); | ||
on(this, 'closeAsserted', function () { }); | ||
}; | ||
ConnectionFSM.prototype.state_error = function (on, once, timeout) { | ||
this.validTransitions(['delay', 'closed']); | ||
var self = this; | ||
on(this, 'closeAsserted', function () { | ||
self.gotoState('closed'); | ||
}); | ||
if (this.cf_conn && this.cf_conn.destroy) | ||
this.cf_conn.destroy(); | ||
this.cf_conn = undefined; | ||
if (this.cf_shadow) { | ||
this.cf_shadow.sh_error = true; | ||
this.cf_shadow = undefined; | ||
} | ||
/* | ||
* If the closeAfter flag is set, and this is a connection to a "dead" | ||
* backend (i.e., a "monitor" watching to see when it comes back), then | ||
* exit now. For an ordinary backend, we don't want to do this, | ||
* because we want to give ourselves the opportunity to run out of | ||
* retries. | ||
* | ||
* Otherwise, in a situation where we have two connections that were | ||
* created at the same time, one to a failing backend that's already | ||
* declared dead, and one to a different failing backend not yet | ||
* declared, we may never learn that the second backend is down and | ||
* declare it dead. The already declared dead backend may exit first | ||
* during a pool reshuffle and cause this one to exit prematurely | ||
* (there's a race in who exits first and causes the planner to engage) | ||
*/ | ||
if (this.cf_retries === Infinity && this.cf_closeAfter) { | ||
this.cf_retriesLeft = 0; | ||
this.gotoState('closed'); | ||
return; | ||
} | ||
if (this.cf_retries !== Infinity) | ||
--this.cf_retriesLeft; | ||
if (this.cf_retries === Infinity || this.cf_retriesLeft > 0) { | ||
this.gotoState('delay'); | ||
} else { | ||
this.cf_log.warn(this.cf_lastError, 'failed to connect to ' + | ||
'backend %s (%j)', this.cf_backend.key, this.cf_backend); | ||
this.cf_pool._incrCounter('retries-exhausted'); | ||
this.gotoState('closed'); | ||
} | ||
}; | ||
ConnectionFSM.prototype.state_delay = function (on, once, timeout) { | ||
this.validTransitions(['connect', 'closed']); | ||
var delay = this.cf_delay; | ||
this.cf_delay *= 2; | ||
this.cf_timeout *= 2; | ||
if (this.cf_timeout > this.cf_maxTimeout) | ||
this.cf_timeout = this.cf_maxTimeout; | ||
if (this.cf_delay > this.cf_maxDelay) | ||
this.cf_delay = this.cf_maxDelay; | ||
var self = this; | ||
var t = timeout(delay, function () { | ||
self.gotoState('connect'); | ||
}); | ||
t.unref(); | ||
once(this, 'closeAsserted', function () { | ||
self.gotoState('closed'); | ||
}); | ||
}; | ||
ConnectionFSM.prototype.state_idle = function (on, once, timeout) { | ||
this.validTransitions(['busy', 'error', 'closed']); | ||
var self = this; | ||
this.cf_claimed = false; | ||
this.cf_claimStack = []; | ||
this.cf_log.trace('connected, idling'); | ||
if (this.cf_shadow) { | ||
this.cf_shadow.sh_claimed = false; | ||
this.cf_shadow.sh_releaseStack = this.cf_releaseStack; | ||
this.cf_shadow = undefined; | ||
} | ||
['close', 'error', 'readable', 'data'].forEach(function (evt) { | ||
var newCount = self.cf_conn.listeners(evt).filter( | ||
function (h) { return (typeof (h) === 'function'); }). | ||
length; | ||
var oldCount = self.cf_oldListeners[evt]; | ||
if (oldCount !== undefined && newCount > oldCount) { | ||
var info = {}; | ||
info.stack = self.cf_releaseStack; | ||
info.handlers = self.cf_conn.listeners(evt).map( | ||
function (f) { return (f.toString()); }); | ||
info.event = evt; | ||
self.cf_log.warn(info, 'connection claimer looks ' + | ||
'like it leaked event handlers'); | ||
} | ||
}); | ||
/* | ||
* Reset retries and retry delay to their defaults since we are now | ||
* connected. | ||
*/ | ||
this.cf_retries = this.cf_connectRecov.retries; | ||
this.cf_retriesLeft = this.cf_connectRecov.retries; | ||
this.cf_minDelay = this.cf_connectRecov.delay; | ||
this.cf_delay = this.cf_connectRecov.delay; | ||
this.cf_maxDelay = this.cf_connectRecov.maxDelay || Infinity; | ||
this.cf_timeout = this.cf_connectRecov.timeout; | ||
this.cf_maxTimeout = this.cf_connectRecov.maxTimeout || Infinity; | ||
if (this.cf_closeAfter === true) { | ||
this.cf_closeAfter = false; | ||
this.cf_lastError = undefined; | ||
this.gotoState('closed'); | ||
on(this, 'closeAsserted', function () { }); | ||
return; | ||
} | ||
this.cf_conn.unref(); | ||
once(this, 'claimAsserted', function () { | ||
self.gotoState('busy'); | ||
}); | ||
once(this.cf_conn, 'error', function (err) { | ||
self.cf_lastError = err; | ||
self.gotoState('error'); | ||
self.cf_pool._incrCounter('error-during-idle'); | ||
}); | ||
once(this.cf_conn, 'close', function () { | ||
self.cf_lastError = new mod_errors.ConnectionClosedError(self); | ||
self.gotoState('error'); | ||
self.cf_pool._incrCounter('close-during-idle'); | ||
}); | ||
once(this.cf_conn, 'end', function () { | ||
self.cf_lastError = new mod_errors.ConnectionClosedError(self); | ||
self.gotoState('error'); | ||
self.cf_pool._incrCounter('end-during-idle'); | ||
}); | ||
once(this, 'closeAsserted', function () { | ||
self.gotoState('closed'); | ||
}); | ||
if (this.cf_checkTimeout !== undefined) { | ||
var now = new Date(); | ||
var sinceLast = (now - this.cf_lastCheck); | ||
var delay; | ||
if (sinceLast > this.cf_checkTimeout) { | ||
delay = 1000; | ||
} else { | ||
delay = this.cf_checkTimeout - sinceLast; | ||
if (delay < 1000) | ||
delay = 1000; | ||
} | ||
var t = timeout(delay, function () { | ||
self.gotoState('ping'); | ||
}); | ||
t.unref(); | ||
} | ||
}; | ||
ConnectionFSM.prototype.state_ping = function (on, once, timeout) { | ||
this.validTransitions(['error', 'closed', 'idle']); | ||
this.cf_lastCheck = new Date(); | ||
this.cf_claimStack = [ | ||
'ConnectionFSM.prototype.state_ping', | ||
'(periodic_health_check)']; | ||
this.cf_claimed = true; | ||
var self = this; | ||
this.cf_conn.ref(); | ||
this.cf_releaseStack = []; | ||
this.cf_log.trace('doing health check'); | ||
/* | ||
* Write down the count of event handlers on the backing object so that | ||
* we can spot if the client leaked any common ones in release(). | ||
*/ | ||
this.cf_oldListeners = {}; | ||
['close', 'error', 'readable', 'data'].forEach(function (evt) { | ||
var count = self.cf_conn.listeners(evt).filter( | ||
function (h) { return (typeof (h) === 'function'); }). | ||
length; | ||
self.cf_oldListeners[evt] = count; | ||
}); | ||
/* | ||
* The ConnectionHandle is a one-time use object that proxies calls to | ||
* our release() and close() functions. We use it so that we can assert | ||
* that this particular client only releases us once. If we only | ||
* asserted on our current state, there could be a race where we get | ||
* claimed by a different client in the meantime. | ||
*/ | ||
this.cf_shadow = new ConnectionHandle(this); | ||
once(this, 'releaseAsserted', function () { | ||
if (self.cf_closeAfter === true) { | ||
self.gotoState('closed'); | ||
} else { | ||
self.gotoState('idle'); | ||
} | ||
}); | ||
once(this.cf_conn, 'error', function (err) { | ||
self.cf_lastError = err; | ||
self.gotoState('error'); | ||
self.cf_pool._incrCounter('error-during-ping'); | ||
}); | ||
once(this.cf_conn, 'close', function () { | ||
self.cf_lastError = new mod_errors.ConnectionClosedError(self); | ||
self.gotoState('error'); | ||
self.cf_pool._incrCounter('close-during-ping'); | ||
}); | ||
once(this.cf_conn, 'end', function () { | ||
self.cf_lastError = new mod_errors.ConnectionClosedError(self); | ||
self.gotoState('error'); | ||
self.cf_pool._incrCounter('end-during-ping'); | ||
}); | ||
once(this, 'closeAsserted', function () { | ||
self.cf_lastError = new mod_errors.ConnectionClosedError(self); | ||
self.gotoState('error'); | ||
}); | ||
var t = timeout(this.cf_checkTimeout, function () { | ||
var info = {}; | ||
info.stack = self.cf_claimStack; | ||
self.cf_log.warn(info, 'health check is taking too ' + | ||
'long to run (has been more than %d ms)', | ||
self.cf_checkTimeout); | ||
}); | ||
t.unref(); | ||
this.cf_checker.call(undefined, this.cf_shadow, this.cf_conn); | ||
}; | ||
ConnectionFSM.prototype.state_busy = function (on, once, timeout) { | ||
this.validTransitions(['error', 'closed', 'idle']); | ||
var self = this; | ||
this.cf_conn.ref(); | ||
this.cf_releaseStack = []; | ||
this.cf_log.trace('busy, claimed by %s', | ||
this.cf_claimStack[1].split(' ')[0]); | ||
/* | ||
* Write down the count of event handlers on the backing object so that | ||
* we can spot if the client leaked any common ones in release(). | ||
*/ | ||
this.cf_oldListeners = {}; | ||
['close', 'error', 'readable', 'data'].forEach(function (evt) { | ||
var count = self.cf_conn.listeners(evt).filter( | ||
function (h) { return (typeof (h) === 'function'); }). | ||
length; | ||
self.cf_oldListeners[evt] = count; | ||
}); | ||
/* | ||
* The ConnectionHandle is a one-time use object that proxies calls to | ||
* our release() and close() functions. We use it so that we can assert | ||
* that this particular client only releases us once. If we only | ||
* asserted on our current state, there could be a race where we get | ||
* claimed by a different client in the meantime. | ||
*/ | ||
this.cf_shadow = new ConnectionHandle(this); | ||
once(this, 'releaseAsserted', function () { | ||
if (self.cf_closeAfter === true) { | ||
self.gotoState('closed'); | ||
} else { | ||
self.gotoState('idle'); | ||
} | ||
}); | ||
once(this.cf_conn, 'error', function (err) { | ||
self.cf_lastError = err; | ||
self.gotoState('error'); | ||
self.cf_pool._incrCounter('error-during-busy'); | ||
}); | ||
once(this.cf_conn, 'end', function () { | ||
self.cf_closeAfter = true; | ||
self.cf_pool._incrCounter('end-during-busy'); | ||
}); | ||
once(this.cf_conn, 'close', function () { | ||
self.cf_lastError = new mod_errors.ConnectionClosedError(self); | ||
self.gotoState('error'); | ||
self.cf_pool._incrCounter('close-during-busy'); | ||
}); | ||
once(this, 'closeAsserted', function () { | ||
self.cf_lastError = new mod_errors.ConnectionClosedError(self); | ||
self.gotoState('error'); | ||
}); | ||
if (this.cf_checkTimeout !== undefined) { | ||
var t = timeout(this.cf_checkTimeout, function () { | ||
var info = {}; | ||
info.stack = self.cf_claimStack; | ||
self.cf_log.warn(info, 'connection held for longer ' + | ||
'than checkTimeout (%d ms), may have been leaked', | ||
self.cf_checkTimeout); | ||
}); | ||
t.unref(); | ||
} | ||
}; | ||
function ConnectionHandle(cf) { | ||
this.sh_cf = cf; | ||
this.sh_claimed = true; | ||
this.sh_error = false; | ||
this.sh_releaseStack = []; | ||
} | ||
ConnectionHandle.prototype.close = function () { | ||
mod_assert.ok(this.sh_claimed, 'Connection not claimed by ' + | ||
'this handle, released by ' + this.sh_releaseStack[2]); | ||
if (this.sh_error) { | ||
this.sh_claimed = false; | ||
return (undefined); | ||
} | ||
return (this.sh_cf.close.apply(this.sh_cf, arguments)); | ||
}; | ||
ConnectionHandle.prototype.release = function () { | ||
mod_assert.ok(this.sh_claimed, 'Connection not claimed by ' + | ||
'this handle, released by ' + this.sh_releaseStack[2]); | ||
if (this.sh_error) { | ||
this.sh_claimed = false; | ||
return (undefined); | ||
} | ||
return (this.sh_cf.release.apply(this.sh_cf, arguments)); | ||
}; | ||
ConnectionHandle.prototype.closeAfterRelease = function () { | ||
mod_assert.ok(this.sh_claimed, 'Connection not claimed by ' + | ||
'this handle, released by ' + this.sh_releaseStack[2]); | ||
if (this.sh_error) { | ||
this.sh_claimed = false; | ||
return (undefined); | ||
} | ||
return (this.sh_cf.closeAfterRelease.apply(this.sh_cf, | ||
arguments)); | ||
}; | ||
/* | ||
* A ConnectionPool holds a pool of ConnectionFSMs that are kept up to date | ||
@@ -675,2 +105,3 @@ * based on the output of a Resolver. At any given time the pool may contain: | ||
this.p_inRebalance = false; | ||
this.p_rebalScheduled = false; | ||
this.p_startedResolver = false; | ||
@@ -749,9 +180,8 @@ | ||
CueBallConnectionPool.prototype.state_starting = | ||
function (on, once, timeout, onState) { | ||
this.validTransitions(['failed', 'running', 'stopping']); | ||
CueBallConnectionPool.prototype.state_starting = function (S) { | ||
S.validTransitions(['failed', 'running', 'stopping']); | ||
mod_monitor.monitor.registerPool(this); | ||
on(this.p_resolver, 'added', this.on_resolver_added.bind(this)); | ||
on(this.p_resolver, 'removed', this.on_resolver_removed.bind(this)); | ||
S.on(this.p_resolver, 'added', this.on_resolver_added.bind(this)); | ||
S.on(this.p_resolver, 'removed', this.on_resolver_removed.bind(this)); | ||
@@ -763,10 +193,12 @@ var self = this; | ||
'pool will start up in "failed" state'); | ||
this.gotoState('failed'); | ||
S.gotoState('failed'); | ||
return; | ||
} | ||
onState(this.p_resolver, 'failed', function () { | ||
self.p_log.warn('underlying resolver failed, moving pool ' + | ||
'to "failed" state'); | ||
self.gotoState('failed'); | ||
S.on(this.p_resolver, 'stateChanged', function (state) { | ||
if (state === 'failed') { | ||
self.p_log.warn('underlying resolver failed, moving ' + | ||
'pool to "failed" state'); | ||
S.gotoState('failed'); | ||
} | ||
}); | ||
@@ -786,7 +218,7 @@ | ||
on(this, 'connectedToBackend', function () { | ||
self.gotoState('running'); | ||
S.on(this, 'connectedToBackend', function () { | ||
S.gotoState('running'); | ||
}); | ||
on(this, 'closedBackend', function (fsm) { | ||
S.on(this, 'closedBackend', function (fsm) { | ||
var dead = Object.keys(self.p_dead).length; | ||
@@ -798,39 +230,39 @@ if (dead >= self.p_keys.length) { | ||
'"failed" state'); | ||
self.gotoState('failed'); | ||
S.gotoState('failed'); | ||
} | ||
}); | ||
on(this, 'stopAsserted', function () { | ||
self.gotoState('stopping'); | ||
S.on(this, 'stopAsserted', function () { | ||
S.gotoState('stopping'); | ||
}); | ||
}; | ||
CueBallConnectionPool.prototype.state_failed = function (on) { | ||
this.validTransitions(['running', 'stopping']); | ||
on(this.p_resolver, 'added', this.on_resolver_added.bind(this)); | ||
on(this.p_resolver, 'removed', this.on_resolver_removed.bind(this)); | ||
on(this.p_shuffleTimer, 'timeout', this.reshuffle.bind(this)); | ||
CueBallConnectionPool.prototype.state_failed = function (S) { | ||
S.validTransitions(['running', 'stopping']); | ||
S.on(this.p_resolver, 'added', this.on_resolver_added.bind(this)); | ||
S.on(this.p_resolver, 'removed', this.on_resolver_removed.bind(this)); | ||
S.on(this.p_shuffleTimer, 'timeout', this.reshuffle.bind(this)); | ||
var self = this; | ||
on(this, 'connectedToBackend', function () { | ||
S.on(this, 'connectedToBackend', function () { | ||
mod_assert.ok(!self.p_resolver.isInState('failed')); | ||
self.p_log.info('successfully connected to a backend, ' + | ||
'moving back to running state'); | ||
self.gotoState('running'); | ||
S.gotoState('running'); | ||
}); | ||
on(this, 'stopAsserted', function () { | ||
self.gotoState('stopping'); | ||
S.on(this, 'stopAsserted', function () { | ||
S.gotoState('stopping'); | ||
}); | ||
}; | ||
CueBallConnectionPool.prototype.state_running = function (on) { | ||
this.validTransitions(['failed', 'stopping']); | ||
CueBallConnectionPool.prototype.state_running = function (S) { | ||
S.validTransitions(['failed', 'stopping']); | ||
var self = this; | ||
on(this.p_resolver, 'added', this.on_resolver_added.bind(this)); | ||
on(this.p_resolver, 'removed', this.on_resolver_removed.bind(this)); | ||
on(this.p_rebalTimer, 'timeout', this.rebalance.bind(this)); | ||
on(this.p_shuffleTimer, 'timeout', this.reshuffle.bind(this)); | ||
S.on(this.p_resolver, 'added', this.on_resolver_added.bind(this)); | ||
S.on(this.p_resolver, 'removed', this.on_resolver_removed.bind(this)); | ||
S.on(this.p_rebalTimer, 'timeout', this.rebalance.bind(this)); | ||
S.on(this.p_shuffleTimer, 'timeout', this.reshuffle.bind(this)); | ||
on(this, 'closedBackend', function (fsm) { | ||
S.on(this, 'closedBackend', function (fsm) { | ||
var dead = Object.keys(self.p_dead).length; | ||
@@ -842,29 +274,31 @@ if (dead >= self.p_keys.length) { | ||
'"failed" state'); | ||
self.gotoState('failed'); | ||
S.gotoState('failed'); | ||
} | ||
}); | ||
on(this, 'stopAsserted', function () { | ||
self.gotoState('stopping'); | ||
S.on(this, 'stopAsserted', function () { | ||
S.gotoState('stopping'); | ||
}); | ||
}; | ||
CueBallConnectionPool.prototype.state_stopping = | ||
function (on, once, timeout, onState) { | ||
this.validTransitions(['stopping.backends']); | ||
var self = this; | ||
CueBallConnectionPool.prototype.state_stopping = function (S) { | ||
S.validTransitions(['stopping.backends']); | ||
if (this.p_startedResolver) { | ||
onState(this.p_resolver, 'stopped', function () { | ||
self.gotoState('stopping.backends'); | ||
S.on(this.p_resolver, 'stateChanged', function (s) { | ||
if (s === 'stopped') { | ||
S.gotoState('stopping.backends'); | ||
} | ||
}); | ||
this.p_resolver.stop(); | ||
if (this.p_resolver.isInState('stopped')) { | ||
S.gotoState('stopping.backends'); | ||
} | ||
} else { | ||
this.gotoState('stopping.backends'); | ||
S.gotoState('stopping.backends'); | ||
} | ||
}; | ||
CueBallConnectionPool.prototype.state_stopping.backends = function () { | ||
this.validTransitions(['stopped']); | ||
CueBallConnectionPool.prototype.state_stopping.backends = function (S) { | ||
S.validTransitions(['stopped']); | ||
var conns = this.p_connections; | ||
var self = this; | ||
var fsms = []; | ||
@@ -880,3 +314,3 @@ Object.keys(conns).forEach(function (k) { | ||
}, function () { | ||
self.gotoState('stopped'); | ||
S.gotoState('stopped'); | ||
}); | ||
@@ -886,3 +320,6 @@ function closeBackend(fsm, cb) { | ||
fsm.closeAfterRelease(); | ||
fsm.onState('closed', cb); | ||
fsm.on('stateChanged', function (st) { | ||
if (st === 'closed') | ||
cb(); | ||
}); | ||
} else { | ||
@@ -894,4 +331,4 @@ fsm.close(cb); | ||
CueBallConnectionPool.prototype.state_stopped = function () { | ||
this.validTransitions([]); | ||
CueBallConnectionPool.prototype.state_stopped = function (S) { | ||
S.validTransitions([]); | ||
mod_monitor.monitor.unregisterPool(this); | ||
@@ -905,2 +342,6 @@ this.p_keys = []; | ||
CueBallConnectionPool.prototype.isDeclaredDead = function (backend) { | ||
return (this.p_dead[backend] === true); | ||
}; | ||
CueBallConnectionPool.prototype.reshuffle = function () { | ||
@@ -918,2 +359,20 @@ var taken = this.p_keys.pop(); | ||
CueBallConnectionPool.prototype.rebalance = function () { | ||
if (this.p_keys.length < 1) | ||
return; | ||
if (this.isInState('stopping') || this.isInState('stopped')) | ||
return; | ||
if (this.p_rebalScheduled !== false) | ||
return; | ||
this.p_rebalScheduled = true; | ||
var self = this; | ||
setImmediate(function () { | ||
self._rebalance(); | ||
}); | ||
}; | ||
/* | ||
@@ -927,14 +386,9 @@ * Rebalance the pool, by looking at the distribution of connections to | ||
*/ | ||
CueBallConnectionPool.prototype.rebalance = function () { | ||
CueBallConnectionPool.prototype._rebalance = function () { | ||
var self = this; | ||
if (this.p_keys.length < 1) | ||
return; | ||
if (this.isInState('stopping') || this.isInState('stopped')) | ||
return; | ||
if (this.p_inRebalance !== false) | ||
return; | ||
this.p_inRebalance = true; | ||
this.p_rebalScheduled = false; | ||
@@ -1036,8 +490,6 @@ var total = 0; | ||
fsm.on('stateChanged', function (newState) { | ||
var doRebalance = false; | ||
if (fsm.p_initq_node) { | ||
/* These transitions mean we're still starting up. */ | ||
if (newState === 'delay' || newState === 'error' || | ||
newState === 'connect') | ||
if (newState === 'init' || newState === 'delay' || | ||
newState === 'error' || newState === 'connect') | ||
return; | ||
@@ -1056,3 +508,3 @@ /* | ||
delete (self.p_dead[key]); | ||
doRebalance = true; | ||
self.rebalance(); | ||
} | ||
@@ -1062,3 +514,3 @@ } | ||
if (newState === 'idle') { | ||
if (newState === 'idle' && fsm.isInState('idle')) { | ||
/* | ||
@@ -1110,3 +562,3 @@ * This backend has just become available, either | ||
self.emit('closedBackend', key, fsm); | ||
doRebalance = true; | ||
self.rebalance(); | ||
} | ||
@@ -1123,7 +575,4 @@ | ||
/* Also rebalance, in case we were closed or died. */ | ||
doRebalance = true; | ||
self.rebalance(); | ||
} | ||
if (doRebalance) | ||
self.rebalance(); | ||
}); | ||
@@ -1134,26 +583,2 @@ | ||
CueBallConnectionPool.prototype.claimSync = function () { | ||
if (this.isInState('stopping') || this.isInState('stopped')) | ||
throw (new mod_errors.PoolStoppingError(this)); | ||
if (this.isInState('failed')) | ||
throw (new mod_errors.PoolFailedError(this)); | ||
var e = {}; | ||
Error.captureStackTrace(e); | ||
/* If there are idle connections sitting around, take one. */ | ||
if (this.p_idleq.length > 0) { | ||
var fsm = this.p_idleq.shift(); | ||
delete (fsm.p_idleq_node); | ||
fsm.claim(e.stack); | ||
mod_assert.ok(fsm.cf_shadow); | ||
return ({ | ||
handle: fsm.cf_shadow, | ||
connection: fsm.cf_conn | ||
}); | ||
} | ||
throw (new mod_errors.NoBackendsError(this)); | ||
}; | ||
CueBallConnectionPool.prototype.claim = function (options, cb) { | ||
@@ -1169,7 +594,9 @@ var self = this; | ||
mod_assert.optionalNumber(options.timeout, 'options.timeout'); | ||
var timeout = options.timeout || Infinity; | ||
var timeout = options.timeout; | ||
if (timeout === undefined) | ||
timeout = Infinity; | ||
mod_assert.optionalBool(options.errorOnEmpty, 'options.errorOnEmpty'); | ||
var errOnEmpty = options.errorOnEmpty; | ||
if (this.isInState('stoppping') || this.isInState('stopped')) { | ||
if (this.isInState('stopping') || this.isInState('stopped')) { | ||
setImmediate(function () { | ||
@@ -1202,4 +629,19 @@ if (!done) | ||
delete (fsm.p_idleq_node); | ||
fsm.claim(e.stack, cb); | ||
return (undefined); | ||
fsm.claim(e.stack, function (err, hdl, conn) { | ||
if (err) { | ||
if (!done) | ||
cb(err); | ||
done = true; | ||
return; | ||
} | ||
if (done) { | ||
hdl.release(); | ||
return; | ||
} | ||
done = true; | ||
cb(err, hdl, conn); | ||
}); | ||
return ({ | ||
cancel: function () { done = true; } | ||
}); | ||
} | ||
@@ -1206,0 +648,0 @@ |
@@ -101,46 +101,45 @@ /* | ||
CueBallResolver.prototype.state_stopped = function (on) { | ||
var self = this; | ||
on(this, 'startAsserted', function () { | ||
self.gotoState('starting'); | ||
CueBallResolver.prototype.state_stopped = function (S) { | ||
S.on(this, 'startAsserted', function () { | ||
S.gotoState('starting'); | ||
}); | ||
}; | ||
CueBallResolver.prototype.state_starting = function (on) { | ||
CueBallResolver.prototype.state_starting = function (S) { | ||
var self = this; | ||
this.r_fsm.start(); | ||
on(this.r_fsm, 'updated', function (err) { | ||
S.on(this.r_fsm, 'updated', function (err) { | ||
if (err) { | ||
self.r_lastError = err; | ||
self.gotoState('failed'); | ||
S.gotoState('failed'); | ||
} else { | ||
self.gotoState('running'); | ||
S.gotoState('running'); | ||
} | ||
}); | ||
on(this, 'stopAsserted', function () { | ||
self.gotoState('stopping'); | ||
S.on(this, 'stopAsserted', function () { | ||
S.gotoState('stopping'); | ||
}); | ||
}; | ||
CueBallResolver.prototype.state_running = function (on) { | ||
var self = this; | ||
on(this, 'stopAsserted', function () { | ||
self.gotoState('stopping'); | ||
CueBallResolver.prototype.state_running = function (S) { | ||
S.on(this, 'stopAsserted', function () { | ||
S.gotoState('stopping'); | ||
}); | ||
}; | ||
CueBallResolver.prototype.state_failed = function (on) { | ||
var self = this; | ||
on(this.r_fsm, 'updated', function (err) { | ||
CueBallResolver.prototype.state_failed = function (S) { | ||
S.on(this.r_fsm, 'updated', function (err) { | ||
if (!err) | ||
self.gotoState('running'); | ||
S.gotoState('running'); | ||
}); | ||
on(this, 'stopAsserted', function () { | ||
self.gotoState('stopping'); | ||
S.on(this, 'stopAsserted', function () { | ||
S.gotoState('stopping'); | ||
}); | ||
}; | ||
CueBallResolver.prototype.state_stopping = function (on) { | ||
CueBallResolver.prototype.state_stopping = function (S) { | ||
this.r_fsm.stop(); | ||
this.gotoState('stopped'); | ||
S.immediate(function () { | ||
S.gotoState('stopped'); | ||
}); | ||
}; | ||
@@ -287,11 +286,10 @@ | ||
CueBallDNSResolver.prototype.state_init = function (on) { | ||
var self = this; | ||
CueBallDNSResolver.prototype.state_init = function (S) { | ||
this.r_stopping = false; | ||
on(this, 'startAsserted', function () { | ||
self.gotoState('check_ns'); | ||
S.on(this, 'startAsserted', function () { | ||
S.gotoState('check_ns'); | ||
}); | ||
}; | ||
CueBallDNSResolver.prototype.state_check_ns = function (on, once) { | ||
CueBallDNSResolver.prototype.state_check_ns = function (S) { | ||
var self = this; | ||
@@ -303,3 +301,3 @@ if (this.r_resolvers.length > 0) { | ||
if (notIp.length === 0) { | ||
this.gotoState('srv'); | ||
S.gotoState('srv'); | ||
return; | ||
@@ -322,3 +320,3 @@ } | ||
} | ||
this.gotoState('bootstrap_ns'); | ||
S.gotoState('bootstrap_ns'); | ||
} else { | ||
@@ -329,3 +327,3 @@ mod_fs.readFile('/etc/resolv.conf', 'ascii', | ||
self.r_resolvers = ['8.8.8.8', '8.8.4.4']; | ||
self.gotoState('srv'); | ||
S.gotoState('srv'); | ||
return; | ||
@@ -341,3 +339,3 @@ } | ||
}); | ||
self.gotoState('srv'); | ||
S.gotoState('srv'); | ||
}); | ||
@@ -347,3 +345,3 @@ } | ||
CueBallDNSResolver.prototype.state_bootstrap_ns = function (on, once) { | ||
CueBallDNSResolver.prototype.state_bootstrap_ns = function (S) { | ||
var self = this; | ||
@@ -374,6 +372,6 @@ this.r_bootstrap.on('added', function (k, srv) { | ||
}); | ||
self.gotoState('srv'); | ||
S.gotoState('srv'); | ||
} else { | ||
once(this.r_bootstrap, 'added', function () { | ||
self.gotoState('srv'); | ||
S.on(this.r_bootstrap, 'added', function () { | ||
S.gotoState('srv'); | ||
}); | ||
@@ -384,10 +382,10 @@ this.r_bootstrap.start(); | ||
CueBallDNSResolver.prototype.state_srv = function () { | ||
CueBallDNSResolver.prototype.state_srv = function (S) { | ||
var r = this.r_srvRetry; | ||
r.delay = r.minDelay; | ||
r.count = r.max; | ||
this.gotoState('srv_try'); | ||
S.gotoState('srv_try'); | ||
}; | ||
CueBallDNSResolver.prototype.state_srv_try = function (on, once, timeout) { | ||
CueBallDNSResolver.prototype.state_srv_try = function (S) { | ||
var self = this; | ||
@@ -397,3 +395,3 @@ | ||
var req = this.resolve(name, 'SRV', this.r_srvRetry.timeout); | ||
once(req, 'answers', function (ans, ttl) { | ||
S.on(req, 'answers', function (ans, ttl) { | ||
var d = new Date(); | ||
@@ -404,5 +402,5 @@ d.setTime(d.getTime() + 1000*ttl); | ||
self.r_srvs = ans; | ||
self.gotoState('aaaa'); | ||
S.gotoState('aaaa'); | ||
}); | ||
once(req, 'error', function (err) { | ||
S.on(req, 'error', function (err) { | ||
self.r_lastError = err; | ||
@@ -433,5 +431,5 @@ | ||
self.gotoState('aaaa'); | ||
S.gotoState('aaaa'); | ||
} else { | ||
self.gotoState('srv_error'); | ||
S.gotoState('srv_error'); | ||
} | ||
@@ -442,8 +440,8 @@ }); | ||
CueBallDNSResolver.prototype.state_srv_error = function (on, once, timeout) { | ||
CueBallDNSResolver.prototype.state_srv_error = function (S) { | ||
var self = this; | ||
var r = self.r_srvRetry; | ||
if (--r.count > 0) { | ||
timeout(r.delay, function () { | ||
self.gotoState('srv_try'); | ||
S.timeout(r.delay, function () { | ||
S.gotoState('srv_try'); | ||
}); | ||
@@ -474,13 +472,13 @@ | ||
self.gotoState('aaaa'); | ||
S.gotoState('aaaa'); | ||
} | ||
}; | ||
CueBallDNSResolver.prototype.state_aaaa = function (on, once, timeout) { | ||
CueBallDNSResolver.prototype.state_aaaa = function (S) { | ||
this.r_srvRem = this.r_srvs.slice(); | ||
this.r_nextV6 = undefined; | ||
this.gotoState('aaaa_next'); | ||
S.gotoState('aaaa_next'); | ||
}; | ||
CueBallDNSResolver.prototype.state_aaaa_next = function () { | ||
CueBallDNSResolver.prototype.state_aaaa_next = function (S) { | ||
var r = this.r_retry; | ||
@@ -493,10 +491,10 @@ r.delay = r.minDelay; | ||
this.r_srv = srv; | ||
this.gotoState('aaaa_try'); | ||
S.gotoState('aaaa_try'); | ||
} else { | ||
/* Lookups are all done, proceed on through. */ | ||
this.gotoState('a'); | ||
S.gotoState('a'); | ||
} | ||
}; | ||
CueBallDNSResolver.prototype.state_aaaa_try = function (on, once, timeout) { | ||
CueBallDNSResolver.prototype.state_aaaa_try = function (S) { | ||
var self = this; | ||
@@ -511,3 +509,3 @@ var srv = this.r_srv; | ||
}); | ||
self.gotoState('aaaa_next'); | ||
S.gotoState('aaaa_next'); | ||
return; | ||
@@ -517,3 +515,3 @@ } | ||
var req = this.resolve(srv.name, 'AAAA', this.r_retry.timeout); | ||
once(req, 'answers', function (ans, ttl) { | ||
S.on(req, 'answers', function (ans, ttl) { | ||
var d = new Date(); | ||
@@ -527,7 +525,7 @@ d.setTime(d.getTime() + 1000*ttl); | ||
}); | ||
self.gotoState('aaaa_next'); | ||
S.gotoState('aaaa_next'); | ||
}); | ||
once(req, 'error', function (err) { | ||
S.on(req, 'error', function (err) { | ||
self.r_lastError = err; | ||
self.gotoState('aaaa_error'); | ||
S.gotoState('aaaa_error'); | ||
}); | ||
@@ -537,8 +535,8 @@ req.send(); | ||
CueBallDNSResolver.prototype.state_aaaa_error = function (on, once, timeout) { | ||
CueBallDNSResolver.prototype.state_aaaa_error = function (S) { | ||
var self = this; | ||
var r = self.r_retry; | ||
if (--r.count > 0) { | ||
timeout(r.delay, function () { | ||
self.gotoState('aaaa_try'); | ||
S.timeout(r.delay, function () { | ||
S.gotoState('aaaa_try'); | ||
}); | ||
@@ -560,13 +558,13 @@ | ||
self.gotoState('aaaa_next'); | ||
S.gotoState('aaaa_next'); | ||
} | ||
}; | ||
CueBallDNSResolver.prototype.state_a = function (on, once, timeout) { | ||
CueBallDNSResolver.prototype.state_a = function (S) { | ||
this.r_srvRem = this.r_srvs.slice(); | ||
this.r_nextV4 = undefined; | ||
this.gotoState('a_next'); | ||
S.gotoState('a_next'); | ||
}; | ||
CueBallDNSResolver.prototype.state_a_next = function () { | ||
CueBallDNSResolver.prototype.state_a_next = function (S) { | ||
var r = this.r_retry; | ||
@@ -579,10 +577,10 @@ r.delay = r.minDelay; | ||
this.r_srv = srv; | ||
this.gotoState('a_try'); | ||
S.gotoState('a_try'); | ||
} else { | ||
/* Lookups are all done, proceed on through. */ | ||
this.gotoState('process'); | ||
S.gotoState('process'); | ||
} | ||
}; | ||
CueBallDNSResolver.prototype.state_a_try = function (on, once, timeout) { | ||
CueBallDNSResolver.prototype.state_a_try = function (S) { | ||
var self = this; | ||
@@ -597,3 +595,3 @@ var srv = this.r_srv; | ||
}); | ||
self.gotoState('a_next'); | ||
S.gotoState('a_next'); | ||
return; | ||
@@ -603,3 +601,3 @@ } | ||
var req = this.resolve(srv.name, 'A', this.r_retry.timeout); | ||
once(req, 'answers', function (ans, ttl) { | ||
S.on(req, 'answers', function (ans, ttl) { | ||
var d = new Date(); | ||
@@ -613,7 +611,7 @@ d.setTime(d.getTime() + 1000*ttl); | ||
}); | ||
self.gotoState('a_next'); | ||
S.gotoState('a_next'); | ||
}); | ||
once(req, 'error', function (err) { | ||
S.on(req, 'error', function (err) { | ||
self.r_lastError = err; | ||
self.gotoState('a_error'); | ||
S.gotoState('a_error'); | ||
}); | ||
@@ -623,8 +621,8 @@ req.send(); | ||
CueBallDNSResolver.prototype.state_a_error = function (on, once, timeout) { | ||
CueBallDNSResolver.prototype.state_a_error = function (S) { | ||
var self = this; | ||
var r = self.r_retry; | ||
if (--r.count > 0) { | ||
timeout(r.delay, function () { | ||
self.gotoState('a_try'); | ||
S.timeout(r.delay, function () { | ||
S.gotoState('a_try'); | ||
}); | ||
@@ -646,7 +644,7 @@ | ||
self.gotoState('a_next'); | ||
S.gotoState('a_next'); | ||
} | ||
}; | ||
CueBallDNSResolver.prototype.state_process = function () { | ||
CueBallDNSResolver.prototype.state_process = function (S) { | ||
var self = this; | ||
@@ -682,3 +680,3 @@ | ||
this.emit('updated', err); | ||
this.gotoState('sleep'); | ||
S.gotoState('sleep'); | ||
return; | ||
@@ -708,6 +706,6 @@ } | ||
this.emit('updated'); | ||
this.gotoState('sleep'); | ||
S.gotoState('sleep'); | ||
}; | ||
CueBallDNSResolver.prototype.state_sleep = function (on, once, timeout) { | ||
CueBallDNSResolver.prototype.state_sleep = function (S) { | ||
var self = this; | ||
@@ -718,3 +716,3 @@ var now = new Date(); | ||
if (this.r_stopping) { | ||
this.gotoState('init'); | ||
S.gotoState('init'); | ||
return; | ||
@@ -735,12 +733,12 @@ } | ||
if (minDelay < 0) { | ||
this.gotoState(state); | ||
S.gotoState(state); | ||
} else { | ||
self.r_log.trace({state: state, delay: minDelay}, | ||
'sleeping until next TTL expiry'); | ||
var t = timeout(minDelay, function () { | ||
self.gotoState(state); | ||
var t = S.timeout(minDelay, function () { | ||
S.gotoState(state); | ||
}); | ||
t.unref(); | ||
on(this, 'stopAsserted', function () { | ||
self.gotoState('init'); | ||
S.on(this, 'stopAsserted', function () { | ||
S.gotoState('init'); | ||
}); | ||
@@ -838,5 +836,6 @@ } | ||
addns.forEach(function (rr) { | ||
if (rr.type !== type) { | ||
if (rr.type !== 'A' && rr.type !== 'AAAA') { | ||
if (rr.type === 'CNAME' || | ||
rr.type === 'DNAME') | ||
rr.type === 'DNAME' || | ||
rr.type === 'OPT') | ||
return; | ||
@@ -843,0 +842,0 @@ self.r_log.warn('got unsupported ' + |
418
lib/set.js
@@ -24,2 +24,3 @@ /* | ||
const mod_errors = require('./errors'); | ||
const mod_monitor = require('./pool-monitor'); | ||
@@ -40,11 +41,4 @@ const FSM = mod_mooremachine.FSM; | ||
mod_assert.optionalArrayOfString(options.resolvers, | ||
'options.resolvers'); | ||
mod_assert.optionalObject(options.resolver, 'options.resolver'); | ||
mod_assert.string(options.domain, 'options.domain'); | ||
this.cs_domain = options.domain; | ||
mod_assert.optionalString(options.service, 'options.service'); | ||
mod_assert.optionalNumber(options.maxDNSConcurrency, | ||
'options.maxDNSConcurrency'); | ||
mod_assert.optionalNumber(options.defaultPort, 'options.defaultPort'); | ||
mod_assert.object(options.resolver, 'options.resolver'); | ||
this.cs_resolver = options.resolver; | ||
@@ -69,28 +63,56 @@ mod_assert.object(options.recovery, 'options.recovery'); | ||
/* Array of string keys for all currently available backends. */ | ||
this.cs_keys = []; | ||
/* Map of backend key => backend info objects. */ | ||
this.cs_backends = {}; | ||
this.cs_connections = {}; | ||
/* Map of backend key => array of ConnectionFSM instances. */ | ||
this.cs_fsms = {}; | ||
/* Map of backend key => bool, if true the backend is declared dead. */ | ||
this.cs_dead = {}; | ||
/* | ||
* Map of backend key => integer, latest serial number for connections | ||
* to each backend. We use the serial numbers to generate the per- | ||
* connection keys. | ||
*/ | ||
this.cs_serials = {}; | ||
/* | ||
* Map of connection key => connection instance (as returned by | ||
* cs_constructor). | ||
*/ | ||
this.cs_connections = {}; | ||
/* For debugging, track when we last rebalanced. */ | ||
this.cs_lastRebalance = undefined; | ||
/* | ||
* If true, we are currently doing a rebalance. Rebalancing may involve | ||
* closing connections or opening new ones, which can then trigger a | ||
* recursive call into rebalance(), so we use this flag to avoid | ||
* problems caused by this. | ||
*/ | ||
this.cs_inRebalance = false; | ||
this.cs_startedResolver = false; | ||
/* | ||
* If true, a rebalance() is scheduled for the next turn of the event | ||
* loop (in a setImmediate). | ||
*/ | ||
this.cs_rebalScheduled = false; | ||
/* Debugging counters. */ | ||
this.cs_counters = {}; | ||
var self = this; | ||
if (options.resolver !== undefined && options.resolver !== null) { | ||
this.p_resolver = options.resolver; | ||
this.p_resolver_custom = true; | ||
} else { | ||
this.p_resolver = new mod_resolver.Resolver({ | ||
resolvers: options.resolvers, | ||
domain: options.domain, | ||
service: options.service, | ||
maxDNSConcurrency: options.maxDNSConcurrency, | ||
defaultPort: options.defaultPort, | ||
log: this.p_log, | ||
recovery: options.recovery | ||
}); | ||
this.p_resolver_custom = false; | ||
} | ||
this.cs_rebalTimer = new EventEmitter(); | ||
this.cs_rebalTimerInst = setInterval(function () { | ||
self.cs_rebalTimer.emit('timeout'); | ||
}, 10000); | ||
this.cs_rebalTimerInst.unref(); | ||
this.cs_shuffleTimer = new EventEmitter(); | ||
this.cs_shuffleTimerInst = setInterval(function () { | ||
self.cs_shuffleTimer.emit('timeout'); | ||
}, 60000); | ||
this.cs_shuffleTimerInst.unref(); | ||
FSM.call(this, 'starting'); | ||
@@ -113,8 +135,11 @@ } | ||
delete (this.cs_backends[k]); | ||
(this.cs_connections[k] || []).forEach(function (fsm) { | ||
if (!fsm.isInState('busy')) | ||
fsm.close(); | ||
var self = this; | ||
var cks = Object.keys(this.cs_connections).filter(function (ck) { | ||
return (ck.indexOf(k + '.') === 0); | ||
}); | ||
delete (this.cs_connections[k]); | ||
this.rebalance(); | ||
cks.forEach(function (ck) { | ||
var conn = self.cs_connections[ck]; | ||
delete (self.cs_connections[ck]); | ||
self.assertEmit('removed', ck, conn); | ||
}); | ||
}; | ||
@@ -126,3 +151,332 @@ | ||
CueBallConnectionSet.prototype.state_starting = function (on) { | ||
CueBallConnectionSet.prototype.state_starting = function (S) { | ||
S.validTransitions(['failed', 'running', 'stopping']); | ||
mod_monitor.monitor.registerSet(this); | ||
S.on(this.cs_resolver, 'added', this.on_resolver_added.bind(this)); | ||
S.on(this.cs_resolver, 'removed', this.on_resolver_removed.bind(this)); | ||
var self = this; | ||
if (this.cs_resolver.isInState('failed')) { | ||
this.cs_log.warn('resolver has already failed, cset will ' + | ||
'start up in "failed" state'); | ||
S.gotoState('failed'); | ||
return; | ||
} | ||
S.on(this.cs_resolver, 'stateChanged', function (st) { | ||
if (st === 'failed') { | ||
self.cs_log.warn('underlying resolver failed, moving ' + | ||
'cset to "failed" state'); | ||
S.gotoState('failed'); | ||
} | ||
}); | ||
if (this.cs_resolver.isInState('running')) { | ||
var backends = this.cs_resolver.list(); | ||
Object.keys(backends).forEach(function (k) { | ||
var backend = backends[k]; | ||
self.on_resolver_added(k, backend); | ||
}); | ||
} | ||
S.on(this, 'connectedToBackend', function () { | ||
S.gotoState('running'); | ||
}); | ||
S.on(this, 'closedBackend', function (fsm) { | ||
var dead = Object.keys(self.cs_dead).length; | ||
if (dead >= self.cs_keys.length) { | ||
self.cs_log.error( | ||
{ dead: dead }, | ||
'cset has exhausted all retries, now moving to ' + | ||
'"failed" state'); | ||
S.gotoState('failed'); | ||
} | ||
}); | ||
S.on(this, 'stopAsserted', function () { | ||
S.gotoState('stopping'); | ||
}); | ||
}; | ||
CueBallConnectionSet.prototype.state_failed = function (S) { | ||
S.validTransitions(['running', 'stopping']); | ||
S.on(this.cs_resolver, 'added', this.on_resolver_added.bind(this)); | ||
S.on(this.cs_resolver, 'removed', this.on_resolver_removed.bind(this)); | ||
S.on(this.cs_shuffleTimer, 'timeout', this.reshuffle.bind(this)); | ||
var self = this; | ||
S.on(this, 'connectedToBackend', function () { | ||
mod_assert.ok(!self.cs_resolver.isInState('failed')); | ||
self.cs_log.info('successfully connected to a backend, ' + | ||
'moving back to running state'); | ||
S.gotoState('running'); | ||
}); | ||
S.on(this, 'stopAsserted', function () { | ||
S.gotoState('stopping'); | ||
}); | ||
}; | ||
CueBallConnectionSet.prototype.state_running = function (S) { | ||
S.validTransitions(['failed', 'stopping']); | ||
var self = this; | ||
S.on(this.cs_resolver, 'added', this.on_resolver_added.bind(this)); | ||
S.on(this.cs_resolver, 'removed', this.on_resolver_removed.bind(this)); | ||
S.on(this.cs_rebalTimer, 'timeout', this.rebalance.bind(this)); | ||
S.on(this.cs_shuffleTimer, 'timeout', this.reshuffle.bind(this)); | ||
S.on(this, 'closedBackend', function (fsm) { | ||
var dead = Object.keys(self.cs_dead).length; | ||
if (dead >= self.cs_keys.length) { | ||
self.cs_log.error( | ||
{ dead: dead }, | ||
'pool has exhausted all retries, now moving to ' + | ||
'"failed" state'); | ||
S.gotoState('failed'); | ||
} | ||
}); | ||
S.on(this, 'stopAsserted', function () { | ||
S.gotoState('stopping'); | ||
}); | ||
}; | ||
CueBallConnectionSet.prototype.state_stopping = function (S) { | ||
S.validTransitions(['stopped']); | ||
var conns = this.cs_fsms; | ||
var fsms = []; | ||
Object.keys(conns).forEach(function (k) { | ||
conns[k].forEach(function (fsm) { | ||
fsms.push(fsm); | ||
}); | ||
}); | ||
mod_vasync.forEachParallel({ | ||
func: closeBackend, | ||
inputs: fsms | ||
}, function () { | ||
S.gotoState('stopped'); | ||
}); | ||
function closeBackend(fsm, cb) { | ||
if (fsm.isInState('busy')) { | ||
fsm.closeAfterRelease(); | ||
fsm.on('stateChanged', function (s) { | ||
if (s === 'closed') | ||
cb(); | ||
}); | ||
} else { | ||
fsm.close(cb); | ||
} | ||
} | ||
}; | ||
CueBallConnectionSet.prototype.state_stopped = function (S) { | ||
S.validTransitions([]); | ||
mod_monitor.monitor.unregisterSet(this); | ||
this.cs_keys = []; | ||
this.cs_fsms = {}; | ||
this.cs_connections = {}; | ||
this.cs_backends = {}; | ||
clearInterval(this.cs_rebalTimerInst); | ||
clearInterval(this.cs_shuffleTimerInst); | ||
}; | ||
CueBallConnectionSet.prototype.isDeclaredDead = function (backend) { | ||
return (this.cs_dead[backend] === true); | ||
}; | ||
CueBallConnectionSet.prototype.reshuffle = function () { | ||
var taken = this.cs_keys.pop(); | ||
var idx = Math.floor(Math.random() * (this.cs_keys.length + 1)); | ||
this.cs_keys.splice(idx, 0, taken); | ||
this.rebalance(); | ||
}; | ||
/* Stop and kill everything. */ | ||
CueBallConnectionSet.prototype.stop = function () { | ||
this.emit('stopAsserted'); | ||
}; | ||
CueBallConnectionSet.prototype.setTarget = function (target) { | ||
this.cs_target = target; | ||
this.rebalance(); | ||
}; | ||
CueBallConnectionSet.prototype.rebalance = function () { | ||
if (this.cs_keys.length < 1) | ||
return; | ||
if (this.isInState('stopping') || this.isInState('stopped')) | ||
return; | ||
if (this.cs_rebalScheduled !== false) | ||
return; | ||
this.p_rebalScheduled = true; | ||
var self = this; | ||
setImmediate(function () { | ||
self._rebalance(); | ||
}); | ||
}; | ||
/* | ||
* Rebalance the set, by looking at the distribution of connections to | ||
* backends amongst the "init" and "idle" queues. | ||
* | ||
* If the connections are not evenly distributed over the available backends, | ||
* then planRebalance() will return a plan to take us back to an even | ||
* distribution, which we then apply. | ||
*/ | ||
CueBallConnectionSet.prototype._rebalance = function () { | ||
var self = this; | ||
if (this.cs_inRebalance !== false) | ||
return; | ||
this.cs_inRebalance = true; | ||
this.cs_rebalScheduled = false; | ||
var conns = {}; | ||
var total = 0; | ||
this.cs_keys.forEach(function (k) { | ||
conns[k] = []; | ||
}); | ||
Object.keys(this.cs_fsms).forEach(function (k) { | ||
conns[k] = self.cs_fsms[k].slice(); | ||
total += self.cs_fsms[k].length; | ||
}); | ||
var plan = mod_utils.planRebalance( | ||
conns, this.cs_dead, this.cs_target, this.cs_max, true); | ||
if (plan.remove.length > 0 || plan.add.length > 0) { | ||
this.cs_log.trace('rebalancing cset, remove %d, ' + | ||
'add %d (target = %d, total = %d)', | ||
plan.remove.length, plan.add.length, this.cs_target, | ||
total); | ||
} | ||
plan.remove.forEach(function (fsm) { | ||
var k = fsm.cf_backend.key; | ||
var cks = Object.keys(self.cs_connections).filter( | ||
function (ck) { | ||
return (ck.indexOf(k + '.') === 0); | ||
}); | ||
cks.forEach(function (ck) { | ||
var conn = self.cs_connections[ck]; | ||
delete (self.cs_connections[ck]); | ||
self.emit('removed', ck, conn); | ||
}); | ||
if (cks.length === 0) { | ||
var fsmIdx = self.cs_fsms[k].indexOf(fsm); | ||
if (fsmIdx > 0 || self.cs_keys.indexOf(k) === -1) { | ||
fsm.close(); | ||
--total; | ||
} else { | ||
fsm.closeAfterRelease(); | ||
} | ||
} | ||
}); | ||
plan.add.forEach(function (k) { | ||
/* Make sure we *never* exceed our socket limit. */ | ||
if (++total > self.cs_max) | ||
return; | ||
self.addConnection(k); | ||
}); | ||
this.cs_inRebalance = false; | ||
this.cs_lastRebalance = new Date(); | ||
}; | ||
CueBallConnectionSet.prototype.assertEmit = function () { | ||
var args = arguments; | ||
var event = args[0]; | ||
if (this.listeners(event).length < 1) { | ||
throw (new Error('Event "' + event + '" on ConnectionSet ' + | ||
'must be handled')); | ||
} | ||
return (this.emit.apply(this, args)); | ||
}; | ||
CueBallConnectionSet.prototype.addConnection = function (key) { | ||
if (this.isInState('stopping') || this.isInState('stopped')) | ||
return; | ||
var backend = this.cs_backends[key]; | ||
backend.key = key; | ||
var fsm = new ConnectionFSM({ | ||
constructor: this.cs_constructor, | ||
backend: backend, | ||
log: this.cs_log, | ||
pool: this, | ||
recovery: this.cs_recovery, | ||
doRef: false | ||
}); | ||
if (this.cs_fsms[key] === undefined) | ||
this.cs_fsms[key] = []; | ||
if (this.cs_serials[key] === undefined) | ||
this.cs_serials[key] = 1; | ||
this.cs_fsms[key].push(fsm); | ||
var serial; | ||
var ckey; | ||
var self = this; | ||
fsm.on('stateChanged', function (newState) { | ||
if (newState === 'idle' && fsm.isInState('idle')) { | ||
if (serial === undefined) { | ||
self.emit('connectedToBackend', key, fsm); | ||
} | ||
if (ckey !== undefined && self.cs_connections[ckey]) { | ||
var conn = self.cs_connections[ckey]; | ||
delete (self.cs_connections[ckey]); | ||
self.assertEmit('removed', ckey, conn); | ||
} | ||
conn = fsm.getConnection(); | ||
serial = self.cs_serials[key]++; | ||
ckey = key + '.' + serial; | ||
conn.cs_serial = serial; | ||
fsm.cs_serial = serial; | ||
self.cs_connections[ckey] = conn; | ||
self.assertEmit('added', ckey, conn); | ||
self.rebalance(); | ||
return; | ||
} | ||
if (newState === 'closed') { | ||
if (self.cs_connections[ckey]) { | ||
conn = self.cs_connections[ckey]; | ||
delete (self.cs_connections[ckey]); | ||
self.assertEmit('removed', ckey, conn); | ||
} | ||
if (self.cs_fsms[key]) { | ||
var idx = self.cs_fsms[key].indexOf(fsm); | ||
self.cs_fsms[key].splice(idx, 1); | ||
} | ||
if (fsm.retriesExhausted()) { | ||
self.cs_dead[key] = true; | ||
} | ||
self.emit('closedBackend', fsm); | ||
self.rebalance(); | ||
return; | ||
} | ||
}); | ||
fsm.start(); | ||
}; | ||
CueBallConnectionSet.prototype.getConnections = function () { | ||
var self = this; | ||
return (Object.keys(this.cs_connections).map(function (k) { | ||
return (self.cs_connections[k]); | ||
})); | ||
}; | ||
CueBallConnectionSet.prototype._incrCounter = function (counter) { | ||
if (this.cs_counters[counter] === undefined) | ||
this.cs_counters[counter] = 0; | ||
++this.cs_counters[counter]; | ||
}; |
@@ -102,4 +102,6 @@ /* | ||
* - `max` -- a Number, maximum socket ceiling | ||
* - `singleton` -- optional Boolean (default false), create only a single | ||
* connection per distinct backend. used for Sets. | ||
*/ | ||
function planRebalance(inSpares, dead, target, max) { | ||
function planRebalance(inSpares, dead, target, max, singleton) { | ||
var replacements = 0; | ||
@@ -135,4 +137,11 @@ var wantedSpares = {}; | ||
if (dead[k] !== true) { | ||
++wantedSpares[k]; | ||
++done; | ||
if (singleton) { | ||
if (wantedSpares[k] === 0) { | ||
wantedSpares[k] = 1; | ||
++done; | ||
} | ||
} else { | ||
++wantedSpares[k]; | ||
++done; | ||
} | ||
continue; | ||
@@ -161,5 +170,13 @@ } | ||
if (dead[k] !== true) { | ||
++wantedSpares[k]; | ||
++done; | ||
continue; | ||
if (singleton) { | ||
if (wantedSpares[k] === 0) { | ||
wantedSpares[k] = 1; | ||
++done; | ||
continue; | ||
} | ||
} else { | ||
++wantedSpares[k]; | ||
++done; | ||
continue; | ||
} | ||
} | ||
@@ -182,5 +199,11 @@ /* | ||
var empties = keys.filter(function (kk) { | ||
return (dead[kk] !== true || | ||
wantedSpares[kk] === undefined || | ||
wantedSpares[kk] === 0); | ||
if (singleton) { | ||
return (dead[kk] !== true && ( | ||
wantedSpares[kk] === undefined || | ||
wantedSpares[kk] === 0)); | ||
} else { | ||
return (dead[kk] !== true || | ||
wantedSpares[kk] === undefined || | ||
wantedSpares[kk] === 0); | ||
} | ||
}); | ||
@@ -187,0 +210,0 @@ if (count + 1 <= max) { |
{ | ||
"name": "cueball", | ||
"version": "0.4.1", | ||
"version": "0.5.0", | ||
"description": "", | ||
@@ -12,4 +12,4 @@ "main": "lib/index.js", | ||
"ipaddr.js": ">=1.1.0 <2.0.0", | ||
"mooremachine": ">=1.4.2 <2.0.0", | ||
"named-client": "git+https://github.com/arekinath/node-named-client.git#v0.3.5", | ||
"mooremachine": ">=2.0.0 <3.0.0", | ||
"named-client": "git+https://github.com/arekinath/node-named-client.git#v0.3.6", | ||
"node-uuid": ">=1.4.7 <2.0.0", | ||
@@ -16,0 +16,0 @@ "posix-getopt": ">=1.2.0 <2.0.0", |
137
README.md
@@ -198,2 +198,7 @@ cueball | ||
If a client determines that a connection must be closed immediately (e.g. due | ||
to a protocol error making it impossible to continue using it safely), it must | ||
call the `.close()` method on the *handle*, not any `.destroy()` or similar | ||
method on the connection itself. | ||
Calling `claim()` on a Pool that is in the "stopping", "stopped" or "failed" | ||
@@ -416,2 +421,70 @@ states will result in the callback being called with an error on the next run of | ||
## Connection interface | ||
Objects returned by a `constructor` function (such as supplied to the | ||
`ConnectionPool` constructor) must obey a subset of the node.js socket | ||
interface. In particular they must support the following events and methods: | ||
### `constructor(backend)` | ||
Parameters: | ||
- `backend` -- an Object, with properties: | ||
- `key` -- a String, the backend key as supplied via the Resolver interface. | ||
Can be used to uniquely identify the backend. | ||
- `address` -- a String, address of the backend (IPv4 or IPv6) | ||
- `port` -- a Number, TCP or UDP port number | ||
Returns an object obeying the Connection interface. | ||
### Event `connect` (required) | ||
At construction, the connection object must immediately attempt to make a | ||
connection to the backend specified by the first argument to the constructor. | ||
When the connection succeeds, it must emit the event `connect`. No arguments are | ||
required. | ||
### Event `error` | ||
Connection objects may emit `error` at any time in response to a fatal error. | ||
The connection will be immediately terminated (by calling `.destroy()`) upon the | ||
emission of any `error` event. | ||
The `error` event should be emitted with an Object as the first parameter. This | ||
is expected to have `Error` on its prototype chain (`obj instanceof Error` | ||
should be `true`). | ||
May also be emitted as `connectError` only in the state before `connect` has | ||
been emitted. | ||
### Event `close` (required) | ||
Connection objects must emit `close` as the final event they emit after the | ||
connection has ended. No events may be emitted after `close`. | ||
### `#ref()` (required) | ||
Marks the connection as "referenced", meaning that it should keep the Node | ||
process running even if no event handlers are registered on it. | ||
It is generally acceptable for long-lived server processes (which never intend | ||
to exit in normal operation) to provide a Connection that implements `ref()` and | ||
`unref()` as no-ops. | ||
### `#unref()` (required) | ||
Marks the connection as "unreferenced", allowing the Node process to exit if it | ||
is the only remaining source of events. | ||
### `#destroy()` (required) | ||
Immediately disconnects the connection and proceeds to emit `close`. | ||
### Event `timeout` | ||
Optional. Equivalent to emitting `error` with a ConnectionTimeoutError as an | ||
argument. | ||
May also be emitted as `connectTimeout` only in the state before `connect` has | ||
been emitted. | ||
## Errors | ||
@@ -553,3 +626,67 @@ | ||
ConnectionSet | ||
------------- | ||
Cueball also includes an alternative to the ConnectionPool, named ConnectionSet. | ||
This is a more low-level API which is useful for implementing clients for | ||
protocols that are not as strictly connection-oriented. | ||
Key differences to ConnectionPool: | ||
- Each backend in a ConnectionSet has a maximum of 1 connection open to it | ||
(it's expected to be used with protocols that multiplex operations over a | ||
single socket.) | ||
- No support for leases (claim/release). ConnectionSet does not track whether | ||
connections are busy or not, and expects its consumer to manage this. | ||
ConnectionSets have an identical state graph to ConnectionPools. | ||
### `new mod_cueball.ConnectionSet(options)` | ||
Parameters | ||
- `options` -- Object, with keys: | ||
- `resolver` -- Object, an instance of the Resolver interface | ||
- `constructor` -- Function, same as in ConnectionPool | ||
- `recovery` -- Object, a recovery spec (see below) | ||
- `log` -- optional Object, a `bunyan`-style logger to use | ||
- `target` -- optional Number, target number of connections to be made | ||
available in the entire set | ||
- `maximum` -- optional Number, maximum number of connections per host | ||
### Event `'added'` | ||
Emitted when a new connection becomes available in the set. This event *must* | ||
have a handler on it at all times. | ||
Parameters | ||
- `key` -- String, a unique key to identify this connection | ||
- `connection` -- Object, the connection as returned by the constructor | ||
### Event `removed` | ||
Emitted when an existing connection should be removed from the pool. This event | ||
*must* have a handler on it at all times. The handler is obligated to take all | ||
necessary actions to drain the connection of outstanding requests and then close | ||
it. The emission of this event must cause the connection object to emit | ||
`'close'` as soon as possible. | ||
Parameters | ||
- `key` -- String, a unique key to identify the connection | ||
### `ConnectionSet#stop()` | ||
Stops the ConnectionSet, disconnecting all available connections (by first | ||
emitting `'removed'` for them.) | ||
### `ConnectionSet#setTarget(target)` | ||
Sets the target number of connections in the ConnectionSet. Will trigger an | ||
async operation to add or remove connections in order to meet the new target. | ||
Parameters: | ||
- `target` -- Number | ||
### `ConnectionSet#getConnections()` | ||
Returns all the currently open connections in the Set, as an Array. | ||
Tools | ||
@@ -556,0 +693,0 @@ ----- |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Git dependency
Supply chain riskContains a dependency which resolves to a remote git URL. Dependencies fetched from git URLs are not immutable and can be used to inject untrusted code or reduce the likelihood of a reproducible install.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Git dependency
Supply chain riskContains a dependency which resolves to a remote git URL. Dependencies fetched from git URLs are not immutable and can be used to inject untrusted code or reduce the likelihood of a reproducible install.
Found 1 instance in 1 package
141784
3330
734
8
+ Addedmooremachine@2.3.0(transitive)
- Removedmooremachine@1.4.2(transitive)
Updatedmooremachine@>=2.0.0 <3.0.0
Updatednamed-client@git+https://github.com/arekinath/node-named-client.git#v0.3.6