limitd-client
Advanced tools
Comparing version 2.4.0 to 2.4.1
131
client.js
@@ -20,2 +20,7 @@ const url = require('url'); | ||
function QueuedRequest(callback) { | ||
this.start = new Date(); | ||
this.callback = callback; | ||
} | ||
function LimitdClient (options, done) { | ||
@@ -37,7 +42,12 @@ if (options && options.shard) { | ||
}; | ||
} else if(typeof options === 'object' && 'host' in options) { | ||
const host = options.host; | ||
options = _.extend(_.omit(options, 'host'), { hosts: [host] }); | ||
} else if(typeof options === 'object') { | ||
if ('host' in options) { | ||
const host = options.host; | ||
options = _.extend(_.omit(options, 'host'), { hosts: [host] }); | ||
} else { | ||
//clone the graph but keep stream. | ||
options = _.extend(_.cloneDeep(options), { stream: options.stream}); | ||
} | ||
} else { | ||
options = options && _.cloneDeep(options) || { hosts: [] }; | ||
options = { hosts: [] }; | ||
} | ||
@@ -63,3 +73,3 @@ | ||
this.pending_requests = new Map(); | ||
this.pending_requests = Object.create(null); | ||
@@ -74,3 +84,6 @@ this.connect(done); | ||
this._request = disyuntor(this._request.bind(this), _.extend({ | ||
// this._request = this._directRequest; | ||
this._request = disyuntor((request, callback) => { | ||
this._directRequest(request, callback); | ||
}, _.extend({ | ||
name: 'limitd.request', | ||
@@ -110,2 +123,6 @@ monitor: details => { | ||
if (options.stream) { | ||
return this._onNewStream(options.stream); | ||
} | ||
if (options.hosts.length > 1) { | ||
@@ -169,4 +186,2 @@ this._connectUsingFailover(done); | ||
LimitdClient.prototype._onNewStream = function (stream) { | ||
var self = this; | ||
stream | ||
@@ -180,15 +195,13 @@ .pipe(lps.decode()) | ||
})) | ||
.on('data', function (response) { | ||
var response_handler = self.pending_requests.get(response.request_id); | ||
if (response_handler) { | ||
response_handler(response); | ||
} | ||
.on('data', (response) => { | ||
const queuedRequest = this.pending_requests[response.request_id]; | ||
this._responseHandler(response, queuedRequest); | ||
}) | ||
.on('error', function (err) { | ||
self.emit('error', err); | ||
.on('error', (err) => { | ||
this.emit('error', err); | ||
}); | ||
self.stream = stream; | ||
this.stream = stream; | ||
self.emit('ready'); | ||
this.emit('ready'); | ||
}; | ||
@@ -206,27 +219,26 @@ | ||
LimitdClient.prototype._responseHandler = function(requestID, callback) { | ||
const start = Date.now(); | ||
LimitdClient.prototype._responseHandler = function(response, queuedRequest) { | ||
delete this.pending_requests[response.request_id]; | ||
return (response) => { | ||
this.pending_requests.delete(requestID); | ||
if (!queuedRequest) { return; } | ||
if (response.error && | ||
response.error.type === 'UNKNOWN_BUCKET_TYPE') { | ||
return callback(new Error('Invalid bucket type')); | ||
} | ||
if (response.error && | ||
response.error.type === 'UNKNOWN_BUCKET_TYPE') { | ||
return queuedRequest.callback(new Error('Invalid bucket type')); | ||
} | ||
const resp = response[response.body]; | ||
const resp = response[response.body]; | ||
if (resp) { | ||
resp.took = Date.now() - queuedRequest.start; | ||
if (typeof resp.protocol_version !== 'undefined') { | ||
this.protocol_version = resp.protocol_version; | ||
} | ||
} | ||
if (resp) { | ||
resp.took = Date.now() - start; | ||
} | ||
this.emit('response', resp); | ||
this.emit('response', resp); | ||
callback(null, resp); | ||
}; | ||
queuedRequest.callback(null, resp); | ||
}; | ||
@@ -243,3 +255,3 @@ | ||
LimitdClient.prototype._request = function (request, type, callback) { | ||
LimitdClient.prototype._directRequest = function (request, callback) { | ||
if (!this.stream || !this.stream.writable) { | ||
@@ -252,18 +264,14 @@ const err = new Error(`Unable to send ${request.method} to limitd. The socket is closed.`); | ||
this.pending_requests.set(request.id, this._responseHandler(request.id, callback)); | ||
this.pending_requests[request.id] = new QueuedRequest(callback); | ||
}; | ||
LimitdClient.prototype._takeOrWait = function (method, type, key, count, done) { | ||
if (typeof count === 'undefined' && typeof done === 'undefined') { | ||
done = _.noop; | ||
count = 1; | ||
} | ||
if (typeof count === 'function') { | ||
done = count; | ||
count = 1; | ||
} | ||
if (typeof done !== 'function') { | ||
} else if (typeof count === 'undefined' && typeof done === 'undefined') { | ||
done = _.noop; | ||
count = 1; | ||
} else if (typeof done !== 'function') { | ||
done = _.noop; | ||
} | ||
@@ -278,7 +286,11 @@ | ||
'method': method, | ||
'all': takeAll || null, | ||
'count': !takeAll ? count : undefined | ||
}; | ||
return this._request(request, type, done); | ||
if (takeAll) { | ||
request.all = true; | ||
} else { | ||
request.count = count; | ||
} | ||
return this._request(request, done); | ||
}; | ||
@@ -296,12 +308,11 @@ | ||
LimitdClient.prototype.put = function (type, key, count, done) { | ||
if (typeof count === 'undefined' && typeof done === 'undefined') { | ||
done = undefined; | ||
count = 'all'; | ||
} | ||
if (typeof count === 'function') { | ||
done = count; | ||
count = 'all'; | ||
} else if (typeof count === 'undefined' && typeof done === 'undefined') { | ||
done = undefined; | ||
count = 'all'; | ||
} | ||
const reset_all = count === 'all'; | ||
@@ -316,7 +327,11 @@ | ||
'method': 'PUT', | ||
'all': reset_all ? true : null, | ||
'count': !reset_all ? count : undefined, | ||
'skipResponse': fireAndForget | ||
}; | ||
if (reset_all) { | ||
request.all = true; | ||
} else { | ||
request.count = count; | ||
} | ||
if (fireAndForget) { | ||
@@ -326,7 +341,7 @@ return this._fireAndForgetRequest(request); | ||
return this._request(request, type, done); | ||
return this._request(request, done); | ||
}; | ||
LimitdClient.prototype.status = function (type, key, done) { | ||
var request = { | ||
const request = { | ||
'id': this.nextId(), | ||
@@ -338,7 +353,7 @@ 'type': type, | ||
return this._request(request, type, done); | ||
return this._request(request, done); | ||
}; | ||
LimitdClient.prototype.ping = function (done) { | ||
var request = { | ||
const request = { | ||
'id': this.nextId(), | ||
@@ -350,5 +365,5 @@ 'type': '', | ||
return this._request(request, '', done); | ||
return this._request(request, done); | ||
}; | ||
module.exports = LimitdClient; |
{ | ||
"name": "limitd-client", | ||
"version": "2.4.0", | ||
"version": "2.4.1", | ||
"description": "limitd client for node.js", | ||
@@ -12,3 +12,3 @@ "main": "index.js", | ||
"dependencies": { | ||
"disyuntor": "^1.2.2", | ||
"disyuntor": "^1.2.3", | ||
"length-prefixed-message": "^3.0.3", | ||
@@ -15,0 +15,0 @@ "length-prefixed-stream": "~1.4.0", |
@@ -25,3 +25,3 @@ const LimitdClient = require('../'); | ||
it('should be able to parse the response of TAKE', function (done) { | ||
it('should use the protocol from the pong response', function (done) { | ||
@@ -28,0 +28,0 @@ server.on('request', function (request, reply) { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
37713
18
971
Updateddisyuntor@^1.2.3