node-nats-streaming-buffered-client
Advanced tools
Comparing version 1.2.6 to 1.3.0
@@ -50,14 +50,12 @@ "use strict"; | ||
* | ||
* @param {Stan} stan The NATS connection | ||
* @param {number} [bufferSize=10] The ring buffer size | ||
* @param {boolean} [waitForInitialConnect=false] Indicates the client can publish even before connecting. Will also change behaviour of the connect call | ||
* @param {number} [bufferSize=10] Size of our publish buffer | ||
* @param {boolean} [waitForInitialConnection=false] Allows publishing to the buffer before initial connect | ||
* @param {*} [logger=console] The console logger to use | ||
* @memberof NatsBufferedClient | ||
*/ | ||
function NatsBufferedClient(bufferSize, reconnectTimeout, waitForInitialConnection, logger) { | ||
function NatsBufferedClient(bufferSize, waitForInitialConnection, logger) { | ||
if (bufferSize === void 0) { bufferSize = 10; } | ||
if (reconnectTimeout === void 0) { reconnectTimeout = 30000; } | ||
if (waitForInitialConnection === void 0) { waitForInitialConnection = false; } | ||
if (logger === void 0) { logger = console; } | ||
var _this = this; | ||
this.reconnectTimeout = reconnectTimeout; | ||
this.waitForInitialConnection = waitForInitialConnection; | ||
@@ -73,9 +71,2 @@ this.logger = logger; | ||
/** | ||
* Indicator we've reached a connected state with the current stan instance | ||
* | ||
* @private | ||
* @memberof NatsBufferedClient | ||
*/ | ||
this.clientConnected = false; | ||
/** | ||
* Indicator we've reached a connected state at least once with the current stan instance | ||
@@ -87,2 +78,3 @@ * | ||
this.initialConnected = false; | ||
this.logger.log('[NATS-BUFFERED-CLIENT] Constructing...'); | ||
// Initialize our ring buffer with the requested size | ||
@@ -95,21 +87,5 @@ // | ||
process.on('exit', function () { return __awaiter(_this, void 0, void 0, function () { | ||
var error_1; | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: | ||
this.logger.debug('[NATS-BUFFERED-CLIENT] EXIT encountered'); | ||
_a.label = 1; | ||
case 1: | ||
_a.trys.push([1, 3, , 4]); | ||
return [4 /*yield*/, this.disconnect()]; | ||
case 2: | ||
_a.sent(); | ||
this.logger.debug('[NATS-BUFFERED-CLIENT] Disconnected due to EXIT'); | ||
return [3 /*break*/, 4]; | ||
case 3: | ||
error_1 = _a.sent(); | ||
this.logger.error('[NATS-BUFFERED-CLIENT] Error during EXIT disconnect', error_1); | ||
return [3 /*break*/, 4]; | ||
case 4: return [2 /*return*/]; | ||
} | ||
this.logger.log('[NATS-BUFFERED-CLIENT] EXIT encountered'); | ||
return [2 /*return*/]; | ||
}); | ||
@@ -120,66 +96,24 @@ }); }); | ||
process.on('SIGINT', function () { return __awaiter(_this, void 0, void 0, function () { | ||
var error_2; | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: | ||
this.logger.debug('[NATS-BUFFERED-CLIENT] SIGINT encountered'); | ||
// Stop any pending reconnect timers | ||
// | ||
clearInterval(this.reconnectTimer); | ||
_a.label = 1; | ||
case 1: | ||
_a.trys.push([1, 3, , 4]); | ||
return [4 /*yield*/, this.disconnect()]; | ||
case 2: | ||
_a.sent(); | ||
this.logger.debug('[NATS-BUFFERED-CLIENT] Disconnected due to SIGINT'); | ||
process.exit(); | ||
return [3 /*break*/, 4]; | ||
case 3: | ||
error_2 = _a.sent(); | ||
this.logger.error('[NATS-BUFFERED-CLIENT] Error during SIGINT disconnect', error_2); | ||
process.exit(); | ||
return [3 /*break*/, 4]; | ||
case 4: return [2 /*return*/]; | ||
this.logger.log('[NATS-BUFFERED-CLIENT] SIGINT encountered'); | ||
try { | ||
if (this.stan) { | ||
this.logger.log('[NATS-BUFFERED-CLIENT] Disconnected due to SIGINT'); | ||
this.disconnect(); | ||
} | ||
} | ||
catch (error) { | ||
this.logger.error('[NATS-BUFFERED-CLIENT] Error during SIGINT disconnect', error); | ||
} | ||
return [2 /*return*/]; | ||
}); | ||
}); }); | ||
} | ||
Object.defineProperty(NatsBufferedClient.prototype, "connected", { | ||
/** | ||
* The getter for the client connection state | ||
* | ||
* @readonly | ||
* @memberof NatsBufferedClient | ||
*/ | ||
get: function () { | ||
return this.clientConnected; | ||
}, | ||
/** | ||
* The setter for the client connection state | ||
* Will stop or reset the client reconnect timer | ||
* | ||
* @memberof NatsBufferedClient | ||
*/ | ||
set: function (newConnectedState) { | ||
var _this = this; | ||
this.clientConnected = newConnectedState; | ||
this.logger.debug('[NATS-BUFFERED-CLIENT] Client connected status', this.clientConnected); | ||
// This timer will try to reconnect to the server on prolonged absence | ||
// | ||
clearInterval(this.reconnectTimer); | ||
if (!this.clientConnected) { | ||
this.logger.debug('[NATS-BUFFERED-CLIENT] Starting reconnect timer', this.reconnectTimeout); | ||
this.reconnectTimer = setInterval(function () { | ||
_this.logger.debug('[NATS-BUFFERED-CLIENT] Timer triggered reconnect...'); | ||
_this.reconnect(); | ||
}, this.reconnectTimeout); | ||
} | ||
}, | ||
enumerable: true, | ||
configurable: true | ||
}); | ||
/** | ||
* Connect to the NATS server | ||
* | ||
* @param {string} clusterId The name of the nats cluster | ||
* @param {string} clientId The client identifier to use | ||
* @param {nats.StanOptions} [options] Streaming connection options. Will be amended with out defaults | ||
* @returns {Promise<boolean>} | ||
* @memberof NatsBufferedClient | ||
@@ -189,71 +123,59 @@ */ | ||
var _this = this; | ||
return new Promise(function (resolve, reject) { return __awaiter(_this, void 0, void 0, function () { | ||
var error_3; | ||
var _this = this; | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: | ||
_a.trys.push([0, 2, , 3]); | ||
return [4 /*yield*/, this.disconnect()]; | ||
case 1: | ||
_a.sent(); | ||
return [3 /*break*/, 3]; | ||
case 2: | ||
error_3 = _a.sent(); | ||
this.logger.error('[NATS-BUFFERED-CLIENT] Error during disconnect', error_3); | ||
return [3 /*break*/, 3]; | ||
case 3: | ||
// Reset connected state | ||
// | ||
this.connected = false; | ||
this.stan = undefined; | ||
// Connect to NATS server | ||
// | ||
this.logger.debug('[NATS-BUFFERED-CLIENT] Connecting...'); | ||
this.stan = nats.connect(clusterId, clientId, options); | ||
// Store connection parameters | ||
// | ||
this.clusterId = clusterId; | ||
this.clientId = clientId; | ||
this.clientOptions = options; | ||
// Listen for connect events | ||
// | ||
this.stan.on('connect', function () { | ||
_this.logger.debug('[NATS-BUFFERED-CLIENT] Connected'); | ||
_this.connected = true; | ||
_this.initialConnected = true; | ||
// Start processing the buffer | ||
// | ||
_this.tick(); | ||
// Resolve initial connection promise | ||
// | ||
resolve(true); | ||
}); | ||
this.stan.on('error', function (error) { | ||
_this.logger.error('[NATS-BUFFERED-CLIENT] Server error', error); | ||
// Reject initial connection promise | ||
// | ||
if (!_this.initialConnected) { | ||
reject(error); | ||
} | ||
}); | ||
this.stan.on('reconnecting', function () { | ||
_this.logger.debug('[NATS-BUFFERED-CLIENT] Reconnecting'); | ||
}); | ||
this.stan.on('reconnect', function () { | ||
_this.logger.debug('[NATS-BUFFERED-CLIENT] Reconnected'); | ||
}); | ||
this.stan.on('disconnect', function () { | ||
_this.logger.debug('[NATS-BUFFERED-CLIENT] Disconnected'); | ||
}); | ||
this.stan.on('close', function () { | ||
_this.logger.debug('[NATS-BUFFERED-CLIENT] Closed connection'); | ||
}); | ||
this.stan.on('permission_error', function (error) { | ||
_this.logger.debug('[NATS-BUFFERED-CLIENT] Permission error', error); | ||
}); | ||
return [2 /*return*/]; | ||
return new Promise(function (resolve, reject) { | ||
// Store connection parameters | ||
// Apply default options for recommended reconnect logic | ||
// | ||
_this.clusterId = clusterId; | ||
_this.clientId = clientId; | ||
var defaultOptions = { | ||
maxReconnectAttempts: -1, | ||
reconnect: true, | ||
waitOnFirstConnect: true, | ||
}; | ||
_this.clientOptions = Object.assign(defaultOptions, options); | ||
// Connect to NATS server | ||
// | ||
_this.logger.log('[NATS-BUFFERED-CLIENT] Connecting...', clusterId, clientId, _this.clientOptions); | ||
_this.stan = nats.connect(clusterId, clientId, _this.clientOptions); | ||
// Listen for connect events | ||
// | ||
_this.stan.on('connect', function () { | ||
_this.logger.log('[NATS-BUFFERED-CLIENT] Connected'); | ||
_this.initialConnected = true; | ||
// Start processing the buffer | ||
// | ||
_this.logger.log('[NATS-BUFFERED-CLIENT] Start buffer processing...'); | ||
_this.tick(); | ||
// Resolve initial connection promise | ||
// | ||
resolve(true); | ||
}); | ||
_this.stan.on('error', function (error) { | ||
_this.logger.error('[NATS-BUFFERED-CLIENT] Server error', error); | ||
// Reject initial connection promise | ||
// | ||
if (!_this.initialConnected) { | ||
reject(error); | ||
} | ||
}); | ||
}); }); | ||
_this.stan.on('reconnecting', function () { | ||
_this.logger.log('[NATS-BUFFERED-CLIENT] Reconnecting'); | ||
}); | ||
_this.stan.on('reconnect', function (stan) { | ||
_this.logger.log('[NATS-BUFFERED-CLIENT] Reconnected', stan === _this.stan); | ||
// Resume processing the buffer | ||
// | ||
// this.logger.log( '[NATS-BUFFERED-CLIENT] Resuming buffer processing...' ); | ||
// this.tick(); | ||
}); | ||
_this.stan.on('disconnect', function () { | ||
_this.logger.log('[NATS-BUFFERED-CLIENT] Disconnected'); | ||
}); | ||
_this.stan.on('close', function () { | ||
_this.logger.log('[NATS-BUFFERED-CLIENT] Closed connection'); | ||
}); | ||
_this.stan.on('permission_error', function (error) { | ||
_this.logger.log('[NATS-BUFFERED-CLIENT] Permission error', error); | ||
}); | ||
}); | ||
}; | ||
@@ -273,3 +195,3 @@ /** | ||
// | ||
if (_this.stan && _this.connected) { | ||
if (_this.stan) { | ||
var currentConnection = _this.stan; | ||
@@ -279,3 +201,2 @@ currentConnection.on('disconnect', resolve); | ||
currentConnection.close(); | ||
_this.connected = false; | ||
_this.stan = undefined; | ||
@@ -286,3 +207,3 @@ } | ||
// | ||
_this.logger.debug('[NATS-BUFFERED-CLIENT] Not connected so no need to disconnect', _this.connected, _this.stan === undefined); | ||
_this.logger.debug('[NATS-BUFFERED-CLIENT] Not connected so no need to disconnect', _this.stan === undefined); | ||
resolve(); | ||
@@ -298,4 +219,23 @@ } | ||
NatsBufferedClient.prototype.reconnect = function () { | ||
this.logger.debug('[NATS-BUFFERED-CLIENT] Reconnecting...'); | ||
this.connect(this.clusterId, this.clientId, this.clientOptions); | ||
return __awaiter(this, void 0, void 0, function () { | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: | ||
// First close existing connection if still available | ||
// | ||
this.logger.log('[NATS-BUFFERED-CLIENT] Reconnect requested. Disconnecting...'); | ||
return [4 /*yield*/, this.disconnect()]; | ||
case 1: | ||
_a.sent(); | ||
// Create a new connection | ||
// | ||
this.logger.log('[NATS-BUFFERED-CLIENT] Disconnected trying to connect again...'); | ||
return [4 /*yield*/, this.connect(this.clusterId, this.clientId, this.clientOptions)]; | ||
case 2: | ||
_a.sent(); | ||
this.logger.log('[NATS-BUFFERED-CLIENT] Reconnect completed'); | ||
return [2 /*return*/]; | ||
} | ||
}); | ||
}); | ||
}; | ||
@@ -319,3 +259,3 @@ /** | ||
this.buffer.push({ subject: subject, data: data }); | ||
this.logger.debug('[NATS-BUFFERED-CLIENT] Added message to buffer', subject, data); | ||
this.logger.log('[NATS-BUFFERED-CLIENT] Added message to buffer', subject, data); | ||
// Resume buffer processing if needed | ||
@@ -340,3 +280,3 @@ // | ||
// | ||
this.logger.debug('[NATS-BUFFERED-CLIENT] Buffer is full. Dropping data:', data); | ||
this.logger.log('[NATS-BUFFERED-CLIENT] Buffer is full. Dropping data:', data); | ||
}; | ||
@@ -360,21 +300,19 @@ /** | ||
_this.logger.error('[NATS-BUFFERED-CLIENT] Publish failed', error); | ||
// Push the item back onto the buffer | ||
// | ||
_this.buffer.unshift(pub_1); | ||
// Reconnect to the server on publish error | ||
// Trigger a reconnect to reset the connection and begin processing again | ||
// | ||
_this.logger.warn('[NATS-BUFFERED-CLIENT] Reconnect to server due to publish error'); | ||
_this.reconnect(); | ||
// this.reconnect(); | ||
} | ||
else { | ||
_this.logger.debug('[NATS-BUFFERED-CLIENT] Publish done', pub_1); | ||
// If we can publish we are connected | ||
// | ||
_this.connected = true; | ||
// Next! | ||
// | ||
_this.tick(); | ||
_this.logger.log('[NATS-BUFFERED-CLIENT] Publish done', pub_1); | ||
} | ||
// Next buffer item or retry that last one | ||
// | ||
_this.tick(); | ||
}); | ||
} | ||
else { | ||
this.logger.debug('[NATS-BUFFERED-CLIENT] Buffer is empty. Going to sleep'); | ||
this.logger.log('[NATS-BUFFERED-CLIENT] Buffer is empty. Going to sleep'); | ||
this.ticking = false; | ||
@@ -381,0 +319,0 @@ } |
@@ -10,3 +10,2 @@ /// <reference types="node" /> | ||
export declare class NatsBufferedClient { | ||
private reconnectTimeout; | ||
private waitForInitialConnection; | ||
@@ -62,9 +61,2 @@ private logger; | ||
/** | ||
* Indicator we've reached a connected state with the current stan instance | ||
* | ||
* @private | ||
* @memberof NatsBufferedClient | ||
*/ | ||
private clientConnected; | ||
/** | ||
* Indicator we've reached a connected state at least once with the current stan instance | ||
@@ -77,34 +69,17 @@ * | ||
/** | ||
* The reconnect timer | ||
* | ||
* @private | ||
* @type {NodeJS.Timer} | ||
* @memberof NatsBufferedClient | ||
*/ | ||
private reconnectTimer; | ||
/** | ||
* The getter for the client connection state | ||
* | ||
* @readonly | ||
* @memberof NatsBufferedClient | ||
*/ | ||
/** | ||
* The setter for the client connection state | ||
* Will stop or reset the client reconnect timer | ||
* | ||
* @memberof NatsBufferedClient | ||
*/ | ||
connected: boolean; | ||
/** | ||
* Creates an instance of NatsBufferedClient | ||
* | ||
* @param {Stan} stan The NATS connection | ||
* @param {number} [bufferSize=10] The ring buffer size | ||
* @param {boolean} [waitForInitialConnect=false] Indicates the client can publish even before connecting. Will also change behaviour of the connect call | ||
* @param {number} [bufferSize=10] Size of our publish buffer | ||
* @param {boolean} [waitForInitialConnection=false] Allows publishing to the buffer before initial connect | ||
* @param {*} [logger=console] The console logger to use | ||
* @memberof NatsBufferedClient | ||
*/ | ||
constructor(bufferSize?: number, reconnectTimeout?: number, waitForInitialConnection?: boolean, logger?: Console); | ||
constructor(bufferSize?: number, waitForInitialConnection?: boolean, logger?: Console); | ||
/** | ||
* Connect to the NATS server | ||
* | ||
* @param {string} clusterId The name of the nats cluster | ||
* @param {string} clientId The client identifier to use | ||
* @param {nats.StanOptions} [options] Streaming connection options. Will be amended with out defaults | ||
* @returns {Promise<boolean>} | ||
* @memberof NatsBufferedClient | ||
@@ -125,3 +100,3 @@ */ | ||
*/ | ||
reconnect(): void; | ||
reconnect(): Promise<void>; | ||
/** | ||
@@ -128,0 +103,0 @@ * Push an item into the buffer to publish it |
{ | ||
"name": "node-nats-streaming-buffered-client", | ||
"version": "1.2.6", | ||
"version": "1.3.0", | ||
"description": "", | ||
@@ -5,0 +5,0 @@ "keywords": [], |
# NATS Streaming Buffered Client | ||
[![npm version](https://badge.fury.io/js/node-nats-streaming-buffered-client.svg)](https://badge.fury.io/js/node-nats-streaming-buffered-client) | ||
[![Travis](https://travis-ci.com/SpringTree/node-nats-streaming-buffered-client.svg?branch=master)](https://travis-ci.com/SpringTree/node-nats-streaming-buffered-client) | ||
@@ -35,7 +36,9 @@ | ||
const bufferSize = 2000; | ||
const reconnectTimeout = 30000; | ||
const waitForInitialConnect = false; | ||
const logger = console; | ||
const client = new NatsBufferedClient( bufferSize, reconnectTimeout, waitForInitialConnect, logger ); | ||
// NOTE: constructor parameters have changed in v0.3.0 | ||
// | ||
const client = new NatsBufferedClient( bufferSize, waitForInitialConnect, logger ); | ||
// Connect to the NATS server | ||
@@ -51,7 +54,23 @@ // NATS connect options: https://github.com/nats-io/node-nats#connect-options | ||
// Access to NATS client instance is available | ||
// Access to NATS Streaming client instance is available | ||
// | ||
const subsription = client.stan.subscribe( 'topic', ... ); | ||
const subscription = client.stan.subscribe( 'topic', ... ); | ||
``` | ||
There is a more complete test client [here](test/client-demo.js) | ||
### A note on nats connect options | ||
The reconnect logic from the nats streaming client relies on these 3 options: | ||
```javascript | ||
const defaultOptions = { | ||
maxReconnectAttempts: -1, | ||
reconnect: true, | ||
waitOnFirstConnect: true, | ||
}; | ||
``` | ||
Be very careful when supplying your own connect options to not change these unless you know what you're doing. | ||
## NPM scripts | ||
@@ -58,0 +77,0 @@ |
Sorry, the diff of this file is not supported yet
78
29109
438