Comparing version 1.3.2 to 2.0.0
@@ -291,2 +291,3 @@ /* | ||
socket.once('free', onFree); | ||
socket.once('close', onClose); | ||
socket.once('agentRemove', onAgentRemove); | ||
@@ -296,2 +297,11 @@ req.onSocket(socket); | ||
function onClose() { | ||
sock.removeListener('free', onFree); | ||
sock.removeListener('agentRemove', onAgentRemove); | ||
req.removeListener('abort', onAbort); | ||
conn.release(); | ||
conn = undefined; | ||
sock = undefined; | ||
} | ||
function onAbort() { | ||
@@ -303,2 +313,3 @@ if (waiter !== undefined) { | ||
if (conn !== undefined) { | ||
sock.removeListener('close', onClose); | ||
sock.removeListener('free', onFree); | ||
@@ -312,2 +323,3 @@ sock.removeListener('agentRemove', onAgentRemove); | ||
function onFree() { | ||
sock.removeListener('close', onClose); | ||
sock.removeListener('agentRemove', onAgentRemove); | ||
@@ -320,2 +332,3 @@ req.removeListener('abort', onAbort); | ||
function onAgentRemove() { | ||
sock.removeListener('close', onClose); | ||
sock.removeListener('free', onFree); | ||
@@ -322,0 +335,0 @@ req.removeListener('abort', onAbort); |
@@ -9,3 +9,6 @@ /* | ||
module.exports = ConnectionFSM; | ||
module.exports = { | ||
ConnectionSlotFSM: ConnectionSlotFSM, | ||
CueBallClaimHandle: CueBallClaimHandle | ||
}; | ||
@@ -33,42 +36,96 @@ const mod_events = require('events'); | ||
/* | ||
* ConnectionFSM is the state machine for a "connection" -- an abstract entity | ||
* that is managed by a ConnectionPool. ConnectionFSMs are associated with a | ||
* particular 'backend', and hold a backing object cf_conn. Typically this | ||
* backing object is a TCP socket, but may be any EventEmitter that emits | ||
* 'close' and 'error'. | ||
* This file contains 3 key FSMs used to manage connections: | ||
* - SocketMgrFSM | ||
* - ConnectionSlotFSM | ||
* - CueBallClaimHandle (also an FSM, despite the name) | ||
* | ||
* Only the ConnectionSlotFSM and CueBallClaimHandle are exported and used | ||
* elsewhere in cueball. The SocketMgrFSM is an implementation detail of the | ||
* ConnectionSlotFSM (which is the only thing that constructs and manages it). | ||
* | ||
* Pools and Sets construct one ConnectionSlotFSM for each "slot" they have | ||
* (a "slot" being a position in their list of connections, sort of like a | ||
* vnode versus pnode in consistent hashing terminology). | ||
* | ||
* Pools and Sets also can construct ClaimHandles, which attempt to "claim" a | ||
* slot. Once claimed, the ClaimHandle will call a callback and then hold that | ||
* slot until a release() or close() method on it is called. It is used to | ||
* manage the process of "giving out" connections to downstream users of | ||
* cueball (this process involves multiple steps and possible state-change | ||
* races, so it is encapsulated in an FSM). | ||
* | ||
* Note on state transition diagrams in this file: | ||
* | ||
* In the condition notation on edges: | ||
* 'connect' -- means "when the event 'connect' is emitted" | ||
* .connect() -- means "when the signal function .connect() is called" | ||
* unwanted -- means "if the flag 'unwanted' is true" or | ||
* "when the flag 'unwanted' changes to true" | ||
* | ||
* Generally the first condition on an edge is the trigger condition -- we will | ||
* only transition when it changes. The subsequent conditions joined by && are | ||
* evaluated only after a change happens in the first condition. | ||
*/ | ||
function ConnectionFSM(options) { | ||
/* | ||
* The "Socket Manager FSM" (SocketMgrFSM) is an abstraction for a "socket"/ | ||
* connection, which handles retry and backoff, as well as de-duplication of | ||
* error and closure related events. | ||
* | ||
* A single SocketMgrFSM may construct multiple sockets over its lifetime -- | ||
* it creates them by calling the "constructor" function passed in from the | ||
* public-facing API (e.g. the ConnectionPool options). Every time we enter the | ||
* "connecting" state we will construct a new socket, and we destroy it in | ||
* "error" or "closed". | ||
* | ||
* A SocketMgrFSM is constructed and managed only by a ConnectionSlotFSM | ||
* (there is a 1:1 relationship between the two). Many points in the SocketMgr | ||
* state graph stop and wait for signals from the SlotFSM on how to proceed. | ||
* | ||
* These signals are "signal functions" in the mooremachine style -- see | ||
* SocketMgrFSM.prototype.connect() and .retry(). | ||
* | ||
* +------------+ | ||
* | | | ||
* | failed | | ||
* +------+ | | | ||
* | init | +------------+ | ||
* +------+ ^ | ||
* | | | ||
* | |retries | ||
* v |exhausted | ||
* +------------+ +--+---------+ | ||
* | | timeout| | | ||
* +-----> | connecting | <--------+ backoff | <--+ | ||
* | | | | | | | ||
* | +--+----+----+ +------------+ | | ||
* | | | ^ | | ||
* | 'connect'| | timeout/'error' | | | ||
* | | +---------+ | | | ||
* | v | | .retry() | | ||
* | +------------+ | +--+---------+ | | ||
* | | | +---> | | | | ||
* | | connected +--------> | error | | | ||
* | | | 'error' | | | | ||
* | +--+---------+ +------------+ | | ||
* | 'close'| | | ||
* | .close()| | | ||
* | | | | ||
* | v | | ||
* | +------------+ | | ||
* | | | | | ||
* +-------+ closed +----------------------------+ | ||
* .connect()| | .retry() | ||
* +------------+ | ||
*/ | ||
function SocketMgrFSM(options) { | ||
mod_assert.object(options, 'options'); | ||
mod_assert.object(options.pool, 'options.pool'); | ||
mod_assert.func(options.constructor, 'options.constructor'); | ||
mod_assert.object(options.backend, 'options.backend'); | ||
mod_assert.object(options.log, 'options.log'); | ||
mod_assert.optionalFunc(options.checker, 'options.checker'); | ||
mod_assert.optionalNumber(options.checkTimeout, 'options.checkTimeout'); | ||
mod_assert.object(options.slot, 'options.slot'); | ||
mod_assert.bool(options.monitor, 'options.monitor'); | ||
mod_assert.object(options.pool, 'options.pool'); | ||
this.cf_pool = options.pool; | ||
this.cf_constructor = options.constructor; | ||
this.cf_backend = options.backend; | ||
this.cf_claimed = false; | ||
this.cf_claimStack = []; | ||
this.cf_releaseStack = []; | ||
this.cf_lastError = undefined; | ||
this.cf_conn = undefined; | ||
this.cf_shadow = undefined; | ||
this.cf_closeAfter = false; | ||
mod_assert.optionalBool(options.doRef, 'options.doRef'); | ||
this.cf_doRef = options.doRef; | ||
if (this.cf_doRef === undefined && this.cf_doRef !== null) | ||
this.cf_doRef = true; | ||
this.cf_oldListeners = {}; | ||
this.cf_checkTimeout = options.checkTimeout; | ||
this.cf_checker = options.checker; | ||
this.cf_lastCheck = new Date(); | ||
this.cf_log = options.log.child({ | ||
component: 'CueBallConnectionFSM', | ||
backend: this.cf_backend.key, | ||
address: this.cf_backend.address, | ||
port: this.cf_backend.port | ||
}); | ||
mod_assert.object(options.recovery, 'options.recovery'); | ||
@@ -87,187 +144,177 @@ | ||
this.cf_initialRecov = initialRecov; | ||
this.cf_connectRecov = connectRecov; | ||
this.sm_initialRecov = initialRecov; | ||
this.sm_connectRecov = connectRecov; | ||
this.cf_retries = initialRecov.retries; | ||
this.cf_retriesLeft = initialRecov.retries; | ||
this.cf_minDelay = initialRecov.delay; | ||
this.cf_delay = initialRecov.delay; | ||
this.cf_maxDelay = initialRecov.maxDelay || Infinity; | ||
this.cf_timeout = initialRecov.timeout; | ||
this.cf_maxTimeout = initialRecov.maxTimeout || Infinity; | ||
this.sm_pool = options.pool; | ||
this.sm_backend = options.backend; | ||
this.sm_constructor = options.constructor; | ||
this.sm_slot = options.slot; | ||
this.sm_log = options.log.child({ | ||
component: 'CueBallSocketMgrFSM', | ||
backend: this.sm_backend.key, | ||
address: this.sm_backend.address, | ||
port: this.sm_backend.port | ||
}); | ||
this.sm_lastError = undefined; | ||
this.sm_socket = undefined; | ||
/* | ||
* If our parent pool thinks this backend is dead, resume connection | ||
* attempts with the maximum delay and timeout. Something is going | ||
* wrong, let's not retry too aggressively and make it worse. | ||
* We start off in the "init" state, waiting for the SlotFSM to call | ||
* connect(). | ||
*/ | ||
if (this.cf_pool.isDeclaredDead(this.cf_backend.key)) { | ||
/* | ||
* We might be given an infinite maxDelay or maxTimeout. If | ||
* we are, then multiply it by 2^(retries) to get to what the | ||
* value would have been before. | ||
*/ | ||
var mult = 1 << this.cf_retries; | ||
this.cf_delay = this.cf_maxDelay; | ||
if (!isFinite(this.cf_delay)) | ||
this.cf_delay = initialRecov.delay * mult; | ||
this.cf_timeout = this.cf_maxTimeout; | ||
if (!isFinite(this.cf_timeout)) | ||
this.cf_timeout = initialRecov.timeout * mult; | ||
/* Keep retrying a failed backend forever */ | ||
this.cf_retries = Infinity; | ||
this.cf_retriesLeft = Infinity; | ||
} | ||
FSM.call(this, 'init'); | ||
this.allStateEvent('closeAsserted'); | ||
FSM.call(this, 'init'); | ||
this.sm_monitor = undefined; | ||
this.setMonitor(options.monitor); | ||
} | ||
mod_util.inherits(ConnectionFSM, FSM); | ||
mod_util.inherits(SocketMgrFSM, FSM); | ||
/* | ||
* Return true if this connection was closed due to retry exhaustion. | ||
* Sets whether we are in "monitor" mode or not. In "monitor" mode the | ||
* SocketMgrFSM has infinite retries and does not use exponential back-off | ||
* (timeout and delay are fixed at their max values). | ||
*/ | ||
ConnectionFSM.prototype.retriesExhausted = function () { | ||
return (this.isInState('closed') && this.cf_retriesLeft <= 0); | ||
SocketMgrFSM.prototype.setMonitor = function (value) { | ||
mod_assert.ok(this.isInState('init') || this.isInState('connected')); | ||
if (value === this.sm_monitor) | ||
return; | ||
this.sm_monitor = value; | ||
this.resetBackoff(); | ||
}; | ||
ConnectionFSM.prototype.getConnection = function () { | ||
mod_assert.ok(this.isInState('idle')); | ||
return (this.cf_conn); | ||
}; | ||
SocketMgrFSM.prototype.resetBackoff = function () { | ||
var initialRecov = this.sm_initialRecov; | ||
/* | ||
* Mark this Connection as "claimed"; in use by a particular client of the | ||
* pool. | ||
* | ||
* Normally this will be called by the pool itself, which will give the 'stack' | ||
* argument as a copy of the stack trace from its caller. | ||
* | ||
* We keep track of the stack trace of our last claimer and releaser to aid | ||
* in debugging. | ||
*/ | ||
ConnectionFSM.prototype.claim = function (stack, cb) { | ||
mod_assert.ok(this.cf_claimed === false); | ||
mod_assert.strictEqual(this.getState(), 'idle'); | ||
if (typeof (stack) === 'function') { | ||
cb = stack; | ||
stack = undefined; | ||
this.sm_retries = initialRecov.retries; | ||
this.sm_retriesLeft = initialRecov.retries; | ||
this.sm_minDelay = initialRecov.delay; | ||
this.sm_delay = initialRecov.delay; | ||
this.sm_maxDelay = initialRecov.maxDelay || Infinity; | ||
this.sm_timeout = initialRecov.timeout; | ||
this.sm_maxTimeout = initialRecov.maxTimeout || Infinity; | ||
if (this.sm_monitor === true) { | ||
var mult = 1 << this.sm_retries; | ||
this.sm_delay = this.sm_maxDelay; | ||
if (!isFinite(this.sm_delay)) | ||
this.sm_delay = initialRecov.delay * mult; | ||
this.sm_timeout = this.sm_maxTimeout; | ||
if (!isFinite(this.sm_timeout)) | ||
this.sm_timeout = initialRecov.timeout * mult; | ||
/* Keep retrying a failed backend forever */ | ||
this.sm_retries = Infinity; | ||
this.sm_retriesLeft = Infinity; | ||
} | ||
mod_assert.func(cb, 'callback'); | ||
if (stack === undefined) { | ||
var e = mod_utils.maybeCaptureStackTrace(); | ||
stack = e.stack; | ||
} | ||
this.cf_claimStack = stack.split('\n').slice(1). | ||
map(function (l) { return (l.replace(/^[ ]*at /, '')); }); | ||
this.cf_claimed = true; | ||
var self = this; | ||
this.on('stateChanged', onStateChanged); | ||
function onStateChanged(st) { | ||
if (st === 'busy' && self.isInState('busy')) { | ||
self.removeListener('stateChanged', onStateChanged); | ||
cb(null, self.cf_shadow, self.cf_conn); | ||
} else if (st === 'error' && self.cf_lastError !== undefined) { | ||
self.removeListener('stateChanged', onStateChanged); | ||
var err = new mod_verror.VError(self.cf_lastError, | ||
'Connection error during claim on backend %s:%d', | ||
self.cf_backend.address, self.cf_backend.port); | ||
cb(err); | ||
} else if (st !== 'busy') { | ||
self.removeListener('stateChanged', onStateChanged); | ||
cb(new mod_verror.VError('Claimed connection entered ' + | ||
'state "%s" during claim, instead of "busy"', st)); | ||
} | ||
} | ||
this.emit('claimAsserted'); | ||
}; | ||
/* | ||
* Mark this Connection as "free" and ready to be re-used. This is normally | ||
* called via the ConnectionHandle. | ||
*/ | ||
ConnectionFSM.prototype.release = function (cb) { | ||
mod_assert.ok(this.cf_claimed === true); | ||
mod_assert.ok(['busy', 'ping'].indexOf(this.getState()) !== -1, | ||
'connection is not held'); | ||
SocketMgrFSM.prototype.connect = function () { | ||
mod_assert.ok(this.isInState('init') || this.isInState('closed'), | ||
'SocketMgrFSM#connect may only be called in state "init" or ' + | ||
'"closed" (is in "' + this.getState() + '")'); | ||
this.emit('connectAsserted'); | ||
}; | ||
var e = mod_utils.maybeCaptureStackTrace(); | ||
this.cf_releaseStack = e.stack.split('\n').slice(1). | ||
map(function (l) { return (l.replace(/^[ ]*at /, '')); }); | ||
this.once('stateChanged', function (st) { | ||
mod_assert.notStrictEqual(st, 'busy'); | ||
if (cb) | ||
cb(null); | ||
}); | ||
this.emit('releaseAsserted'); | ||
SocketMgrFSM.prototype.retry = function () { | ||
mod_assert.ok(this.isInState('closed') || this.isInState('error'), | ||
'SocketMgrFSM#retry may only be called in state "closed" or ' + | ||
'"error" (is in "' + this.getState() + '")'); | ||
this.emit('retryAsserted'); | ||
}; | ||
ConnectionFSM.prototype.close = function (cb) { | ||
if (cb) { | ||
this.on('stateChanged', function (st) { | ||
if (st === 'closed') | ||
cb(); | ||
}); | ||
} | ||
SocketMgrFSM.prototype.close = function () { | ||
mod_assert.ok(this.isInState('connected') || this.isInState('backoff'), | ||
'SocketMgrFSM#close may only be called in state "connected" or ' + | ||
'"backoff" (is in "' + this.getState() + '")'); | ||
this.emit('closeAsserted'); | ||
}; | ||
ConnectionFSM.prototype.start = function () { | ||
this.emit('startAsserted'); | ||
SocketMgrFSM.prototype.getLastError = function () { | ||
return (this.sm_lastError); | ||
}; | ||
ConnectionFSM.prototype.closeAfterRelease = function () { | ||
this.cf_closeAfter = true; | ||
SocketMgrFSM.prototype.getSocket = function () { | ||
mod_assert.ok(this.isInState('connected'), 'Sockets may only be ' + | ||
'retrieved from SocketMgrFSMs in "connected" state (is in ' + | ||
'state "' + this.getState() + '")'); | ||
return (this.sm_socket); | ||
}; | ||
ConnectionFSM.prototype.state_init = function (S) { | ||
S.validTransitions(['connect', 'closed']); | ||
S.on(this, 'startAsserted', function () { | ||
S.gotoState('connect'); | ||
SocketMgrFSM.prototype.state_init = function (S) { | ||
S.validTransitions(['connecting']); | ||
S.on(this, 'connectAsserted', function () { | ||
S.gotoState('connecting'); | ||
}); | ||
S.on(this, 'closeAsserted', function () { | ||
S.gotoState('closed'); | ||
}); | ||
}; | ||
ConnectionFSM.prototype.state_connect = function (S) { | ||
S.validTransitions(['error', 'idle', 'closed']); | ||
SocketMgrFSM.prototype.state_connecting = function (S) { | ||
S.validTransitions(['connected', 'error']); | ||
var self = this; | ||
S.timeout(this.cf_timeout, function () { | ||
self.cf_lastError = new mod_errors.ConnectionTimeoutError(self); | ||
S.timeout(this.sm_timeout, function () { | ||
self.sm_lastError = new mod_errors.ConnectionTimeoutError(self); | ||
S.gotoState('error'); | ||
self.sm_pool._incrCounter('timeout-during-connect'); | ||
}); | ||
this.cf_log.trace('calling constructor to open new connection'); | ||
this.cf_conn = this.cf_constructor(this.cf_backend); | ||
mod_assert.object(this.cf_conn, 'constructor return value'); | ||
this.cf_conn.cf_fsm = this; | ||
S.on(this.cf_conn, 'connect', function () { | ||
S.gotoState('idle'); | ||
this.sm_log.trace('calling constructor to open new connection'); | ||
this.sm_socket = this.sm_constructor(this.sm_backend); | ||
mod_assert.object(this.sm_socket, 'constructor return value'); | ||
this.sm_socket.sm_fsm = this; | ||
S.on(this.sm_socket, 'connect', function () { | ||
S.gotoState('connected'); | ||
}); | ||
S.on(this.cf_conn, 'error', function (err) { | ||
self.cf_lastError = err; | ||
S.on(this.sm_socket, 'error', function socketMgrErrorListener(err) { | ||
self.sm_lastError = err; | ||
S.gotoState('error'); | ||
self.cf_pool._incrCounter('error-during-connect'); | ||
self.sm_pool._incrCounter('error-during-connect'); | ||
}); | ||
S.on(this.cf_conn, 'connectError', function (err) { | ||
self.cf_lastError = err; | ||
S.on(this.sm_socket, 'connectError', function (err) { | ||
self.sm_lastError = err; | ||
S.gotoState('error'); | ||
self.cf_pool._incrCounter('error-during-connect'); | ||
self.sm_pool._incrCounter('error-during-connect'); | ||
}); | ||
S.on(this.cf_conn, 'close', function () { | ||
self.cf_lastError = new mod_errors.ConnectionClosedError(self); | ||
S.on(this.sm_socket, 'close', function () { | ||
self.sm_lastError = new mod_errors.ConnectionClosedError(self); | ||
S.gotoState('error'); | ||
self.cf_pool._incrCounter('close-during-connect'); | ||
self.sm_pool._incrCounter('close-during-connect'); | ||
}); | ||
S.on(this.cf_conn, 'timeout', function () { | ||
self.cf_lastError = new mod_errors.ConnectionTimeoutError(self); | ||
S.on(this.sm_socket, 'timeout', function () { | ||
self.sm_lastError = new mod_errors.ConnectionTimeoutError(self); | ||
S.gotoState('error'); | ||
self.cf_pool._incrCounter('timeout-during-connect'); | ||
self.sm_pool._incrCounter('timeout-during-connect'); | ||
}); | ||
S.on(this.cf_conn, 'connectTimeout', function (err) { | ||
self.cf_lastError = new mod_errors.ConnectionTimeoutError(self); | ||
S.on(this.sm_socket, 'connectTimeout', function () { | ||
self.sm_lastError = new mod_errors.ConnectionTimeoutError(self); | ||
S.gotoState('error'); | ||
self.cf_pool._incrCounter('timeout-during-connect'); | ||
self.sm_pool._incrCounter('timeout-during-connect'); | ||
}); | ||
}; | ||
SocketMgrFSM.prototype.state_connected = function (S) { | ||
S.validTransitions(['error', 'closed']); | ||
var self = this; | ||
if (typeof (self.sm_socket.localPort) === 'number') { | ||
this.sm_log = this.sm_log.child({ | ||
localPort: this.sm_socket.localPort | ||
}); | ||
} | ||
this.sm_log.trace('connected'); | ||
this.resetBackoff(); | ||
S.on(this.sm_socket, 'error', function socketMgrErrorListener(err) { | ||
self.sm_lastError = err; | ||
S.gotoState('error'); | ||
self.sm_pool._incrCounter('error-while-connected'); | ||
}); | ||
S.on(this.sm_socket, 'close', function () { | ||
S.gotoState('closed'); | ||
}); | ||
S.on(this, 'closeAsserted', function () { | ||
@@ -278,128 +325,349 @@ S.gotoState('closed'); | ||
ConnectionFSM.prototype.state_closed = function (S) { | ||
S.validTransitions([]); | ||
this.cf_log.trace('closed'); | ||
if (this.cf_conn) { | ||
this.cf_conn.destroy(); | ||
this.cf_log = this.cf_log.child({ localPort: null }); | ||
SocketMgrFSM.prototype.state_error = function (S) { | ||
S.validTransitions(['backoff']); | ||
var log = this.sm_log; | ||
if (this.sm_socket) { | ||
this.sm_socket.destroy(); | ||
this.sm_log = this.sm_log.child({ localPort: null }); | ||
} | ||
this.cf_conn = undefined; | ||
S.on(this, 'closeAsserted', function () { }); | ||
}; | ||
this.sm_socket = undefined; | ||
ConnectionFSM.prototype.state_error = function (S) { | ||
S.validTransitions(['delay', 'closed']); | ||
log.trace(this.sm_lastError, 'got error from connection'); | ||
S.on(this, 'closeAsserted', function () { }); | ||
S.on(this, 'retryAsserted', function () { | ||
S.gotoState('backoff'); | ||
}); | ||
}; | ||
var log = this.cf_log; | ||
if (this.cf_conn) { | ||
this.cf_conn.destroy(); | ||
this.cf_log = this.cf_log.child({ localPort: null }); | ||
} | ||
this.cf_conn = undefined; | ||
SocketMgrFSM.prototype.state_backoff = function (S) { | ||
S.validTransitions(['failed', 'connecting']); | ||
if (this.cf_shadow) { | ||
this.cf_shadow.sh_error = true; | ||
this.cf_shadow = undefined; | ||
} | ||
/* | ||
* If the closeAfter flag is set, and this is a connection to a "dead" | ||
* backend (i.e., a "monitor" watching to see when it comes back), then | ||
* exit now. For an ordinary backend, we don't want to do this, | ||
* because we want to give ourselves the opportunity to run out of | ||
* retries. | ||
* | ||
* Otherwise, in a situation where we have two connections that were | ||
* created at the same time, one to a failing backend that's already | ||
* declared dead, and one to a different failing backend not yet | ||
* declared, we may never learn that the second backend is down and | ||
* declare it dead. The already declared dead backend may exit first | ||
* during a pool reshuffle and cause this one to exit prematurely | ||
* (there's a race in who exits first and causes the planner to engage) | ||
* Unfortunately, "retries" actually means "attempts" in the cueball | ||
* API. To preserve compatibility we have to compare to 1 here instead | ||
* of 0. | ||
*/ | ||
if (this.cf_retries === Infinity && this.cf_closeAfter) { | ||
this.cf_retriesLeft = 0; | ||
log.trace('backoff monitor shut down'); | ||
S.gotoState('closed'); | ||
if (this.sm_retriesLeft !== Infinity && this.sm_retriesLeft <= 1) { | ||
S.gotoState('failed'); | ||
return; | ||
} | ||
var delay = this.sm_delay; | ||
if (this.sm_retries !== Infinity) { | ||
--this.sm_retriesLeft; | ||
this.sm_delay *= 2; | ||
this.sm_timeout *= 2; | ||
if (this.sm_timeout > this.sm_maxTimeout) | ||
this.sm_timeout = this.sm_maxTimeout; | ||
if (this.sm_delay > this.sm_maxDelay) | ||
this.sm_delay = this.sm_maxDelay; | ||
} | ||
var t = S.timeout(delay, function () { | ||
S.gotoState('connecting'); | ||
}); | ||
/* | ||
* If this backend has been removed from the Resolver, we should not | ||
* attempt any kind of reconnection. Exit now. | ||
* If we're in monitor mode, unref the backoff timer. That way we | ||
* don't block a commandline app from exiting that wants to exit as | ||
* soon as a pool enters 'failed'. | ||
*/ | ||
if (!this.cf_pool.shouldRetryBackend(this.cf_backend.key)) { | ||
log.trace('pool no longer wants us, closing'); | ||
if (this.sm_monitor) | ||
t.unref(); | ||
S.on(this, 'closeAsserted', function () { | ||
S.gotoState('closed'); | ||
return; | ||
}); | ||
}; | ||
SocketMgrFSM.prototype.state_closed = function (S) { | ||
S.validTransitions(['backoff', 'connecting']); | ||
var log = this.sm_log; | ||
if (this.sm_socket) { | ||
this.sm_socket.destroy(); | ||
this.sm_log = this.sm_log.child({ localPort: null }); | ||
} | ||
this.sm_socket = undefined; | ||
if (this.cf_retries !== Infinity) | ||
--this.cf_retriesLeft; | ||
log.trace('connection closed'); | ||
if (this.cf_retries === Infinity || this.cf_retriesLeft > 0) { | ||
log.trace(this.cf_lastError, 'failed to connect to ' + | ||
'backend, %d retries left: delaying before retry', | ||
this.cf_retriesLeft); | ||
S.gotoState('delay'); | ||
S.on(this, 'retryAsserted', function () { | ||
S.gotoState('backoff'); | ||
}); | ||
S.on(this, 'connectAsserted', function () { | ||
S.gotoState('connecting'); | ||
}); | ||
}; | ||
SocketMgrFSM.prototype.state_failed = function (S) { | ||
S.validTransitions([]); | ||
this.sm_log.warn(this.sm_lastError, 'failed to connect to ' + | ||
'backend, retries exhausted'); | ||
this.sm_pool._incrCounter('retries-exhausted'); | ||
}; | ||
/* | ||
* ClaimHandle is a state machine representing a "claim handle". We give | ||
* these out to clients of the Cueball pool, so that they can call either | ||
* release() or close() on them when their use of the connection is complete. | ||
* | ||
* They also handle the process of finding a Slot in the pool to fulfill the | ||
* claim and getting the connection successfully returned to the user. | ||
* | ||
* Some ancilliary responsibilities relate to the user interface features we | ||
* support, like detecting leaked event handlers on re-useable connections and | ||
* cancelling the process mid-way. | ||
* | ||
* A ClaimHandle is created immediately when .claim() is called on a pool. | ||
* The pool then chooses slots that it will attempt to use to fulfill the claim. | ||
* Each time the pool tries another slot, it calls .try() on the ClaimHandle. | ||
* The SlotFSM will then either call .accept() or .reject() on the handle. If | ||
* .reject() is called, we try another slot. If .accept() is called, we proceed | ||
* to state "claimed". From "claimed", the end-user of the cueball pool calls | ||
* either one of .release() or .close() to relinquish their claim. | ||
* | ||
* timeout +--------+ | ||
* +--------------------> | | | ||
* | | | | ||
* | | failed | | ||
* +----+----+ .fail() | | | ||
* | +---------------> | | | ||
* | | +--------+ | ||
* init -> | waiting | | ||
* | | .cancel() | ||
* | +---------------------+ | ||
* | | | | ||
* +-+-------+ | | ||
* .try() | ^ | | ||
* | | | | ||
* | | v | ||
* | | +-----------+ | ||
* | | | | | ||
* | | | cancelled | | ||
* | | | | | ||
* | | .reject() +-----------+ | ||
* v | && !cancel ^ | ||
* +------+---+ | | ||
* | | | | ||
* | claiming +--------------------+ | ||
* | | .reject() && ^ | ||
* +----+-----+ cancel | | ||
* .accept() | | | ||
* | +------------+ | ||
* | | | ||
* v | | ||
* +---------+ cancel | | ||
* | +--------+ +--------+ | ||
* | claimed | | | | ||
* | | .close() | closed | | ||
* | +-------------> | | | ||
* +----+----+ +--------+ | ||
* | .release() | ||
* | | ||
* | | ||
* v | ||
* +----------+ | ||
* | | | ||
* | released | | ||
* | | | ||
* +----------+ | ||
* | ||
*/ | ||
function CueBallClaimHandle(options) { | ||
mod_assert.object(options, 'options'); | ||
mod_assert.number(options.claimTimeout, 'options.claimTimeout'); | ||
this.ch_claimTimeout = options.claimTimeout; | ||
mod_assert.object(options.pool, 'options.pool'); | ||
this.ch_pool = options.pool; | ||
mod_assert.string(options.claimStack, 'options.claimStack'); | ||
this.ch_claimStack = options.claimStack.split('\n').slice(1). | ||
map(function (l) { return (l.replace(/^[ ]*at /, '')); }); | ||
/* The user callback provided to Pool#claim() */ | ||
mod_assert.func(options.callback, 'options.callback'); | ||
this.ch_callback = options.callback; | ||
mod_assert.object(options.log, 'options.log'); | ||
this.ch_log = options.log.child({ | ||
component: 'CueBallClaimHandle' | ||
}); | ||
this.ch_slot = undefined; | ||
this.ch_releaseStack = undefined; | ||
this.ch_connection = undefined; | ||
this.ch_preListeners = {}; | ||
this.ch_cancelled = false; | ||
this.ch_lastError = undefined; | ||
FSM.call(this, 'waiting'); | ||
} | ||
mod_util.inherits(CueBallClaimHandle, FSM); | ||
CueBallClaimHandle.prototype.try = function (slot) { | ||
mod_assert.ok(this.isInState('waiting'), 'ClaimHandle#try may only ' + | ||
'be called in state "waiting" (is in "' + this.getState() + '")'); | ||
mod_assert.ok(slot.isInState('idle'), 'ClaimHandle#try may only ' + | ||
'be called on a slot in state "idle" (is in "' + slot.getState() + | ||
'")'); | ||
this.ch_slot = slot; | ||
this.emit('tryAsserted'); | ||
}; | ||
CueBallClaimHandle.prototype.accept = function (connection) { | ||
mod_assert.ok(this.isInState('claiming')); | ||
this.ch_connection = connection; | ||
this.emit('accepted'); | ||
}; | ||
CueBallClaimHandle.prototype.reject = function () { | ||
mod_assert.ok(this.isInState('claiming')); | ||
this.emit('rejected'); | ||
}; | ||
CueBallClaimHandle.prototype.cancel = function () { | ||
if (this.isInState('claimed')) { | ||
this.release(); | ||
} else { | ||
log.warn(this.cf_lastError, 'failed to connect to backend, ' + | ||
'retries exhausted'); | ||
this.cf_pool._incrCounter('retries-exhausted'); | ||
S.gotoState('closed'); | ||
this.ch_cancelled = true; | ||
this.emit('cancelled'); | ||
} | ||
}; | ||
ConnectionFSM.prototype.state_delay = function (S) { | ||
S.validTransitions(['connect', 'closed']); | ||
var delay = this.cf_delay; | ||
CueBallClaimHandle.prototype.fail = function (err) { | ||
this.emit('error', err); | ||
}; | ||
this.cf_delay *= 2; | ||
this.cf_timeout *= 2; | ||
if (this.cf_timeout > this.cf_maxTimeout) | ||
this.cf_timeout = this.cf_maxTimeout; | ||
if (this.cf_delay > this.cf_maxDelay) | ||
this.cf_delay = this.cf_maxDelay; | ||
CueBallClaimHandle.prototype.relinquish = function (event) { | ||
if (!this.isInState('claimed')) { | ||
if (this.isInState('released') || this.isInState('closed')) { | ||
var err = new Error('Connection not claimed by ' + | ||
'this handle, released by ' + | ||
this.ch_releaseStack[2]); | ||
throw (err); | ||
} | ||
throw (new Error('ClaimHandle#release() called while in ' + | ||
'state "' + this.getState() + '"')); | ||
} | ||
var e = mod_utils.maybeCaptureStackTrace(); | ||
this.ch_releaseStack = e.stack.split('\n').slice(1). | ||
map(function (l) { return (l.replace(/^[ ]*at /, '')); }); | ||
this.emit(event); | ||
}; | ||
var t = S.timeout(delay, function () { | ||
S.gotoState('connect'); | ||
}); | ||
t.unref(); | ||
S.on(this, 'closeAsserted', function () { | ||
S.gotoState('closed'); | ||
}); | ||
CueBallClaimHandle.prototype.release = function () { | ||
this.relinquish('releaseAsserted'); | ||
}; | ||
ConnectionFSM.prototype.state_idle = function (S) { | ||
S.validTransitions(['busy', 'error', 'closed']); | ||
CueBallClaimHandle.prototype.close = function () { | ||
this.relinquish('closeAsserted'); | ||
}; | ||
CueBallClaimHandle.prototype.state_waiting = function (S) { | ||
S.validTransitions(['claiming', 'cancelled', 'failed']); | ||
var self = this; | ||
this.cf_claimed = false; | ||
this.cf_claimStack = []; | ||
this.ch_slot = undefined; | ||
if (typeof (self.cf_conn.localPort) === 'number') { | ||
this.cf_log = this.cf_log.child({ | ||
localPort: self.cf_conn.localPort | ||
S.on(this, 'tryAsserted', function () { | ||
S.gotoState('claiming'); | ||
}); | ||
if (isFinite(this.ch_claimTimeout)) { | ||
S.timeout(this.ch_claimTimeout, function () { | ||
self.ch_lastError = new mod_errors.ClaimTimeoutError( | ||
self.ch_pool); | ||
S.gotoState('failed'); | ||
}); | ||
} | ||
this.cf_log.trace('connected, idling'); | ||
S.on(this, 'error', function (err) { | ||
self.ch_lastError = err; | ||
S.gotoState('failed'); | ||
}); | ||
if (this.cf_shadow) { | ||
this.cf_shadow.sh_claimed = false; | ||
this.cf_shadow.sh_releaseStack = this.cf_releaseStack; | ||
this.cf_shadow = undefined; | ||
S.on(this, 'cancelled', function () { | ||
S.gotoState('cancelled'); | ||
}); | ||
}; | ||
CueBallClaimHandle.prototype.state_claiming = function (S) { | ||
S.validTransitions(['claimed', 'waiting']); | ||
var self = this; | ||
S.on(this, 'accepted', function () { | ||
S.gotoState('claimed'); | ||
}); | ||
S.on(this, 'rejected', function () { | ||
if (self.ch_cancelled) { | ||
S.gotoState('cancelled'); | ||
} else { | ||
S.gotoState('waiting'); | ||
} | ||
}); | ||
this.ch_slot.claim(this); | ||
}; | ||
CueBallClaimHandle.prototype.state_claimed = function (S) { | ||
var self = this; | ||
S.validTransitions(['released', 'closed']); | ||
S.on(this, 'releaseAsserted', function () { | ||
S.gotoState('released'); | ||
}); | ||
S.on(this, 'closeAsserted', function () { | ||
S.gotoState('closed'); | ||
}); | ||
if (this.ch_cancelled) { | ||
S.gotoState('released'); | ||
return; | ||
} | ||
this.ch_preListeners = {}; | ||
['close', 'error', 'readable', 'data'].forEach(function (evt) { | ||
var newCount = countListeners(self.cf_conn, evt); | ||
var oldCount = self.cf_oldListeners[evt]; | ||
var count = countListeners(self.ch_connection, evt); | ||
self.ch_preListeners[evt] = count; | ||
}); | ||
S.on(this.ch_connection, 'error', function clHandleErrorListener(err) { | ||
var count = countListeners(self.ch_connection, 'error'); | ||
if (count === 0) { | ||
/* | ||
* Our end-user never set up an 'error' event listener | ||
* and the socket emitted 'error'. We want to act like | ||
* nothing is listening for it at all and throw. | ||
*/ | ||
throw (err); | ||
} | ||
}); | ||
this.ch_log = this.ch_slot.makeChildLogger({ | ||
component: 'CueBallClaimHandle' | ||
}); | ||
this.ch_callback(null, this, this.ch_connection); | ||
}; | ||
CueBallClaimHandle.prototype.state_released = function (S) { | ||
S.validTransitions([]); | ||
var conn = this.ch_connection; | ||
var self = this; | ||
['close', 'error', 'readable', 'data'].forEach(function (evt) { | ||
var newCount = countListeners(conn, evt); | ||
var oldCount = self.ch_preListeners[evt]; | ||
if (oldCount !== undefined && newCount > oldCount) { | ||
var info = {}; | ||
info.stack = self.cf_releaseStack; | ||
info.stack = self.ch_stack; | ||
info.countBeforeClaim = oldCount; | ||
info.countAfterRelease = newCount; | ||
info.handlers = self.cf_conn.listeners(evt).map( | ||
info.handlers = conn.listeners(evt).map( | ||
function (f) { | ||
@@ -420,163 +688,29 @@ /* | ||
info.event = evt; | ||
self.cf_log.warn(info, 'connection claimer looks ' + | ||
self.ch_log.warn(info, 'connection claimer looks ' + | ||
'like it leaked event handlers'); | ||
} | ||
}); | ||
}; | ||
CueBallClaimHandle.prototype.state_closed = function (S) { | ||
S.validTransitions([]); | ||
/* | ||
* Reset retries and retry delay to their defaults since we are now | ||
* connected. | ||
* Don't bother to check for leaked event handlers here, as the | ||
* connection is being closed anyway. | ||
*/ | ||
this.cf_retries = this.cf_connectRecov.retries; | ||
this.cf_retriesLeft = this.cf_connectRecov.retries; | ||
this.cf_minDelay = this.cf_connectRecov.delay; | ||
this.cf_delay = this.cf_connectRecov.delay; | ||
this.cf_maxDelay = this.cf_connectRecov.maxDelay || Infinity; | ||
this.cf_timeout = this.cf_connectRecov.timeout; | ||
this.cf_maxTimeout = this.cf_connectRecov.maxTimeout || Infinity; | ||
if (this.cf_closeAfter === true) { | ||
this.cf_closeAfter = false; | ||
this.cf_lastError = undefined; | ||
S.on(this, 'closeAsserted', function () { }); | ||
S.gotoState('closed'); | ||
return; | ||
} | ||
if (this.cf_doRef) | ||
this.cf_conn.unref(); | ||
S.on(this, 'claimAsserted', function () { | ||
S.gotoState('busy'); | ||
}); | ||
S.on(this.cf_conn, 'error', function (err) { | ||
self.cf_lastError = err; | ||
S.gotoState('error'); | ||
self.cf_pool._incrCounter('error-during-idle'); | ||
}); | ||
S.on(this.cf_conn, 'close', function () { | ||
/* | ||
* If we receive 'close' while idle with our closeAfter flag | ||
* set, we were going to be removed anyway. Just go to closed. | ||
* Going to error would give us a chance to learn about a dead | ||
* backend, but a clean 'close' with no 'error' is probably | ||
* not dead. | ||
* | ||
* This is particularly important with ConnectionSets, where | ||
* this is the normal path that's taken for the Set's consumer | ||
* to notify it that this connection has drained and closed. | ||
*/ | ||
if (self.cf_closeAfter === true) { | ||
S.gotoState('closed'); | ||
} else { | ||
self.cf_lastError = | ||
new mod_errors.ConnectionClosedError(self); | ||
S.gotoState('error'); | ||
self.cf_pool._incrCounter('close-during-idle'); | ||
} | ||
}); | ||
S.on(this.cf_conn, 'end', function () { | ||
if (self.cf_closeAfter === true) { | ||
S.gotoState('closed'); | ||
} else { | ||
self.cf_lastError = new | ||
mod_errors.ConnectionClosedError(self); | ||
S.gotoState('error'); | ||
self.cf_pool._incrCounter('end-during-idle'); | ||
} | ||
}); | ||
S.on(this, 'closeAsserted', function () { | ||
S.gotoState('closed'); | ||
}); | ||
if (this.cf_checkTimeout !== undefined && | ||
this.cf_checkTimeout !== null) { | ||
var now = new Date(); | ||
var sinceLast = (now - this.cf_lastCheck); | ||
var delay; | ||
if (sinceLast > this.cf_checkTimeout) { | ||
delay = 1000; | ||
} else { | ||
delay = this.cf_checkTimeout - sinceLast; | ||
if (delay < 1000) | ||
delay = 1000; | ||
} | ||
var t = S.timeout(delay, function () { | ||
S.gotoState('ping'); | ||
}); | ||
t.unref(); | ||
} | ||
}; | ||
ConnectionFSM.prototype.state_ping = function (S) { | ||
S.validTransitions(['error', 'closed', 'idle']); | ||
this.cf_lastCheck = new Date(); | ||
this.cf_claimStack = [ | ||
'ConnectionFSM.prototype.state_ping', | ||
'(periodic_health_check)']; | ||
this.cf_claimed = true; | ||
var self = this; | ||
if (this.cf_doRef) | ||
this.cf_conn.ref(); | ||
this.cf_releaseStack = []; | ||
this.cf_log.trace('doing health check'); | ||
CueBallClaimHandle.prototype.state_cancelled = function (S) { | ||
S.validTransitions([]); | ||
/* | ||
* Write down the count of event handlers on the backing object so that | ||
* we can spot if the client leaked any common ones in release(). | ||
* The public API requires that the callback function not be called at | ||
* all when .cancel() has been used. So we do nothing here. | ||
*/ | ||
this.cf_oldListeners = {}; | ||
['close', 'error', 'readable', 'data'].forEach(function (evt) { | ||
var count = countListeners(self.cf_conn, evt); | ||
self.cf_oldListeners[evt] = count; | ||
}); | ||
}; | ||
/* | ||
* The ConnectionHandle is a one-time use object that proxies calls to | ||
* our release() and close() functions. We use it so that we can assert | ||
* that this particular client only releases us once. If we only | ||
* asserted on our current state, there could be a race where we get | ||
* claimed by a different client in the meantime. | ||
*/ | ||
this.cf_shadow = new ConnectionHandle(this); | ||
S.on(this, 'releaseAsserted', function () { | ||
if (self.cf_closeAfter === true) { | ||
S.gotoState('closed'); | ||
} else { | ||
S.gotoState('idle'); | ||
} | ||
}); | ||
S.on(this.cf_conn, 'error', function (err) { | ||
self.cf_lastError = err; | ||
S.gotoState('error'); | ||
self.cf_pool._incrCounter('error-during-ping'); | ||
}); | ||
S.on(this.cf_conn, 'close', function () { | ||
self.cf_lastError = new mod_errors.ConnectionClosedError(self); | ||
S.gotoState('error'); | ||
self.cf_pool._incrCounter('close-during-ping'); | ||
}); | ||
S.on(this.cf_conn, 'end', function () { | ||
self.cf_lastError = new mod_errors.ConnectionClosedError(self); | ||
S.gotoState('error'); | ||
self.cf_pool._incrCounter('end-during-ping'); | ||
}); | ||
S.on(this, 'closeAsserted', function () { | ||
self.cf_lastError = new mod_errors.ConnectionClosedError(self); | ||
S.gotoState('error'); | ||
}); | ||
var t = S.timeout(this.cf_checkTimeout, function () { | ||
var info = {}; | ||
info.stack = self.cf_claimStack; | ||
self.cf_log.warn(info, 'health check is taking too ' + | ||
'long to run (has been more than %d ms)', | ||
self.cf_checkTimeout); | ||
}); | ||
t.unref(); | ||
CueBallClaimHandle.prototype.state_failed = function (S) { | ||
S.validTransitions([]); | ||
var self = this; | ||
S.immediate(function () { | ||
self.cf_checker.call(null, self.cf_shadow, self.cf_conn); | ||
self.ch_callback(self.ch_lastError); | ||
}); | ||
@@ -599,2 +733,7 @@ }; | ||
} | ||
/* Don't count the error listeners set up by cueball. */ | ||
if (h.name === 'socketMgrErrorListener' || | ||
h.name === 'clHandleErrorListener') { | ||
return (false); | ||
} | ||
return (true); | ||
@@ -605,104 +744,398 @@ }); | ||
ConnectionFSM.prototype.state_busy = function (S) { | ||
S.validTransitions(['error', 'closed', 'idle']); | ||
var self = this; | ||
if (this.cf_doRef) | ||
this.cf_conn.ref(); | ||
/* | ||
* ConnectionSlotFSM is a large part of the guts of cueball connection | ||
* management. Its primary job is to construct and manage a SocketMgrFSM, | ||
* making decisions about when to retry and what to do when it's time to | ||
* give up. | ||
* | ||
* It also handles reporting to the Pool or Set that hosts this slot, as its | ||
* state graph represents only the transitions that the Pool or Set actually | ||
* cares about. | ||
* | ||
* SlotFSMs have two key flags passed in by the Pool or Set: "wanted" and | ||
* "monitor". "Monitor" indicates that this connection is a monitor connection | ||
* (i.e. the backend that it is connecting to is expected to be dead, and this | ||
* FSM's job is to monitor it for when it comes back). The "wanted" flag is | ||
* always true at construction but may be set to false by the Pool or Set. When | ||
* so set, it indicates that this slot is no longer "wanted" and should cease | ||
* operation as soon as possible (by moving to "failed" or "stopped"). | ||
* | ||
* init (startup) | ||
* | +--------+ | ||
* | +-----------> | failed | <-----------+ | ||
* | | +--------+ | | ||
* | | | | ||
* | | | +---------+ | ||
* | smgr | smgr | | | | ||
* | failed | failed | | | | ||
* | | | v smgr | | ||
* | +--------+-------+ +-------+--------+ error | | ||
* +--> | connecting | smgr error | retrying +-------+ | ||
* (A)---> | +------------------> | | | ||
* | smgr.connect() | | smgr.retry() | <-+ | ||
* +-------+--------+ +-------+--------+ | | ||
* | ^ ^ | | | ||
* smgr | | | | smgr | | ||
* connected | | | | connected | | ||
* | | smgr smgr | | | | ||
* | | closed +--------+ error | | | | ||
* | +-----------+ +----------+ | | | ||
* | | | | | | ||
* +------------> | idle | <-----------+ | | ||
* | | | | ||
* +----------------------+ | <-----------+ | | ||
* | !wanted && smgr +---+----+ | | | ||
* | connected | .claim() | | | ||
* v | | | | ||
* +--------------+ | | | | ||
* | stopping | v | | | ||
* | | +--------+ | | | ||
* | smgr.close() | <------------+ +-------------+ | | ||
* +------+-------+ hdl rel. && | | hdl released && | | ||
* | smgr conn && | | smgr connected && | | ||
* smgr | !wanted | | wanted | | ||
* closed | | | | | ||
* | | busy +----------------->(A) | | ||
* v | | hdl released && | | ||
* +---------+ | | smgr closed | | ||
* | stopped | <---------------+ | | | ||
* +---------+ hdl rel. && | +--------------------------+ | ||
* !(smgr conn) && | | hdl closed && ^ | ||
* !wanted +-----+--+ !(smgr connected) | | ||
* | | | ||
* | hdl closed && | | ||
* | smgr connected | | ||
* | | | ||
* v | | ||
* +--------------+ | | ||
* | killing +----------------+ | ||
* | | smgr closed | ||
* | smgr.close() | | ||
* +--------------+ | ||
*/ | ||
function ConnectionSlotFSM(options) { | ||
mod_assert.object(options, 'options'); | ||
mod_assert.object(options.pool, 'options.pool'); | ||
mod_assert.func(options.constructor, 'options.constructor'); | ||
mod_assert.object(options.backend, 'options.backend'); | ||
mod_assert.object(options.log, 'options.log'); | ||
mod_assert.object(options.recovery, 'options.recovery'); | ||
mod_assert.bool(options.monitor, 'options.monitor'); | ||
this.cf_releaseStack = []; | ||
this.cf_log.trace('busy, claimed by %s', | ||
this.cf_claimStack[1].split(' ')[0]); | ||
mod_assert.optionalFunc(options.checker, 'options.checker'); | ||
mod_assert.optionalNumber(options.checkTimeout, 'options.checkTimeout'); | ||
/* | ||
* Write down the count of event handlers on the backing object so that | ||
* we can spot if the client leaked any common ones in release(). | ||
*/ | ||
this.cf_oldListeners = {}; | ||
['close', 'error', 'readable', 'data'].forEach(function (evt) { | ||
var count = countListeners(self.cf_conn, evt); | ||
self.cf_oldListeners[evt] = count; | ||
this.csf_pool = options.pool; | ||
this.csf_backend = options.backend; | ||
this.csf_wanted = true; | ||
this.csf_handle = undefined; | ||
this.csf_monitor = options.monitor; | ||
this.csf_checker = options.checker; | ||
this.csf_checkTimeout = options.checkTimeout; | ||
this.csf_log = options.log.child({ | ||
component: 'CueBallConnectionSlotFSM', | ||
backend: this.csf_backend.key, | ||
address: this.csf_backend.address, | ||
port: this.csf_backend.port | ||
}); | ||
/* | ||
* The ConnectionHandle is a one-time use object that proxies calls to | ||
* our release() and close() functions. We use it so that we can assert | ||
* that this particular client only releases us once. If we only | ||
* asserted on our current state, there could be a race where we get | ||
* claimed by a different client in the meantime. | ||
*/ | ||
this.cf_shadow = new ConnectionHandle(this); | ||
var smgrOpts = { | ||
pool: options.pool, | ||
constructor: options.constructor, | ||
backend: options.backend, | ||
log: options.log, | ||
recovery: options.recovery, | ||
monitor: options.monitor, | ||
slot: this | ||
}; | ||
this.csf_smgr = new SocketMgrFSM(smgrOpts); | ||
S.on(this, 'releaseAsserted', function () { | ||
if (self.cf_closeAfter === true) { | ||
S.gotoState('closed'); | ||
} else { | ||
FSM.call(this, 'init'); | ||
} | ||
mod_util.inherits(ConnectionSlotFSM, FSM); | ||
ConnectionSlotFSM.prototype.setUnwanted = function () { | ||
if (this.csf_wanted === false) | ||
return; | ||
this.csf_wanted = false; | ||
this.emit('unwanted'); | ||
}; | ||
ConnectionSlotFSM.prototype.start = function () { | ||
mod_assert.ok(this.isInState('init')); | ||
this.emit('startAsserted'); | ||
}; | ||
ConnectionSlotFSM.prototype.claim = function (handle) { | ||
mod_assert.ok(this.isInState('idle')); | ||
mod_assert.strictEqual(this.csf_handle, undefined); | ||
this.csf_handle = handle; | ||
this.emit('claimAsserted'); | ||
}; | ||
ConnectionSlotFSM.prototype.makeChildLogger = function (args) { | ||
return (this.csf_log.child(args)); | ||
}; | ||
ConnectionSlotFSM.prototype.getSocketMgr = function () { | ||
return (this.csf_smgr); | ||
}; | ||
ConnectionSlotFSM.prototype.state_init = function (S) { | ||
S.on(this, 'startAsserted', function () { | ||
S.gotoState('connecting'); | ||
}); | ||
}; | ||
ConnectionSlotFSM.prototype.state_connecting = function (S) { | ||
S.validTransitions(['failed', 'retrying', 'idle']); | ||
var smgr = this.csf_smgr; | ||
S.on(smgr, 'stateChanged', function (st) { | ||
switch (st) { | ||
case 'init': | ||
case 'connecting': | ||
break; | ||
case 'failed': | ||
S.gotoState('failed'); | ||
break; | ||
case 'error': | ||
S.gotoState('retrying'); | ||
break; | ||
case 'connected': | ||
S.gotoState('idle'); | ||
break; | ||
default: | ||
throw (new Error('Unhandled smgr state transition: ' + | ||
'.connect() => "' + st + '"')); | ||
} | ||
}); | ||
S.on(this.cf_conn, 'error', function (err) { | ||
self.cf_log.error(err, 'connection emitted "error" while ' + | ||
'busy (claimed)'); | ||
self.cf_lastError = err; | ||
S.gotoState('error'); | ||
self.cf_pool._incrCounter('error-during-busy'); | ||
smgr.connect(); | ||
}; | ||
ConnectionSlotFSM.prototype.state_failed = function (S) { | ||
S.validTransitions([]); | ||
mod_assert.ok(this.csf_smgr.isInState('failed'), | ||
'smgr must be failed'); | ||
}; | ||
ConnectionSlotFSM.prototype.state_retrying = function (S) { | ||
S.validTransitions(['idle', 'failed', 'retrying', 'stopped', | ||
'stopping']); | ||
var self = this; | ||
var smgr = this.csf_smgr; | ||
S.on(smgr, 'stateChanged', function (st) { | ||
switch (st) { | ||
case 'backoff': | ||
case 'connecting': | ||
break; | ||
case 'failed': | ||
S.gotoState('failed'); | ||
break; | ||
case 'error': | ||
if (self.csf_monitor && !self.csf_wanted) { | ||
S.gotoState('stopped'); | ||
} else { | ||
S.gotoState('retrying'); | ||
} | ||
break; | ||
case 'connected': | ||
S.gotoState('idle'); | ||
break; | ||
default: | ||
throw (new Error('Unhandled smgr state transition: ' + | ||
'.retry() => "' + st + '"')); | ||
} | ||
}); | ||
S.on(this.cf_conn, 'end', function () { | ||
self.cf_closeAfter = true; | ||
self.cf_pool._incrCounter('end-during-busy'); | ||
S.on(this, 'unwanted', function () { | ||
if (self.csf_monitor && smgr.isInState('backoff')) { | ||
S.gotoState('stopping'); | ||
} | ||
}); | ||
S.on(this.cf_conn, 'close', function () { | ||
self.cf_lastError = new mod_errors.ConnectionClosedError(self); | ||
S.gotoState('error'); | ||
self.cf_pool._incrCounter('close-during-busy'); | ||
smgr.retry(); | ||
}; | ||
ConnectionSlotFSM.prototype.state_idle = function (S) { | ||
var self = this; | ||
var smgr = this.csf_smgr; | ||
this.csf_handle = undefined; | ||
if (smgr.isInState('connected')) { | ||
var sock = smgr.getSocket(); | ||
/* | ||
* If we can, unref the socket while idle, so that a bunch of | ||
* spare connections doing nothing don't hold the process open. | ||
*/ | ||
if (typeof (sock.unref) === 'function') | ||
sock.unref(); | ||
} | ||
/* Monitor successfully connected: make it into a normal slot now. */ | ||
if (this.csf_monitor === true) { | ||
this.csf_monitor = false; | ||
smgr.setMonitor(false); | ||
} | ||
if (!this.csf_wanted) { | ||
onUnwanted(); | ||
return; | ||
} | ||
S.on(this, 'unwanted', onUnwanted); | ||
function onUnwanted() { | ||
if (smgr.isInState('connected')) { | ||
S.gotoState('stopping'); | ||
} | ||
} | ||
S.on(smgr, 'stateChanged', function (st) { | ||
switch (st) { | ||
case 'error': | ||
S.gotoState('retrying'); | ||
break; | ||
case 'closed': | ||
if (!self.csf_wanted) { | ||
S.gotoState('stopped'); | ||
} else { | ||
S.gotoState('connecting'); | ||
} | ||
break; | ||
default: | ||
throw (new Error('Unhandled smgr state transition: ' + | ||
'connected => "' + st + '"')); | ||
} | ||
}); | ||
S.on(this, 'closeAsserted', function () { | ||
self.cf_lastError = new mod_errors.ConnectionClosedError(self); | ||
S.gotoState('error'); | ||
S.on(this, 'claimAsserted', function () { | ||
S.gotoState('busy'); | ||
}); | ||
if (this.cf_checkTimeout !== undefined && | ||
this.cf_checkTimeout !== null) { | ||
var t = S.timeout(this.cf_checkTimeout, function () { | ||
var info = {}; | ||
info.stack = self.cf_claimStack; | ||
self.cf_log.warn(info, 'connection held for longer ' + | ||
'than checkTimeout (%d ms), may have been leaked', | ||
self.cf_checkTimeout); | ||
if (this.csf_checkTimeout !== undefined && | ||
this.csf_checker !== undefined) { | ||
S.timeout(this.csf_checkTimeout, function () { | ||
doPingCheck(self, self.csf_checker); | ||
}); | ||
t.unref(); | ||
} | ||
}; | ||
function ConnectionHandle(cf) { | ||
this.sh_cf = cf; | ||
this.sh_claimed = true; | ||
this.sh_error = false; | ||
this.sh_releaseStack = []; | ||
function doPingCheck(fsm, checker) { | ||
var hdlOpts = { | ||
pool: fsm.csf_pool, | ||
claimStack: 'Error\n' + | ||
'at claim\n' + | ||
'at cueball.doPingCheck\n' + | ||
'at cueball.doPingCheck\n', | ||
callback: checker, | ||
log: fsm.csf_log, | ||
claimTimeout: Infinity | ||
}; | ||
var handle = new CueBallClaimHandle(hdlOpts); | ||
/* | ||
* Don't bother handling a return to "waiting" state here: if we | ||
* fail, it's fine, just let go of this handle entirely. | ||
*/ | ||
handle.try(fsm); | ||
} | ||
ConnectionHandle.prototype.close = function () { | ||
mod_assert.ok(this.sh_claimed, 'Connection not claimed by ' + | ||
'this handle, released by ' + this.sh_releaseStack[2]); | ||
if (this.sh_error) { | ||
this.sh_claimed = false; | ||
return (undefined); | ||
ConnectionSlotFSM.prototype.state_busy = function (S) { | ||
S.validTransitions(['idle', 'stopping', 'stopped', 'retrying', | ||
'killing', 'connecting']); | ||
var self = this; | ||
var smgr = this.csf_smgr; | ||
var hdl = this.csf_handle; | ||
function onRelease() { | ||
if (smgr.isInState('connected')) { | ||
if (self.csf_wanted) { | ||
S.gotoState('idle'); | ||
} else { | ||
S.gotoState('stopping'); | ||
} | ||
} else if (smgr.isInState('closed')) { | ||
if (self.csf_wanted) { | ||
S.gotoState('connecting'); | ||
} else { | ||
S.gotoState('stopped'); | ||
} | ||
} else if (smgr.isInState('error')) { | ||
S.gotoState('retrying'); | ||
} else { | ||
throw (new Error('Handle released while smgr was ' + | ||
'in unhandled state "' + smgr.getState() + '"')); | ||
} | ||
} | ||
return (this.sh_cf.close.apply(this.sh_cf, arguments)); | ||
}; | ||
ConnectionHandle.prototype.release = function () { | ||
mod_assert.ok(this.sh_claimed, 'Connection not claimed by ' + | ||
'this handle, released by ' + this.sh_releaseStack[2]); | ||
if (this.sh_error) { | ||
this.sh_claimed = false; | ||
return (undefined); | ||
function onClose() { | ||
if (smgr.isInState('connected')) { | ||
S.gotoState('killing'); | ||
} else { | ||
S.gotoState('retrying'); | ||
} | ||
} | ||
return (this.sh_cf.release.apply(this.sh_cf, arguments)); | ||
}; | ||
ConnectionHandle.prototype.closeAfterRelease = function () { | ||
mod_assert.ok(this.sh_claimed, 'Connection not claimed by ' + | ||
'this handle, released by ' + this.sh_releaseStack[2]); | ||
if (this.sh_error) { | ||
this.sh_claimed = false; | ||
return (undefined); | ||
S.on(hdl, 'stateChanged', function (st) { | ||
switch (st) { | ||
case 'released': | ||
S.immediate(onRelease); | ||
break; | ||
case 'closed': | ||
S.immediate(onClose); | ||
break; | ||
default: | ||
break; | ||
} | ||
}); | ||
/* | ||
* It's possible that the smgr has already moved out of 'connected' | ||
* by the time we get here. | ||
* | ||
* If we lose the race, treat it like our handle was released. | ||
*/ | ||
if (smgr.isInState('connected')) { | ||
var sock = smgr.getSocket(); | ||
if (typeof (sock.ref) === 'function') | ||
sock.ref(); | ||
hdl.accept(sock); | ||
} else { | ||
hdl.reject(); | ||
this.csf_handle = undefined; | ||
S.immediate(onRelease); | ||
} | ||
return (this.sh_cf.closeAfterRelease.apply(this.sh_cf, | ||
arguments)); | ||
}; | ||
ConnectionSlotFSM.prototype.state_killing = function (S) { | ||
S.validTransitions(['retrying']); | ||
var smgr = this.csf_smgr; | ||
S.on(smgr, 'stateChanged', function (st) { | ||
if (st === 'closed') { | ||
S.gotoState('retrying'); | ||
} | ||
}); | ||
smgr.close(); | ||
}; | ||
ConnectionSlotFSM.prototype.state_stopping = function (S) { | ||
S.validTransitions(['stopped']); | ||
var smgr = this.csf_smgr; | ||
S.on(smgr, 'stateChanged', function (st) { | ||
if (st === 'closed') { | ||
S.gotoState('stopped'); | ||
} | ||
}); | ||
smgr.close(); | ||
}; | ||
ConnectionSlotFSM.prototype.state_stopped = function (S) { | ||
S.validTransitions([]); | ||
var smgr = this.csf_smgr; | ||
mod_assert.ok(smgr.isInState('closed') || smgr.isInState('error') || | ||
smgr.isInState('failed'), 'smgr must be stopped'); | ||
}; |
@@ -122,3 +122,3 @@ /* | ||
var ks = cset.cs_keys.slice(); | ||
Object.keys(cset.cs_fsms).forEach(function (k) { | ||
Object.keys(cset.cs_fsm).forEach(function (k) { | ||
if (ks.indexOf(k) === -1) | ||
@@ -128,10 +128,8 @@ ks.push(k); | ||
ks.forEach(function (k) { | ||
var conns = cset.cs_fsms[k] || []; | ||
var fsm = cset.cs_fsm[k]; | ||
obj.fsms[k] = {}; | ||
conns.forEach(function (fsm) { | ||
var s = fsm.getState(); | ||
if (obj.fsms[k][s] === undefined) | ||
obj.fsms[k][s] = 0; | ||
++obj.fsms[k][s]; | ||
}); | ||
var s = fsm.getState(); | ||
if (obj.fsms[k][s] === undefined) | ||
obj.fsms[k][s] = 0; | ||
++obj.fsms[k][s]; | ||
}); | ||
@@ -138,0 +136,0 @@ obj.dead_backends = Object.keys(cset.cs_dead); |
251
lib/pool.js
@@ -31,3 +31,5 @@ /* | ||
const Queue = require('./queue'); | ||
const ConnectionFSM = require('./connection-fsm'); | ||
const mod_connfsm = require('./connection-fsm'); | ||
const ConnectionSlotFSM = mod_connfsm.ConnectionSlotFSM; | ||
const CueBallClaimHandle = mod_connfsm.CueBallClaimHandle; | ||
@@ -261,14 +263,27 @@ /* | ||
var idx = this.p_keys.indexOf(k); | ||
if (idx !== -1) | ||
this.p_keys.splice(idx, 1); | ||
mod_assert.notStrictEqual(idx, -1, 'resolver key ' + k + ' not found'); | ||
this.p_keys.splice(idx, 1); | ||
delete (this.p_backends[k]); | ||
(this.p_connections[k] || []).forEach(function (fsm) { | ||
if (fsm.isInState('busy')) | ||
fsm.closeAfterRelease(); | ||
else | ||
fsm.close(); | ||
delete (this.p_dead[k]); | ||
var self = this; | ||
mod_vasync.forEachParallel({ | ||
func: closeBackend, | ||
inputs: (this.p_connections[k] || []) | ||
}, function () { | ||
mod_assert.strictEqual(self.p_connections[k].length, 0); | ||
delete (self.p_connections[k]); | ||
self.rebalance(); | ||
}); | ||
delete (this.p_connections[k]); | ||
delete (this.p_dead[k]); | ||
this.rebalance(); | ||
function closeBackend(fsm, cb) { | ||
fsm.setUnwanted(); | ||
if (fsm.isInState('stopped') || fsm.isInState('failed')) { | ||
cb(); | ||
} else { | ||
fsm.on('stateChanged', function (st) { | ||
if (st === 'stopped' || st === 'failed') | ||
cb(); | ||
}); | ||
} | ||
} | ||
}; | ||
@@ -352,2 +367,9 @@ | ||
this._incrCounter('failed-state'); | ||
/* Fail all outstanding claims that are waiting for a connection. */ | ||
while (!this.p_waiters.isEmpty()) { | ||
var hdl = this.p_waiters.shift(); | ||
if (hdl.isInState('waiting')) | ||
hdl.fail(new mod_errors.PoolFailedError(self)); | ||
} | ||
}; | ||
@@ -413,10 +435,10 @@ | ||
function closeBackend(fsm, cb) { | ||
if (fsm.isInState('busy')) { | ||
fsm.closeAfterRelease(); | ||
fsm.setUnwanted(); | ||
if (fsm.isInState('stopped') || fsm.isInState('failed')) { | ||
cb(); | ||
} else { | ||
fsm.on('stateChanged', function (st) { | ||
if (st === 'closed') | ||
if (st === 'stopped' || st === 'failed') | ||
cb(); | ||
}); | ||
} else { | ||
fsm.close(cb); | ||
} | ||
@@ -533,27 +555,11 @@ } | ||
plan.remove.forEach(function (fsm) { | ||
/* This slot is no longer wanted. */ | ||
fsm.setUnwanted(); | ||
/* | ||
* Only tell the FSM to quit *right now* if either: | ||
* 1. it's idle | ||
* 2. there are other FSMs for this backend | ||
* 2. it is connected to a backend that has been | ||
* removed from the resolver | ||
* Otherwise get it to quit gracefully once it's done | ||
* doing whatever it's doing (using closeAfterRelease). | ||
* This way we when we have a failing backend that we | ||
* want to mark as "dead" ASAP, we don't give up early | ||
* and never figure out if it's actually dead or not. | ||
* We may have changed to stopped or failed synchronously after | ||
* setting unwanted. If we have, don't count this as a socket | ||
* against our cap (it's been destroyed). | ||
*/ | ||
var fsmIdx = self.p_connections[fsm.cf_backend.key]. | ||
indexOf(fsm); | ||
if (fsm.isInState('idle')) { | ||
fsm.close(); | ||
if (fsm.isInState('stopped') || fsm.isInState('failed')) { | ||
--total; | ||
} else if (fsm.isInState('busy')) { | ||
fsm.closeAfterRelease(); | ||
} else if (fsmIdx > 0 || | ||
self.p_keys.indexOf(fsm.cf_backend.key) === -1) { | ||
fsm.close(); | ||
--total; | ||
} else { | ||
fsm.closeAfterRelease(); | ||
} | ||
@@ -579,3 +585,3 @@ }); | ||
var fsm = new ConnectionFSM({ | ||
var fsm = new ConnectionSlotFSM({ | ||
constructor: this.p_constructor, | ||
@@ -587,3 +593,4 @@ backend: backend, | ||
checkTimeout: this.p_checkTimeout, | ||
recovery: this.p_recovery | ||
recovery: this.p_recovery, | ||
monitor: (this.p_dead[key] === true) | ||
}); | ||
@@ -600,4 +607,4 @@ if (this.p_connections[key] === undefined) | ||
/* These transitions mean we're still starting up. */ | ||
if (newState === 'init' || newState === 'delay' || | ||
newState === 'error' || newState === 'connect') | ||
if (newState === 'init' || newState === 'connecting' || | ||
newState === 'retrying') | ||
return; | ||
@@ -631,3 +638,3 @@ /* | ||
if (self.p_backends[key] === undefined) { | ||
fsm.close(); | ||
fsm.setUnwanted(); | ||
return; | ||
@@ -640,6 +647,8 @@ } | ||
*/ | ||
if (self.p_waiters.length > 0) { | ||
var cb = self.p_waiters.shift(); | ||
fsm.claim(cb.stack, cb); | ||
return; | ||
while (self.p_waiters.length > 0) { | ||
var hdl = self.p_waiters.shift(); | ||
if (hdl.isInState('waiting')) { | ||
hdl.try(fsm); | ||
return; | ||
} | ||
} | ||
@@ -661,3 +670,7 @@ | ||
if (newState === 'closed') { | ||
if (newState === 'failed') { | ||
self.p_dead[key] = true; | ||
} | ||
if (newState === 'stopped' || newState === 'failed') { | ||
if (self.p_connections[key]) { | ||
@@ -667,5 +680,2 @@ var idx = self.p_connections[key].indexOf(fsm); | ||
} | ||
if (fsm.retriesExhausted()) { | ||
self.p_dead[key] = true; | ||
} | ||
self.emit('closedBackend', key, fsm); | ||
@@ -691,5 +701,28 @@ self.rebalance(); | ||
CueBallConnectionPool.prototype.printConnections = function () { | ||
var self = this; | ||
var obj = { connections: {} }; | ||
var ks = self.p_keys.slice(); | ||
Object.keys(self.p_connections).forEach(function (k) { | ||
if (ks.indexOf(k) === -1) | ||
ks.push(k); | ||
}); | ||
ks.forEach(function (k) { | ||
var conns = self.p_connections[k] || []; | ||
obj.connections[k] = {}; | ||
conns.forEach(function (fsm) { | ||
var s = fsm.getState(); | ||
if (obj.connections[k][s] === undefined) | ||
obj.connections[k][s] = 0; | ||
++obj.connections[k][s]; | ||
}); | ||
}); | ||
console.log('live:', obj.connections); | ||
console.log('dead:', self.p_dead); | ||
}; | ||
CueBallConnectionPool.prototype.claim = function (options, cb) { | ||
var self = this; | ||
var done = false; | ||
var handle; | ||
@@ -733,84 +766,56 @@ if (typeof (options) === 'function' && cb === undefined) { | ||
/* If there are idle connections sitting around, take one. */ | ||
while (this.p_idleq.length > 0) { | ||
var fsm = this.p_idleq.shift(); | ||
delete (fsm.p_idleq_node); | ||
/* | ||
* Since 'stateChanged' is emitted async from mooremachine, | ||
* things may be on the idle queue still but not actually idle. | ||
* If we find one, just rip it off the queue (which we've | ||
* already done) and try the next thing. The state mgmt | ||
* callback from addConnection will cope. | ||
*/ | ||
if (!fsm.isInState('idle')) | ||
continue; | ||
handle = new CueBallClaimHandle({ | ||
pool: this, | ||
claimStack: e.stack, | ||
callback: cb, | ||
log: this.p_log, | ||
claimTimeout: timeout | ||
}); | ||
fsm.claim(e.stack, function (err, hdl, conn) { | ||
if (err) { | ||
if (!done) | ||
cb(err); | ||
done = true; | ||
return; | ||
} | ||
if (done) { | ||
hdl.release(); | ||
return; | ||
} | ||
done = true; | ||
cb(err, hdl, conn); | ||
}); | ||
return ({ | ||
cancel: function () { done = true; } | ||
}); | ||
function waitingListener(st) { | ||
if (st === 'waiting') { | ||
tryNext(); | ||
} | ||
} | ||
handle.on('stateChanged', waitingListener); | ||
if (errOnEmpty && this.p_resolver.count() < 1) { | ||
setImmediate(function () { | ||
if (!done) | ||
cb(new mod_errors.NoBackendsError(self)); | ||
done = true; | ||
}); | ||
return ({ | ||
cancel: function () { done = true; } | ||
}); | ||
} | ||
function tryNext() { | ||
if (!handle.isInState('waiting')) | ||
return; | ||
/* Otherwise add an entry on the "waiter" queue. */ | ||
var timer; | ||
var waiter = function () { | ||
if (timer !== undefined) | ||
clearTimeout(timer); | ||
timer = undefined; | ||
done = true; | ||
cb.apply(this, arguments); | ||
}; | ||
waiter.stack = e.stack; | ||
var qnode = this.p_waiters.push(waiter); | ||
/* If there are idle connections sitting around, take one. */ | ||
while (self.p_idleq.length > 0) { | ||
var fsm = self.p_idleq.shift(); | ||
delete (fsm.p_idleq_node); | ||
/* | ||
* Since 'stateChanged' is emitted async from | ||
* mooremachine, things may be on the idle queue still | ||
* but not actually idle. If we find one, just rip it | ||
* off the queue (which we've already done) and try the | ||
* next thing. The state mgmt callback from | ||
* addConnection will cope. | ||
*/ | ||
if (!fsm.isInState('idle')) | ||
continue; | ||
this._hwmCounter('max-claim-queue', this.p_waiters.length); | ||
this._incrCounter('queued-claim'); | ||
handle.try(fsm); | ||
if (timeout !== Infinity) { | ||
timer = setTimeout(function () { | ||
if (timer === undefined) | ||
return; | ||
return; | ||
} | ||
qnode.remove(); | ||
done = true; | ||
cb(new mod_errors.ClaimTimeoutError(self)); | ||
}, timeout); | ||
if (errOnEmpty && self.p_resolver.count() < 1) { | ||
var err = new mod_errors.NoBackendsError(self); | ||
handle.fail(err); | ||
} | ||
/* Otherwise add an entry on the "waiter" queue. */ | ||
self.p_waiters.push(handle); | ||
self._hwmCounter('max-claim-queue', self.p_waiters.length); | ||
self._incrCounter('queued-claim'); | ||
self.rebalance(); | ||
} | ||
this.rebalance(); | ||
var handle = {}; | ||
handle.cancel = function () { | ||
mod_assert.ok(done === false, 'callback was already called ' + | ||
'for this waiter handle'); | ||
if (timer !== undefined) | ||
clearTimeout(timer); | ||
timer = undefined; | ||
qnode.remove(); | ||
done = true; | ||
}; | ||
return (handle); | ||
}; |
270
lib/set.js
@@ -30,3 +30,5 @@ /* | ||
const Queue = require('./queue'); | ||
const ConnectionFSM = require('./connection-fsm'); | ||
const mod_cfsm = require('./connection-fsm'); | ||
const ConnectionSlotFSM = mod_cfsm.ConnectionSlotFSM; | ||
const CueBallClaimHandle = mod_cfsm.CueBallClaimHandle; | ||
@@ -68,6 +70,8 @@ function CueBallConnectionSet(options) { | ||
this.cs_backends = {}; | ||
/* Map of backend key => array of ConnectionFSM instances. */ | ||
this.cs_fsms = {}; | ||
/* Map of backend key => ConnectionSlotFSM instance. */ | ||
this.cs_fsm = {}; | ||
/* Map of backend key => bool, if true the backend is declared dead. */ | ||
this.cs_dead = {}; | ||
/* Map of backend key => claim handle. */ | ||
this.cs_handles = {}; | ||
@@ -85,3 +89,8 @@ /* | ||
this.cs_connections = {}; | ||
/* Map of backend key => Array of connection keys. */ | ||
this.cs_connectionKeys = {}; | ||
/* Map of connection key => bool, if true 'removed' has been emitted. */ | ||
this.cs_emitted = {}; | ||
/* For debugging, track when we last rebalanced. */ | ||
@@ -144,19 +153,15 @@ this.cs_lastRebalance = undefined; | ||
var cks = Object.keys(this.cs_connections).filter(function (ck) { | ||
return (ck.indexOf(k + '.') === 0); | ||
}); | ||
var fsm = self.cs_fsm[k]; | ||
if (fsm !== undefined) | ||
fsm.setUnwanted(); | ||
var fsms = self.cs_fsms[k] || []; | ||
fsms.forEach(function (fsm) { | ||
if (cks.length > 0 || fsm.isInState('idle')) { | ||
fsm.closeAfterRelease(); | ||
} else { | ||
fsm.close(); | ||
var cks = this.cs_connectionKeys[k]; | ||
(cks || []).forEach(function (ck) { | ||
var conn = self.cs_connections[ck]; | ||
var hdl = self.cs_handles[ck]; | ||
if (self.cs_emitted[ck] !== true) { | ||
self.cs_emitted[ck] = true; | ||
self.assertEmit('removed', ck, conn, hdl); | ||
} | ||
}); | ||
cks.forEach(function (ck) { | ||
var conn = self.cs_connections[ck]; | ||
delete (self.cs_connections[ck]); | ||
self.assertEmit('removed', ck, conn); | ||
}); | ||
}; | ||
@@ -269,3 +274,3 @@ | ||
S.validTransitions(['stopped']); | ||
var conns = this.cs_fsms; | ||
var conns = this.cs_fsm; | ||
var fsms = []; | ||
@@ -275,5 +280,3 @@ var self = this; | ||
Object.keys(conns).forEach(function (k) { | ||
conns[k].forEach(function (fsm) { | ||
fsms.push(fsm); | ||
}); | ||
fsms.push(conns[k]); | ||
}); | ||
@@ -292,21 +295,17 @@ mod_vasync.forEachParallel({ | ||
var k = fsm.cf_backend.key; | ||
var cks = Object.keys(self.cs_connections).filter( | ||
function (ck) { | ||
return (ck.indexOf(k + '.') === 0); | ||
}); | ||
var k = fsm.csf_backend.key; | ||
var cks = self.cs_connectionKeys[k]; | ||
fsm.on('stateChanged', function (s) { | ||
if (s === 'closed') | ||
if (s === 'stopped' || s === 'failed') | ||
cb(); | ||
}); | ||
if (cks.length === 0 && !fsm.isInState('idle')) { | ||
fsm.close(); | ||
} else { | ||
fsm.closeAfterRelease(); | ||
cks.forEach(function (ck) { | ||
var conn = self.cs_connections[ck]; | ||
delete (self.cs_connections[ck]); | ||
self.assertEmit('removed', ck, conn); | ||
}); | ||
} | ||
fsm.setUnwanted(); | ||
cks.forEach(function (ck) { | ||
var conn = self.cs_connections[ck]; | ||
var hdl = self.cs_handles[ck]; | ||
if (self.cs_emitted[ck] !== true) { | ||
self.cs_emitted[ck] = true; | ||
self.assertEmit('removed', ck, conn, hdl); | ||
} | ||
}); | ||
} | ||
@@ -319,3 +318,3 @@ }; | ||
this.cs_keys = []; | ||
this.cs_fsms = {}; | ||
this.cs_fsm = {}; | ||
this.cs_connections = {}; | ||
@@ -386,3 +385,5 @@ this.cs_backends = {}; | ||
this.cs_keys.forEach(function (k) { | ||
conns[k] = (self.cs_fsms[k] || []).slice(); | ||
conns[k] = []; | ||
if (self.cs_fsm[k] !== undefined) | ||
conns[k].push(self.cs_fsm[k]); | ||
total += conns[k].length; | ||
@@ -409,3 +410,3 @@ }); | ||
var k = fsm.cf_backend.key; | ||
var k = fsm.csf_backend.key; | ||
/* | ||
@@ -420,28 +421,14 @@ * Find any advertised connections from this FSM, and (after | ||
}); | ||
/* | ||
* We can close the FSM immediately if we aren't advertising | ||
* any connections for it, and we aren't waiting on our consumer | ||
* to close any -- i.e., the FSM is in an error state, probably | ||
* delay or connect. | ||
* | ||
* Still want to avoid doing .close() on the *last* one for a | ||
* given backend, so it has a chance to run out of retries if | ||
* the backend is in fact dead. So we do .closeAfterRelease() | ||
* instead for those. | ||
*/ | ||
if (cks.length === 0 && !fsm.isInState('idle')) { | ||
var fsmIdx = self.cs_fsms[k].indexOf(fsm); | ||
if (fsmIdx > 0 || self.cs_keys.indexOf(k) === -1) { | ||
fsm.close(); | ||
--total; | ||
} else { | ||
fsm.closeAfterRelease(); | ||
} | ||
} else { | ||
fsm.closeAfterRelease(); | ||
fsm.setUnwanted(); | ||
if (fsm.isInState('stopped') || fsm.isInState('failed')) { | ||
delete (self.cs_fsm[k]); | ||
--total; | ||
} | ||
cks.forEach(function (ck) { | ||
var conn = self.cs_connections[ck]; | ||
delete (self.cs_connections[ck]); | ||
self.assertEmit('removed', ck, conn); | ||
var hdl = self.cs_handles[ck]; | ||
if (self.cs_emitted[ck] !== true) { | ||
self.cs_emitted[ck] = true; | ||
self.assertEmit('removed', ck, conn, hdl); | ||
} | ||
}); | ||
@@ -453,2 +440,6 @@ }); | ||
return; | ||
/* Never make more than one slot for the same backend. */ | ||
if (self.cs_fsm[k] !== undefined) | ||
return; | ||
self.addConnection(k); | ||
@@ -471,2 +462,25 @@ }); | ||
function forceClaim(handle, fsm) { | ||
handle.on('stateChanged', hdlStateListener); | ||
function hdlStateListener(st) { | ||
if (st === 'waiting' && handle.isInState('waiting')) { | ||
if (fsm.isInState('idle')) { | ||
handle.try(fsm); | ||
} else { | ||
fsm.on('stateChanged', fsmStateListener); | ||
} | ||
} | ||
} | ||
function fsmStateListener(st) { | ||
if (st === 'idle' && fsm.isInState('idle')) { | ||
fsm.removeListener('stateChanged', fsmStateListener); | ||
if (handle.isInState('waiting')) { | ||
handle.try(fsm); | ||
} | ||
} | ||
} | ||
} | ||
CueBallConnectionSet.prototype.addConnection = function (key) { | ||
@@ -479,3 +493,3 @@ if (this.isInState('stopping') || this.isInState('stopped')) | ||
var fsm = new ConnectionFSM({ | ||
var fsm = new ConnectionSlotFSM({ | ||
constructor: this.cs_constructor, | ||
@@ -486,14 +500,47 @@ backend: backend, | ||
recovery: this.cs_recovery, | ||
doRef: false | ||
monitor: (this.cs_dead[key] === true) | ||
}); | ||
if (this.cs_fsms[key] === undefined) | ||
this.cs_fsms[key] = []; | ||
if (this.cs_serials[key] === undefined) | ||
this.cs_serials[key] = 1; | ||
this.cs_fsms[key].push(fsm); | ||
if (this.cs_connectionKeys[key] === undefined) | ||
this.cs_connectionKeys[key] = []; | ||
mod_assert.strictEqual(this.cs_fsm[key], undefined); | ||
this.cs_fsm[key] = fsm; | ||
var serial; | ||
var ckey; | ||
var smgr = fsm.getSocketMgr(); | ||
var self = this; | ||
fsm.on('stateChanged', function (newState) { | ||
if (newState === 'busy' && fsm.isInState('busy')) { | ||
mod_assert.notStrictEqual(ckey, undefined); | ||
mod_assert.notStrictEqual(self.cs_connections[ckey], | ||
undefined); | ||
return; | ||
} | ||
/* | ||
* If we already have a ckey set for this FSM, and it exists in | ||
* cs_connections, then we previously got a connection and | ||
* claimed it. Any state transition (other than to "busy", | ||
* which was excluded above) now means this connection's | ||
* claim handle has been released/closed and we must clean up | ||
* the associated entries. | ||
*/ | ||
if (ckey !== undefined && | ||
self.cs_connections[ckey] !== undefined) { | ||
mod_assert.ok(self.cs_emitted[ckey]); | ||
delete (self.cs_connections[ckey]); | ||
delete (self.cs_emitted[ckey]); | ||
delete (self.cs_handles[ckey]); | ||
var cks = self.cs_connectionKeys[key]; | ||
var ckIdx = cks.indexOf(ckey); | ||
mod_assert.notStrictEqual(ckIdx, -1); | ||
cks.splice(ckIdx, 1); | ||
ckey = undefined; | ||
} | ||
if (newState === 'idle' && fsm.isInState('idle')) { | ||
@@ -505,3 +552,3 @@ /* | ||
if (self.cs_backends[key] === undefined) { | ||
fsm.close(); | ||
fsm.setUnwanted(); | ||
return; | ||
@@ -513,34 +560,43 @@ } | ||
} | ||
if (ckey !== undefined && self.cs_connections[ckey]) { | ||
var conn = self.cs_connections[ckey]; | ||
delete (self.cs_connections[ckey]); | ||
self.assertEmit('removed', ckey, conn); | ||
/* We got a connection, so we're not dead. */ | ||
if (self.cs_dead[key] !== undefined) { | ||
delete (self.cs_dead[key]); | ||
} | ||
conn = fsm.getConnection(); | ||
serial = self.cs_serials[key]++; | ||
ckey = key + '.' + serial; | ||
conn.cs_serial = serial; | ||
fsm.cs_serial = serial; | ||
self.cs_connections[ckey] = conn; | ||
self.assertEmit('added', ckey, conn); | ||
var hdlOpts = { | ||
pool: self, | ||
claimStack: 'Error\n' + | ||
' at claim\n' + | ||
' at CueBallConnectionSet.addConnection\n' + | ||
' at CueBallConnectionSet.addConnection', | ||
callback: afterClaim, | ||
log: self.cs_log, | ||
claimTimeout: Infinity | ||
}; | ||
var handle = new CueBallClaimHandle(hdlOpts); | ||
self.rebalance(); | ||
return; | ||
} | ||
self.cs_handles[ckey] = handle; | ||
forceClaim(handle, fsm); | ||
if (newState !== 'idle') { | ||
if (self.cs_connections[ckey]) { | ||
conn = self.cs_connections[ckey]; | ||
delete (self.cs_connections[ckey]); | ||
self.assertEmit('removed', ckey, conn); | ||
function afterClaim(err, hdl, conn) { | ||
mod_assert.ok(!err); | ||
conn.cs_serial = serial; | ||
conn.cs_backendKey = key; | ||
self.cs_connections[ckey] = conn; | ||
self.cs_connectionKeys[key].push(ckey); | ||
self.assertEmit('added', ckey, conn, hdl); | ||
self.rebalance(); | ||
} | ||
return; | ||
} | ||
if (newState === 'closed') { | ||
if (self.cs_fsms[key]) { | ||
var idx = self.cs_fsms[key].indexOf(fsm); | ||
self.cs_fsms[key].splice(idx, 1); | ||
} | ||
if (newState === 'failed') { | ||
/* | ||
@@ -550,6 +606,9 @@ * Set the dead flag, but not on a backend that's no | ||
*/ | ||
if (fsm.retriesExhausted() && | ||
self.cs_backends[key] !== undefined) { | ||
if (self.cs_backends[key] !== undefined) { | ||
self.cs_dead[key] = true; | ||
} | ||
} | ||
if (newState === 'stopped' || newState === 'failed') { | ||
delete (self.cs_fsm[key]); | ||
self.emit('closedBackend', fsm); | ||
@@ -561,2 +620,20 @@ self.rebalance(); | ||
smgr.on('stateChanged', function (newState) { | ||
if (!fsm.isInState('busy')) | ||
return; | ||
if (newState === 'connected') | ||
return; | ||
/* | ||
* A transition out of 'connected' while the slot is still | ||
* 'busy' indicates that we lost the connection. We should | ||
* emit 'removed' for our clients. | ||
*/ | ||
mod_assert.string(ckey); | ||
if (self.cs_emitted[ckey] !== true) { | ||
self.cs_emitted[ckey] = true; | ||
self.assertEmit('removed', ckey, | ||
self.cs_connections[ckey], self.cs_handles[ckey]); | ||
} | ||
}); | ||
fsm.start(); | ||
@@ -567,4 +644,9 @@ }; | ||
var self = this; | ||
return (Object.keys(this.cs_connections).map(function (k) { | ||
return (self.cs_connections[k]); | ||
var conns = []; | ||
return (Object.keys(this.cs_connections).forEach(function (k) { | ||
var c = self.cs_connections[k]; | ||
var fsm = self.cs_fsm[c.cs_backendKey]; | ||
var h = self.cs_handles[k]; | ||
if (fsm.isInState('busy') && h.isInState('claimed')) | ||
conns.push(c); | ||
})); | ||
@@ -571,0 +653,0 @@ }; |
{ | ||
"name": "cueball", | ||
"version": "1.3.2", | ||
"version": "2.0.0", | ||
"description": "", | ||
@@ -5,0 +5,0 @@ "main": "lib/index.js", |
@@ -73,7 +73,7 @@ cueball | ||
- `recovery` -- Object, a recovery spec (see below) | ||
- `spares` -- Number, number of spares wanted in the pool per host | ||
- `maximum` -- Number, maximum number of connections per host | ||
- `resolvers` -- optional Array of String, either containing IP addresses to | ||
use as nameservers, or a single string for Dynamic Resolver mode | ||
- `log` -- optional Object, a `bunyan`-style logger to use | ||
- `spares` -- optional Number, number of spares wanted in the pool per host | ||
- `maximum` -- optional Number, maximum number of connections per host | ||
- `initialDomains` -- optional Array of String, initial domains to create | ||
@@ -119,2 +119,4 @@ connections to at startup (to pre-seed the Agent for quick user later) | ||
- `recovery` -- Object, a recovery spec (see below) | ||
- `spares` -- Number, number of spares wanted in the pool per host | ||
- `maximum` -- Number, maximum number of connections per host | ||
- `service` -- optional String, name of SRV service (e.g. `_http._tcp`) | ||
@@ -126,4 +128,2 @@ - `defaultPort` -- optional Number, port to use for plain A/AAAA records | ||
- `log` -- optional Object, a `bunyan`-style logger to use | ||
- `spares` -- optional Number, number of spares wanted in the pool per host | ||
- `maximum` -- optional Number, maximum number of connections per host | ||
- `maxDNSConcurrency` -- optional Number, max number of DNS queries to issue | ||
@@ -676,8 +676,8 @@ at once (default 5) | ||
- `recovery` -- Object, a recovery spec (see below) | ||
- `log` -- optional Object, a `bunyan`-style logger to use | ||
- `target` -- optional Number, target number of connections to be made | ||
- `target` -- Number, target number of connections to be made | ||
available in the entire set | ||
- `maximum` -- optional Number, maximum number of sockets opened by the set. | ||
- `maximum` -- Number, maximum number of sockets opened by the set. | ||
Note that this number may temporarily be exceeded by 1 socket | ||
to allow the set to re-balance. | ||
- `log` -- optional Object, a `bunyan`-style logger to use | ||
@@ -689,5 +689,17 @@ ### Event `'added'` | ||
The `handle` that is given as the third argument to this event has two methods | ||
`.release()` and `.close()`, like a Pool handle. As with Pool handles, it can | ||
be used to indicate the failure of a connection (e.g. due to a protocol error | ||
making safe use of the connection impossible) at any time, but unlike a Pool | ||
handle, it is an error to call `.release()` until after a `'removed'` event | ||
has been emitted. | ||
The user of the ConnectionSet should store both the `connection` and `handle` | ||
in such a way as to be able to retrieve them using the `key`. | ||
Parameters | ||
- `key` -- String, a unique key to identify this connection | ||
- `connection` -- Object, the connection as returned by the constructor | ||
- `handle` -- Object, a handle to be used in response to a 'removed' event | ||
about this connection | ||
@@ -698,5 +710,4 @@ ### Event `removed` | ||
*must* have a handler on it at all times. The handler is obligated to take all | ||
necessary actions to drain the connection of outstanding requests and then close | ||
it. The emission of this event must cause the connection object to emit | ||
`'close'` as soon as possible. | ||
necessary actions to drain the connection of outstanding requests and then | ||
call the `.release()` method on the relevant handle. | ||
@@ -703,0 +714,0 @@ Parameters |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
185198
16
4466
773