node-nats-streaming-buffered-client
Advanced tools
Comparing version 1.4.1 to 1.5.0
@@ -69,5 +69,6 @@ "use strict"; | ||
* @param {number} [reconnectDelay=5000] If reconnect fails retry after this amount of time. Default is 5 seconds | ||
* @param {number} [batchSize=10] Amount of items to publish in 1 tick | ||
* @memberof NatsBufferedClient | ||
*/ | ||
function NatsBufferedClient(bufferSize, waitForInitialConnection, logger, reconnectDelay) { | ||
function NatsBufferedClient(bufferSize, waitForInitialConnection, logger, reconnectDelay, batchSize) { | ||
if (bufferSize === void 0) { bufferSize = 10; } | ||
@@ -77,2 +78,3 @@ if (waitForInitialConnection === void 0) { waitForInitialConnection = false; } | ||
if (reconnectDelay === void 0) { reconnectDelay = 5000; } | ||
if (batchSize === void 0) { batchSize = 10; } | ||
var _this = | ||
@@ -82,4 +84,6 @@ // Initialise the event emitter super class | ||
_super.call(this) || this; | ||
_this.bufferSize = bufferSize; | ||
_this.waitForInitialConnection = waitForInitialConnection; | ||
_this.reconnectDelay = reconnectDelay; | ||
_this.batchSize = batchSize; | ||
/** | ||
@@ -372,56 +376,83 @@ * Indicates if we're processing the buffer | ||
if (this.stan) { | ||
var pub_1 = this.buffer.shift(); | ||
if (pub_1) { | ||
this.stan.publish(pub_1.subject, pub_1.data, function (error) { | ||
if (error) { | ||
_this.logger.error('[NATS-BUFFERED-CLIENT] Publish failed', error); | ||
_this.logger.error('[NATS-BUFFERED-CLIENT] Error type', typeof error); | ||
var batchItems = this.buffer.slice(0, this.bufferSize); | ||
if (!batchItems.length) { | ||
this.logger.log('[NATS-BUFFERED-CLIENT] Buffer is empty. Going to sleep'); | ||
this.ticking = false; | ||
} | ||
else { | ||
// Take the batch out of the buffer | ||
// We will push back error items if needed | ||
// | ||
this.buffer = this.buffer.slice(this.bufferSize); | ||
// Collect publish promises for the entire batch | ||
// | ||
var publishBatch_1 = []; | ||
// Use a for each to create a function scope so can remember which | ||
// pub item fails | ||
// | ||
batchItems.forEach(function (pub) { | ||
var publishItem = _this.stanPublish(_this.stan, pub); | ||
publishBatch_1.push(publishItem); | ||
publishItem | ||
.then(function () { | ||
_this.logger.log('[NATS-BUFFERED-CLIENT] Publish done', pub); | ||
}) | ||
.catch(function (error) { | ||
_this.logger.error('[NATS-BUFFERED-CLIENT] Publish failed', pub, error); | ||
_this.logger.error('[NATS-BUFFERED-CLIENT] Publish error', error); | ||
// Push the item back onto the buffer | ||
// | ||
_this.buffer.unshift(pub_1); | ||
// Try to retrieve the actual error message | ||
// Errors thrown in the client are normal JS Error objects | ||
// Errors returned from the server appear to be strings | ||
// | ||
var errorMessage = void 0; | ||
try { | ||
errorMessage = typeof error === 'string' ? error : error.message; | ||
} | ||
catch (unknownErrorTypeError) { | ||
_this.logger.warn('[NATS-BUFFERED-CLIENT] Failed to interpret error type', error); | ||
} | ||
if (errorMessage === 'stan: publish ack timeout') { | ||
_this.logger.warn('[NATS-BUFFERED-CLIENT] Publish time-out detected', error); | ||
} | ||
// A long term disconnect can trigger a bigger issue | ||
// The NATS connection might reconnect/resume but the streaming server | ||
// may have lost your client id or considers it dead due to missing heartbeats | ||
// An error called 'stan: invalid publish request' will occur in this case | ||
// If that happens we will manually reconnect to try and restore communication | ||
// | ||
// NOTE: Be cautious about reconnecting. If subscriptions are not closed you | ||
// end up with a client id already registered error | ||
// | ||
if (errorMessage === 'stan: invalid publish request') { | ||
_this.reconnect() | ||
.then(function () { | ||
_this.logger.warn('[NATS-BUFFERED-CLIENT] Completed forced reconnect due to client de-sync', error); | ||
}) | ||
.catch(function (reconnectError) { | ||
_this.logger.error('[NATS-BUFFERED-CLIENT] Reconnect failed', reconnectError); | ||
}); | ||
} | ||
_this.buffer.unshift(pub); | ||
}); | ||
}); | ||
Promise.all(publishBatch_1) | ||
.then(function () { | ||
_this.logger.log("[NATS-BUFFERED-CLIENT] Buffer utilitisation " + Math.round(_this.buffer.length * 100 / _this.bufferSize) + "%", _this.buffer.length); | ||
// Next buffer item batch | ||
// | ||
_this.tick(); | ||
}) | ||
.catch(function (error) { | ||
_this.logger.error('[NATS-BUFFERED-CLIENT] Error type', typeof error); | ||
// Try to retrieve the actual error message | ||
// Errors thrown in the client are normal JS Error objects | ||
// Errors returned from the server appear to be strings | ||
// | ||
var errorMessage; | ||
try { | ||
errorMessage = typeof error === 'string' ? error : error.message; | ||
} | ||
catch (unknownErrorTypeError) { | ||
_this.logger.warn('[NATS-BUFFERED-CLIENT] Failed to interpret error type', error); | ||
} | ||
if (errorMessage === 'stan: publish ack timeout') { | ||
_this.logger.warn('[NATS-BUFFERED-CLIENT] Publish time-out detected', error); | ||
} | ||
// A long term disconnect can trigger a bigger issue | ||
// The NATS connection might reconnect/resume but the streaming server | ||
// may have lost your client id or considers it dead due to missing heartbeats | ||
// An error called 'stan: invalid publish request' will occur in this case | ||
// If that happens we will manually reconnect to try and restore communication | ||
// | ||
// NOTE: Be cautious about reconnecting. If subscriptions are not closed you | ||
// end up with a client id already registered error | ||
// | ||
if (errorMessage === 'stan: invalid publish request') { | ||
_this.reconnect() | ||
.then(function () { | ||
_this.logger.warn('[NATS-BUFFERED-CLIENT] Completed forced reconnect due to client de-sync', error); | ||
_this.tick(); | ||
}) | ||
.catch(function (reconnectError) { | ||
_this.logger.error('[NATS-BUFFERED-CLIENT] Reconnect failed', reconnectError); | ||
_this.tick(); | ||
}); | ||
} | ||
else { | ||
_this.logger.log('[NATS-BUFFERED-CLIENT] Publish done', pub_1); | ||
// Next buffer item or retry | ||
// | ||
_this.tick(); | ||
} | ||
// Next buffer item or retry that last one | ||
// | ||
_this.tick(); | ||
}); | ||
} | ||
else { | ||
this.logger.log('[NATS-BUFFERED-CLIENT] Buffer is empty. Going to sleep'); | ||
this.ticking = false; | ||
} | ||
} | ||
@@ -433,2 +464,23 @@ else { | ||
}; | ||
/** | ||
* Publish item to stan using a promise | ||
* | ||
* @private | ||
* @param {nats.Stan} stan | ||
* @param {IBufferItem} pub | ||
* @returns {Promise<string>} | ||
* @memberof NatsBufferedClient | ||
*/ | ||
NatsBufferedClient.prototype.stanPublish = function (stan, pub) { | ||
return new Promise(function (resolve, reject) { | ||
stan.publish(pub.subject, pub.data, function (error) { | ||
if (error) { | ||
reject(error); | ||
} | ||
else { | ||
resolve(pub); | ||
} | ||
}); | ||
}); | ||
}; | ||
return NatsBufferedClient; | ||
@@ -435,0 +487,0 @@ }(events_1.EventEmitter)); |
@@ -11,4 +11,6 @@ /// <reference types="node" /> | ||
export declare class NatsBufferedClient extends EventEmitter { | ||
private bufferSize; | ||
private waitForInitialConnection; | ||
private reconnectDelay; | ||
private batchSize; | ||
/** | ||
@@ -82,5 +84,6 @@ * The connection to the NATS server | ||
* @param {number} [reconnectDelay=5000] If reconnect fails retry after this amount of time. Default is 5 seconds | ||
* @param {number} [batchSize=10] Amount of items to publish in 1 tick | ||
* @memberof NatsBufferedClient | ||
*/ | ||
constructor(bufferSize?: number, waitForInitialConnection?: boolean, logger?: Console, reconnectDelay?: number); | ||
constructor(bufferSize?: number, waitForInitialConnection?: boolean, logger?: Console, reconnectDelay?: number, batchSize?: number); | ||
/** | ||
@@ -137,2 +140,12 @@ * Connect to the NATS server | ||
protected tick(): void; | ||
/** | ||
* Publish item to stan using a promise | ||
* | ||
* @private | ||
* @param {nats.Stan} stan | ||
* @param {IBufferItem} pub | ||
* @returns {Promise<string>} | ||
* @memberof NatsBufferedClient | ||
*/ | ||
private stanPublish; | ||
} |
{ | ||
"name": "node-nats-streaming-buffered-client", | ||
"version": "1.4.1", | ||
"version": "1.5.0", | ||
"description": "", | ||
@@ -66,30 +66,30 @@ "keywords": [], | ||
"devDependencies": { | ||
"@commitlint/cli": "^7.2.1", | ||
"@commitlint/config-angular": "^7.1.2", | ||
"@types/jest": "^23.3.10", | ||
"@types/node": "^10.12.12", | ||
"@commitlint/cli": "^7.5.2", | ||
"@commitlint/config-angular": "^7.5.0", | ||
"@types/jest": "^24.0.11", | ||
"@types/node": "^11.11.0", | ||
"colors": "^1.3.3", | ||
"coveralls": "^3.0.2", | ||
"coveralls": "^3.0.3", | ||
"cross-env": "^5.2.0", | ||
"husky": "^1.2.0", | ||
"jest": "^23.6.0", | ||
"lint-staged": "^8.1.0", | ||
"husky": "^1.3.1", | ||
"jest": "^24.3.1", | ||
"lint-staged": "^8.1.5", | ||
"lodash.camelcase": "^4.3.0", | ||
"node-nats-streaming": "^0.0.51", | ||
"node-nats-streaming": "0.0.52", | ||
"prompt": "^1.0.0", | ||
"replace-in-file": "^3.4.2", | ||
"rimraf": "^2.6.2", | ||
"rollup": "^0.67.4", | ||
"rollup-plugin-commonjs": "^9.2.0", | ||
"replace-in-file": "^3.4.4", | ||
"rimraf": "^2.6.3", | ||
"rollup": "^1.6.0", | ||
"rollup-plugin-commonjs": "^9.2.1", | ||
"rollup-plugin-json": "^3.1.0", | ||
"rollup-plugin-node-resolve": "^4.0.0", | ||
"rollup-plugin-node-resolve": "^4.0.1", | ||
"rollup-plugin-sourcemaps": "^0.4.2", | ||
"rollup-plugin-typescript2": "^0.18.0", | ||
"semantic-release": "^15.12.4", | ||
"ts-jest": "^23.10.5", | ||
"ts-node": "^7.0.1", | ||
"tslint": "^5.11.0", | ||
"rollup-plugin-typescript2": "^0.19.3", | ||
"semantic-release": "^15.13.3", | ||
"ts-jest": "^24.0.0", | ||
"ts-node": "^8.0.3", | ||
"tslint": "^5.13.1", | ||
"tslint-config-standard": "^8.0.1", | ||
"typedoc": "^0.13.0", | ||
"typescript": "^3.2.2", | ||
"typedoc": "^0.14.2", | ||
"typescript": "^3.3.3333", | ||
"validate-commit-msg": "^2.14.0" | ||
@@ -96,0 +96,0 @@ }, |
Sorry, the diff of this file is not supported yet
42911
631