New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

node-nats-streaming-buffered-client

Package Overview
Dependencies
Maintainers
1
Versions
25
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

node-nats-streaming-buffered-client - npm Package Compare versions

Comparing version 1.4.1 to 1.5.0

148

dist/node-nats-streaming-buffered-client.js

@@ -69,5 +69,6 @@ "use strict";

* @param {number} [reconnectDelay=5000] If reconnect fails retry after this amount of time. Default is 5 seconds
* @param {number} [batchSize=10] Amount of items to publish in 1 tick
* @memberof NatsBufferedClient
*/
function NatsBufferedClient(bufferSize, waitForInitialConnection, logger, reconnectDelay) {
function NatsBufferedClient(bufferSize, waitForInitialConnection, logger, reconnectDelay, batchSize) {
if (bufferSize === void 0) { bufferSize = 10; }

@@ -77,2 +78,3 @@ if (waitForInitialConnection === void 0) { waitForInitialConnection = false; }

if (reconnectDelay === void 0) { reconnectDelay = 5000; }
if (batchSize === void 0) { batchSize = 10; }
var _this =

@@ -82,4 +84,6 @@ // Initialise the event emitter super class

_super.call(this) || this;
_this.bufferSize = bufferSize;
_this.waitForInitialConnection = waitForInitialConnection;
_this.reconnectDelay = reconnectDelay;
_this.batchSize = batchSize;
/**

@@ -372,56 +376,83 @@ * Indicates if we're processing the buffer

if (this.stan) {
var pub_1 = this.buffer.shift();
if (pub_1) {
this.stan.publish(pub_1.subject, pub_1.data, function (error) {
if (error) {
_this.logger.error('[NATS-BUFFERED-CLIENT] Publish failed', error);
_this.logger.error('[NATS-BUFFERED-CLIENT] Error type', typeof error);
var batchItems = this.buffer.slice(0, this.bufferSize);
if (!batchItems.length) {
this.logger.log('[NATS-BUFFERED-CLIENT] Buffer is empty. Going to sleep');
this.ticking = false;
}
else {
// Take the batch out of the buffer
// We will push back error items if needed
//
this.buffer = this.buffer.slice(this.bufferSize);
// Collect publish promises for the entire batch
//
var publishBatch_1 = [];
// Use a for each to create a function scope so can remember which
// pub item fails
//
batchItems.forEach(function (pub) {
var publishItem = _this.stanPublish(_this.stan, pub);
publishBatch_1.push(publishItem);
publishItem
.then(function () {
_this.logger.log('[NATS-BUFFERED-CLIENT] Publish done', pub);
})
.catch(function (error) {
_this.logger.error('[NATS-BUFFERED-CLIENT] Publish failed', pub, error);
_this.logger.error('[NATS-BUFFERED-CLIENT] Publish error', error);
// Push the item back onto the buffer
//
_this.buffer.unshift(pub_1);
// Try to retrieve the actual error message
// Errors thrown in the client are normal JS Error objects
// Errors returned from the server appear to be strings
//
var errorMessage = void 0;
try {
errorMessage = typeof error === 'string' ? error : error.message;
}
catch (unknownErrorTypeError) {
_this.logger.warn('[NATS-BUFFERED-CLIENT] Failed to interpret error type', error);
}
if (errorMessage === 'stan: publish ack timeout') {
_this.logger.warn('[NATS-BUFFERED-CLIENT] Publish time-out detected', error);
}
// A long term disconnect can trigger a bigger issue
// The NATS connection might reconnect/resume but the streaming server
// may have lost your client id or considers it dead due to missing heartbeats
// An error called 'stan: invalid publish request' will occur in this case
// If that happens we will manually reconnect to try and restore communication
//
// NOTE: Be cautious about reconnecting. If subscriptions are not closed you
// end up with a client id already registered error
//
if (errorMessage === 'stan: invalid publish request') {
_this.reconnect()
.then(function () {
_this.logger.warn('[NATS-BUFFERED-CLIENT] Completed forced reconnect due to client de-sync', error);
})
.catch(function (reconnectError) {
_this.logger.error('[NATS-BUFFERED-CLIENT] Reconnect failed', reconnectError);
});
}
_this.buffer.unshift(pub);
});
});
Promise.all(publishBatch_1)
.then(function () {
_this.logger.log("[NATS-BUFFERED-CLIENT] Buffer utilitisation " + Math.round(_this.buffer.length * 100 / _this.bufferSize) + "%", _this.buffer.length);
// Next buffer item batch
//
_this.tick();
})
.catch(function (error) {
_this.logger.error('[NATS-BUFFERED-CLIENT] Error type', typeof error);
// Try to retrieve the actual error message
// Errors thrown in the client are normal JS Error objects
// Errors returned from the server appear to be strings
//
var errorMessage;
try {
errorMessage = typeof error === 'string' ? error : error.message;
}
catch (unknownErrorTypeError) {
_this.logger.warn('[NATS-BUFFERED-CLIENT] Failed to interpret error type', error);
}
if (errorMessage === 'stan: publish ack timeout') {
_this.logger.warn('[NATS-BUFFERED-CLIENT] Publish time-out detected', error);
}
// A long term disconnect can trigger a bigger issue
// The NATS connection might reconnect/resume but the streaming server
// may have lost your client id or considers it dead due to missing heartbeats
// An error called 'stan: invalid publish request' will occur in this case
// If that happens we will manually reconnect to try and restore communication
//
// NOTE: Be cautious about reconnecting. If subscriptions are not closed you
// end up with a client id already registered error
//
if (errorMessage === 'stan: invalid publish request') {
_this.reconnect()
.then(function () {
_this.logger.warn('[NATS-BUFFERED-CLIENT] Completed forced reconnect due to client de-sync', error);
_this.tick();
})
.catch(function (reconnectError) {
_this.logger.error('[NATS-BUFFERED-CLIENT] Reconnect failed', reconnectError);
_this.tick();
});
}
else {
_this.logger.log('[NATS-BUFFERED-CLIENT] Publish done', pub_1);
// Next buffer item or retry
//
_this.tick();
}
// Next buffer item or retry that last one
//
_this.tick();
});
}
else {
this.logger.log('[NATS-BUFFERED-CLIENT] Buffer is empty. Going to sleep');
this.ticking = false;
}
}

@@ -433,2 +464,23 @@ else {

};
/**
* Publish item to stan using a promise
*
* @private
* @param {nats.Stan} stan
* @param {IBufferItem} pub
* @returns {Promise<string>}
* @memberof NatsBufferedClient
*/
NatsBufferedClient.prototype.stanPublish = function (stan, pub) {
return new Promise(function (resolve, reject) {
stan.publish(pub.subject, pub.data, function (error) {
if (error) {
reject(error);
}
else {
resolve(pub);
}
});
});
};
return NatsBufferedClient;

@@ -435,0 +487,0 @@ }(events_1.EventEmitter));

@@ -11,4 +11,6 @@ /// <reference types="node" />

export declare class NatsBufferedClient extends EventEmitter {
private bufferSize;
private waitForInitialConnection;
private reconnectDelay;
private batchSize;
/**

@@ -82,5 +84,6 @@ * The connection to the NATS server

* @param {number} [reconnectDelay=5000] If reconnect fails retry after this amount of time. Default is 5 seconds
* @param {number} [batchSize=10] Amount of items to publish in 1 tick
* @memberof NatsBufferedClient
*/
constructor(bufferSize?: number, waitForInitialConnection?: boolean, logger?: Console, reconnectDelay?: number);
constructor(bufferSize?: number, waitForInitialConnection?: boolean, logger?: Console, reconnectDelay?: number, batchSize?: number);
/**

@@ -137,2 +140,12 @@ * Connect to the NATS server

protected tick(): void;
/**
* Publish item to stan using a promise
*
* @private
* @param {nats.Stan} stan
* @param {IBufferItem} pub
* @returns {Promise<string>}
* @memberof NatsBufferedClient
*/
private stanPublish;
}
{
"name": "node-nats-streaming-buffered-client",
"version": "1.4.1",
"version": "1.5.0",
"description": "",

@@ -66,30 +66,30 @@ "keywords": [],

"devDependencies": {
"@commitlint/cli": "^7.2.1",
"@commitlint/config-angular": "^7.1.2",
"@types/jest": "^23.3.10",
"@types/node": "^10.12.12",
"@commitlint/cli": "^7.5.2",
"@commitlint/config-angular": "^7.5.0",
"@types/jest": "^24.0.11",
"@types/node": "^11.11.0",
"colors": "^1.3.3",
"coveralls": "^3.0.2",
"coveralls": "^3.0.3",
"cross-env": "^5.2.0",
"husky": "^1.2.0",
"jest": "^23.6.0",
"lint-staged": "^8.1.0",
"husky": "^1.3.1",
"jest": "^24.3.1",
"lint-staged": "^8.1.5",
"lodash.camelcase": "^4.3.0",
"node-nats-streaming": "^0.0.51",
"node-nats-streaming": "0.0.52",
"prompt": "^1.0.0",
"replace-in-file": "^3.4.2",
"rimraf": "^2.6.2",
"rollup": "^0.67.4",
"rollup-plugin-commonjs": "^9.2.0",
"replace-in-file": "^3.4.4",
"rimraf": "^2.6.3",
"rollup": "^1.6.0",
"rollup-plugin-commonjs": "^9.2.1",
"rollup-plugin-json": "^3.1.0",
"rollup-plugin-node-resolve": "^4.0.0",
"rollup-plugin-node-resolve": "^4.0.1",
"rollup-plugin-sourcemaps": "^0.4.2",
"rollup-plugin-typescript2": "^0.18.0",
"semantic-release": "^15.12.4",
"ts-jest": "^23.10.5",
"ts-node": "^7.0.1",
"tslint": "^5.11.0",
"rollup-plugin-typescript2": "^0.19.3",
"semantic-release": "^15.13.3",
"ts-jest": "^24.0.0",
"ts-node": "^8.0.3",
"tslint": "^5.13.1",
"tslint-config-standard": "^8.0.1",
"typedoc": "^0.13.0",
"typescript": "^3.2.2",
"typedoc": "^0.14.2",
"typescript": "^3.3.3333",
"validate-commit-msg": "^2.14.0"

@@ -96,0 +96,0 @@ },

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc