node-nats-streaming
Advanced tools
Comparing version 0.0.52 to 0.1.0
205
lib/stan.js
@@ -35,3 +35,3 @@ /* | ||
*/ | ||
var VERSION = '0.0.52', | ||
var VERSION = '0.1.0', | ||
DEFAULT_PORT = 4222, | ||
@@ -59,4 +59,10 @@ DEFAULT_PRE = 'nats://localhost:', | ||
SUB_REQ_TIMEOUT = 'stan: subscribe request timeout', | ||
UNSUB_REQ_TIMEOUT = 'stan: unsubscribe request timeout'; | ||
UNSUB_REQ_TIMEOUT = 'stan: unsubscribe request timeout', | ||
PROTOCOL_ONE = 1, | ||
DEFAULT_PING_INTERVAL = 5 * 1000, | ||
DEFAULT_PING_MAXOUT = 3, | ||
MAX_PINGS_EXCEEDED = 'stan: connection lost due to PING failure'; | ||
/** | ||
@@ -130,3 +136,6 @@ * Library Version | ||
maxPubAcksInflight: DEFAULT_MAX_IN_FLIGHT, | ||
stanEncoding: 'utf8' | ||
stanEncoding: 'utf8', | ||
pingInterval: DEFAULT_PING_INTERVAL, | ||
pingMaxOut: DEFAULT_PING_MAXOUT, | ||
maxReconnectAttempts: -1 | ||
}; | ||
@@ -152,2 +161,4 @@ | ||
this.assignOption(opts, 'stanEncoding'); | ||
this.assignOption(opts, 'pingInterval'); | ||
this.assignOption(opts, 'pingMaxOut'); | ||
@@ -287,2 +298,15 @@ // node-nats does takes a bunch of other options | ||
that.pingInbox = nats.createInbox(); | ||
that.pingSubscription = that.nc.subscribe(that.pingInbox, function(msg) { | ||
if (msg) { | ||
var pingResponse = proto.PingResponse.deserializeBinary(Buffer.from(msg, 'binary').toByteArray()); | ||
var err = pingResponse.getError(); | ||
if (err) { | ||
that.closeWithError('connection_lost', err); | ||
return; | ||
} | ||
} | ||
that.pingOut = 0; | ||
}); | ||
that.ackSubscription = that.nc.subscribe(that.ackSubject, that.processAck()); | ||
@@ -292,23 +316,17 @@ | ||
//noinspection JSUnresolvedFunction | ||
that.connId = Buffer.from(nuid.next(), "utf8"); | ||
var req = new proto.ConnectRequest(); | ||
req.setClientId(that.clientID); | ||
req.setHeartbeatInbox(hbInbox); | ||
req.setProtocol(PROTOCOL_ONE); | ||
req.setConnId(that.connId); | ||
req.setPingInterval(Math.ceil(that.options.pingInterval / 1000)); | ||
req.setPingMaxOut(that.options.pingMaxOut); | ||
// cleanup the connection | ||
function cleanupConnection(error) { | ||
that.emit('error', error); | ||
var nc = that.nc; | ||
delete that.nc; | ||
nc.flush(function(){ | ||
if (that.ncOwned) { | ||
nc.close(); | ||
that.emit('close'); | ||
} | ||
}); | ||
} | ||
// fixme: hardcoded timeout | ||
that.nc.requestOne(discoverSubject, Buffer.from(req.serializeBinary()), 2*1000, function(msg) { | ||
if(msg instanceof nats.NatsError) { | ||
cleanupConnection(msg); | ||
that.closeWithError('error', msg); | ||
return; | ||
@@ -319,3 +337,3 @@ } | ||
if (cr.getError() !== "") { | ||
cleanupConnection(cr.getError()); | ||
that.closeWithError('error', cr.getError()); | ||
return; | ||
@@ -328,2 +346,33 @@ } | ||
that.closeRequests = cr.getCloseRequests(); | ||
var unsubPingSub = true; | ||
if (cr.getProtocol() >= PROTOCOL_ONE) { | ||
if (cr.getPingInterval() !== 0) { | ||
unsubPingSub = false; | ||
that.pingRequests = cr.getPingRequests(); | ||
that.pingInterval = cr.getPingInterval() * 1000; | ||
that.pingMaxOut = cr.getPingMaxOut(); | ||
var ping = new proto.Ping(); | ||
ping.setConnId(that.connId); | ||
that.pingBytes = Buffer.from(ping.serializeBinary()); | ||
that.pingOut = 0; | ||
that.pingTimer = setTimeout(function pingFun() { | ||
that.pingOut++; | ||
if (that.pingOut > that.pingMaxOut) { | ||
that.closeWithError('connection_lost', new Error(MAX_PINGS_EXCEEDED)); | ||
return; | ||
} | ||
that.nc.publish(that.pingRequests, that.pingBytes, that.pingInbox); | ||
that.pingTimer = setTimeout(pingFun, that.pingInterval); | ||
}, that.pingInterval); | ||
} | ||
} | ||
if (unsubPingSub) { | ||
that.nc.unsubscribe(that.pingSubscription); | ||
that.pingSubscription = null; | ||
} | ||
that.emit('connect', that); | ||
@@ -356,2 +405,62 @@ }); | ||
/** | ||
* Close stan invoking the event notification with the | ||
* specified error, followed by a close notification. | ||
* @param event | ||
* @param error | ||
* @private | ||
*/ | ||
Stan.prototype.closeWithError = function(event, error) { | ||
if (this.nc === undefined || this.clientID === undefined) { | ||
return; | ||
} | ||
this.cleanupOnClose(error); | ||
if (this.ncOwned) { | ||
this.nc.close(); | ||
} | ||
this.emit(event, error); | ||
this.emit('close'); | ||
}; | ||
/** | ||
* Cleanup stan protocol subscriptions, pings and pending acks | ||
* @param err | ||
* @private | ||
*/ | ||
Stan.prototype.cleanupOnClose = function(err) { | ||
// remove the ping timer | ||
if (this.pingTimer) { | ||
timers.clearTimeout(this.pingTimer); | ||
delete this.pingTimer; | ||
} | ||
// if we don't own the connection, we unsub to insure | ||
// that a subsequent reconnect will properly clean up. | ||
// Otherwise the close() will take care of it. | ||
if(!this.ncOwned && this.nc) { | ||
if (this.ackSubscription) { | ||
this.nc.unsubscribe(this.ackSubscription); | ||
this.ackSubscription = null; | ||
} | ||
if(this.pingSubscription) { | ||
this.nc.unsubscribe(this.pingSubscription); | ||
this.pingSubscription = null; | ||
} | ||
if(this.hbSubscription) { | ||
this.nc.unsubscribe(this.hbSubscription); | ||
this.hbSubscription = null; | ||
} | ||
} | ||
for (var guid in this.pubAckMap) { | ||
if(!this.pubAckMap.hasOwnProperty(guid)) { | ||
continue; | ||
} | ||
var a = this.removeAck(guid); | ||
if (a && a.ah && typeof a.ah === 'function') { | ||
a.ah(err, guid); | ||
} | ||
} | ||
}; | ||
/** | ||
* Closes the NATS streaming server connection, or returns if already closed. | ||
@@ -365,31 +474,40 @@ * @fire Stan.close, Stan.error | ||
} | ||
if (this.ackSubscription !== null) { | ||
this.nc.unsubscribe(this.ackSubscription); | ||
} | ||
this.cleanupOnClose(new Error(CONN_CLOSED)); | ||
var that = this; | ||
//noinspection JSUnresolvedFunction | ||
var req = new proto.CloseRequest(); | ||
req.setClientId(this.clientID); | ||
this.nc.requestOne(this.closeRequests, Buffer.from(req.serializeBinary()), {}, that.options.connectTimeout, function(msgOrError) { | ||
var nc = that.nc; | ||
delete that.nc; | ||
//noinspection JSUnresolvedVariable | ||
if(msgOrError instanceof nats.NatsError) { | ||
that.emit('error', msgOrError); | ||
} else { | ||
if(this.nc && this.closeRequests) { | ||
var req = new proto.CloseRequest(); | ||
req.setClientId(this.clientID); | ||
this.nc.requestOne(this.closeRequests, Buffer.from(req.serializeBinary()), {}, that.options.connectTimeout, function (msgOrError) { | ||
var nc = that.nc; | ||
delete that.nc; | ||
var closeError = null; | ||
//noinspection JSUnresolvedVariable | ||
if (msgOrError instanceof nats.NatsError) { | ||
// if we get an error here, we simply show it in the close notification as there's not much we can do here. | ||
closeError = msgOrError; | ||
} else { | ||
var cr = proto.CloseResponse.deserializeBinary(Buffer.from(msgOrError, 'binary').toByteArray()); | ||
var err = cr.getError(); | ||
if (err && err.length > 0) { | ||
that.emit('error', new Error(err)); | ||
// if the protocol returned an error there's nothing for us to handle, pass it as an arg to close notification. | ||
closeError = new Error(err); | ||
} | ||
} | ||
// go closes always | ||
if (nc && that.ncOwned) { | ||
nc.close(); | ||
} | ||
that.emit('close', closeError); | ||
}); | ||
} else { | ||
if (this.nc && this.ncOwned) { | ||
this.nc.close(); | ||
delete that.nc; | ||
} | ||
// go closes always | ||
if (that.ncOwned) { | ||
nc.close(); | ||
that.emit('close'); | ||
} | ||
}); | ||
that.emit('close'); | ||
} | ||
}; | ||
/** | ||
@@ -467,2 +585,3 @@ * @return {Function} for processing acks associated with the protocol | ||
pe.setClientId(this.clientID); | ||
pe.setConnId(this.connId); | ||
pe.setGuid(peGUID); | ||
@@ -879,3 +998,15 @@ pe.setSubject(subject); | ||
/** | ||
* | ||
* @returns {!(string|Uint8Array)} | ||
*/ | ||
Message.prototype.getClientID = function() { | ||
return this.msg.getConnId(); | ||
}; | ||
Message.prototype.getConnectionID = function() { | ||
return this.msg.getClientId(); | ||
}; | ||
/** | ||
@@ -882,0 +1013,0 @@ * Returns an object with various constants for StartPosition (NEW_ONLY, |
{ | ||
"name": "node-nats-streaming", | ||
"version": "0.0.52", | ||
"version": "0.1.0", | ||
"description": "Node.js client for NATS Streaming, a lightweight, high-performance cloud native messaging system", | ||
@@ -5,0 +5,0 @@ "keywords": [ |
@@ -200,3 +200,2 @@ # NATS Streaming - Node.js Client | ||
### Rate limiting/matching | ||
@@ -219,2 +218,37 @@ | ||
### Connection Status | ||
The fact that the NATS Streaming server and clients are not directly connected poses a challenge when it comes to know if a client is still valid. When a client disconnects, the streaming server is not notified, hence the importance of calling `stan#close()`. The server sends heartbeats to the client's private inbox and if it misses a certain number of responses, it will consider the client's connection lost and remove it from its state. | ||
Before version `0.1.0`, the client library was not sending PINGs to the streaming server to detect connection failure. This was problematic especially if an application was never sending data (had only subscriptions for instance). Picture the case where a client connects to a NATS Server which has a route to a NATS Streaming server (either connecting to a standalone NATS Server or the server it embeds). If the connection between the streaming server and the client's NATS Server is broken, the client's NATS connection would still be ok, yet, no communication with the streaming server is possible. | ||
Starting version `0.1.0` of this library and server `0.10.0`, the client library will now send PINGs at regular intervals (default is `5000` milliseconds) and will close the streaming connection after a certain number of PINGs have been sent without any response (default is `3`). When that happens, a callback - if one is registered - will be invoked to notify the user that the connection is permanently lost, and the reason for the failure. | ||
Here is how you would specify your own PING values and the callback: | ||
```javascript | ||
var STAN = require('node-nats-streaming'); | ||
sc.connect('test-cluster', 'test', { | ||
maxPingOut: 3, | ||
pingInterval: 1000 | ||
}); | ||
sc.on('connect', function () { | ||
sc.on('connection_lost', function(error) { | ||
console.log('disconnected from stan', error); | ||
}); | ||
... | ||
``` | ||
Note that the only way to be notified is to set the callback. If the callback is not set, PINGs are still sent and the connection will be closed if needed, but the application won't know if it has only subscriptions. | ||
When the connection is lost, your application would have to re-create it and all subscriptions if any. | ||
When no NATS connection is provided via the connection options, the library creates its own NATS connection and will now set the reconnect attempts (`maxReconnectAttempts`) to "infinite" (`-1`), which was not the case before. It should therefore be possible for the library to always reconnect, but this does not mean that the streaming connection will not be closed, even if you set a very high threshold for the PINGs max out value. Keep in mind that while the client is disconnected, the server is sending heartbeats to the clients too, and when not getting any response, it will remove that client from its state. When the communication is restored, the PINGs sent to the server will allow to detect this condition and report to the client that the connection is now closed. | ||
Also, while a client is "disconnected" from the server, another application with connectivity to the streaming server may connect and uses the same client ID. The server, when detecting the duplicate client ID, will try to contact the first client to know if it should reject the connect request of the second client. Since the communication between the server and the first client is broken, the server will not get a response and therefore will replace the first client with the second one. | ||
Prior to client `0.1.0` and server `0.10.0`, if the communication between the first client and server were to be restored, and the application would send messages, the server would accept those because the published messages client ID would be valid, although the client is not. With client at `0.1.0`+ and server `0.10.0`+, additional information is sent with each message to allow the server to reject messages from a client that has been replaced by another client. | ||
## Supported Node Versions | ||
@@ -221,0 +255,0 @@ |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
160454
3921
263