New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

node-nats-streaming-buffered-client

Package Overview
Dependencies
Maintainers
1
Versions
25
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

node-nats-streaming-buffered-client - npm Package Compare versions

Comparing version 1.2.6 to 1.3.0

282

dist/node-nats-streaming-buffered-client.js

@@ -50,14 +50,12 @@ "use strict";

*
* @param {Stan} stan The NATS connection
* @param {number} [bufferSize=10] The ring buffer size
* @param {boolean} [waitForInitialConnect=false] Indicates the client can publish even before connecting. Will also change behaviour of the connect call
* @param {number} [bufferSize=10] Size of our publish buffer
* @param {boolean} [waitForInitialConnection=false] Allows publishing to the buffer before initial connect
* @param {*} [logger=console] The console logger to use
* @memberof NatsBufferedClient
*/
function NatsBufferedClient(bufferSize, reconnectTimeout, waitForInitialConnection, logger) {
function NatsBufferedClient(bufferSize, waitForInitialConnection, logger) {
if (bufferSize === void 0) { bufferSize = 10; }
if (reconnectTimeout === void 0) { reconnectTimeout = 30000; }
if (waitForInitialConnection === void 0) { waitForInitialConnection = false; }
if (logger === void 0) { logger = console; }
var _this = this;
this.reconnectTimeout = reconnectTimeout;
this.waitForInitialConnection = waitForInitialConnection;

@@ -73,9 +71,2 @@ this.logger = logger;

/**
* Indicator we've reached a connected state with the current stan instance
*
* @private
* @memberof NatsBufferedClient
*/
this.clientConnected = false;
/**
* Indicator we've reached a connected state at least once with the current stan instance

@@ -87,2 +78,3 @@ *

this.initialConnected = false;
this.logger.log('[NATS-BUFFERED-CLIENT] Constructing...');
// Initialize our ring buffer with the requested size

@@ -95,21 +87,5 @@ //

process.on('exit', function () { return __awaiter(_this, void 0, void 0, function () {
var error_1;
return __generator(this, function (_a) {
switch (_a.label) {
case 0:
this.logger.debug('[NATS-BUFFERED-CLIENT] EXIT encountered');
_a.label = 1;
case 1:
_a.trys.push([1, 3, , 4]);
return [4 /*yield*/, this.disconnect()];
case 2:
_a.sent();
this.logger.debug('[NATS-BUFFERED-CLIENT] Disconnected due to EXIT');
return [3 /*break*/, 4];
case 3:
error_1 = _a.sent();
this.logger.error('[NATS-BUFFERED-CLIENT] Error during EXIT disconnect', error_1);
return [3 /*break*/, 4];
case 4: return [2 /*return*/];
}
this.logger.log('[NATS-BUFFERED-CLIENT] EXIT encountered');
return [2 /*return*/];
});

@@ -120,66 +96,24 @@ }); });

process.on('SIGINT', function () { return __awaiter(_this, void 0, void 0, function () {
var error_2;
return __generator(this, function (_a) {
switch (_a.label) {
case 0:
this.logger.debug('[NATS-BUFFERED-CLIENT] SIGINT encountered');
// Stop any pending reconnect timers
//
clearInterval(this.reconnectTimer);
_a.label = 1;
case 1:
_a.trys.push([1, 3, , 4]);
return [4 /*yield*/, this.disconnect()];
case 2:
_a.sent();
this.logger.debug('[NATS-BUFFERED-CLIENT] Disconnected due to SIGINT');
process.exit();
return [3 /*break*/, 4];
case 3:
error_2 = _a.sent();
this.logger.error('[NATS-BUFFERED-CLIENT] Error during SIGINT disconnect', error_2);
process.exit();
return [3 /*break*/, 4];
case 4: return [2 /*return*/];
this.logger.log('[NATS-BUFFERED-CLIENT] SIGINT encountered');
try {
if (this.stan) {
this.logger.log('[NATS-BUFFERED-CLIENT] Disconnected due to SIGINT');
this.disconnect();
}
}
catch (error) {
this.logger.error('[NATS-BUFFERED-CLIENT] Error during SIGINT disconnect', error);
}
return [2 /*return*/];
});
}); });
}
Object.defineProperty(NatsBufferedClient.prototype, "connected", {
/**
* The getter for the client connection state
*
* @readonly
* @memberof NatsBufferedClient
*/
get: function () {
return this.clientConnected;
},
/**
* The setter for the client connection state
* Will stop or reset the client reconnect timer
*
* @memberof NatsBufferedClient
*/
set: function (newConnectedState) {
var _this = this;
this.clientConnected = newConnectedState;
this.logger.debug('[NATS-BUFFERED-CLIENT] Client connected status', this.clientConnected);
// This timer will try to reconnect to the server on prolonged absence
//
clearInterval(this.reconnectTimer);
if (!this.clientConnected) {
this.logger.debug('[NATS-BUFFERED-CLIENT] Starting reconnect timer', this.reconnectTimeout);
this.reconnectTimer = setInterval(function () {
_this.logger.debug('[NATS-BUFFERED-CLIENT] Timer triggered reconnect...');
_this.reconnect();
}, this.reconnectTimeout);
}
},
enumerable: true,
configurable: true
});
/**
* Connect to the NATS server
*
* @param {string} clusterId The name of the nats cluster
* @param {string} clientId The client identifier to use
* @param {nats.StanOptions} [options] Streaming connection options. Will be amended with out defaults
* @returns {Promise<boolean>}
* @memberof NatsBufferedClient

@@ -189,71 +123,59 @@ */

var _this = this;
return new Promise(function (resolve, reject) { return __awaiter(_this, void 0, void 0, function () {
var error_3;
var _this = this;
return __generator(this, function (_a) {
switch (_a.label) {
case 0:
_a.trys.push([0, 2, , 3]);
return [4 /*yield*/, this.disconnect()];
case 1:
_a.sent();
return [3 /*break*/, 3];
case 2:
error_3 = _a.sent();
this.logger.error('[NATS-BUFFERED-CLIENT] Error during disconnect', error_3);
return [3 /*break*/, 3];
case 3:
// Reset connected state
//
this.connected = false;
this.stan = undefined;
// Connect to NATS server
//
this.logger.debug('[NATS-BUFFERED-CLIENT] Connecting...');
this.stan = nats.connect(clusterId, clientId, options);
// Store connection parameters
//
this.clusterId = clusterId;
this.clientId = clientId;
this.clientOptions = options;
// Listen for connect events
//
this.stan.on('connect', function () {
_this.logger.debug('[NATS-BUFFERED-CLIENT] Connected');
_this.connected = true;
_this.initialConnected = true;
// Start processing the buffer
//
_this.tick();
// Resolve initial connection promise
//
resolve(true);
});
this.stan.on('error', function (error) {
_this.logger.error('[NATS-BUFFERED-CLIENT] Server error', error);
// Reject initial connection promise
//
if (!_this.initialConnected) {
reject(error);
}
});
this.stan.on('reconnecting', function () {
_this.logger.debug('[NATS-BUFFERED-CLIENT] Reconnecting');
});
this.stan.on('reconnect', function () {
_this.logger.debug('[NATS-BUFFERED-CLIENT] Reconnected');
});
this.stan.on('disconnect', function () {
_this.logger.debug('[NATS-BUFFERED-CLIENT] Disconnected');
});
this.stan.on('close', function () {
_this.logger.debug('[NATS-BUFFERED-CLIENT] Closed connection');
});
this.stan.on('permission_error', function (error) {
_this.logger.debug('[NATS-BUFFERED-CLIENT] Permission error', error);
});
return [2 /*return*/];
return new Promise(function (resolve, reject) {
// Store connection parameters
// Apply default options for recommended reconnect logic
//
_this.clusterId = clusterId;
_this.clientId = clientId;
var defaultOptions = {
maxReconnectAttempts: -1,
reconnect: true,
waitOnFirstConnect: true,
};
_this.clientOptions = Object.assign(defaultOptions, options);
// Connect to NATS server
//
_this.logger.log('[NATS-BUFFERED-CLIENT] Connecting...', clusterId, clientId, _this.clientOptions);
_this.stan = nats.connect(clusterId, clientId, _this.clientOptions);
// Listen for connect events
//
_this.stan.on('connect', function () {
_this.logger.log('[NATS-BUFFERED-CLIENT] Connected');
_this.initialConnected = true;
// Start processing the buffer
//
_this.logger.log('[NATS-BUFFERED-CLIENT] Start buffer processing...');
_this.tick();
// Resolve initial connection promise
//
resolve(true);
});
_this.stan.on('error', function (error) {
_this.logger.error('[NATS-BUFFERED-CLIENT] Server error', error);
// Reject initial connection promise
//
if (!_this.initialConnected) {
reject(error);
}
});
}); });
_this.stan.on('reconnecting', function () {
_this.logger.log('[NATS-BUFFERED-CLIENT] Reconnecting');
});
_this.stan.on('reconnect', function (stan) {
_this.logger.log('[NATS-BUFFERED-CLIENT] Reconnected', stan === _this.stan);
// Resume processing the buffer
//
// this.logger.log( '[NATS-BUFFERED-CLIENT] Resuming buffer processing...' );
// this.tick();
});
_this.stan.on('disconnect', function () {
_this.logger.log('[NATS-BUFFERED-CLIENT] Disconnected');
});
_this.stan.on('close', function () {
_this.logger.log('[NATS-BUFFERED-CLIENT] Closed connection');
});
_this.stan.on('permission_error', function (error) {
_this.logger.log('[NATS-BUFFERED-CLIENT] Permission error', error);
});
});
};

@@ -273,3 +195,3 @@ /**

//
if (_this.stan && _this.connected) {
if (_this.stan) {
var currentConnection = _this.stan;

@@ -279,3 +201,2 @@ currentConnection.on('disconnect', resolve);

currentConnection.close();
_this.connected = false;
_this.stan = undefined;

@@ -286,3 +207,3 @@ }

//
_this.logger.debug('[NATS-BUFFERED-CLIENT] Not connected so no need to disconnect', _this.connected, _this.stan === undefined);
_this.logger.debug('[NATS-BUFFERED-CLIENT] Not connected so no need to disconnect', _this.stan === undefined);
resolve();

@@ -298,4 +219,23 @@ }

NatsBufferedClient.prototype.reconnect = function () {
this.logger.debug('[NATS-BUFFERED-CLIENT] Reconnecting...');
this.connect(this.clusterId, this.clientId, this.clientOptions);
return __awaiter(this, void 0, void 0, function () {
return __generator(this, function (_a) {
switch (_a.label) {
case 0:
// First close existing connection if still available
//
this.logger.log('[NATS-BUFFERED-CLIENT] Reconnect requested. Disconnecting...');
return [4 /*yield*/, this.disconnect()];
case 1:
_a.sent();
// Create a new connection
//
this.logger.log('[NATS-BUFFERED-CLIENT] Disconnected trying to connect again...');
return [4 /*yield*/, this.connect(this.clusterId, this.clientId, this.clientOptions)];
case 2:
_a.sent();
this.logger.log('[NATS-BUFFERED-CLIENT] Reconnect completed');
return [2 /*return*/];
}
});
});
};

@@ -319,3 +259,3 @@ /**

this.buffer.push({ subject: subject, data: data });
this.logger.debug('[NATS-BUFFERED-CLIENT] Added message to buffer', subject, data);
this.logger.log('[NATS-BUFFERED-CLIENT] Added message to buffer', subject, data);
// Resume buffer processing if needed

@@ -340,3 +280,3 @@ //

//
this.logger.debug('[NATS-BUFFERED-CLIENT] Buffer is full. Dropping data:', data);
this.logger.log('[NATS-BUFFERED-CLIENT] Buffer is full. Dropping data:', data);
};

@@ -360,21 +300,19 @@ /**

_this.logger.error('[NATS-BUFFERED-CLIENT] Publish failed', error);
// Push the item back onto the buffer
//
_this.buffer.unshift(pub_1);
// Reconnect to the server on publish error
// Trigger a reconnect to reset the connection and begin processing again
//
_this.logger.warn('[NATS-BUFFERED-CLIENT] Reconnect to server due to publish error');
_this.reconnect();
// this.reconnect();
}
else {
_this.logger.debug('[NATS-BUFFERED-CLIENT] Publish done', pub_1);
// If we can publish we are connected
//
_this.connected = true;
// Next!
//
_this.tick();
_this.logger.log('[NATS-BUFFERED-CLIENT] Publish done', pub_1);
}
// Next buffer item or retry that last one
//
_this.tick();
});
}
else {
this.logger.debug('[NATS-BUFFERED-CLIENT] Buffer is empty. Going to sleep');
this.logger.log('[NATS-BUFFERED-CLIENT] Buffer is empty. Going to sleep');
this.ticking = false;

@@ -381,0 +319,0 @@ }

@@ -10,3 +10,2 @@ /// <reference types="node" />

export declare class NatsBufferedClient {
private reconnectTimeout;
private waitForInitialConnection;

@@ -62,9 +61,2 @@ private logger;

/**
* Indicator we've reached a connected state with the current stan instance
*
* @private
* @memberof NatsBufferedClient
*/
private clientConnected;
/**
* Indicator we've reached a connected state at least once with the current stan instance

@@ -77,34 +69,17 @@ *

/**
* The reconnect timer
*
* @private
* @type {NodeJS.Timer}
* @memberof NatsBufferedClient
*/
private reconnectTimer;
/**
* The getter for the client connection state
*
* @readonly
* @memberof NatsBufferedClient
*/
/**
* The setter for the client connection state
* Will stop or reset the client reconnect timer
*
* @memberof NatsBufferedClient
*/
connected: boolean;
/**
* Creates an instance of NatsBufferedClient
*
* @param {Stan} stan The NATS connection
* @param {number} [bufferSize=10] The ring buffer size
* @param {boolean} [waitForInitialConnect=false] Indicates the client can publish even before connecting. Will also change behaviour of the connect call
* @param {number} [bufferSize=10] Size of our publish buffer
* @param {boolean} [waitForInitialConnection=false] Allows publishing to the buffer before initial connect
* @param {*} [logger=console] The console logger to use
* @memberof NatsBufferedClient
*/
constructor(bufferSize?: number, reconnectTimeout?: number, waitForInitialConnection?: boolean, logger?: Console);
constructor(bufferSize?: number, waitForInitialConnection?: boolean, logger?: Console);
/**
* Connect to the NATS server
*
* @param {string} clusterId The name of the nats cluster
* @param {string} clientId The client identifier to use
* @param {nats.StanOptions} [options] Streaming connection options. Will be amended with out defaults
* @returns {Promise<boolean>}
* @memberof NatsBufferedClient

@@ -125,3 +100,3 @@ */

*/
reconnect(): void;
reconnect(): Promise<void>;
/**

@@ -128,0 +103,0 @@ * Push an item into the buffer to publish it

{
"name": "node-nats-streaming-buffered-client",
"version": "1.2.6",
"version": "1.3.0",
"description": "",

@@ -5,0 +5,0 @@ "keywords": [],

# NATS Streaming Buffered Client
[![npm version](https://badge.fury.io/js/node-nats-streaming-buffered-client.svg)](https://badge.fury.io/js/node-nats-streaming-buffered-client)
[![Travis](https://travis-ci.com/SpringTree/node-nats-streaming-buffered-client.svg?branch=master)](https://travis-ci.com/SpringTree/node-nats-streaming-buffered-client)

@@ -35,7 +36,9 @@

const bufferSize = 2000;
const reconnectTimeout = 30000;
const waitForInitialConnect = false;
const logger = console;
const client = new NatsBufferedClient( bufferSize, reconnectTimeout, waitForInitialConnect, logger );
// NOTE: constructor parameters have changed in v0.3.0
//
const client = new NatsBufferedClient( bufferSize, waitForInitialConnect, logger );
// Connect to the NATS server

@@ -51,7 +54,23 @@ // NATS connect options: https://github.com/nats-io/node-nats#connect-options

// Access to NATS client instance is available
// Access to NATS Streaming client instance is available
//
const subsription = client.stan.subscribe( 'topic', ... );
const subscription = client.stan.subscribe( 'topic', ... );
```
There is a more complete test client [here](test/client-demo.js)
### A note on nats connect options
The reconnect logic from the nats streaming client relies on these 3 options:
```javascript
const defaultOptions = {
maxReconnectAttempts: -1,
reconnect: true,
waitOnFirstConnect: true,
};
```
Be very careful when supplying your own connect options to not change these unless you know what you're doing.
## NPM scripts

@@ -58,0 +77,0 @@

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc