New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

dns-endpoint-pool

Package Overview
Dependencies
Maintainers
1
Versions
7
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

dns-endpoint-pool - npm Package Compare versions

Comparing version 0.1.0 to 1.0.0

pool-manager.js

138

index.js
var EndpointPool,
_ = require('underscore'),
dns = require('dns'),
Events = require('events'),
util = require('util'),
_ = require('underscore'),
dns = require('dns'),
Events = require('events'),
PoolManager = require('./pool-manager'),
util = require('util'),
DNS_LOOKUP_TIMEOUT = 1000,
DNS_LOOKUP_TIMEOUT = 1000;
CLOSED = 0, // closed circuit: endpoint is good to use
HALF_OPEN_READY = 1, // endpoint is in recovery state: offer it for use once
HALF_OPEN_PENDING = 2, // endpoint recovery is in process
OPEN = 3; // open circuit: endpoint is no good
/**
* @param {String} discoveryName The name of the service discovery host
* @param {Number} ttl How long the endpoints are valid for. The service discovery endpoint will be checked on
* this interval.
* @param {Number} maxFailures Number of failures allowed before the endpoint circuit breaker is tripped.
* @param {Number} failureWindow Size of the sliding window of time in which the failures are counted.
* @param {Number} resetTimeout Amount of time before putting the circuit back into half open state.
* @param {Function} onReady Callback to execute when endpoints have been primed (updated for the first time)
* @param {String} discoveryName The name of the service discovery host
* @param {Number} ttl How long the endpoints are valid for. The service discovery endpoint will be checked on
* this interval.
* @param {{maxFailures: Number, failureWindow: Number, resetTimeout: Number}}
* ejectOnErrorConfig How to handle endpoint errors. If specified, the following options must be defined:
* - maxFailures: Number of failures allowed before the endpoint circuit breaker is tripped.
* - failureWindow: Size of the sliding window of time in which the failures are counted.
* - resetTimeout: Amount of time before putting the circuit back into half open state.
* @param {Function=} onReady Callback to execute when endpoints have been primed (updated for the first time)
*/
module.exports = EndpointPool = function (discoveryName, ttl, maxFailures, failureWindow, resetTimeout, onReady) {
if (!discoveryName || !ttl || !maxFailures || !resetTimeout) {
module.exports = EndpointPool = function (discoveryName, ttl, ejectOnErrorConfig, onReady) {
if (!discoveryName || !ttl) {
throw new Error('Must supply all arguments');
}
if (ejectOnErrorConfig) {
this.poolManager = PoolManager.ejectOnErrorPoolManager(ejectOnErrorConfig);
} else {
this.poolManager = PoolManager.defaultPoolManager();
}
Events.EventEmitter.call(this);

@@ -31,8 +36,4 @@

this.ttl = ttl;
this.endpoints = [];
this._endpointOffset = 0;
this._updateTimeout = null;
this.maxFailures = maxFailures;
this.failureWindow = failureWindow;
this.resetTimeout = resetTimeout;
this.lastUpdate = Date.now();

@@ -68,39 +69,14 @@ this.update(onReady);

getEndpoint: function () {
var endpoint, i, l, offset;
for (i = 0, l = this.endpoints.length; i < l; ++i) {
offset = (this._endpointOffset + i) % l;
endpoint = this.endpoints[offset];
var endpoint = this.poolManager.getNextEndpoint();
switch (endpoint.state) {
case HALF_OPEN_READY:
endpoint.state = HALF_OPEN_PENDING; // let one through, then turn it off again
/* falls through */
case CLOSED:
this._endpointOffset = offset + 1;
return endpoint;
// case OPEN: case HALF_OPEN_PENDING: // continue to the next one
}
if (endpoint) {
return endpoint;
} else {
this.emit('noEndpoints');
return null;
}
this.emit('noEndpoints');
return null;
},
setEndpoints: function (endpoints) {
var newEndpoints, i, matchingEndpoint;
newEndpoints = endpoints.map(function (info) {
return new Endpoint(info, this.maxFailures, this.failureWindow, this.resetTimeout);
}, this);
for (i = this.endpoints.length; i--;) {
matchingEndpoint = _.findWhere(newEndpoints, { url: this.endpoints[i].url });
if (matchingEndpoint) { // found a match, remove it from `newEndpoints`, since it's not new
newEndpoints = _.without(newEndpoints, matchingEndpoint);
} else { // didn't find a match in endpoints, so kill that endpoint
this.endpoints.splice(i, 1);
}
}
// push all the actually-new endpoints in
this.endpoints.push.apply(this.endpoints, newEndpoints);
this.poolManager.updateEndpoints(endpoints);
},

@@ -112,51 +88,1 @@

});
function Endpoint(info, maxFailures, failureWindow, resetTimeout) {
this.name = info.name;
this.port = info.port;
this.url = info.name + ':' + info.port;
this.state = CLOSED;
this.failCount = 0;
this.callback = endpointCallback.bind(this);
this.disable = disableEndpoint.bind(this);
this.resetTimeout = resetTimeout;
this.failureWindow = failureWindow;
// A ring buffer, holding the timestamp of each error. As we loop around the ring, the timestamp in the slot we're
// about to fill will tell us the error rate. That is, `maxFailure` number of requests in how many milliseconds?
this.buffer = new Array(maxFailures - 1);
this.bufferPointer = 0;
}
function endpointCallback(err) {
if (err) {
var oldestErrorTime, now;
if (this.state === OPEN) {
return;
}
if (this.buffer.length === 0) {
this.disable();
return;
}
oldestErrorTime = this.buffer[this.bufferPointer];
now = Date.now();
this.buffer[this.bufferPointer] = now;
this.bufferPointer++;
this.bufferPointer %= this.buffer.length;
if (this.state === HALF_OPEN_PENDING || (oldestErrorTime != null && now - oldestErrorTime <= this.failureWindow)) {
this.disable();
}
} else if (this.state === HALF_OPEN_PENDING) {
this.state = CLOSED;
}
}
function disableEndpoint() {
this.state = OPEN;
clearInterval(this._reopenTimeout);
this._reopenTimeout = setTimeout(function () {
this.state = HALF_OPEN_READY;
}.bind(this), this.resetTimeout);
}
{
"name": "dns-endpoint-pool",
"version": "0.1.0",
"version": "1.0.0",
"description": "Manage and load-balance a pool of service endpoints retrieved from a DNS lookup for a service discovery name.",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -43,3 +43,3 @@ ## DNS Endpoint Pool

### `new DNSEndpointPool(serviceDiscoveryName, ttl, maxFailures, failureWindow, resetTimeout)`
### `new DNSEndpointPool(serviceDiscoveryName, ttl, maxFailures, failureWindow, resetTimeout, onReady)`

@@ -46,0 +46,0 @@ Creates a new pool object.

@@ -28,5 +28,3 @@ /*globals it, describe, beforeEach, afterEach */

function () { return new DEP(); },
function () { return new DEP('foo.localhost'); },
function () { return new DEP('foo.localhost', 3); },
function () { return new DEP('foo.localhost', 3, 5); }
function () { return new DEP('foo.localhost'); }
].forEach(function (fn) {

@@ -39,3 +37,3 @@ expect(fn).to.throwError('Must supply all arguments');

var stub = autoRestore(Sinon.stub(DEP.prototype, 'update')),
dep = new DEP('foo.localhost', 5000, 2, 10000, 10000);
dep = new DEP('foo.localhost', 5000);
Sinon.assert.calledOnce(stub);

@@ -50,3 +48,3 @@ });

resolve.callsArgWith(0, null, []);
dep = new DEP('foo.localhost', 5000, 2, 10000, 10000, function () {
dep = new DEP('foo.localhost', 5000, null, function () {
called = true;

@@ -65,3 +63,3 @@ });

resolve.callsArgWith(0, null, []);
dep = new DEP('foo.localhost', 5000, 2, 10000, 10000);
dep = new DEP('foo.localhost', 5000);

@@ -87,3 +85,3 @@ Sinon.assert.calledOnce(resolve);

]);
dep = new DEP('foo.localhost', 5000, 2, 10000, 10000);
dep = new DEP('foo.localhost', 5000);

@@ -107,3 +105,3 @@ expect(dep.getEndpoint().url).to.be('bar.localhost:8000');

dep = new DEP('foo.localhost', 5000, 2, 10000, 10000);
dep = new DEP('foo.localhost', 5000);

@@ -137,3 +135,3 @@ expect(dep.getEndpoint().url).to.be('bar.localhost:8000');

]);
dep = new DEP('foo.localhost', 5000, 2, 10000, 10000);
dep = new DEP('foo.localhost', 5000);

@@ -154,91 +152,110 @@ dep.getEndpoint(); // bar

it('will remove endpoints from the pool if they fail', function () {
var resolve = autoRestore(Sinon.stub(DEP.prototype, 'resolve')),
barEndpoint,
bazEndpoint,
dep;
describe('with eject-on-error pool management', function () {
resolve.callsArgWith(0, null, [
{ name: 'bar.localhost', port: 8000 },
{ name: 'baz.localhost', port: 8001 }
]);
var ejectOnErrorConfig = {
maxFailures: 2,
failureWindow: 10000,
resetTimeout: 10000
};
dep = new DEP('foo.localhost', 5000, 2, 10000, 10000);
it('enforces that config object has proper shape', function () {
autoRestore(Sinon.stub(DEP.prototype, 'update'));
[
function () { return new DEP('foo.localhost', 5000, { maxFailures: 2 }); },
function () { return new DEP('foo.localhost', 5000, { maxFailures: 2, failureWindow: 10000 }); }
].forEach(function (fn) {
expect(fn).to.throwError('Must supply all arguments to ejectOnErrorPoolManager');
});
});
barEndpoint = dep.getEndpoint();
barEndpoint.callback(true);
it('will remove endpoints from the pool if they fail', function () {
var resolve = autoRestore(Sinon.stub(DEP.prototype, 'resolve')),
barEndpoint,
bazEndpoint,
dep;
bazEndpoint = dep.getEndpoint();
resolve.callsArgWith(0, null, [
{ name: 'bar.localhost', port: 8000 },
{ name: 'baz.localhost', port: 8001 }
]);
expect(dep.getEndpoint()).to.be(barEndpoint); // still in the pool
barEndpoint.callback(true);
dep = new DEP('foo.localhost', 5000, ejectOnErrorConfig);
expect(dep.getEndpoint()).to.be(bazEndpoint);
expect(dep.getEndpoint()).to.be(bazEndpoint); // bar is removed
dep.stopUpdating();
});
barEndpoint = dep.getEndpoint();
barEndpoint.callback(true);
it('will reinstate endpoints for a single request after a timeout', function () {
var resolve = autoRestore(Sinon.stub(DEP.prototype, 'resolve')),
barEndpoint,
bazEndpoint,
dep;
bazEndpoint = dep.getEndpoint();
resolve.callsArgWith(0, null, [
{ name: 'bar.localhost', port: 8000 },
{ name: 'baz.localhost', port: 8001 }
]);
expect(dep.getEndpoint()).to.be(barEndpoint); // still in the pool
barEndpoint.callback(true);
dep = new DEP('foo.localhost', 5000, 2, 10000, 10000);
expect(dep.getEndpoint()).to.be(bazEndpoint);
expect(dep.getEndpoint()).to.be(bazEndpoint); // bar is removed
dep.stopUpdating();
});
barEndpoint = dep.getEndpoint();
barEndpoint.callback(true);
barEndpoint.callback(true); // removed from pool.
it('will reinstate endpoints for a single request after a timeout', function () {
var resolve = autoRestore(Sinon.stub(DEP.prototype, 'resolve')),
barEndpoint,
bazEndpoint,
dep;
bazEndpoint = dep.getEndpoint();
resolve.callsArgWith(0, null, [
{ name: 'bar.localhost', port: 8000 },
{ name: 'baz.localhost', port: 8001 }
]);
clock.tick(10000);
dep = new DEP('foo.localhost', 5000, ejectOnErrorConfig);
expect(dep.getEndpoint()).to.be(barEndpoint);
expect(dep.getEndpoint()).to.be(bazEndpoint);
expect(dep.getEndpoint()).to.be(bazEndpoint); // only return barEndpoint once
barEndpoint = dep.getEndpoint();
barEndpoint.callback(true);
barEndpoint.callback(true); // removed from pool.
barEndpoint.callback(null); // denotes success
expect(dep.getEndpoint()).to.be(barEndpoint); // it's back in the game
dep.stopUpdating();
});
bazEndpoint = dep.getEndpoint();
it('reports the age of the endpoints when updates fail', function () {
var resolve = autoRestore(Sinon.stub(DEP.prototype, 'resolve')),
errorHandler = Sinon.spy(),
errObj = { error: true },
dep;
clock.tick(10000);
// works on the first and fourth calls, fails every other time
resolve
.callsArgWith(0, errObj)
.onFirstCall().callsArgWith(0, null, [
{ name: 'bar.localhost', port: 8000 },
{ name: 'baz.localhost', port: 8001 }
])
.onCall(3).callsArgWith(0, null, [
{ name: 'bar.localhost', port: 8000 },
{ name: 'baz.localhost', port: 8001 }
]);
expect(dep.getEndpoint()).to.be(barEndpoint);
expect(dep.getEndpoint()).to.be(bazEndpoint);
expect(dep.getEndpoint()).to.be(bazEndpoint); // only return barEndpoint once
barEndpoint.callback(null); // denotes success
expect(dep.getEndpoint()).to.be(barEndpoint); // it's back in the game
dep.stopUpdating();
});
dep = new DEP('foo.localhost', 5000, 2, 10000, 10000); // call 1
dep.on('updateError', errorHandler);
clock.tick(5000); // call 2
clock.tick(5000); // call 3
clock.tick(5000); // call 4, should reset the timer
clock.tick(5000); // call 5
it('reports the age of the endpoints when updates fail', function () {
var resolve = autoRestore(Sinon.stub(DEP.prototype, 'resolve')),
errorHandler = Sinon.spy(),
errObj = { error: true },
dep;
Sinon.assert.calledThrice(errorHandler);
// works on the first and fourth calls, fails every other time
resolve
.callsArgWith(0, errObj)
.onFirstCall().callsArgWith(0, null, [
{ name: 'bar.localhost', port: 8000 },
{ name: 'baz.localhost', port: 8001 }
])
.onCall(3).callsArgWith(0, null, [
{ name: 'bar.localhost', port: 8000 },
{ name: 'baz.localhost', port: 8001 }
]);
expect(errorHandler.firstCall.calledWithExactly(errObj, 5000)).to.be(true);
expect(errorHandler.secondCall.calledWithExactly(errObj, 10000)).to.be(true);
expect(errorHandler.thirdCall.calledWithExactly(errObj, 5000)).to.be(true);
dep.stopUpdating();
dep = new DEP('foo.localhost', 5000, ejectOnErrorConfig); // call 1
dep.on('updateError', errorHandler);
clock.tick(5000); // call 2
clock.tick(5000); // call 3
clock.tick(5000); // call 4, should reset the timer
clock.tick(5000); // call 5
Sinon.assert.calledThrice(errorHandler);
expect(errorHandler.firstCall.calledWithExactly(errObj, 5000)).to.be(true);
expect(errorHandler.secondCall.calledWithExactly(errObj, 10000)).to.be(true);
expect(errorHandler.thirdCall.calledWithExactly(errObj, 5000)).to.be(true);
dep.stopUpdating();
});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc