Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

cueball

Package Overview
Dependencies
Maintainers
1
Versions
71
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

cueball - npm Package Compare versions

Comparing version 1.3.2 to 2.0.0

CHANGES.adoc

13

lib/agent.js

@@ -291,2 +291,3 @@ /*

socket.once('free', onFree);
socket.once('close', onClose);
socket.once('agentRemove', onAgentRemove);

@@ -296,2 +297,11 @@ req.onSocket(socket);

function onClose() {
sock.removeListener('free', onFree);
sock.removeListener('agentRemove', onAgentRemove);
req.removeListener('abort', onAbort);
conn.release();
conn = undefined;
sock = undefined;
}
function onAbort() {

@@ -303,2 +313,3 @@ if (waiter !== undefined) {

if (conn !== undefined) {
sock.removeListener('close', onClose);
sock.removeListener('free', onFree);

@@ -312,2 +323,3 @@ sock.removeListener('agentRemove', onAgentRemove);

function onFree() {
sock.removeListener('close', onClose);
sock.removeListener('agentRemove', onAgentRemove);

@@ -320,2 +332,3 @@ req.removeListener('abort', onAbort);

function onAgentRemove() {
sock.removeListener('close', onClose);
sock.removeListener('free', onFree);

@@ -322,0 +335,0 @@ req.removeListener('abort', onAbort);

1427

lib/connection-fsm.js

@@ -9,3 +9,6 @@ /*

module.exports = ConnectionFSM;
module.exports = {
ConnectionSlotFSM: ConnectionSlotFSM,
CueBallClaimHandle: CueBallClaimHandle
};

@@ -33,42 +36,96 @@ const mod_events = require('events');

/*
* ConnectionFSM is the state machine for a "connection" -- an abstract entity
* that is managed by a ConnectionPool. ConnectionFSMs are associated with a
* particular 'backend', and hold a backing object cf_conn. Typically this
* backing object is a TCP socket, but may be any EventEmitter that emits
* 'close' and 'error'.
* This file contains 3 key FSMs used to manage connections:
* - SocketMgrFSM
* - ConnectionSlotFSM
* - CueBallClaimHandle (also an FSM, despite the name)
*
* Only the ConnectionSlotFSM and CueBallClaimHandle are exported and used
* elsewhere in cueball. The SocketMgrFSM is an implementation detail of the
* ConnectionSlotFSM (which is the only thing that constructs and manages it).
*
* Pools and Sets construct one ConnectionSlotFSM for each "slot" they have
* (a "slot" being a position in their list of connections, sort of like a
* vnode versus pnode in consistent hashing terminology).
*
* Pools and Sets also can construct ClaimHandles, which attempt to "claim" a
* slot. Once claimed, the ClaimHandle will call a callback and then hold that
* slot until a release() or close() method on it is called. It is used to
* manage the process of "giving out" connections to downstream users of
* cueball (this process involves multiple steps and possible state-change
* races, so it is encapsulated in an FSM).
*
* Note on state transition diagrams in this file:
*
* In the condition notation on edges:
* 'connect' -- means "when the event 'connect' is emitted"
* .connect() -- means "when the signal function .connect() is called"
* unwanted -- means "if the flag 'unwanted' is true" or
* "when the flag 'unwanted' changes to true"
*
* Generally the first condition on an edge is the trigger condition -- we will
* only transition when it changes. The subsequent conditions joined by && are
* evaluated only after a change happens in the first condition.
*/
function ConnectionFSM(options) {
/*
* The "Socket Manager FSM" (SocketMgrFSM) is an abstraction for a "socket"/
* connection, which handles retry and backoff, as well as de-duplication of
* error and closure related events.
*
* A single SocketMgrFSM may construct multiple sockets over its lifetime --
* it creates them by calling the "constructor" function passed in from the
* public-facing API (e.g. the ConnectionPool options). Every time we enter the
* "connecting" state we will construct a new socket, and we destroy it in
* "error" or "closed".
*
* A SocketMgrFSM is constructed and managed only by a ConnectionSlotFSM
* (there is a 1:1 relationship between the two). Many points in the SocketMgr
* state graph stop and wait for signals from the SlotFSM on how to proceed.
*
* These signals are "signal functions" in the mooremachine style -- see
* SocketMgrFSM.prototype.connect() and .retry().
*
* +------------+
* | |
* | failed |
* +------+ | |
* | init | +------------+
* +------+ ^
* | |
* | |retries
* v |exhausted
* +------------+ +--+---------+
* | | timeout| |
* +-----> | connecting | <--------+ backoff | <--+
* | | | | | |
* | +--+----+----+ +------------+ |
* | | | ^ |
* | 'connect'| | timeout/'error' | |
* | | +---------+ | |
* | v | | .retry() |
* | +------------+ | +--+---------+ |
* | | | +---> | | |
* | | connected +--------> | error | |
* | | | 'error' | | |
* | +--+---------+ +------------+ |
* | 'close'| |
* | .close()| |
* | | |
* | v |
* | +------------+ |
* | | | |
* +-------+ closed +----------------------------+
* .connect()| | .retry()
* +------------+
*/
function SocketMgrFSM(options) {
mod_assert.object(options, 'options');
mod_assert.object(options.pool, 'options.pool');
mod_assert.func(options.constructor, 'options.constructor');
mod_assert.object(options.backend, 'options.backend');
mod_assert.object(options.log, 'options.log');
mod_assert.optionalFunc(options.checker, 'options.checker');
mod_assert.optionalNumber(options.checkTimeout, 'options.checkTimeout');
mod_assert.object(options.slot, 'options.slot');
mod_assert.bool(options.monitor, 'options.monitor');
mod_assert.object(options.pool, 'options.pool');
this.cf_pool = options.pool;
this.cf_constructor = options.constructor;
this.cf_backend = options.backend;
this.cf_claimed = false;
this.cf_claimStack = [];
this.cf_releaseStack = [];
this.cf_lastError = undefined;
this.cf_conn = undefined;
this.cf_shadow = undefined;
this.cf_closeAfter = false;
mod_assert.optionalBool(options.doRef, 'options.doRef');
this.cf_doRef = options.doRef;
if (this.cf_doRef === undefined && this.cf_doRef !== null)
this.cf_doRef = true;
this.cf_oldListeners = {};
this.cf_checkTimeout = options.checkTimeout;
this.cf_checker = options.checker;
this.cf_lastCheck = new Date();
this.cf_log = options.log.child({
component: 'CueBallConnectionFSM',
backend: this.cf_backend.key,
address: this.cf_backend.address,
port: this.cf_backend.port
});
mod_assert.object(options.recovery, 'options.recovery');

@@ -87,187 +144,177 @@

this.cf_initialRecov = initialRecov;
this.cf_connectRecov = connectRecov;
this.sm_initialRecov = initialRecov;
this.sm_connectRecov = connectRecov;
this.cf_retries = initialRecov.retries;
this.cf_retriesLeft = initialRecov.retries;
this.cf_minDelay = initialRecov.delay;
this.cf_delay = initialRecov.delay;
this.cf_maxDelay = initialRecov.maxDelay || Infinity;
this.cf_timeout = initialRecov.timeout;
this.cf_maxTimeout = initialRecov.maxTimeout || Infinity;
this.sm_pool = options.pool;
this.sm_backend = options.backend;
this.sm_constructor = options.constructor;
this.sm_slot = options.slot;
this.sm_log = options.log.child({
component: 'CueBallSocketMgrFSM',
backend: this.sm_backend.key,
address: this.sm_backend.address,
port: this.sm_backend.port
});
this.sm_lastError = undefined;
this.sm_socket = undefined;
/*
* If our parent pool thinks this backend is dead, resume connection
* attempts with the maximum delay and timeout. Something is going
* wrong, let's not retry too aggressively and make it worse.
* We start off in the "init" state, waiting for the SlotFSM to call
* connect().
*/
if (this.cf_pool.isDeclaredDead(this.cf_backend.key)) {
/*
* We might be given an infinite maxDelay or maxTimeout. If
* we are, then multiply it by 2^(retries) to get to what the
* value would have been before.
*/
var mult = 1 << this.cf_retries;
this.cf_delay = this.cf_maxDelay;
if (!isFinite(this.cf_delay))
this.cf_delay = initialRecov.delay * mult;
this.cf_timeout = this.cf_maxTimeout;
if (!isFinite(this.cf_timeout))
this.cf_timeout = initialRecov.timeout * mult;
/* Keep retrying a failed backend forever */
this.cf_retries = Infinity;
this.cf_retriesLeft = Infinity;
}
FSM.call(this, 'init');
this.allStateEvent('closeAsserted');
FSM.call(this, 'init');
this.sm_monitor = undefined;
this.setMonitor(options.monitor);
}
mod_util.inherits(ConnectionFSM, FSM);
mod_util.inherits(SocketMgrFSM, FSM);
/*
* Return true if this connection was closed due to retry exhaustion.
* Sets whether we are in "monitor" mode or not. In "monitor" mode the
* SocketMgrFSM has infinite retries and does not use exponential back-off
* (timeout and delay are fixed at their max values).
*/
ConnectionFSM.prototype.retriesExhausted = function () {
return (this.isInState('closed') && this.cf_retriesLeft <= 0);
SocketMgrFSM.prototype.setMonitor = function (value) {
mod_assert.ok(this.isInState('init') || this.isInState('connected'));
if (value === this.sm_monitor)
return;
this.sm_monitor = value;
this.resetBackoff();
};
ConnectionFSM.prototype.getConnection = function () {
mod_assert.ok(this.isInState('idle'));
return (this.cf_conn);
};
SocketMgrFSM.prototype.resetBackoff = function () {
var initialRecov = this.sm_initialRecov;
/*
* Mark this Connection as "claimed"; in use by a particular client of the
* pool.
*
* Normally this will be called by the pool itself, which will give the 'stack'
* argument as a copy of the stack trace from its caller.
*
* We keep track of the stack trace of our last claimer and releaser to aid
* in debugging.
*/
ConnectionFSM.prototype.claim = function (stack, cb) {
mod_assert.ok(this.cf_claimed === false);
mod_assert.strictEqual(this.getState(), 'idle');
if (typeof (stack) === 'function') {
cb = stack;
stack = undefined;
this.sm_retries = initialRecov.retries;
this.sm_retriesLeft = initialRecov.retries;
this.sm_minDelay = initialRecov.delay;
this.sm_delay = initialRecov.delay;
this.sm_maxDelay = initialRecov.maxDelay || Infinity;
this.sm_timeout = initialRecov.timeout;
this.sm_maxTimeout = initialRecov.maxTimeout || Infinity;
if (this.sm_monitor === true) {
var mult = 1 << this.sm_retries;
this.sm_delay = this.sm_maxDelay;
if (!isFinite(this.sm_delay))
this.sm_delay = initialRecov.delay * mult;
this.sm_timeout = this.sm_maxTimeout;
if (!isFinite(this.sm_timeout))
this.sm_timeout = initialRecov.timeout * mult;
/* Keep retrying a failed backend forever */
this.sm_retries = Infinity;
this.sm_retriesLeft = Infinity;
}
mod_assert.func(cb, 'callback');
if (stack === undefined) {
var e = mod_utils.maybeCaptureStackTrace();
stack = e.stack;
}
this.cf_claimStack = stack.split('\n').slice(1).
map(function (l) { return (l.replace(/^[ ]*at /, '')); });
this.cf_claimed = true;
var self = this;
this.on('stateChanged', onStateChanged);
function onStateChanged(st) {
if (st === 'busy' && self.isInState('busy')) {
self.removeListener('stateChanged', onStateChanged);
cb(null, self.cf_shadow, self.cf_conn);
} else if (st === 'error' && self.cf_lastError !== undefined) {
self.removeListener('stateChanged', onStateChanged);
var err = new mod_verror.VError(self.cf_lastError,
'Connection error during claim on backend %s:%d',
self.cf_backend.address, self.cf_backend.port);
cb(err);
} else if (st !== 'busy') {
self.removeListener('stateChanged', onStateChanged);
cb(new mod_verror.VError('Claimed connection entered ' +
'state "%s" during claim, instead of "busy"', st));
}
}
this.emit('claimAsserted');
};
/*
* Mark this Connection as "free" and ready to be re-used. This is normally
* called via the ConnectionHandle.
*/
ConnectionFSM.prototype.release = function (cb) {
mod_assert.ok(this.cf_claimed === true);
mod_assert.ok(['busy', 'ping'].indexOf(this.getState()) !== -1,
'connection is not held');
SocketMgrFSM.prototype.connect = function () {
mod_assert.ok(this.isInState('init') || this.isInState('closed'),
'SocketMgrFSM#connect may only be called in state "init" or ' +
'"closed" (is in "' + this.getState() + '")');
this.emit('connectAsserted');
};
var e = mod_utils.maybeCaptureStackTrace();
this.cf_releaseStack = e.stack.split('\n').slice(1).
map(function (l) { return (l.replace(/^[ ]*at /, '')); });
this.once('stateChanged', function (st) {
mod_assert.notStrictEqual(st, 'busy');
if (cb)
cb(null);
});
this.emit('releaseAsserted');
SocketMgrFSM.prototype.retry = function () {
mod_assert.ok(this.isInState('closed') || this.isInState('error'),
'SocketMgrFSM#retry may only be called in state "closed" or ' +
'"error" (is in "' + this.getState() + '")');
this.emit('retryAsserted');
};
ConnectionFSM.prototype.close = function (cb) {
if (cb) {
this.on('stateChanged', function (st) {
if (st === 'closed')
cb();
});
}
SocketMgrFSM.prototype.close = function () {
mod_assert.ok(this.isInState('connected') || this.isInState('backoff'),
'SocketMgrFSM#close may only be called in state "connected" or ' +
'"backoff" (is in "' + this.getState() + '")');
this.emit('closeAsserted');
};
ConnectionFSM.prototype.start = function () {
this.emit('startAsserted');
SocketMgrFSM.prototype.getLastError = function () {
return (this.sm_lastError);
};
ConnectionFSM.prototype.closeAfterRelease = function () {
this.cf_closeAfter = true;
SocketMgrFSM.prototype.getSocket = function () {
mod_assert.ok(this.isInState('connected'), 'Sockets may only be ' +
'retrieved from SocketMgrFSMs in "connected" state (is in ' +
'state "' + this.getState() + '")');
return (this.sm_socket);
};
ConnectionFSM.prototype.state_init = function (S) {
S.validTransitions(['connect', 'closed']);
S.on(this, 'startAsserted', function () {
S.gotoState('connect');
SocketMgrFSM.prototype.state_init = function (S) {
S.validTransitions(['connecting']);
S.on(this, 'connectAsserted', function () {
S.gotoState('connecting');
});
S.on(this, 'closeAsserted', function () {
S.gotoState('closed');
});
};
ConnectionFSM.prototype.state_connect = function (S) {
S.validTransitions(['error', 'idle', 'closed']);
SocketMgrFSM.prototype.state_connecting = function (S) {
S.validTransitions(['connected', 'error']);
var self = this;
S.timeout(this.cf_timeout, function () {
self.cf_lastError = new mod_errors.ConnectionTimeoutError(self);
S.timeout(this.sm_timeout, function () {
self.sm_lastError = new mod_errors.ConnectionTimeoutError(self);
S.gotoState('error');
self.sm_pool._incrCounter('timeout-during-connect');
});
this.cf_log.trace('calling constructor to open new connection');
this.cf_conn = this.cf_constructor(this.cf_backend);
mod_assert.object(this.cf_conn, 'constructor return value');
this.cf_conn.cf_fsm = this;
S.on(this.cf_conn, 'connect', function () {
S.gotoState('idle');
this.sm_log.trace('calling constructor to open new connection');
this.sm_socket = this.sm_constructor(this.sm_backend);
mod_assert.object(this.sm_socket, 'constructor return value');
this.sm_socket.sm_fsm = this;
S.on(this.sm_socket, 'connect', function () {
S.gotoState('connected');
});
S.on(this.cf_conn, 'error', function (err) {
self.cf_lastError = err;
S.on(this.sm_socket, 'error', function socketMgrErrorListener(err) {
self.sm_lastError = err;
S.gotoState('error');
self.cf_pool._incrCounter('error-during-connect');
self.sm_pool._incrCounter('error-during-connect');
});
S.on(this.cf_conn, 'connectError', function (err) {
self.cf_lastError = err;
S.on(this.sm_socket, 'connectError', function (err) {
self.sm_lastError = err;
S.gotoState('error');
self.cf_pool._incrCounter('error-during-connect');
self.sm_pool._incrCounter('error-during-connect');
});
S.on(this.cf_conn, 'close', function () {
self.cf_lastError = new mod_errors.ConnectionClosedError(self);
S.on(this.sm_socket, 'close', function () {
self.sm_lastError = new mod_errors.ConnectionClosedError(self);
S.gotoState('error');
self.cf_pool._incrCounter('close-during-connect');
self.sm_pool._incrCounter('close-during-connect');
});
S.on(this.cf_conn, 'timeout', function () {
self.cf_lastError = new mod_errors.ConnectionTimeoutError(self);
S.on(this.sm_socket, 'timeout', function () {
self.sm_lastError = new mod_errors.ConnectionTimeoutError(self);
S.gotoState('error');
self.cf_pool._incrCounter('timeout-during-connect');
self.sm_pool._incrCounter('timeout-during-connect');
});
S.on(this.cf_conn, 'connectTimeout', function (err) {
self.cf_lastError = new mod_errors.ConnectionTimeoutError(self);
S.on(this.sm_socket, 'connectTimeout', function () {
self.sm_lastError = new mod_errors.ConnectionTimeoutError(self);
S.gotoState('error');
self.cf_pool._incrCounter('timeout-during-connect');
self.sm_pool._incrCounter('timeout-during-connect');
});
};
SocketMgrFSM.prototype.state_connected = function (S) {
S.validTransitions(['error', 'closed']);
var self = this;
if (typeof (self.sm_socket.localPort) === 'number') {
this.sm_log = this.sm_log.child({
localPort: this.sm_socket.localPort
});
}
this.sm_log.trace('connected');
this.resetBackoff();
S.on(this.sm_socket, 'error', function socketMgrErrorListener(err) {
self.sm_lastError = err;
S.gotoState('error');
self.sm_pool._incrCounter('error-while-connected');
});
S.on(this.sm_socket, 'close', function () {
S.gotoState('closed');
});
S.on(this, 'closeAsserted', function () {

@@ -278,128 +325,349 @@ S.gotoState('closed');

ConnectionFSM.prototype.state_closed = function (S) {
S.validTransitions([]);
this.cf_log.trace('closed');
if (this.cf_conn) {
this.cf_conn.destroy();
this.cf_log = this.cf_log.child({ localPort: null });
SocketMgrFSM.prototype.state_error = function (S) {
S.validTransitions(['backoff']);
var log = this.sm_log;
if (this.sm_socket) {
this.sm_socket.destroy();
this.sm_log = this.sm_log.child({ localPort: null });
}
this.cf_conn = undefined;
S.on(this, 'closeAsserted', function () { });
};
this.sm_socket = undefined;
ConnectionFSM.prototype.state_error = function (S) {
S.validTransitions(['delay', 'closed']);
log.trace(this.sm_lastError, 'got error from connection');
S.on(this, 'closeAsserted', function () { });
S.on(this, 'retryAsserted', function () {
S.gotoState('backoff');
});
};
var log = this.cf_log;
if (this.cf_conn) {
this.cf_conn.destroy();
this.cf_log = this.cf_log.child({ localPort: null });
}
this.cf_conn = undefined;
SocketMgrFSM.prototype.state_backoff = function (S) {
S.validTransitions(['failed', 'connecting']);
if (this.cf_shadow) {
this.cf_shadow.sh_error = true;
this.cf_shadow = undefined;
}
/*
* If the closeAfter flag is set, and this is a connection to a "dead"
* backend (i.e., a "monitor" watching to see when it comes back), then
* exit now. For an ordinary backend, we don't want to do this,
* because we want to give ourselves the opportunity to run out of
* retries.
*
* Otherwise, in a situation where we have two connections that were
* created at the same time, one to a failing backend that's already
* declared dead, and one to a different failing backend not yet
* declared, we may never learn that the second backend is down and
* declare it dead. The already declared dead backend may exit first
* during a pool reshuffle and cause this one to exit prematurely
* (there's a race in who exits first and causes the planner to engage)
* Unfortunately, "retries" actually means "attempts" in the cueball
* API. To preserve compatibility we have to compare to 1 here instead
* of 0.
*/
if (this.cf_retries === Infinity && this.cf_closeAfter) {
this.cf_retriesLeft = 0;
log.trace('backoff monitor shut down');
S.gotoState('closed');
if (this.sm_retriesLeft !== Infinity && this.sm_retriesLeft <= 1) {
S.gotoState('failed');
return;
}
var delay = this.sm_delay;
if (this.sm_retries !== Infinity) {
--this.sm_retriesLeft;
this.sm_delay *= 2;
this.sm_timeout *= 2;
if (this.sm_timeout > this.sm_maxTimeout)
this.sm_timeout = this.sm_maxTimeout;
if (this.sm_delay > this.sm_maxDelay)
this.sm_delay = this.sm_maxDelay;
}
var t = S.timeout(delay, function () {
S.gotoState('connecting');
});
/*
* If this backend has been removed from the Resolver, we should not
* attempt any kind of reconnection. Exit now.
* If we're in monitor mode, unref the backoff timer. That way we
* don't block a commandline app from exiting that wants to exit as
* soon as a pool enters 'failed'.
*/
if (!this.cf_pool.shouldRetryBackend(this.cf_backend.key)) {
log.trace('pool no longer wants us, closing');
if (this.sm_monitor)
t.unref();
S.on(this, 'closeAsserted', function () {
S.gotoState('closed');
return;
});
};
SocketMgrFSM.prototype.state_closed = function (S) {
S.validTransitions(['backoff', 'connecting']);
var log = this.sm_log;
if (this.sm_socket) {
this.sm_socket.destroy();
this.sm_log = this.sm_log.child({ localPort: null });
}
this.sm_socket = undefined;
if (this.cf_retries !== Infinity)
--this.cf_retriesLeft;
log.trace('connection closed');
if (this.cf_retries === Infinity || this.cf_retriesLeft > 0) {
log.trace(this.cf_lastError, 'failed to connect to ' +
'backend, %d retries left: delaying before retry',
this.cf_retriesLeft);
S.gotoState('delay');
S.on(this, 'retryAsserted', function () {
S.gotoState('backoff');
});
S.on(this, 'connectAsserted', function () {
S.gotoState('connecting');
});
};
SocketMgrFSM.prototype.state_failed = function (S) {
S.validTransitions([]);
this.sm_log.warn(this.sm_lastError, 'failed to connect to ' +
'backend, retries exhausted');
this.sm_pool._incrCounter('retries-exhausted');
};
/*
* ClaimHandle is a state machine representing a "claim handle". We give
* these out to clients of the Cueball pool, so that they can call either
* release() or close() on them when their use of the connection is complete.
*
* They also handle the process of finding a Slot in the pool to fulfill the
* claim and getting the connection successfully returned to the user.
*
* Some ancilliary responsibilities relate to the user interface features we
* support, like detecting leaked event handlers on re-useable connections and
* cancelling the process mid-way.
*
* A ClaimHandle is created immediately when .claim() is called on a pool.
* The pool then chooses slots that it will attempt to use to fulfill the claim.
* Each time the pool tries another slot, it calls .try() on the ClaimHandle.
* The SlotFSM will then either call .accept() or .reject() on the handle. If
* .reject() is called, we try another slot. If .accept() is called, we proceed
* to state "claimed". From "claimed", the end-user of the cueball pool calls
* either one of .release() or .close() to relinquish their claim.
*
* timeout +--------+
* +--------------------> | |
* | | |
* | | failed |
* +----+----+ .fail() | |
* | +---------------> | |
* | | +--------+
* init -> | waiting |
* | | .cancel()
* | +---------------------+
* | | |
* +-+-------+ |
* .try() | ^ |
* | | |
* | | v
* | | +-----------+
* | | | |
* | | | cancelled |
* | | | |
* | | .reject() +-----------+
* v | && !cancel ^
* +------+---+ |
* | | |
* | claiming +--------------------+
* | | .reject() && ^
* +----+-----+ cancel |
* .accept() | |
* | +------------+
* | |
* v |
* +---------+ cancel |
* | +--------+ +--------+
* | claimed | | |
* | | .close() | closed |
* | +-------------> | |
* +----+----+ +--------+
* | .release()
* |
* |
* v
* +----------+
* | |
* | released |
* | |
* +----------+
*
*/
function CueBallClaimHandle(options) {
mod_assert.object(options, 'options');
mod_assert.number(options.claimTimeout, 'options.claimTimeout');
this.ch_claimTimeout = options.claimTimeout;
mod_assert.object(options.pool, 'options.pool');
this.ch_pool = options.pool;
mod_assert.string(options.claimStack, 'options.claimStack');
this.ch_claimStack = options.claimStack.split('\n').slice(1).
map(function (l) { return (l.replace(/^[ ]*at /, '')); });
/* The user callback provided to Pool#claim() */
mod_assert.func(options.callback, 'options.callback');
this.ch_callback = options.callback;
mod_assert.object(options.log, 'options.log');
this.ch_log = options.log.child({
component: 'CueBallClaimHandle'
});
this.ch_slot = undefined;
this.ch_releaseStack = undefined;
this.ch_connection = undefined;
this.ch_preListeners = {};
this.ch_cancelled = false;
this.ch_lastError = undefined;
FSM.call(this, 'waiting');
}
mod_util.inherits(CueBallClaimHandle, FSM);
CueBallClaimHandle.prototype.try = function (slot) {
mod_assert.ok(this.isInState('waiting'), 'ClaimHandle#try may only ' +
'be called in state "waiting" (is in "' + this.getState() + '")');
mod_assert.ok(slot.isInState('idle'), 'ClaimHandle#try may only ' +
'be called on a slot in state "idle" (is in "' + slot.getState() +
'")');
this.ch_slot = slot;
this.emit('tryAsserted');
};
CueBallClaimHandle.prototype.accept = function (connection) {
mod_assert.ok(this.isInState('claiming'));
this.ch_connection = connection;
this.emit('accepted');
};
CueBallClaimHandle.prototype.reject = function () {
mod_assert.ok(this.isInState('claiming'));
this.emit('rejected');
};
CueBallClaimHandle.prototype.cancel = function () {
if (this.isInState('claimed')) {
this.release();
} else {
log.warn(this.cf_lastError, 'failed to connect to backend, ' +
'retries exhausted');
this.cf_pool._incrCounter('retries-exhausted');
S.gotoState('closed');
this.ch_cancelled = true;
this.emit('cancelled');
}
};
ConnectionFSM.prototype.state_delay = function (S) {
S.validTransitions(['connect', 'closed']);
var delay = this.cf_delay;
CueBallClaimHandle.prototype.fail = function (err) {
this.emit('error', err);
};
this.cf_delay *= 2;
this.cf_timeout *= 2;
if (this.cf_timeout > this.cf_maxTimeout)
this.cf_timeout = this.cf_maxTimeout;
if (this.cf_delay > this.cf_maxDelay)
this.cf_delay = this.cf_maxDelay;
CueBallClaimHandle.prototype.relinquish = function (event) {
if (!this.isInState('claimed')) {
if (this.isInState('released') || this.isInState('closed')) {
var err = new Error('Connection not claimed by ' +
'this handle, released by ' +
this.ch_releaseStack[2]);
throw (err);
}
throw (new Error('ClaimHandle#release() called while in ' +
'state "' + this.getState() + '"'));
}
var e = mod_utils.maybeCaptureStackTrace();
this.ch_releaseStack = e.stack.split('\n').slice(1).
map(function (l) { return (l.replace(/^[ ]*at /, '')); });
this.emit(event);
};
var t = S.timeout(delay, function () {
S.gotoState('connect');
});
t.unref();
S.on(this, 'closeAsserted', function () {
S.gotoState('closed');
});
CueBallClaimHandle.prototype.release = function () {
this.relinquish('releaseAsserted');
};
ConnectionFSM.prototype.state_idle = function (S) {
S.validTransitions(['busy', 'error', 'closed']);
CueBallClaimHandle.prototype.close = function () {
this.relinquish('closeAsserted');
};
CueBallClaimHandle.prototype.state_waiting = function (S) {
S.validTransitions(['claiming', 'cancelled', 'failed']);
var self = this;
this.cf_claimed = false;
this.cf_claimStack = [];
this.ch_slot = undefined;
if (typeof (self.cf_conn.localPort) === 'number') {
this.cf_log = this.cf_log.child({
localPort: self.cf_conn.localPort
S.on(this, 'tryAsserted', function () {
S.gotoState('claiming');
});
if (isFinite(this.ch_claimTimeout)) {
S.timeout(this.ch_claimTimeout, function () {
self.ch_lastError = new mod_errors.ClaimTimeoutError(
self.ch_pool);
S.gotoState('failed');
});
}
this.cf_log.trace('connected, idling');
S.on(this, 'error', function (err) {
self.ch_lastError = err;
S.gotoState('failed');
});
if (this.cf_shadow) {
this.cf_shadow.sh_claimed = false;
this.cf_shadow.sh_releaseStack = this.cf_releaseStack;
this.cf_shadow = undefined;
S.on(this, 'cancelled', function () {
S.gotoState('cancelled');
});
};
CueBallClaimHandle.prototype.state_claiming = function (S) {
S.validTransitions(['claimed', 'waiting']);
var self = this;
S.on(this, 'accepted', function () {
S.gotoState('claimed');
});
S.on(this, 'rejected', function () {
if (self.ch_cancelled) {
S.gotoState('cancelled');
} else {
S.gotoState('waiting');
}
});
this.ch_slot.claim(this);
};
CueBallClaimHandle.prototype.state_claimed = function (S) {
var self = this;
S.validTransitions(['released', 'closed']);
S.on(this, 'releaseAsserted', function () {
S.gotoState('released');
});
S.on(this, 'closeAsserted', function () {
S.gotoState('closed');
});
if (this.ch_cancelled) {
S.gotoState('released');
return;
}
this.ch_preListeners = {};
['close', 'error', 'readable', 'data'].forEach(function (evt) {
var newCount = countListeners(self.cf_conn, evt);
var oldCount = self.cf_oldListeners[evt];
var count = countListeners(self.ch_connection, evt);
self.ch_preListeners[evt] = count;
});
S.on(this.ch_connection, 'error', function clHandleErrorListener(err) {
var count = countListeners(self.ch_connection, 'error');
if (count === 0) {
/*
* Our end-user never set up an 'error' event listener
* and the socket emitted 'error'. We want to act like
* nothing is listening for it at all and throw.
*/
throw (err);
}
});
this.ch_log = this.ch_slot.makeChildLogger({
component: 'CueBallClaimHandle'
});
this.ch_callback(null, this, this.ch_connection);
};
CueBallClaimHandle.prototype.state_released = function (S) {
S.validTransitions([]);
var conn = this.ch_connection;
var self = this;
['close', 'error', 'readable', 'data'].forEach(function (evt) {
var newCount = countListeners(conn, evt);
var oldCount = self.ch_preListeners[evt];
if (oldCount !== undefined && newCount > oldCount) {
var info = {};
info.stack = self.cf_releaseStack;
info.stack = self.ch_stack;
info.countBeforeClaim = oldCount;
info.countAfterRelease = newCount;
info.handlers = self.cf_conn.listeners(evt).map(
info.handlers = conn.listeners(evt).map(
function (f) {

@@ -420,163 +688,29 @@ /*

info.event = evt;
self.cf_log.warn(info, 'connection claimer looks ' +
self.ch_log.warn(info, 'connection claimer looks ' +
'like it leaked event handlers');
}
});
};
CueBallClaimHandle.prototype.state_closed = function (S) {
S.validTransitions([]);
/*
* Reset retries and retry delay to their defaults since we are now
* connected.
* Don't bother to check for leaked event handlers here, as the
* connection is being closed anyway.
*/
this.cf_retries = this.cf_connectRecov.retries;
this.cf_retriesLeft = this.cf_connectRecov.retries;
this.cf_minDelay = this.cf_connectRecov.delay;
this.cf_delay = this.cf_connectRecov.delay;
this.cf_maxDelay = this.cf_connectRecov.maxDelay || Infinity;
this.cf_timeout = this.cf_connectRecov.timeout;
this.cf_maxTimeout = this.cf_connectRecov.maxTimeout || Infinity;
if (this.cf_closeAfter === true) {
this.cf_closeAfter = false;
this.cf_lastError = undefined;
S.on(this, 'closeAsserted', function () { });
S.gotoState('closed');
return;
}
if (this.cf_doRef)
this.cf_conn.unref();
S.on(this, 'claimAsserted', function () {
S.gotoState('busy');
});
S.on(this.cf_conn, 'error', function (err) {
self.cf_lastError = err;
S.gotoState('error');
self.cf_pool._incrCounter('error-during-idle');
});
S.on(this.cf_conn, 'close', function () {
/*
* If we receive 'close' while idle with our closeAfter flag
* set, we were going to be removed anyway. Just go to closed.
* Going to error would give us a chance to learn about a dead
* backend, but a clean 'close' with no 'error' is probably
* not dead.
*
* This is particularly important with ConnectionSets, where
* this is the normal path that's taken for the Set's consumer
* to notify it that this connection has drained and closed.
*/
if (self.cf_closeAfter === true) {
S.gotoState('closed');
} else {
self.cf_lastError =
new mod_errors.ConnectionClosedError(self);
S.gotoState('error');
self.cf_pool._incrCounter('close-during-idle');
}
});
S.on(this.cf_conn, 'end', function () {
if (self.cf_closeAfter === true) {
S.gotoState('closed');
} else {
self.cf_lastError = new
mod_errors.ConnectionClosedError(self);
S.gotoState('error');
self.cf_pool._incrCounter('end-during-idle');
}
});
S.on(this, 'closeAsserted', function () {
S.gotoState('closed');
});
if (this.cf_checkTimeout !== undefined &&
this.cf_checkTimeout !== null) {
var now = new Date();
var sinceLast = (now - this.cf_lastCheck);
var delay;
if (sinceLast > this.cf_checkTimeout) {
delay = 1000;
} else {
delay = this.cf_checkTimeout - sinceLast;
if (delay < 1000)
delay = 1000;
}
var t = S.timeout(delay, function () {
S.gotoState('ping');
});
t.unref();
}
};
ConnectionFSM.prototype.state_ping = function (S) {
S.validTransitions(['error', 'closed', 'idle']);
this.cf_lastCheck = new Date();
this.cf_claimStack = [
'ConnectionFSM.prototype.state_ping',
'(periodic_health_check)'];
this.cf_claimed = true;
var self = this;
if (this.cf_doRef)
this.cf_conn.ref();
this.cf_releaseStack = [];
this.cf_log.trace('doing health check');
CueBallClaimHandle.prototype.state_cancelled = function (S) {
S.validTransitions([]);
/*
* Write down the count of event handlers on the backing object so that
* we can spot if the client leaked any common ones in release().
* The public API requires that the callback function not be called at
* all when .cancel() has been used. So we do nothing here.
*/
this.cf_oldListeners = {};
['close', 'error', 'readable', 'data'].forEach(function (evt) {
var count = countListeners(self.cf_conn, evt);
self.cf_oldListeners[evt] = count;
});
};
/*
* The ConnectionHandle is a one-time use object that proxies calls to
* our release() and close() functions. We use it so that we can assert
* that this particular client only releases us once. If we only
* asserted on our current state, there could be a race where we get
* claimed by a different client in the meantime.
*/
this.cf_shadow = new ConnectionHandle(this);
S.on(this, 'releaseAsserted', function () {
if (self.cf_closeAfter === true) {
S.gotoState('closed');
} else {
S.gotoState('idle');
}
});
S.on(this.cf_conn, 'error', function (err) {
self.cf_lastError = err;
S.gotoState('error');
self.cf_pool._incrCounter('error-during-ping');
});
S.on(this.cf_conn, 'close', function () {
self.cf_lastError = new mod_errors.ConnectionClosedError(self);
S.gotoState('error');
self.cf_pool._incrCounter('close-during-ping');
});
S.on(this.cf_conn, 'end', function () {
self.cf_lastError = new mod_errors.ConnectionClosedError(self);
S.gotoState('error');
self.cf_pool._incrCounter('end-during-ping');
});
S.on(this, 'closeAsserted', function () {
self.cf_lastError = new mod_errors.ConnectionClosedError(self);
S.gotoState('error');
});
var t = S.timeout(this.cf_checkTimeout, function () {
var info = {};
info.stack = self.cf_claimStack;
self.cf_log.warn(info, 'health check is taking too ' +
'long to run (has been more than %d ms)',
self.cf_checkTimeout);
});
t.unref();
CueBallClaimHandle.prototype.state_failed = function (S) {
S.validTransitions([]);
var self = this;
S.immediate(function () {
self.cf_checker.call(null, self.cf_shadow, self.cf_conn);
self.ch_callback(self.ch_lastError);
});

@@ -599,2 +733,7 @@ };

}
/* Don't count the error listeners set up by cueball. */
if (h.name === 'socketMgrErrorListener' ||
h.name === 'clHandleErrorListener') {
return (false);
}
return (true);

@@ -605,104 +744,398 @@ });

ConnectionFSM.prototype.state_busy = function (S) {
S.validTransitions(['error', 'closed', 'idle']);
var self = this;
if (this.cf_doRef)
this.cf_conn.ref();
/*
* ConnectionSlotFSM is a large part of the guts of cueball connection
* management. Its primary job is to construct and manage a SocketMgrFSM,
* making decisions about when to retry and what to do when it's time to
* give up.
*
* It also handles reporting to the Pool or Set that hosts this slot, as its
* state graph represents only the transitions that the Pool or Set actually
* cares about.
*
* SlotFSMs have two key flags passed in by the Pool or Set: "wanted" and
* "monitor". "Monitor" indicates that this connection is a monitor connection
* (i.e. the backend that it is connecting to is expected to be dead, and this
* FSM's job is to monitor it for when it comes back). The "wanted" flag is
* always true at construction but may be set to false by the Pool or Set. When
* so set, it indicates that this slot is no longer "wanted" and should cease
* operation as soon as possible (by moving to "failed" or "stopped").
*
* init (startup)
* | +--------+
* | +-----------> | failed | <-----------+
* | | +--------+ |
* | | |
* | | | +---------+
* | smgr | smgr | | |
* | failed | failed | | |
* | | | v smgr |
* | +--------+-------+ +-------+--------+ error |
* +--> | connecting | smgr error | retrying +-------+
* (A)---> | +------------------> | |
* | smgr.connect() | | smgr.retry() | <-+
* +-------+--------+ +-------+--------+ |
* | ^ ^ | |
* smgr | | | | smgr |
* connected | | | | connected |
* | | smgr smgr | | |
* | | closed +--------+ error | | |
* | +-----------+ +----------+ | |
* | | | | |
* +------------> | idle | <-----------+ |
* | | |
* +----------------------+ | <-----------+ |
* | !wanted && smgr +---+----+ | |
* | connected | .claim() | |
* v | | |
* +--------------+ | | |
* | stopping | v | |
* | | +--------+ | |
* | smgr.close() | <------------+ +-------------+ |
* +------+-------+ hdl rel. && | | hdl released && |
* | smgr conn && | | smgr connected && |
* smgr | !wanted | | wanted |
* closed | | | |
* | | busy +----------------->(A) |
* v | | hdl released && |
* +---------+ | | smgr closed |
* | stopped | <---------------+ | |
* +---------+ hdl rel. && | +--------------------------+
* !(smgr conn) && | | hdl closed && ^
* !wanted +-----+--+ !(smgr connected) |
* | |
* | hdl closed && |
* | smgr connected |
* | |
* v |
* +--------------+ |
* | killing +----------------+
* | | smgr closed
* | smgr.close() |
* +--------------+
*/
function ConnectionSlotFSM(options) {
mod_assert.object(options, 'options');
mod_assert.object(options.pool, 'options.pool');
mod_assert.func(options.constructor, 'options.constructor');
mod_assert.object(options.backend, 'options.backend');
mod_assert.object(options.log, 'options.log');
mod_assert.object(options.recovery, 'options.recovery');
mod_assert.bool(options.monitor, 'options.monitor');
this.cf_releaseStack = [];
this.cf_log.trace('busy, claimed by %s',
this.cf_claimStack[1].split(' ')[0]);
mod_assert.optionalFunc(options.checker, 'options.checker');
mod_assert.optionalNumber(options.checkTimeout, 'options.checkTimeout');
/*
* Write down the count of event handlers on the backing object so that
* we can spot if the client leaked any common ones in release().
*/
this.cf_oldListeners = {};
['close', 'error', 'readable', 'data'].forEach(function (evt) {
var count = countListeners(self.cf_conn, evt);
self.cf_oldListeners[evt] = count;
this.csf_pool = options.pool;
this.csf_backend = options.backend;
this.csf_wanted = true;
this.csf_handle = undefined;
this.csf_monitor = options.monitor;
this.csf_checker = options.checker;
this.csf_checkTimeout = options.checkTimeout;
this.csf_log = options.log.child({
component: 'CueBallConnectionSlotFSM',
backend: this.csf_backend.key,
address: this.csf_backend.address,
port: this.csf_backend.port
});
/*
* The ConnectionHandle is a one-time use object that proxies calls to
* our release() and close() functions. We use it so that we can assert
* that this particular client only releases us once. If we only
* asserted on our current state, there could be a race where we get
* claimed by a different client in the meantime.
*/
this.cf_shadow = new ConnectionHandle(this);
var smgrOpts = {
pool: options.pool,
constructor: options.constructor,
backend: options.backend,
log: options.log,
recovery: options.recovery,
monitor: options.monitor,
slot: this
};
this.csf_smgr = new SocketMgrFSM(smgrOpts);
S.on(this, 'releaseAsserted', function () {
if (self.cf_closeAfter === true) {
S.gotoState('closed');
} else {
FSM.call(this, 'init');
}
mod_util.inherits(ConnectionSlotFSM, FSM);
ConnectionSlotFSM.prototype.setUnwanted = function () {
if (this.csf_wanted === false)
return;
this.csf_wanted = false;
this.emit('unwanted');
};
ConnectionSlotFSM.prototype.start = function () {
mod_assert.ok(this.isInState('init'));
this.emit('startAsserted');
};
ConnectionSlotFSM.prototype.claim = function (handle) {
mod_assert.ok(this.isInState('idle'));
mod_assert.strictEqual(this.csf_handle, undefined);
this.csf_handle = handle;
this.emit('claimAsserted');
};
ConnectionSlotFSM.prototype.makeChildLogger = function (args) {
return (this.csf_log.child(args));
};
ConnectionSlotFSM.prototype.getSocketMgr = function () {
return (this.csf_smgr);
};
ConnectionSlotFSM.prototype.state_init = function (S) {
S.on(this, 'startAsserted', function () {
S.gotoState('connecting');
});
};
ConnectionSlotFSM.prototype.state_connecting = function (S) {
S.validTransitions(['failed', 'retrying', 'idle']);
var smgr = this.csf_smgr;
S.on(smgr, 'stateChanged', function (st) {
switch (st) {
case 'init':
case 'connecting':
break;
case 'failed':
S.gotoState('failed');
break;
case 'error':
S.gotoState('retrying');
break;
case 'connected':
S.gotoState('idle');
break;
default:
throw (new Error('Unhandled smgr state transition: ' +
'.connect() => "' + st + '"'));
}
});
S.on(this.cf_conn, 'error', function (err) {
self.cf_log.error(err, 'connection emitted "error" while ' +
'busy (claimed)');
self.cf_lastError = err;
S.gotoState('error');
self.cf_pool._incrCounter('error-during-busy');
smgr.connect();
};
ConnectionSlotFSM.prototype.state_failed = function (S) {
S.validTransitions([]);
mod_assert.ok(this.csf_smgr.isInState('failed'),
'smgr must be failed');
};
ConnectionSlotFSM.prototype.state_retrying = function (S) {
S.validTransitions(['idle', 'failed', 'retrying', 'stopped',
'stopping']);
var self = this;
var smgr = this.csf_smgr;
S.on(smgr, 'stateChanged', function (st) {
switch (st) {
case 'backoff':
case 'connecting':
break;
case 'failed':
S.gotoState('failed');
break;
case 'error':
if (self.csf_monitor && !self.csf_wanted) {
S.gotoState('stopped');
} else {
S.gotoState('retrying');
}
break;
case 'connected':
S.gotoState('idle');
break;
default:
throw (new Error('Unhandled smgr state transition: ' +
'.retry() => "' + st + '"'));
}
});
S.on(this.cf_conn, 'end', function () {
self.cf_closeAfter = true;
self.cf_pool._incrCounter('end-during-busy');
S.on(this, 'unwanted', function () {
if (self.csf_monitor && smgr.isInState('backoff')) {
S.gotoState('stopping');
}
});
S.on(this.cf_conn, 'close', function () {
self.cf_lastError = new mod_errors.ConnectionClosedError(self);
S.gotoState('error');
self.cf_pool._incrCounter('close-during-busy');
smgr.retry();
};
ConnectionSlotFSM.prototype.state_idle = function (S) {
var self = this;
var smgr = this.csf_smgr;
this.csf_handle = undefined;
if (smgr.isInState('connected')) {
var sock = smgr.getSocket();
/*
* If we can, unref the socket while idle, so that a bunch of
* spare connections doing nothing don't hold the process open.
*/
if (typeof (sock.unref) === 'function')
sock.unref();
}
/* Monitor successfully connected: make it into a normal slot now. */
if (this.csf_monitor === true) {
this.csf_monitor = false;
smgr.setMonitor(false);
}
if (!this.csf_wanted) {
onUnwanted();
return;
}
S.on(this, 'unwanted', onUnwanted);
function onUnwanted() {
if (smgr.isInState('connected')) {
S.gotoState('stopping');
}
}
S.on(smgr, 'stateChanged', function (st) {
switch (st) {
case 'error':
S.gotoState('retrying');
break;
case 'closed':
if (!self.csf_wanted) {
S.gotoState('stopped');
} else {
S.gotoState('connecting');
}
break;
default:
throw (new Error('Unhandled smgr state transition: ' +
'connected => "' + st + '"'));
}
});
S.on(this, 'closeAsserted', function () {
self.cf_lastError = new mod_errors.ConnectionClosedError(self);
S.gotoState('error');
S.on(this, 'claimAsserted', function () {
S.gotoState('busy');
});
if (this.cf_checkTimeout !== undefined &&
this.cf_checkTimeout !== null) {
var t = S.timeout(this.cf_checkTimeout, function () {
var info = {};
info.stack = self.cf_claimStack;
self.cf_log.warn(info, 'connection held for longer ' +
'than checkTimeout (%d ms), may have been leaked',
self.cf_checkTimeout);
if (this.csf_checkTimeout !== undefined &&
this.csf_checker !== undefined) {
S.timeout(this.csf_checkTimeout, function () {
doPingCheck(self, self.csf_checker);
});
t.unref();
}
};
function ConnectionHandle(cf) {
this.sh_cf = cf;
this.sh_claimed = true;
this.sh_error = false;
this.sh_releaseStack = [];
function doPingCheck(fsm, checker) {
var hdlOpts = {
pool: fsm.csf_pool,
claimStack: 'Error\n' +
'at claim\n' +
'at cueball.doPingCheck\n' +
'at cueball.doPingCheck\n',
callback: checker,
log: fsm.csf_log,
claimTimeout: Infinity
};
var handle = new CueBallClaimHandle(hdlOpts);
/*
* Don't bother handling a return to "waiting" state here: if we
* fail, it's fine, just let go of this handle entirely.
*/
handle.try(fsm);
}
ConnectionHandle.prototype.close = function () {
mod_assert.ok(this.sh_claimed, 'Connection not claimed by ' +
'this handle, released by ' + this.sh_releaseStack[2]);
if (this.sh_error) {
this.sh_claimed = false;
return (undefined);
ConnectionSlotFSM.prototype.state_busy = function (S) {
S.validTransitions(['idle', 'stopping', 'stopped', 'retrying',
'killing', 'connecting']);
var self = this;
var smgr = this.csf_smgr;
var hdl = this.csf_handle;
function onRelease() {
if (smgr.isInState('connected')) {
if (self.csf_wanted) {
S.gotoState('idle');
} else {
S.gotoState('stopping');
}
} else if (smgr.isInState('closed')) {
if (self.csf_wanted) {
S.gotoState('connecting');
} else {
S.gotoState('stopped');
}
} else if (smgr.isInState('error')) {
S.gotoState('retrying');
} else {
throw (new Error('Handle released while smgr was ' +
'in unhandled state "' + smgr.getState() + '"'));
}
}
return (this.sh_cf.close.apply(this.sh_cf, arguments));
};
ConnectionHandle.prototype.release = function () {
mod_assert.ok(this.sh_claimed, 'Connection not claimed by ' +
'this handle, released by ' + this.sh_releaseStack[2]);
if (this.sh_error) {
this.sh_claimed = false;
return (undefined);
function onClose() {
if (smgr.isInState('connected')) {
S.gotoState('killing');
} else {
S.gotoState('retrying');
}
}
return (this.sh_cf.release.apply(this.sh_cf, arguments));
};
ConnectionHandle.prototype.closeAfterRelease = function () {
mod_assert.ok(this.sh_claimed, 'Connection not claimed by ' +
'this handle, released by ' + this.sh_releaseStack[2]);
if (this.sh_error) {
this.sh_claimed = false;
return (undefined);
S.on(hdl, 'stateChanged', function (st) {
switch (st) {
case 'released':
S.immediate(onRelease);
break;
case 'closed':
S.immediate(onClose);
break;
default:
break;
}
});
/*
* It's possible that the smgr has already moved out of 'connected'
* by the time we get here.
*
* If we lose the race, treat it like our handle was released.
*/
if (smgr.isInState('connected')) {
var sock = smgr.getSocket();
if (typeof (sock.ref) === 'function')
sock.ref();
hdl.accept(sock);
} else {
hdl.reject();
this.csf_handle = undefined;
S.immediate(onRelease);
}
return (this.sh_cf.closeAfterRelease.apply(this.sh_cf,
arguments));
};
ConnectionSlotFSM.prototype.state_killing = function (S) {
S.validTransitions(['retrying']);
var smgr = this.csf_smgr;
S.on(smgr, 'stateChanged', function (st) {
if (st === 'closed') {
S.gotoState('retrying');
}
});
smgr.close();
};
ConnectionSlotFSM.prototype.state_stopping = function (S) {
S.validTransitions(['stopped']);
var smgr = this.csf_smgr;
S.on(smgr, 'stateChanged', function (st) {
if (st === 'closed') {
S.gotoState('stopped');
}
});
smgr.close();
};
ConnectionSlotFSM.prototype.state_stopped = function (S) {
S.validTransitions([]);
var smgr = this.csf_smgr;
mod_assert.ok(smgr.isInState('closed') || smgr.isInState('error') ||
smgr.isInState('failed'), 'smgr must be stopped');
};

@@ -122,3 +122,3 @@ /*

var ks = cset.cs_keys.slice();
Object.keys(cset.cs_fsms).forEach(function (k) {
Object.keys(cset.cs_fsm).forEach(function (k) {
if (ks.indexOf(k) === -1)

@@ -128,10 +128,8 @@ ks.push(k);

ks.forEach(function (k) {
var conns = cset.cs_fsms[k] || [];
var fsm = cset.cs_fsm[k];
obj.fsms[k] = {};
conns.forEach(function (fsm) {
var s = fsm.getState();
if (obj.fsms[k][s] === undefined)
obj.fsms[k][s] = 0;
++obj.fsms[k][s];
});
var s = fsm.getState();
if (obj.fsms[k][s] === undefined)
obj.fsms[k][s] = 0;
++obj.fsms[k][s];
});

@@ -138,0 +136,0 @@ obj.dead_backends = Object.keys(cset.cs_dead);

@@ -31,3 +31,5 @@ /*

const Queue = require('./queue');
const ConnectionFSM = require('./connection-fsm');
const mod_connfsm = require('./connection-fsm');
const ConnectionSlotFSM = mod_connfsm.ConnectionSlotFSM;
const CueBallClaimHandle = mod_connfsm.CueBallClaimHandle;

@@ -261,14 +263,27 @@ /*

var idx = this.p_keys.indexOf(k);
if (idx !== -1)
this.p_keys.splice(idx, 1);
mod_assert.notStrictEqual(idx, -1, 'resolver key ' + k + ' not found');
this.p_keys.splice(idx, 1);
delete (this.p_backends[k]);
(this.p_connections[k] || []).forEach(function (fsm) {
if (fsm.isInState('busy'))
fsm.closeAfterRelease();
else
fsm.close();
delete (this.p_dead[k]);
var self = this;
mod_vasync.forEachParallel({
func: closeBackend,
inputs: (this.p_connections[k] || [])
}, function () {
mod_assert.strictEqual(self.p_connections[k].length, 0);
delete (self.p_connections[k]);
self.rebalance();
});
delete (this.p_connections[k]);
delete (this.p_dead[k]);
this.rebalance();
function closeBackend(fsm, cb) {
fsm.setUnwanted();
if (fsm.isInState('stopped') || fsm.isInState('failed')) {
cb();
} else {
fsm.on('stateChanged', function (st) {
if (st === 'stopped' || st === 'failed')
cb();
});
}
}
};

@@ -352,2 +367,9 @@

this._incrCounter('failed-state');
/* Fail all outstanding claims that are waiting for a connection. */
while (!this.p_waiters.isEmpty()) {
var hdl = this.p_waiters.shift();
if (hdl.isInState('waiting'))
hdl.fail(new mod_errors.PoolFailedError(self));
}
};

@@ -413,10 +435,10 @@

function closeBackend(fsm, cb) {
if (fsm.isInState('busy')) {
fsm.closeAfterRelease();
fsm.setUnwanted();
if (fsm.isInState('stopped') || fsm.isInState('failed')) {
cb();
} else {
fsm.on('stateChanged', function (st) {
if (st === 'closed')
if (st === 'stopped' || st === 'failed')
cb();
});
} else {
fsm.close(cb);
}

@@ -533,27 +555,11 @@ }

plan.remove.forEach(function (fsm) {
/* This slot is no longer wanted. */
fsm.setUnwanted();
/*
* Only tell the FSM to quit *right now* if either:
* 1. it's idle
* 2. there are other FSMs for this backend
* 2. it is connected to a backend that has been
* removed from the resolver
* Otherwise get it to quit gracefully once it's done
* doing whatever it's doing (using closeAfterRelease).
* This way we when we have a failing backend that we
* want to mark as "dead" ASAP, we don't give up early
* and never figure out if it's actually dead or not.
* We may have changed to stopped or failed synchronously after
* setting unwanted. If we have, don't count this as a socket
* against our cap (it's been destroyed).
*/
var fsmIdx = self.p_connections[fsm.cf_backend.key].
indexOf(fsm);
if (fsm.isInState('idle')) {
fsm.close();
if (fsm.isInState('stopped') || fsm.isInState('failed')) {
--total;
} else if (fsm.isInState('busy')) {
fsm.closeAfterRelease();
} else if (fsmIdx > 0 ||
self.p_keys.indexOf(fsm.cf_backend.key) === -1) {
fsm.close();
--total;
} else {
fsm.closeAfterRelease();
}

@@ -579,3 +585,3 @@ });

var fsm = new ConnectionFSM({
var fsm = new ConnectionSlotFSM({
constructor: this.p_constructor,

@@ -587,3 +593,4 @@ backend: backend,

checkTimeout: this.p_checkTimeout,
recovery: this.p_recovery
recovery: this.p_recovery,
monitor: (this.p_dead[key] === true)
});

@@ -600,4 +607,4 @@ if (this.p_connections[key] === undefined)

/* These transitions mean we're still starting up. */
if (newState === 'init' || newState === 'delay' ||
newState === 'error' || newState === 'connect')
if (newState === 'init' || newState === 'connecting' ||
newState === 'retrying')
return;

@@ -631,3 +638,3 @@ /*

if (self.p_backends[key] === undefined) {
fsm.close();
fsm.setUnwanted();
return;

@@ -640,6 +647,8 @@ }

*/
if (self.p_waiters.length > 0) {
var cb = self.p_waiters.shift();
fsm.claim(cb.stack, cb);
return;
while (self.p_waiters.length > 0) {
var hdl = self.p_waiters.shift();
if (hdl.isInState('waiting')) {
hdl.try(fsm);
return;
}
}

@@ -661,3 +670,7 @@

if (newState === 'closed') {
if (newState === 'failed') {
self.p_dead[key] = true;
}
if (newState === 'stopped' || newState === 'failed') {
if (self.p_connections[key]) {

@@ -667,5 +680,2 @@ var idx = self.p_connections[key].indexOf(fsm);

}
if (fsm.retriesExhausted()) {
self.p_dead[key] = true;
}
self.emit('closedBackend', key, fsm);

@@ -691,5 +701,28 @@ self.rebalance();

CueBallConnectionPool.prototype.printConnections = function () {
var self = this;
var obj = { connections: {} };
var ks = self.p_keys.slice();
Object.keys(self.p_connections).forEach(function (k) {
if (ks.indexOf(k) === -1)
ks.push(k);
});
ks.forEach(function (k) {
var conns = self.p_connections[k] || [];
obj.connections[k] = {};
conns.forEach(function (fsm) {
var s = fsm.getState();
if (obj.connections[k][s] === undefined)
obj.connections[k][s] = 0;
++obj.connections[k][s];
});
});
console.log('live:', obj.connections);
console.log('dead:', self.p_dead);
};
CueBallConnectionPool.prototype.claim = function (options, cb) {
var self = this;
var done = false;
var handle;

@@ -733,84 +766,56 @@ if (typeof (options) === 'function' && cb === undefined) {

/* If there are idle connections sitting around, take one. */
while (this.p_idleq.length > 0) {
var fsm = this.p_idleq.shift();
delete (fsm.p_idleq_node);
/*
* Since 'stateChanged' is emitted async from mooremachine,
* things may be on the idle queue still but not actually idle.
* If we find one, just rip it off the queue (which we've
* already done) and try the next thing. The state mgmt
* callback from addConnection will cope.
*/
if (!fsm.isInState('idle'))
continue;
handle = new CueBallClaimHandle({
pool: this,
claimStack: e.stack,
callback: cb,
log: this.p_log,
claimTimeout: timeout
});
fsm.claim(e.stack, function (err, hdl, conn) {
if (err) {
if (!done)
cb(err);
done = true;
return;
}
if (done) {
hdl.release();
return;
}
done = true;
cb(err, hdl, conn);
});
return ({
cancel: function () { done = true; }
});
function waitingListener(st) {
if (st === 'waiting') {
tryNext();
}
}
handle.on('stateChanged', waitingListener);
if (errOnEmpty && this.p_resolver.count() < 1) {
setImmediate(function () {
if (!done)
cb(new mod_errors.NoBackendsError(self));
done = true;
});
return ({
cancel: function () { done = true; }
});
}
function tryNext() {
if (!handle.isInState('waiting'))
return;
/* Otherwise add an entry on the "waiter" queue. */
var timer;
var waiter = function () {
if (timer !== undefined)
clearTimeout(timer);
timer = undefined;
done = true;
cb.apply(this, arguments);
};
waiter.stack = e.stack;
var qnode = this.p_waiters.push(waiter);
/* If there are idle connections sitting around, take one. */
while (self.p_idleq.length > 0) {
var fsm = self.p_idleq.shift();
delete (fsm.p_idleq_node);
/*
* Since 'stateChanged' is emitted async from
* mooremachine, things may be on the idle queue still
* but not actually idle. If we find one, just rip it
* off the queue (which we've already done) and try the
* next thing. The state mgmt callback from
* addConnection will cope.
*/
if (!fsm.isInState('idle'))
continue;
this._hwmCounter('max-claim-queue', this.p_waiters.length);
this._incrCounter('queued-claim');
handle.try(fsm);
if (timeout !== Infinity) {
timer = setTimeout(function () {
if (timer === undefined)
return;
return;
}
qnode.remove();
done = true;
cb(new mod_errors.ClaimTimeoutError(self));
}, timeout);
if (errOnEmpty && self.p_resolver.count() < 1) {
var err = new mod_errors.NoBackendsError(self);
handle.fail(err);
}
/* Otherwise add an entry on the "waiter" queue. */
self.p_waiters.push(handle);
self._hwmCounter('max-claim-queue', self.p_waiters.length);
self._incrCounter('queued-claim');
self.rebalance();
}
this.rebalance();
var handle = {};
handle.cancel = function () {
mod_assert.ok(done === false, 'callback was already called ' +
'for this waiter handle');
if (timer !== undefined)
clearTimeout(timer);
timer = undefined;
qnode.remove();
done = true;
};
return (handle);
};

@@ -30,3 +30,5 @@ /*

const Queue = require('./queue');
const ConnectionFSM = require('./connection-fsm');
const mod_cfsm = require('./connection-fsm');
const ConnectionSlotFSM = mod_cfsm.ConnectionSlotFSM;
const CueBallClaimHandle = mod_cfsm.CueBallClaimHandle;

@@ -68,6 +70,8 @@ function CueBallConnectionSet(options) {

this.cs_backends = {};
/* Map of backend key => array of ConnectionFSM instances. */
this.cs_fsms = {};
/* Map of backend key => ConnectionSlotFSM instance. */
this.cs_fsm = {};
/* Map of backend key => bool, if true the backend is declared dead. */
this.cs_dead = {};
/* Map of backend key => claim handle. */
this.cs_handles = {};

@@ -85,3 +89,8 @@ /*

this.cs_connections = {};
/* Map of backend key => Array of connection keys. */
this.cs_connectionKeys = {};
/* Map of connection key => bool, if true 'removed' has been emitted. */
this.cs_emitted = {};
/* For debugging, track when we last rebalanced. */

@@ -144,19 +153,15 @@ this.cs_lastRebalance = undefined;

var cks = Object.keys(this.cs_connections).filter(function (ck) {
return (ck.indexOf(k + '.') === 0);
});
var fsm = self.cs_fsm[k];
if (fsm !== undefined)
fsm.setUnwanted();
var fsms = self.cs_fsms[k] || [];
fsms.forEach(function (fsm) {
if (cks.length > 0 || fsm.isInState('idle')) {
fsm.closeAfterRelease();
} else {
fsm.close();
var cks = this.cs_connectionKeys[k];
(cks || []).forEach(function (ck) {
var conn = self.cs_connections[ck];
var hdl = self.cs_handles[ck];
if (self.cs_emitted[ck] !== true) {
self.cs_emitted[ck] = true;
self.assertEmit('removed', ck, conn, hdl);
}
});
cks.forEach(function (ck) {
var conn = self.cs_connections[ck];
delete (self.cs_connections[ck]);
self.assertEmit('removed', ck, conn);
});
};

@@ -269,3 +274,3 @@

S.validTransitions(['stopped']);
var conns = this.cs_fsms;
var conns = this.cs_fsm;
var fsms = [];

@@ -275,5 +280,3 @@ var self = this;

Object.keys(conns).forEach(function (k) {
conns[k].forEach(function (fsm) {
fsms.push(fsm);
});
fsms.push(conns[k]);
});

@@ -292,21 +295,17 @@ mod_vasync.forEachParallel({

var k = fsm.cf_backend.key;
var cks = Object.keys(self.cs_connections).filter(
function (ck) {
return (ck.indexOf(k + '.') === 0);
});
var k = fsm.csf_backend.key;
var cks = self.cs_connectionKeys[k];
fsm.on('stateChanged', function (s) {
if (s === 'closed')
if (s === 'stopped' || s === 'failed')
cb();
});
if (cks.length === 0 && !fsm.isInState('idle')) {
fsm.close();
} else {
fsm.closeAfterRelease();
cks.forEach(function (ck) {
var conn = self.cs_connections[ck];
delete (self.cs_connections[ck]);
self.assertEmit('removed', ck, conn);
});
}
fsm.setUnwanted();
cks.forEach(function (ck) {
var conn = self.cs_connections[ck];
var hdl = self.cs_handles[ck];
if (self.cs_emitted[ck] !== true) {
self.cs_emitted[ck] = true;
self.assertEmit('removed', ck, conn, hdl);
}
});
}

@@ -319,3 +318,3 @@ };

this.cs_keys = [];
this.cs_fsms = {};
this.cs_fsm = {};
this.cs_connections = {};

@@ -386,3 +385,5 @@ this.cs_backends = {};

this.cs_keys.forEach(function (k) {
conns[k] = (self.cs_fsms[k] || []).slice();
conns[k] = [];
if (self.cs_fsm[k] !== undefined)
conns[k].push(self.cs_fsm[k]);
total += conns[k].length;

@@ -409,3 +410,3 @@ });

var k = fsm.cf_backend.key;
var k = fsm.csf_backend.key;
/*

@@ -420,28 +421,14 @@ * Find any advertised connections from this FSM, and (after

});
/*
* We can close the FSM immediately if we aren't advertising
* any connections for it, and we aren't waiting on our consumer
* to close any -- i.e., the FSM is in an error state, probably
* delay or connect.
*
* Still want to avoid doing .close() on the *last* one for a
* given backend, so it has a chance to run out of retries if
* the backend is in fact dead. So we do .closeAfterRelease()
* instead for those.
*/
if (cks.length === 0 && !fsm.isInState('idle')) {
var fsmIdx = self.cs_fsms[k].indexOf(fsm);
if (fsmIdx > 0 || self.cs_keys.indexOf(k) === -1) {
fsm.close();
--total;
} else {
fsm.closeAfterRelease();
}
} else {
fsm.closeAfterRelease();
fsm.setUnwanted();
if (fsm.isInState('stopped') || fsm.isInState('failed')) {
delete (self.cs_fsm[k]);
--total;
}
cks.forEach(function (ck) {
var conn = self.cs_connections[ck];
delete (self.cs_connections[ck]);
self.assertEmit('removed', ck, conn);
var hdl = self.cs_handles[ck];
if (self.cs_emitted[ck] !== true) {
self.cs_emitted[ck] = true;
self.assertEmit('removed', ck, conn, hdl);
}
});

@@ -453,2 +440,6 @@ });

return;
/* Never make more than one slot for the same backend. */
if (self.cs_fsm[k] !== undefined)
return;
self.addConnection(k);

@@ -471,2 +462,25 @@ });

function forceClaim(handle, fsm) {
handle.on('stateChanged', hdlStateListener);
function hdlStateListener(st) {
if (st === 'waiting' && handle.isInState('waiting')) {
if (fsm.isInState('idle')) {
handle.try(fsm);
} else {
fsm.on('stateChanged', fsmStateListener);
}
}
}
function fsmStateListener(st) {
if (st === 'idle' && fsm.isInState('idle')) {
fsm.removeListener('stateChanged', fsmStateListener);
if (handle.isInState('waiting')) {
handle.try(fsm);
}
}
}
}
CueBallConnectionSet.prototype.addConnection = function (key) {

@@ -479,3 +493,3 @@ if (this.isInState('stopping') || this.isInState('stopped'))

var fsm = new ConnectionFSM({
var fsm = new ConnectionSlotFSM({
constructor: this.cs_constructor,

@@ -486,14 +500,47 @@ backend: backend,

recovery: this.cs_recovery,
doRef: false
monitor: (this.cs_dead[key] === true)
});
if (this.cs_fsms[key] === undefined)
this.cs_fsms[key] = [];
if (this.cs_serials[key] === undefined)
this.cs_serials[key] = 1;
this.cs_fsms[key].push(fsm);
if (this.cs_connectionKeys[key] === undefined)
this.cs_connectionKeys[key] = [];
mod_assert.strictEqual(this.cs_fsm[key], undefined);
this.cs_fsm[key] = fsm;
var serial;
var ckey;
var smgr = fsm.getSocketMgr();
var self = this;
fsm.on('stateChanged', function (newState) {
if (newState === 'busy' && fsm.isInState('busy')) {
mod_assert.notStrictEqual(ckey, undefined);
mod_assert.notStrictEqual(self.cs_connections[ckey],
undefined);
return;
}
/*
* If we already have a ckey set for this FSM, and it exists in
* cs_connections, then we previously got a connection and
* claimed it. Any state transition (other than to "busy",
* which was excluded above) now means this connection's
* claim handle has been released/closed and we must clean up
* the associated entries.
*/
if (ckey !== undefined &&
self.cs_connections[ckey] !== undefined) {
mod_assert.ok(self.cs_emitted[ckey]);
delete (self.cs_connections[ckey]);
delete (self.cs_emitted[ckey]);
delete (self.cs_handles[ckey]);
var cks = self.cs_connectionKeys[key];
var ckIdx = cks.indexOf(ckey);
mod_assert.notStrictEqual(ckIdx, -1);
cks.splice(ckIdx, 1);
ckey = undefined;
}
if (newState === 'idle' && fsm.isInState('idle')) {

@@ -505,3 +552,3 @@ /*

if (self.cs_backends[key] === undefined) {
fsm.close();
fsm.setUnwanted();
return;

@@ -513,34 +560,43 @@ }

}
if (ckey !== undefined && self.cs_connections[ckey]) {
var conn = self.cs_connections[ckey];
delete (self.cs_connections[ckey]);
self.assertEmit('removed', ckey, conn);
/* We got a connection, so we're not dead. */
if (self.cs_dead[key] !== undefined) {
delete (self.cs_dead[key]);
}
conn = fsm.getConnection();
serial = self.cs_serials[key]++;
ckey = key + '.' + serial;
conn.cs_serial = serial;
fsm.cs_serial = serial;
self.cs_connections[ckey] = conn;
self.assertEmit('added', ckey, conn);
var hdlOpts = {
pool: self,
claimStack: 'Error\n' +
' at claim\n' +
' at CueBallConnectionSet.addConnection\n' +
' at CueBallConnectionSet.addConnection',
callback: afterClaim,
log: self.cs_log,
claimTimeout: Infinity
};
var handle = new CueBallClaimHandle(hdlOpts);
self.rebalance();
return;
}
self.cs_handles[ckey] = handle;
forceClaim(handle, fsm);
if (newState !== 'idle') {
if (self.cs_connections[ckey]) {
conn = self.cs_connections[ckey];
delete (self.cs_connections[ckey]);
self.assertEmit('removed', ckey, conn);
function afterClaim(err, hdl, conn) {
mod_assert.ok(!err);
conn.cs_serial = serial;
conn.cs_backendKey = key;
self.cs_connections[ckey] = conn;
self.cs_connectionKeys[key].push(ckey);
self.assertEmit('added', ckey, conn, hdl);
self.rebalance();
}
return;
}
if (newState === 'closed') {
if (self.cs_fsms[key]) {
var idx = self.cs_fsms[key].indexOf(fsm);
self.cs_fsms[key].splice(idx, 1);
}
if (newState === 'failed') {
/*

@@ -550,6 +606,9 @@ * Set the dead flag, but not on a backend that's no

*/
if (fsm.retriesExhausted() &&
self.cs_backends[key] !== undefined) {
if (self.cs_backends[key] !== undefined) {
self.cs_dead[key] = true;
}
}
if (newState === 'stopped' || newState === 'failed') {
delete (self.cs_fsm[key]);
self.emit('closedBackend', fsm);

@@ -561,2 +620,20 @@ self.rebalance();

smgr.on('stateChanged', function (newState) {
if (!fsm.isInState('busy'))
return;
if (newState === 'connected')
return;
/*
* A transition out of 'connected' while the slot is still
* 'busy' indicates that we lost the connection. We should
* emit 'removed' for our clients.
*/
mod_assert.string(ckey);
if (self.cs_emitted[ckey] !== true) {
self.cs_emitted[ckey] = true;
self.assertEmit('removed', ckey,
self.cs_connections[ckey], self.cs_handles[ckey]);
}
});
fsm.start();

@@ -567,4 +644,9 @@ };

var self = this;
return (Object.keys(this.cs_connections).map(function (k) {
return (self.cs_connections[k]);
var conns = [];
return (Object.keys(this.cs_connections).forEach(function (k) {
var c = self.cs_connections[k];
var fsm = self.cs_fsm[c.cs_backendKey];
var h = self.cs_handles[k];
if (fsm.isInState('busy') && h.isInState('claimed'))
conns.push(c);
}));

@@ -571,0 +653,0 @@ };

{
"name": "cueball",
"version": "1.3.2",
"version": "2.0.0",
"description": "",

@@ -5,0 +5,0 @@ "main": "lib/index.js",

@@ -73,7 +73,7 @@ cueball

- `recovery` -- Object, a recovery spec (see below)
- `spares` -- Number, number of spares wanted in the pool per host
- `maximum` -- Number, maximum number of connections per host
- `resolvers` -- optional Array of String, either containing IP addresses to
use as nameservers, or a single string for Dynamic Resolver mode
- `log` -- optional Object, a `bunyan`-style logger to use
- `spares` -- optional Number, number of spares wanted in the pool per host
- `maximum` -- optional Number, maximum number of connections per host
- `initialDomains` -- optional Array of String, initial domains to create

@@ -119,2 +119,4 @@ connections to at startup (to pre-seed the Agent for quick user later)

- `recovery` -- Object, a recovery spec (see below)
- `spares` -- Number, number of spares wanted in the pool per host
- `maximum` -- Number, maximum number of connections per host
- `service` -- optional String, name of SRV service (e.g. `_http._tcp`)

@@ -126,4 +128,2 @@ - `defaultPort` -- optional Number, port to use for plain A/AAAA records

- `log` -- optional Object, a `bunyan`-style logger to use
- `spares` -- optional Number, number of spares wanted in the pool per host
- `maximum` -- optional Number, maximum number of connections per host
- `maxDNSConcurrency` -- optional Number, max number of DNS queries to issue

@@ -676,8 +676,8 @@ at once (default 5)

- `recovery` -- Object, a recovery spec (see below)
- `log` -- optional Object, a `bunyan`-style logger to use
- `target` -- optional Number, target number of connections to be made
- `target` -- Number, target number of connections to be made
available in the entire set
- `maximum` -- optional Number, maximum number of sockets opened by the set.
- `maximum` -- Number, maximum number of sockets opened by the set.
Note that this number may temporarily be exceeded by 1 socket
to allow the set to re-balance.
- `log` -- optional Object, a `bunyan`-style logger to use

@@ -689,5 +689,17 @@ ### Event `'added'`

The `handle` that is given as the third argument to this event has two methods
`.release()` and `.close()`, like a Pool handle. As with Pool handles, it can
be used to indicate the failure of a connection (e.g. due to a protocol error
making safe use of the connection impossible) at any time, but unlike a Pool
handle, it is an error to call `.release()` until after a `'removed'` event
has been emitted.
The user of the ConnectionSet should store both the `connection` and `handle`
in such a way as to be able to retrieve them using the `key`.
Parameters
- `key` -- String, a unique key to identify this connection
- `connection` -- Object, the connection as returned by the constructor
- `handle` -- Object, a handle to be used in response to a 'removed' event
about this connection

@@ -698,5 +710,4 @@ ### Event `removed`

*must* have a handler on it at all times. The handler is obligated to take all
necessary actions to drain the connection of outstanding requests and then close
it. The emission of this event must cause the connection object to emit
`'close'` as soon as possible.
necessary actions to drain the connection of outstanding requests and then
call the `.release()` method on the relevant handle.

@@ -703,0 +714,0 @@ Parameters

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc