dns-endpoint-pool
Advanced tools
Comparing version 0.1.0 to 1.0.0
138
index.js
var EndpointPool, | ||
_ = require('underscore'), | ||
dns = require('dns'), | ||
Events = require('events'), | ||
util = require('util'), | ||
_ = require('underscore'), | ||
dns = require('dns'), | ||
Events = require('events'), | ||
PoolManager = require('./pool-manager'), | ||
util = require('util'), | ||
DNS_LOOKUP_TIMEOUT = 1000, | ||
DNS_LOOKUP_TIMEOUT = 1000; | ||
CLOSED = 0, // closed circuit: endpoint is good to use | ||
HALF_OPEN_READY = 1, // endpoint is in recovery state: offer it for use once | ||
HALF_OPEN_PENDING = 2, // endpoint recovery is in process | ||
OPEN = 3; // open circuit: endpoint is no good | ||
/** | ||
* @param {String} discoveryName The name of the service discovery host | ||
* @param {Number} ttl How long the endpoints are valid for. The service discovery endpoint will be checked on | ||
* this interval. | ||
* @param {Number} maxFailures Number of failures allowed before the endpoint circuit breaker is tripped. | ||
* @param {Number} failureWindow Size of the sliding window of time in which the failures are counted. | ||
* @param {Number} resetTimeout Amount of time before putting the circuit back into half open state. | ||
* @param {Function} onReady Callback to execute when endpoints have been primed (updated for the first time) | ||
* @param {String} discoveryName The name of the service discovery host | ||
* @param {Number} ttl How long the endpoints are valid for. The service discovery endpoint will be checked on | ||
* this interval. | ||
* @param {{maxFailures: Number, failureWindow: Number, resetTimeout: Number}} | ||
* ejectOnErrorConfig How to handle endpoint errors. If specified, the following options must be defined: | ||
* - maxFailures: Number of failures allowed before the endpoint circuit breaker is tripped. | ||
* - failureWindow: Size of the sliding window of time in which the failures are counted. | ||
* - resetTimeout: Amount of time before putting the circuit back into half open state. | ||
* @param {Function=} onReady Callback to execute when endpoints have been primed (updated for the first time) | ||
*/ | ||
module.exports = EndpointPool = function (discoveryName, ttl, maxFailures, failureWindow, resetTimeout, onReady) { | ||
if (!discoveryName || !ttl || !maxFailures || !resetTimeout) { | ||
module.exports = EndpointPool = function (discoveryName, ttl, ejectOnErrorConfig, onReady) { | ||
if (!discoveryName || !ttl) { | ||
throw new Error('Must supply all arguments'); | ||
} | ||
if (ejectOnErrorConfig) { | ||
this.poolManager = PoolManager.ejectOnErrorPoolManager(ejectOnErrorConfig); | ||
} else { | ||
this.poolManager = PoolManager.defaultPoolManager(); | ||
} | ||
Events.EventEmitter.call(this); | ||
@@ -31,8 +36,4 @@ | ||
this.ttl = ttl; | ||
this.endpoints = []; | ||
this._endpointOffset = 0; | ||
this._updateTimeout = null; | ||
this.maxFailures = maxFailures; | ||
this.failureWindow = failureWindow; | ||
this.resetTimeout = resetTimeout; | ||
this.lastUpdate = Date.now(); | ||
@@ -68,39 +69,14 @@ this.update(onReady); | ||
getEndpoint: function () { | ||
var endpoint, i, l, offset; | ||
for (i = 0, l = this.endpoints.length; i < l; ++i) { | ||
offset = (this._endpointOffset + i) % l; | ||
endpoint = this.endpoints[offset]; | ||
var endpoint = this.poolManager.getNextEndpoint(); | ||
switch (endpoint.state) { | ||
case HALF_OPEN_READY: | ||
endpoint.state = HALF_OPEN_PENDING; // let one through, then turn it off again | ||
/* falls through */ | ||
case CLOSED: | ||
this._endpointOffset = offset + 1; | ||
return endpoint; | ||
// case OPEN: case HALF_OPEN_PENDING: // continue to the next one | ||
} | ||
if (endpoint) { | ||
return endpoint; | ||
} else { | ||
this.emit('noEndpoints'); | ||
return null; | ||
} | ||
this.emit('noEndpoints'); | ||
return null; | ||
}, | ||
setEndpoints: function (endpoints) { | ||
var newEndpoints, i, matchingEndpoint; | ||
newEndpoints = endpoints.map(function (info) { | ||
return new Endpoint(info, this.maxFailures, this.failureWindow, this.resetTimeout); | ||
}, this); | ||
for (i = this.endpoints.length; i--;) { | ||
matchingEndpoint = _.findWhere(newEndpoints, { url: this.endpoints[i].url }); | ||
if (matchingEndpoint) { // found a match, remove it from `newEndpoints`, since it's not new | ||
newEndpoints = _.without(newEndpoints, matchingEndpoint); | ||
} else { // didn't find a match in endpoints, so kill that endpoint | ||
this.endpoints.splice(i, 1); | ||
} | ||
} | ||
// push all the actually-new endpoints in | ||
this.endpoints.push.apply(this.endpoints, newEndpoints); | ||
this.poolManager.updateEndpoints(endpoints); | ||
}, | ||
@@ -112,51 +88,1 @@ | ||
}); | ||
function Endpoint(info, maxFailures, failureWindow, resetTimeout) { | ||
this.name = info.name; | ||
this.port = info.port; | ||
this.url = info.name + ':' + info.port; | ||
this.state = CLOSED; | ||
this.failCount = 0; | ||
this.callback = endpointCallback.bind(this); | ||
this.disable = disableEndpoint.bind(this); | ||
this.resetTimeout = resetTimeout; | ||
this.failureWindow = failureWindow; | ||
// A ring buffer, holding the timestamp of each error. As we loop around the ring, the timestamp in the slot we're | ||
// about to fill will tell us the error rate. That is, `maxFailure` number of requests in how many milliseconds? | ||
this.buffer = new Array(maxFailures - 1); | ||
this.bufferPointer = 0; | ||
} | ||
function endpointCallback(err) { | ||
if (err) { | ||
var oldestErrorTime, now; | ||
if (this.state === OPEN) { | ||
return; | ||
} | ||
if (this.buffer.length === 0) { | ||
this.disable(); | ||
return; | ||
} | ||
oldestErrorTime = this.buffer[this.bufferPointer]; | ||
now = Date.now(); | ||
this.buffer[this.bufferPointer] = now; | ||
this.bufferPointer++; | ||
this.bufferPointer %= this.buffer.length; | ||
if (this.state === HALF_OPEN_PENDING || (oldestErrorTime != null && now - oldestErrorTime <= this.failureWindow)) { | ||
this.disable(); | ||
} | ||
} else if (this.state === HALF_OPEN_PENDING) { | ||
this.state = CLOSED; | ||
} | ||
} | ||
function disableEndpoint() { | ||
this.state = OPEN; | ||
clearInterval(this._reopenTimeout); | ||
this._reopenTimeout = setTimeout(function () { | ||
this.state = HALF_OPEN_READY; | ||
}.bind(this), this.resetTimeout); | ||
} |
{ | ||
"name": "dns-endpoint-pool", | ||
"version": "0.1.0", | ||
"version": "1.0.0", | ||
"description": "Manage and load-balance a pool of service endpoints retrieved from a DNS lookup for a service discovery name.", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -43,3 +43,3 @@ ## DNS Endpoint Pool | ||
### `new DNSEndpointPool(serviceDiscoveryName, ttl, maxFailures, failureWindow, resetTimeout)` | ||
### `new DNSEndpointPool(serviceDiscoveryName, ttl, maxFailures, failureWindow, resetTimeout, onReady)` | ||
@@ -46,0 +46,0 @@ Creates a new pool object. |
171
test.js
@@ -28,5 +28,3 @@ /*globals it, describe, beforeEach, afterEach */ | ||
function () { return new DEP(); }, | ||
function () { return new DEP('foo.localhost'); }, | ||
function () { return new DEP('foo.localhost', 3); }, | ||
function () { return new DEP('foo.localhost', 3, 5); } | ||
function () { return new DEP('foo.localhost'); } | ||
].forEach(function (fn) { | ||
@@ -39,3 +37,3 @@ expect(fn).to.throwError('Must supply all arguments'); | ||
var stub = autoRestore(Sinon.stub(DEP.prototype, 'update')), | ||
dep = new DEP('foo.localhost', 5000, 2, 10000, 10000); | ||
dep = new DEP('foo.localhost', 5000); | ||
Sinon.assert.calledOnce(stub); | ||
@@ -50,3 +48,3 @@ }); | ||
resolve.callsArgWith(0, null, []); | ||
dep = new DEP('foo.localhost', 5000, 2, 10000, 10000, function () { | ||
dep = new DEP('foo.localhost', 5000, null, function () { | ||
called = true; | ||
@@ -65,3 +63,3 @@ }); | ||
resolve.callsArgWith(0, null, []); | ||
dep = new DEP('foo.localhost', 5000, 2, 10000, 10000); | ||
dep = new DEP('foo.localhost', 5000); | ||
@@ -87,3 +85,3 @@ Sinon.assert.calledOnce(resolve); | ||
]); | ||
dep = new DEP('foo.localhost', 5000, 2, 10000, 10000); | ||
dep = new DEP('foo.localhost', 5000); | ||
@@ -107,3 +105,3 @@ expect(dep.getEndpoint().url).to.be('bar.localhost:8000'); | ||
dep = new DEP('foo.localhost', 5000, 2, 10000, 10000); | ||
dep = new DEP('foo.localhost', 5000); | ||
@@ -137,3 +135,3 @@ expect(dep.getEndpoint().url).to.be('bar.localhost:8000'); | ||
]); | ||
dep = new DEP('foo.localhost', 5000, 2, 10000, 10000); | ||
dep = new DEP('foo.localhost', 5000); | ||
@@ -154,91 +152,110 @@ dep.getEndpoint(); // bar | ||
it('will remove endpoints from the pool if they fail', function () { | ||
var resolve = autoRestore(Sinon.stub(DEP.prototype, 'resolve')), | ||
barEndpoint, | ||
bazEndpoint, | ||
dep; | ||
describe('with eject-on-error pool management', function () { | ||
resolve.callsArgWith(0, null, [ | ||
{ name: 'bar.localhost', port: 8000 }, | ||
{ name: 'baz.localhost', port: 8001 } | ||
]); | ||
var ejectOnErrorConfig = { | ||
maxFailures: 2, | ||
failureWindow: 10000, | ||
resetTimeout: 10000 | ||
}; | ||
dep = new DEP('foo.localhost', 5000, 2, 10000, 10000); | ||
it('enforces that config object has proper shape', function () { | ||
autoRestore(Sinon.stub(DEP.prototype, 'update')); | ||
[ | ||
function () { return new DEP('foo.localhost', 5000, { maxFailures: 2 }); }, | ||
function () { return new DEP('foo.localhost', 5000, { maxFailures: 2, failureWindow: 10000 }); } | ||
].forEach(function (fn) { | ||
expect(fn).to.throwError('Must supply all arguments to ejectOnErrorPoolManager'); | ||
}); | ||
}); | ||
barEndpoint = dep.getEndpoint(); | ||
barEndpoint.callback(true); | ||
it('will remove endpoints from the pool if they fail', function () { | ||
var resolve = autoRestore(Sinon.stub(DEP.prototype, 'resolve')), | ||
barEndpoint, | ||
bazEndpoint, | ||
dep; | ||
bazEndpoint = dep.getEndpoint(); | ||
resolve.callsArgWith(0, null, [ | ||
{ name: 'bar.localhost', port: 8000 }, | ||
{ name: 'baz.localhost', port: 8001 } | ||
]); | ||
expect(dep.getEndpoint()).to.be(barEndpoint); // still in the pool | ||
barEndpoint.callback(true); | ||
dep = new DEP('foo.localhost', 5000, ejectOnErrorConfig); | ||
expect(dep.getEndpoint()).to.be(bazEndpoint); | ||
expect(dep.getEndpoint()).to.be(bazEndpoint); // bar is removed | ||
dep.stopUpdating(); | ||
}); | ||
barEndpoint = dep.getEndpoint(); | ||
barEndpoint.callback(true); | ||
it('will reinstate endpoints for a single request after a timeout', function () { | ||
var resolve = autoRestore(Sinon.stub(DEP.prototype, 'resolve')), | ||
barEndpoint, | ||
bazEndpoint, | ||
dep; | ||
bazEndpoint = dep.getEndpoint(); | ||
resolve.callsArgWith(0, null, [ | ||
{ name: 'bar.localhost', port: 8000 }, | ||
{ name: 'baz.localhost', port: 8001 } | ||
]); | ||
expect(dep.getEndpoint()).to.be(barEndpoint); // still in the pool | ||
barEndpoint.callback(true); | ||
dep = new DEP('foo.localhost', 5000, 2, 10000, 10000); | ||
expect(dep.getEndpoint()).to.be(bazEndpoint); | ||
expect(dep.getEndpoint()).to.be(bazEndpoint); // bar is removed | ||
dep.stopUpdating(); | ||
}); | ||
barEndpoint = dep.getEndpoint(); | ||
barEndpoint.callback(true); | ||
barEndpoint.callback(true); // removed from pool. | ||
it('will reinstate endpoints for a single request after a timeout', function () { | ||
var resolve = autoRestore(Sinon.stub(DEP.prototype, 'resolve')), | ||
barEndpoint, | ||
bazEndpoint, | ||
dep; | ||
bazEndpoint = dep.getEndpoint(); | ||
resolve.callsArgWith(0, null, [ | ||
{ name: 'bar.localhost', port: 8000 }, | ||
{ name: 'baz.localhost', port: 8001 } | ||
]); | ||
clock.tick(10000); | ||
dep = new DEP('foo.localhost', 5000, ejectOnErrorConfig); | ||
expect(dep.getEndpoint()).to.be(barEndpoint); | ||
expect(dep.getEndpoint()).to.be(bazEndpoint); | ||
expect(dep.getEndpoint()).to.be(bazEndpoint); // only return barEndpoint once | ||
barEndpoint = dep.getEndpoint(); | ||
barEndpoint.callback(true); | ||
barEndpoint.callback(true); // removed from pool. | ||
barEndpoint.callback(null); // denotes success | ||
expect(dep.getEndpoint()).to.be(barEndpoint); // it's back in the game | ||
dep.stopUpdating(); | ||
}); | ||
bazEndpoint = dep.getEndpoint(); | ||
it('reports the age of the endpoints when updates fail', function () { | ||
var resolve = autoRestore(Sinon.stub(DEP.prototype, 'resolve')), | ||
errorHandler = Sinon.spy(), | ||
errObj = { error: true }, | ||
dep; | ||
clock.tick(10000); | ||
// works on the first and fourth calls, fails every other time | ||
resolve | ||
.callsArgWith(0, errObj) | ||
.onFirstCall().callsArgWith(0, null, [ | ||
{ name: 'bar.localhost', port: 8000 }, | ||
{ name: 'baz.localhost', port: 8001 } | ||
]) | ||
.onCall(3).callsArgWith(0, null, [ | ||
{ name: 'bar.localhost', port: 8000 }, | ||
{ name: 'baz.localhost', port: 8001 } | ||
]); | ||
expect(dep.getEndpoint()).to.be(barEndpoint); | ||
expect(dep.getEndpoint()).to.be(bazEndpoint); | ||
expect(dep.getEndpoint()).to.be(bazEndpoint); // only return barEndpoint once | ||
barEndpoint.callback(null); // denotes success | ||
expect(dep.getEndpoint()).to.be(barEndpoint); // it's back in the game | ||
dep.stopUpdating(); | ||
}); | ||
dep = new DEP('foo.localhost', 5000, 2, 10000, 10000); // call 1 | ||
dep.on('updateError', errorHandler); | ||
clock.tick(5000); // call 2 | ||
clock.tick(5000); // call 3 | ||
clock.tick(5000); // call 4, should reset the timer | ||
clock.tick(5000); // call 5 | ||
it('reports the age of the endpoints when updates fail', function () { | ||
var resolve = autoRestore(Sinon.stub(DEP.prototype, 'resolve')), | ||
errorHandler = Sinon.spy(), | ||
errObj = { error: true }, | ||
dep; | ||
Sinon.assert.calledThrice(errorHandler); | ||
// works on the first and fourth calls, fails every other time | ||
resolve | ||
.callsArgWith(0, errObj) | ||
.onFirstCall().callsArgWith(0, null, [ | ||
{ name: 'bar.localhost', port: 8000 }, | ||
{ name: 'baz.localhost', port: 8001 } | ||
]) | ||
.onCall(3).callsArgWith(0, null, [ | ||
{ name: 'bar.localhost', port: 8000 }, | ||
{ name: 'baz.localhost', port: 8001 } | ||
]); | ||
expect(errorHandler.firstCall.calledWithExactly(errObj, 5000)).to.be(true); | ||
expect(errorHandler.secondCall.calledWithExactly(errObj, 10000)).to.be(true); | ||
expect(errorHandler.thirdCall.calledWithExactly(errObj, 5000)).to.be(true); | ||
dep.stopUpdating(); | ||
dep = new DEP('foo.localhost', 5000, ejectOnErrorConfig); // call 1 | ||
dep.on('updateError', errorHandler); | ||
clock.tick(5000); // call 2 | ||
clock.tick(5000); // call 3 | ||
clock.tick(5000); // call 4, should reset the timer | ||
clock.tick(5000); // call 5 | ||
Sinon.assert.calledThrice(errorHandler); | ||
expect(errorHandler.firstCall.calledWithExactly(errObj, 5000)).to.be(true); | ||
expect(errorHandler.secondCall.calledWithExactly(errObj, 10000)).to.be(true); | ||
expect(errorHandler.thirdCall.calledWithExactly(errObj, 5000)).to.be(true); | ||
dep.stopUpdating(); | ||
}); | ||
}); | ||
}); |
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
22545
7
381
1