Comparing version 2.0.0-16 to 2.0.0-17
@@ -216,3 +216,2 @@ /* | ||
export interface Sub { | ||
sid: number; | ||
/** | ||
@@ -284,2 +283,7 @@ * Unsubscribe with optional max number of messages before unsubscribing. | ||
isDraining(): boolean; | ||
/** | ||
* @return the id of the subscription | ||
*/ | ||
getID(): number | ||
} | ||
@@ -286,0 +290,0 @@ |
409
lib/nats.js
@@ -199,7 +199,15 @@ /* | ||
function Client (opts) { | ||
this.subs = new Subs(this) | ||
this.reqs = new Reqs(this) | ||
this.servers = new Servers(this) | ||
this.reconnects = 0 | ||
this.connected = false | ||
this.wasConnected = false | ||
this.reconnecting = false | ||
this.pending = [] | ||
this.pout = 0 | ||
events.EventEmitter.call(this) | ||
this.parseOptions(opts) | ||
this.initState() | ||
// Select a server to connect to. | ||
this.selectServer() | ||
this.servers.selectServer() | ||
this.createConnection() | ||
@@ -254,12 +262,2 @@ } | ||
function shuffle (array) { | ||
for (let i = array.length - 1; i > 0; i--) { | ||
const j = Math.floor(Math.random() * (i + 1)) | ||
const temp = array[i] | ||
array[i] = array[j] | ||
array[j] = temp | ||
} | ||
return array | ||
} | ||
/** | ||
@@ -354,33 +352,8 @@ * Parse the constructor/connect options. | ||
} | ||
// For cluster support | ||
this.servers = [] | ||
this.servers.init() | ||
if (Array.isArray(options.servers)) { | ||
options.servers.forEach((server) => { | ||
this.servers.push(new Server(new url.URL(server))) | ||
}) | ||
// Randomize if needed | ||
if (options.noRandomize !== true) { | ||
shuffle(this.servers) | ||
} | ||
// if they gave an URL we should add it if different | ||
if (options.url !== undefined && this.servers.indexOf(options.url) === -1) { | ||
// Make url first element so it is attempted first | ||
this.servers.unshift(new Server(new url.URL(options.url))) | ||
} | ||
} else { | ||
if (undefined === options.url) { | ||
options.url = DEFAULT_URI | ||
} | ||
this.servers.push(new Server(new url.URL(options.url))) | ||
} | ||
// If we are not setup for tls, but were handed a url with a tls:// prefix | ||
// then upgrade to tls. | ||
if (options.tls === false) { | ||
this.servers.forEach((server) => { | ||
if (server.url.protocol === 'tls' || server.url.protocol === 'tls:') { | ||
options.tls = true | ||
} | ||
}) | ||
options.tls = this.servers.hasTLS() !== undefined | ||
} | ||
@@ -424,43 +397,2 @@ | ||
/** | ||
* Properly select the next server. | ||
* We rotate the server list as we go, | ||
* we also pull auth from urls as needed, or | ||
* if they were set in options use that as override. | ||
* | ||
* @api private | ||
*/ | ||
Client.prototype.selectServer = function () { | ||
const server = this.servers.shift() | ||
// Place in client context. | ||
this.currentServer = server | ||
this.url = server.url | ||
const un = server.url.username || '' | ||
const pw = server.url.password || '' | ||
let up = '' | ||
if (un !== '' && pw !== '') { | ||
up = un + ':' + pw | ||
} else if (un !== '') { | ||
up = un | ||
} | ||
if (up !== '') { | ||
const auth = up.split(':') | ||
if (auth.length !== 1) { | ||
if (this.options.user === undefined) { | ||
this.user = auth[0] | ||
} | ||
if (this.options.pass === undefined) { | ||
this.pass = auth[1] | ||
} | ||
} else { | ||
if (this.options.token === undefined) { | ||
this.token = auth[0] | ||
} | ||
} | ||
} | ||
this.servers.push(server) | ||
return server | ||
} | ||
Client.prototype.checkNoEchoMismatch = function () { | ||
@@ -572,3 +504,3 @@ if ((this.info.proto === undefined || this.info.proto < 1) && this.options.noEcho) { | ||
Client.prototype.checkNkeyMismatch = function () { | ||
if (undefined === this.info.nonce) { | ||
if (this.info.nonce === undefined) { | ||
return false | ||
@@ -580,4 +512,11 @@ } | ||
// Treat this as a filename. | ||
// For now we will not capture an error on file not found etc. | ||
const contents = fs.readFileSync(this.options.userCreds).toString() | ||
let contents = null | ||
try { | ||
contents = fs.readFileSync(this.options.userCreds).toString() | ||
} catch (err) { | ||
this.emit(Events.Error, new NatsError(BAD_CREDS_MSG, ErrorCode.BAD_CREDS, err)) | ||
this.closeStream() | ||
return true | ||
} | ||
if (CREDS.exec(contents) === null) { | ||
@@ -599,3 +538,3 @@ this.emit(Events.Error, new NatsError(BAD_CREDS_MSG, ErrorCode.BAD_CREDS)) | ||
if (undefined === this.options.nonceSigner) { | ||
if (this.options.nonceSigner === undefined) { | ||
this.emit(Events.Error, new NatsError(SIGNATURE_REQUIRED_MSG, ErrorCode.SIGNATURE_REQUIRED)) | ||
@@ -605,3 +544,3 @@ this.closeStream() | ||
} | ||
if (typeof (this.options.nonceSigner) !== 'function') { | ||
if (typeof this.options.nonceSigner !== 'function') { | ||
this.emit(Events.Error, new NatsError(SIGCB_NOTFUNC_MSG, ErrorCode.SIGCB_NOTFUNC)) | ||
@@ -611,3 +550,3 @@ this.closeStream() | ||
} | ||
if (undefined === this.options.nkey && undefined === this.options.userJWT) { | ||
if (this.options.nkey === undefined && this.options.userJWT === undefined) { | ||
this.emit(Events.Error, new NatsError(NKEY_OR_JWT_REQ_MSG, ErrorCode.NKEY_OR_JWT_REQ)) | ||
@@ -631,3 +570,3 @@ this.closeStream() | ||
this.wasConnected = true | ||
this.currentServer.didConnect = true | ||
this.servers.getCurrent().didConnect = true | ||
@@ -706,3 +645,3 @@ this.emit(event, this) | ||
stream.on('close', () => { | ||
const done = (this.closed === true || this.options.reconnect === false || this.servers.length === 0) | ||
const done = (this.closed === true || this.options.reconnect === false || this.servers.isEmpty()) | ||
// if connected, it resets everything as partial buffers may have been sent | ||
@@ -743,3 +682,3 @@ // this will also reset the heartbeats, but not other timers on requests or subscriptions | ||
// If we were connected just return, close event will process | ||
if (this.wasConnected === true && this.currentServer.didConnect === true) { | ||
if (this.wasConnected === true && this.servers.getCurrent().didConnect === true) { | ||
return | ||
@@ -751,3 +690,3 @@ } | ||
// this list. Unless overridden | ||
if (this.wasConnected === false && this.currentServer.didConnect === false) { | ||
if (this.wasConnected === false && this.servers.getCurrent().didConnect === false) { | ||
// We can override this behavior with waitOnFirstConnect, which will | ||
@@ -757,5 +696,5 @@ // treat it like a reconnect scenario. | ||
// Pretend to move us into a reconnect state. | ||
this.currentServer.didConnect = true | ||
this.servers.getCurrent().didConnect = true | ||
} else { | ||
this.servers.splice(this.servers.length - 1, 1) | ||
this.servers.remove(this.servers.getCurrent()) | ||
} | ||
@@ -766,3 +705,3 @@ } | ||
// to the server and we only have one, and close | ||
if (this.wasConnected === false && this.servers.length === 0) { | ||
if (this.wasConnected === false && this.servers.isEmpty()) { | ||
this.emit(Events.Error, new NatsError(CONN_ERR_MSG_PREFIX + exception, ErrorCode.CONN_ERR, exception)) | ||
@@ -873,3 +812,3 @@ this.close() | ||
// Create the stream | ||
this.stream = net.createConnection(this.url.port, this.url.hostname) | ||
this.stream = net.createConnection(this.servers.getCurrent().url.port, this.servers.getCurrent().url.hostname) | ||
// this change makes it a bit faster on Linux, slightly worse on OS X | ||
@@ -882,19 +821,2 @@ this.stream.setNoDelay(true) | ||
/** | ||
* Initialize client state. | ||
* | ||
* @api private | ||
*/ | ||
Client.prototype.initState = function () { | ||
this.ssid = 0 | ||
this.subs = new Subs(this) | ||
this.reqs = new Reqs(this) | ||
this.reconnects = 0 | ||
this.connected = false | ||
this.wasConnected = false | ||
this.reconnecting = false | ||
this.pending = [] | ||
this.pout = 0 | ||
} | ||
/** | ||
* Close the connection to the server. | ||
@@ -1129,3 +1051,3 @@ * | ||
// Always try to read the connect_urls from info | ||
this.processServerUpdate() | ||
this.servers.processServerUpdate() | ||
@@ -1186,3 +1108,3 @@ // Process first INFO | ||
// reset the reconnects for this server | ||
this.currentServer.reconnects = 0 | ||
this.servers.getCurrent().reconnects = 0 | ||
// invoke the callback | ||
@@ -1282,52 +1204,2 @@ this.connectCB() | ||
/** | ||
* Process server updates in info object | ||
* @api internal | ||
*/ | ||
Client.prototype.processServerUpdate = function () { | ||
// noinspection JSUnresolvedVariable | ||
if (this.info.connect_urls && this.info.connect_urls.length > 0) { | ||
// parse the infos | ||
const tmp = {} | ||
this.info.connect_urls.forEach((server) => { | ||
const u = 'nats://' + server | ||
const s = new Server(new url.URL(u)) | ||
// implicit servers are ones added via the info connect_urls | ||
s.implicit = true | ||
tmp[s.url.href] = s | ||
}) | ||
// remove implicit servers that are no longer reported | ||
const toDelete = [] | ||
this.servers.forEach((s, index) => { | ||
const u = s.url.href | ||
if (s.implicit && this.currentServer.url.href !== u && tmp[u] === undefined) { | ||
// server was removed | ||
toDelete.push(index) | ||
} | ||
// remove this entry from reported | ||
delete tmp[u] | ||
}) | ||
// perform the deletion | ||
toDelete.reverse() | ||
toDelete.forEach((index) => { | ||
this.servers.splice(index, 1) | ||
}) | ||
// remaining servers are new | ||
const newURLs = [] | ||
for (const k in tmp) { | ||
if (Object.hasOwnProperty.call(tmp, k)) { | ||
this.servers.push(tmp[k]) | ||
newURLs.push(k) | ||
} | ||
} | ||
if (newURLs.length) { | ||
this.emit(InternalEvents.Servers, newURLs) | ||
} | ||
} | ||
} | ||
/** | ||
* Process a delivered message and deliver to appropriate subscriber. | ||
@@ -1431,16 +1303,2 @@ * | ||
/** | ||
* Push a new cluster server. | ||
* | ||
* @param {String} uri | ||
* @api public | ||
*/ | ||
Client.prototype.addServer = function (uri) { | ||
this.servers.push(new Server(new url.URL(uri))) | ||
if (this.options.noRandomize !== true) { | ||
shuffle(this.servers) | ||
} | ||
} | ||
/** | ||
* Flush outbound queue to server and call optional callback when server has processed | ||
@@ -1489,4 +1347,6 @@ * all data. | ||
subs.forEach((sub) => { | ||
sub.drain((err) => { | ||
errs.push(err) | ||
sub.doDrain((err) => { | ||
if (err) { | ||
errs.push(err) | ||
} | ||
drains.push(sub) | ||
@@ -1496,2 +1356,3 @@ if (drains.length === subs.length) { | ||
this.flush(() => { | ||
this.emit(Events.Close) | ||
this.close() | ||
@@ -1512,6 +1373,6 @@ if (typeof callback === 'function') { | ||
}) | ||
// no subscriptions | ||
if (subs.length === 0) { | ||
this.noMorePublishing = true | ||
this.emit(Events.Close) | ||
this.close() | ||
@@ -1749,9 +1610,9 @@ if (typeof callback === 'function') { | ||
} | ||
this.currentServer.reconnects += 1 | ||
this.servers.getCurrent().reconnects += 1 | ||
this.reconnects += 1 | ||
this.createConnection() | ||
if (this.currentServer.didConnect === true) { | ||
if (this.servers.getCurrent().didConnect === true) { | ||
this.emit(Events.Reconnecting) | ||
} | ||
this.currentServer.lastConnect = Date.now() | ||
this.servers.getCurrent().lastConnect = Date.now() | ||
} | ||
@@ -1766,3 +1627,3 @@ | ||
// Just return if no more servers | ||
if (this.servers.length === 0) { | ||
if (this.servers.isEmpty()) { | ||
return | ||
@@ -1777,3 +1638,3 @@ } | ||
let wait = 0 | ||
if (this.servers[0].didConnect === true) { | ||
if (this.servers.peek().didConnect === true) { | ||
wait = this.options.reconnectTimeWait | ||
@@ -1785,7 +1646,7 @@ } | ||
let maxWait = wait | ||
for (let i = 0; i < this.servers.length; i++) { | ||
const srv = this.selectServer() | ||
for (let i = 0; i < this.servers.length(); i++) { | ||
const srv = this.servers.selectServer() | ||
if (srv.reconnects >= this.options.maxReconnectAttempts && this.options.maxReconnectAttempts !== -1) { | ||
// remove the server - we already tried connecting max number of times | ||
this.servers.pop() | ||
this.servers.remove(srv) | ||
continue | ||
@@ -1811,3 +1672,3 @@ } | ||
if (this.servers.length === 0) { | ||
if (this.servers.isEmpty()) { | ||
// we have no more servers | ||
@@ -1918,2 +1779,8 @@ this.cleanupTimers() | ||
} | ||
this.doDrain(callback) | ||
} | ||
// internal sub drain doesn't check if the connection | ||
// is draining as this is the method used to drain the connection | ||
Sub.prototype.doDrain = function (callback) { | ||
if (this.closed) { | ||
@@ -1971,6 +1838,3 @@ if (typeof callback === 'function') { | ||
Sub.prototype.getReceived = function () { | ||
if (!this.closed) { | ||
return this.received | ||
} | ||
return -1 | ||
return this.received | ||
} | ||
@@ -1993,2 +1857,6 @@ | ||
Sub.prototype.getID = function () { | ||
return this.sid | ||
} | ||
Sub.prototype.subCmd = function () { | ||
@@ -2146,1 +2014,156 @@ if (this.queue) { | ||
} | ||
function Servers (nc) { | ||
this.nc = nc | ||
this.pool = [] | ||
this.current = null | ||
} | ||
Servers.prototype.init = function () { | ||
if (Array.isArray(this.nc.options.servers)) { | ||
this.nc.options.servers.forEach((u) => { | ||
this.pool.push(new Server(new url.URL(u))) | ||
}) | ||
// Randomize if needed | ||
if (this.nc.options.noRandomize !== true) { | ||
this.shuffle() | ||
} | ||
// if they gave an URL we should add it if different | ||
if (this.nc.options.url !== undefined && this.pool.indexOf(this.nc.options.url) === -1) { | ||
// Make url first element so it is attempted first | ||
this.pool.unshift(new Server(new url.URL(this.nc.options.url))) | ||
} | ||
} else { | ||
if (this.nc.options.url === undefined) { | ||
this.nc.options.url = DEFAULT_URI | ||
} | ||
this.pool.push(new Server(new url.URL(this.nc.options.url))) | ||
} | ||
} | ||
Servers.prototype.selectServer = function () { | ||
const server = this.pool.shift() | ||
// Place in client context. | ||
this.current = server | ||
const un = server.url.username || '' | ||
const pw = server.url.password || '' | ||
let up = '' | ||
if (un !== '' && pw !== '') { | ||
up = un + ':' + pw | ||
} else if (un !== '') { | ||
up = un | ||
} | ||
if (up !== '') { | ||
const auth = up.split(':') | ||
if (auth.length !== 1) { | ||
if (this.nc.options.user === undefined) { | ||
this.nc.user = auth[0] | ||
} | ||
if (this.nc.options.pass === undefined) { | ||
this.nc.pass = auth[1] | ||
} | ||
} else { | ||
if (this.nc.options.token === undefined) { | ||
this.nc.token = auth[0] | ||
} | ||
} | ||
} | ||
this.pool.push(server) | ||
return server | ||
} | ||
Servers.prototype.processServerUpdate = function () { | ||
// noinspection JSUnresolvedVariable | ||
if (this.nc.info.connect_urls && this.nc.info.connect_urls.length > 0) { | ||
// parse the infos | ||
const tmp = {} | ||
this.nc.info.connect_urls.forEach((server) => { | ||
const u = 'nats://' + server | ||
const s = new Server(new url.URL(u)) | ||
// implicit servers are ones added via the info connect_urls | ||
s.implicit = true | ||
tmp[s.url.href] = s | ||
}) | ||
// remove implicit servers that are no longer reported | ||
const toDelete = [] | ||
this.pool.forEach((s, index) => { | ||
const u = s.url.href | ||
if (s.implicit && this.current.url.href !== u && tmp[u] === undefined) { | ||
// server was removed | ||
toDelete.push(index) | ||
} | ||
// remove this entry from reported | ||
delete tmp[u] | ||
}) | ||
// perform the deletion | ||
toDelete.reverse() | ||
toDelete.forEach((index) => { | ||
this.pool.splice(index, 1) | ||
}) | ||
// remaining servers are new | ||
const newURLs = [] | ||
for (const k in tmp) { | ||
if (Object.hasOwnProperty.call(tmp, k)) { | ||
this.pool.push(tmp[k]) | ||
newURLs.push(k) | ||
} | ||
} | ||
if (newURLs.length) { | ||
this.nc.emit(InternalEvents.Servers, newURLs) | ||
} | ||
} | ||
} | ||
Servers.prototype.add = function (uri) { | ||
this.pool.push(new Server(new url.URL(uri))) | ||
if (this.nc.options.noRandomize !== true) { | ||
this.shuffle() | ||
} | ||
} | ||
Servers.prototype.length = function () { | ||
return this.pool.length | ||
} | ||
Servers.prototype.peek = function () { | ||
return this.pool[0] | ||
} | ||
Servers.prototype.hasTLS = function () { | ||
this.pool.find((s) => { | ||
return (s.url.protocol === 'tls' || s.url.protocol === 'tls:') | ||
}) | ||
} | ||
Servers.prototype.remove = function (s) { | ||
this.pool = this.pool.filter((t) => { | ||
return t !== s | ||
}) | ||
} | ||
Servers.prototype.getAll = function () { | ||
return this.pool | ||
} | ||
Servers.prototype.isEmpty = function () { | ||
return this.pool.length === 0 | ||
} | ||
Servers.prototype.getCurrent = function () { | ||
return this.current | ||
} | ||
Servers.prototype.shuffle = function () { | ||
for (let i = this.pool.length - 1; i > 0; i--) { | ||
const j = Math.floor(Math.random() * (i + 1)) | ||
const temp = this.pool[i] | ||
this.pool[i] = this.pool[j] | ||
this.pool[j] = temp | ||
} | ||
} |
{ | ||
"name": "nats", | ||
"version": "2.0.0-16", | ||
"version": "2.0.0-17", | ||
"description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system", | ||
@@ -5,0 +5,0 @@ "keywords": [ |
@@ -426,3 +426,3 @@ # NATS.js - Node.js Client | ||
nc.on('connect', (nc) => { | ||
console.log(`connect to ${nc.currentServer.url.host}`) | ||
console.log(`connect to ${nc.servers.getCurrent().url.host}`) | ||
}) | ||
@@ -443,3 +443,3 @@ | ||
nc.on('reconnect', (nc) => { | ||
console.log(`reconnect to ${nc.currentServer.url.host}`) | ||
console.log(`reconnect to ${nc.servers.getCurrent().url.host}`) | ||
}) | ||
@@ -446,0 +446,0 @@ |
@@ -6,2 +6,3 @@ - [ ] try to bind async server errors to matching callbacks | ||
- [X] m.respond() - to reply to requests | ||
- [X] Request object needs `cancel()` | ||
- [X] Request object needs `cancel()` | ||
- [ ] Implement payload (string, json, binary) |
116594
2340