Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

poolee

Package Overview
Dependencies
Maintainers
1
Versions
19
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

poolee - npm Package Compare versions

Comparing version 0.1.0 to 0.1.1

benchmark/benchmark1.js

144

lib/endpoint.js
module.exports = function (inherits, EventEmitter) {
var MAX_COUNT = Math.pow(2, 31) // largest smi value
var clock = Date.now()
setInterval(function () { clock = Date.now() }, 10)
function noop() {}
function noop() { return false }

@@ -13,2 +14,3 @@ //

// method: ping method (GET)
// maxPending: number of requests pending before returning an error (500)
// checkInterval: ping interval in ms (0 = no checks)

@@ -37,2 +39,9 @@ // maxSockets: max concurrent open sockets (20)

this.requestCount = 0
this.requestsLastCheck = 0
this.requestRate = 0
this.pending = 0
this.successes = 0
this.failures = 0
this.maxPending = options.maxPending || 500
this.timeout = options.timeout || (60 * 1000)

@@ -52,2 +61,4 @@ this.resolution = options.resolution || 1000

)
self.requestRate = self.requestCount - self.requestsLastCheck
self.requestsLastCheck = self.requestCount
}

@@ -67,10 +78,37 @@ setInterval(this.timeoutCheck, this.resolution)

}
inherits(Endpoint, EventEmitter)
Endpoint.prototype.resetCounters = function () {
this.requestsLastCheck = this.requestRate - this.pending
this.requestCount = this.pending
this.successes = 0
this.failures = 0
}
Endpoint.prototype.setPending = function () {
this.pending = this.requestCount - (this.successes + this.failures)
if (this.requestCount === MAX_COUNT) {
this.resetCounters()
}
}
Endpoint.prototype.complete = function (error, request, response, body) {
this.deleteRequest(request.id)
this.setPending()
request.callback(error, response, body)
request.callback = null
}
Endpoint.prototype.succeeded = function (request, response, body) {
this.successes++
this.complete(null, request, response, body)
}
Endpoint.prototype.failed = function (error, request) {
this.failures++
this.complete(error, request)
}
Endpoint.prototype.busyness = function () {
return (
(this.agent.sockets[this.name] || []).length +
((this.agent.requests[this.name] || []).length * 2)
)
return this.pending
}

@@ -83,22 +121,41 @@

// retryFilter:
// encoding: response body encoding (utf8)
// data: string or buffer
// }
// data:
// callback: function (err, result) {}
Endpoint.prototype.request = function (options, data, callback) {
// callback: function (error, response, body) {}
Endpoint.prototype.request = function (options, callback) {
if (this.pending >= this.maxPending) {
return callback({
reason: 'full',
message: 'too many pending requests ' + this.pending + '/' + this.maxPending
})
}
options.host = this.ip
options.port = this.port
options.retryFilter = options.retryFilter || noop
if (options.agent !== false) {
options.agent = this.agent
}
if (options.encoding !== null) {
options.encoding = options.encoding || 'utf8'
}
var data = options.data
if (data) {
options.headers = options.headers || {}
options.headers["Content-Length"] =
Buffer.isBuffer(data) ? data.length : Buffer.byteLength(data)
}
var req = this.http.request(options)
req.node = this
req.options = options
req.id = this.requestCount++
req.lastTouched = clock
req.callback = callback || noop
req.on('response', gotResponse)
req.on('error', gotError)
req.end(data)
req.id = this.requestCount++
req.lastTouched = clock
this.setPending()
this.requests[req.id] = req
this.callback = callback
}

@@ -117,8 +174,8 @@

function gotPingResponse(err, response, body) {
this.setHealthy(!err && response.statusCode === 200)
function gotPingResponse(error, response, body) {
this.node.setHealthy(!error && response.statusCode === 200)
}
Endpoint.prototype.ping = function (options) {
this.request(options, null, gotPingResponse)
this.request(options, gotPingResponse)
}

@@ -138,3 +195,4 @@

function gotResponse(response) {
response.body = ''
response.bodyChunks = []
response.bodyLength = 0
response.request = this

@@ -147,10 +205,9 @@ response.on('data', gotData)

// this = request
function gotError(err) {
this.node.deleteRequest(this.id)
function gotError(error) {
this.node.failed({
reason: error.message,
message: this.node.ip + ':' + this.node.port + ' error: ' + error.message
},
this)
this.node.setHealthy(false)
this.node.callback({ //fail
reason: err.message,
message: this.node.ip + ':' + this.node.port + ' error: ' + err.message
})
this.node.callback = noop
}

@@ -161,3 +218,4 @@

this.request.lastTouched = clock
this.body += chunk
this.bodyChunks.push(chunk)
this.bodyLength += chunk.length
}

@@ -171,16 +229,25 @@

if (req.callback === null) { return }
node.setHealthy(true)
node.deleteRequest(req.id)
if (typeof opt.retryFilter === 'function') {
var delay = opt.retryFilter(opt, this, this.body)
if (delay !== false) {
return node.callback({ //fail
delay: delay,
reason: 'filter',
message: node.ip + ':' + node.port + ' error: rejected by filter'
})
}
var buffer = new Buffer(this.bodyLength)
var offset = 0
for (var i = 0; i < this.bodyChunks.length; i++) {
var chunk = this.bodyChunks[i]
chunk.copy(buffer, offset, 0, chunk.length)
offset += chunk.length
}
node.callback(null, this, this.body) //success
var body = (opt.encoding !== null) ? buffer.toString(opt.encoding) : buffer
var delay = opt.retryFilter(opt, this, body)
if (delay !== false) { // delay may be 0
return node.failed({
delay: delay,
reason: 'filter',
message: node.ip + ':' + node.port + ' error: rejected by filter'
},
req)
}
node.succeeded(req, this, body)
}

@@ -190,8 +257,7 @@

function gotAborted() {
this.request.node.deleteRequest(this.request.id)
this.request.node.callback({ //fail
this.request.node.failed({
reason: 'aborted',
message: this.request.node.ip + ':' + this.request.node.port + ' error: connection aborted'
})
this.request.node.callback = noop
},
this.request)
}

@@ -198,0 +264,0 @@

@@ -11,4 +11,5 @@ module.exports = function (inherits, EventEmitter, Endpoint, RequestSet) {

// {
// maxPending: number of pending requests allowed per endpoint (500)
// checkInterval: number (milliseconds) health check poll interval, default null (no check)
// path: string querystring for the health check (default = '/ping?type=lb_check')
// path: string querystring for the health check (default = '/ping')
// method: string heath check http method (default = 'GET')

@@ -21,6 +22,16 @@ // retryFilter: function (response) { return true to reject response and retry }

options = options || {}
if (!http || !http.request) {
if (!http || !http.request || !http.Agent) {
throw new Error('invalid http module')
}
options.checkInterval = options.checkInterval || options.check_interval
options.retryFilter = options.retryFilter || options.retry_filter
options.retryDelay = options.retryDelay || options.retry_delay
if (!options.retryDelay && options.retryDelay !== 0) {
options.retryDelay = 20
}
this.name = options.name
this.options = options
this.nodes = []

@@ -33,3 +44,3 @@ if (Array.isArray(nodes)) {

if (port > 0 && port < 65536) {
var node = new Endpoint(http, ip, port)
var node = new Endpoint(http, ip, port, options)
node.on('health', node_health_changed.bind(this))

@@ -45,12 +56,2 @@ node.on('timeout', node_timed_out.bind(this))

}
options.checkInterval = options.checkInterval || options.check_interval
options.retryFilter = options.retryFilter || options.retry_filter
options.retryDelay = options.retryDelay || options.retry_delay
if (!options.retryDelay && options.retryDelay !== 0) {
options.retryDelay = 20
}
this.name = options.name
this.options = options
}

@@ -82,6 +83,10 @@ inherits(Pool, EventEmitter)

function optionsFu(options) {
return (typeof options === 'string') ? { path: options } : (options || {})
}
// options:
// {
// path: string
// method: ['POST', 'GET', 'PUT', 'DELETE', 'HEAD']
// method: ['POST', 'GET', 'PUT', 'DELETE', 'HEAD'] (GET)
// retryFilter: function (response) { return true to reject response and retry }

@@ -92,3 +97,3 @@ // attempts: number (optional, default = nodes.length)

//
// data: string
// data: string or buffer (optional)
//

@@ -99,4 +104,13 @@ // callback:

var self = this
options = options || {}
options = optionsFu(options)
if (!options.data && (typeof data === 'string' || Buffer.isBuffer(data))) {
options.data = data
}
else if (typeof data === 'function') {
callback = data
}
options.method = options.method || 'GET'
options.retryDelay = options.retryDelay || options.retry_delay

@@ -113,3 +127,3 @@ if (!options.retryDelay && options.retryDelay !== 0) {

var started = Date.now()
RequestSet.request(this, options, data, function (err, res, body) {
RequestSet.request(this, options, function (err, res, body) {
options.success = !err

@@ -121,8 +135,6 @@ self.emit('timing', Date.now() - started, options)

Pool.prototype.get = function (options, callback) {
options.method = 'GET'
this.request(options, null, callback)
}
Pool.prototype.get = Pool.prototype.request
Pool.prototype.put = function (options, data, callback) {
options = optionsFu(options)
options.method = 'PUT'

@@ -133,2 +145,3 @@ return this.request(options, data, callback)

Pool.prototype.post = function (options, data, callback) {
options = optionsFu(options)
options.method = 'POST'

@@ -139,6 +152,6 @@ return this.request(options, data, callback)

Pool.prototype.del = function (options, callback) {
options = optionsFu(options)
options.method = 'DELETE'
options.agent = false // XXX
options.headers['Content-Length'] = 0
return this.request(options, null, callback)
return this.request(options, callback)
}

@@ -145,0 +158,0 @@

@@ -10,7 +10,5 @@ // An object to track server requests and handle retries

// }
// data: string or buffer to send
// callback: function (err, response, body) {}
function RequestSet(pool, options, data, callback) {
function RequestSet(pool, options, callback) {
this.options = options || {}
this.data = data
this.pool = pool

@@ -77,6 +75,5 @@ this.callback = callback

// }
// data: string or buffer to send
// callback: function (err, response, body) {}
RequestSet.request = function (pool, options, data, callback) {
var set = new RequestSet(pool, options, data, callback)
RequestSet.request = function (pool, options, callback) {
var set = new RequestSet(pool, options, callback)
set.doRequest()

@@ -91,3 +88,3 @@ }

else {
node.request(this.options, this.data, handleResponse.bind(this))
node.request(this.options, handleResponse.bind(this))
}

@@ -94,0 +91,0 @@ }

{
"name": "poolee",
"version": "0.1.0",
"description": "HTTP Pool",
"version": "0.1.1",
"description": "HTTP pool and load balancer",
"homepage": "http://github.com/dannycoates/poolee",
"author": "Danny Coates <dannycoates@gmail.com>",
"keywords": ["pool", "http", "retry", "health"],
"keywords": ["pool", "http", "retry", "health", "load balancer"],
"repository": {

@@ -15,4 +15,6 @@ "type": "git",

"devDependencies": {
"mocha": "*"
"mocha": "*",
"optimist": "*",
"async": "*"
}
}

@@ -33,3 +33,3 @@ var assert = require("assert")

var error
e.request({path:'/foo', method: 'GET'}, null, function (err, response, body) {
e.request({path:'/foo', method: 'GET'}, function (err, response, body) {
error = err

@@ -63,3 +63,3 @@ })

var error
e.request({path:'/foo', method: 'GET'}, null, function (err, response, body) {
e.request({path:'/foo', method: 'GET'}, function (err, response, body) {
error = err

@@ -89,3 +89,3 @@ })

})
e.request({path:'/foo', method: 'GET'}, null, noop)
e.request({path:'/foo', method: 'GET'}, noop)

@@ -110,3 +110,3 @@ setTimeout(function () {

var error
e.request({path:'/foo', method: 'GET'}, null, function (err, response, body) {
e.request({path:'/foo', method: 'GET'}, function (err, response, body) {
error = err

@@ -136,3 +136,3 @@ })

var error
e.request({path:'/foo', method: 'GET'}, null, function (err, response, body) {
e.request({path:'/foo', method: 'GET'}, function (err, response, body) {
error = err

@@ -160,3 +160,3 @@ })

var error
e.request({path:'/foo', method: 'GET'}, null, function (err, response, body) {
e.request({path:'/foo', method: 'GET'}, function (err, response, body) {
error = err

@@ -174,3 +174,105 @@ })

})
it("returns the whole body to the callback", function (done) {
var s = http.createServer(function (req, res) {
res.write("foo")
setTimeout(function () {
res.end("bar")
}, 10)
})
s.on('listening', function () {
var e = new Endpoint(http, '127.0.0.1', 6969, {timeout: 200, resolution: 10})
var body
e.request({path:'/foo', method: 'GET'}, function (err, response, b) {
body = b
})
setTimeout(function () {
s.close()
assert.equal(body, "foobar")
done()
}, 400)
})
s.listen(6969)
})
it("returns an error to the callback when pending > maxPending", function (done) {
var s = http.createServer(function (req, res) {
setTimeout(function () {
res.end("foo")
}, 100)
})
s.on('listening', function () {
var e = new Endpoint(http, '127.0.0.1', 6969, {timeout: 200, resolution: 10, maxPending: 1})
e.request({path:'/foo', method: 'GET'}, noop)
e.request({path:'/foo', method: 'GET'}, function (err, response, body) {
assert.equal(err.reason, 'full')
})
setTimeout(function () {
s.close()
assert.equal(Object.keys(e.requests).length, 0)
done()
}, 400)
})
s.listen(6969)
})
it("sends Content-Length when data is a string", function (done) {
var s = http.createServer(function (req, res) {
assert.equal(req.headers["content-length"], 4)
res.end("foo")
})
s.on('listening', function () {
var e = new Endpoint(http, '127.0.0.1', 6969)
e.request({path:'/foo', method: 'PUT', data: "ƒoo"}, noop)
setTimeout(function () {
s.close()
done()
}, 10)
})
s.listen(6969)
})
it("sends Content-Length when data is a buffer", function (done) {
var s = http.createServer(function (req, res) {
assert.equal(req.headers["content-length"], 4)
res.end("foo")
})
s.on('listening', function () {
var e = new Endpoint(http, '127.0.0.1', 6969)
e.request({path:'/foo', method: 'PUT', data: Buffer("ƒoo")}, noop)
setTimeout(function () {
s.close()
done()
}, 10)
})
s.listen(6969)
})
})
describe("setPending()", function () {
it("maintains the correct pending count when requestCount 'overflows'", function () {
var e = new Endpoint(http, '127.0.0.1', 6969)
e.successes = (Math.pow(2, 31) / 2) - 250
e.failures = (Math.pow(2, 31) / 2) - 250
e.requestCount = Math.pow(2, 31)
e.setPending()
assert.equal(e.pending, 500)
assert.equal(e.requestCount, 500)
})
it("maintains the correct pending count when requestCount 'overflows'", function () {
var e = new Endpoint(http, '127.0.0.1', 6969)
e.pending = 500
e.requestRate = 500
e.requestCount = Math.pow(2, 31)
e.requestsLastCheck = e.requestCount - 500
e.resetCounters()
assert.equal(e.requestCount - e.requestsLastCheck, e.requestRate)
})
})
})

@@ -8,3 +8,4 @@ var assert = require("assert")

var http = {
request: function () {}
request: noop,
Agent: noop
}

@@ -18,7 +19,7 @@

function succeeding_request(pool, options, data, cb) {
function succeeding_request(pool, options, cb) {
return cb(null, {}, "foo")
}
function failing_request(pool, options, data, cb) {
function failing_request(pool, options, cb) {
return cb({

@@ -109,3 +110,81 @@ message: "crap",

it("allows the data parameter to be optional", function (done) {
FakeRequestSet.request = succeeding_request
pool.request({}, function (e, r, b) {
assert.equal(b, "foo")
done()
})
})
it("allows the options parameter to be a path string", function (done) {
FakeRequestSet.request = function (pool, options, cb) {
assert.equal(options.path, "/foo")
return cb(null, {}, "foo")
}
pool.request("/foo", function (e, r, b) {
assert.equal(b, "foo")
done()
})
})
it("defaults method to GET", function (done) {
FakeRequestSet.request = function (pool, options, cb) {
assert.equal(options.method, "GET")
return cb(null, {}, "foo")
}
pool.request("/foo", function (e, r, b) {
assert.equal(b, "foo")
done()
})
})
})
describe("get()", function () {
it("is an alias to request()", function () {
assert.equal(pool.get, pool.request)
})
})
describe("put()", function () {
it("sets the options.method to PUT", function (done) {
FakeRequestSet.request = function (pool, options, cb) {
assert.equal(options.method, "PUT")
return cb(null, {}, "foo")
}
pool.put("/foo", "bar", function (e, r, b) {
assert.equal(b, "foo")
done()
})
})
})
describe("post()", function () {
it("sets the options.method to POST", function (done) {
FakeRequestSet.request = function (pool, options, cb) {
assert.equal(options.method, "POST")
return cb(null, {}, "foo")
}
pool.post("/foo", "bar", function (e, r, b) {
assert.equal(b, "foo")
done()
})
})
})
describe("del()", function () {
it("sets the options.method to DELETE", function (done) {
FakeRequestSet.request = function (pool, options, cb) {
assert.equal(options.method, "DELETE")
return cb(null, {}, "foo")
}
pool.del("/foo", function (e, r, b) {
assert.equal(b, "foo")
done()
})
})
})
})

@@ -8,7 +8,7 @@ var assert = require("assert")

function succeeding_request(options, data, cb) {
function succeeding_request(options, cb) {
return cb(null, {}, "foo")
}
function failing_request(options, data, cb) {
function failing_request(options, cb) {
return cb({

@@ -20,3 +20,3 @@ message: "crap",

function hangup_request(options, data, cb) {
function hangup_request(options, cb) {
return cb({

@@ -28,3 +28,3 @@ message: "hang up",

function aborted_request(options, data, cb) {
function aborted_request(options, cb) {
return cb({

@@ -46,3 +46,3 @@ message: "aborted",

it("defaults attempt count to at least 2", function () {
var r = new RequestSet({nodes: [1]}, {}, null, null)
var r = new RequestSet({nodes: [1]}, {}, null)
assert.equal(r.attempts, 2)

@@ -52,3 +52,3 @@ })

it("defaults attempt count to at most 5", function () {
var r = new RequestSet({nodes: [1,2,3,4,5,6,7,8,9]}, {}, null, null)
var r = new RequestSet({nodes: [1,2,3,4,5,6,7,8,9]}, {}, null)
assert.equal(r.attempts, 5)

@@ -58,3 +58,3 @@ })

it("defaults attempt count to pool.nodes.length", function () {
var r = new RequestSet({nodes: [1,2,3,4]}, {}, null, null)
var r = new RequestSet({nodes: [1,2,3,4]}, {}, null)
assert.equal(r.attempts, 4)

@@ -67,3 +67,3 @@ })

node.request = succeeding_request
RequestSet.request(pool, {}, null, function (err, res, body) {
RequestSet.request(pool, {}, function (err, res, body) {
assert.equal(err, null)

@@ -77,3 +77,3 @@ assert.equal(body, "foo")

node.request = failing_request
RequestSet.request(pool, {}, null, function (err, res, body) {
RequestSet.request(pool, {}, function (err, res, body) {
assert.equal(err.message, "crap")

@@ -89,3 +89,3 @@ done()

}
RequestSet.request(p, {}, null, function (err, res, body) {
RequestSet.request(p, {}, function (err, res, body) {
assert.equal(err.message, "no nodes")

@@ -102,3 +102,3 @@ done()

}
RequestSet.request(p, {}, null, function (err, res, body) {
RequestSet.request(p, {}, function (err, res, body) {
assert.equal(err, null)

@@ -116,3 +116,3 @@ assert.equal(body, "foo")

}
RequestSet.request(p, {}, null, function (err, res, body) {
RequestSet.request(p, {}, function (err, res, body) {
assert.equal(err.reason, "socket hang up")

@@ -129,3 +129,3 @@ done()

}
RequestSet.request(p, {}, null, function (err, res, body) {
RequestSet.request(p, {}, function (err, res, body) {
assert.equal(err, null)

@@ -143,3 +143,3 @@ assert.equal(body, "foo")

}
RequestSet.request(p, {}, null, function (err, res, body) {
RequestSet.request(p, {}, function (err, res, body) {
assert.equal(err.reason, "aborted")

@@ -156,3 +156,3 @@ done()

}
RequestSet.request(p, {}, null, function (err, res, body) {
RequestSet.request(p, {}, function (err, res, body) {
assert.equal(err.reason, "aborted")

@@ -169,3 +169,3 @@ done()

}
RequestSet.request(p, {}, null, function (err, res, body) {
RequestSet.request(p, {}, function (err, res, body) {
assert.equal(err, null)

@@ -172,0 +172,0 @@ assert.equal(body, "foo")

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc