New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

node-nats-streaming

Package Overview
Dependencies
Maintainers
1
Versions
35
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

node-nats-streaming - npm Package Compare versions

Comparing version 0.0.14 to 0.0.22

14

index.d.ts

@@ -80,5 +80,19 @@

/**
* Returns true if the subscription has been closed or unsubscribed from.
*/
isClosed():boolean;
/**
* Unregisters the subscription from the streaming server.
*/
unsubscribe();
/**
* Close removes the subscriber from the server, but unlike the Subscription#unsubscribe(),
* the durable interest is not removed. If the client has connected to a server
* for which this feature is not available, Subscription#Close() will emit a
* Subscription#error(NO_SERVER_SUPPORT) error. Note that this affects durable clients only.
* If called on a non-durable subscriber, this is equivalent to Subscription#close()
*/
close();
}

@@ -85,0 +99,0 @@

94

lib/stan.js

@@ -18,3 +18,3 @@ /*!

events = require('events'),
nuid = require('./nuid'),
nuid = require('nuid'),
proto = require('./pb');

@@ -25,3 +25,3 @@

*/
var VERSION = '0.0.14',
var VERSION = '0.0.22',
DEFAULT_PORT = 4222,

@@ -40,6 +40,12 @@ DEFAULT_PRE = 'nats://localhost:',

BAD_CLIENT_ID = 'stan: client ID must be supplied',
ACK_TIMEOUT = 'stan: publish ack timeout',
MAX_FLIGHT_LIMIT_REACHED = 'stan: max in flight reached.',
CONN_CLOSED = 'stan: Connection closed',
BINARY_ENCODING_REQUIRED = 'stan: NATS connection encoding must be \'binary\'.';
BAD_SUBSCRIPTION = 'stan: invalid subscription',
BINARY_ENCODING_REQUIRED = 'stan: NATS connection encoding must be \'binary\'.',
NO_SERVER_SUPPORT = 'stan: not supported by server',
ACK_TIMEOUT = 'stan: publish ack timeout',
CONNECT_REQ_TIMEOUT = 'stan: connect request timeout',
CLOSE_REQ_TIMEOUT = 'stan: close request timeout',
SUB_REQ_TIMEOUT = 'stan: subscribe request timeout',
UNSUB_REQ_TIMEOUT = 'stan: unsubscribe request timeout';

@@ -69,2 +75,3 @@ /**

this.unsubRequests = null; // subject for unsubscribe requests
this.subCloseRequests = null; // subject for subscription close requests
this.closeRequests = null; // subject for close requests

@@ -276,2 +283,3 @@

that.unsubRequests = cr.getUnsubRequests();
that.subCloseRequests = cr.getSubCloseRequests();
that.closeRequests = cr.getCloseRequests();

@@ -287,3 +295,3 @@ that.emit('connect', that);

this.nc.on('reconnect', function() {
that.emit('reconnect', this);
that.emit('reconnect', that);
});

@@ -511,4 +519,3 @@

// FIXME: go code has a 2 second timeout
this.nc.request(this.subRequests, new Buffer(sr.serializeBinary()), {'max': 1}, function(msg) {
var rsid = this.nc.request(this.subRequests, new Buffer(sr.serializeBinary()), {'max': 1}, function(msg) {
//noinspection JSUnresolvedVariable

@@ -526,2 +533,8 @@ var r = proto.SubscriptionResponse.deserializeBinary(new Buffer(msg, 'binary').toByteArray());

});
// FIXME: hardcoded timeout
that.nc.timeout(rsid, 2 * 1000, 1, function () {
that.emit('timeout', new Error(SUB_REQ_TIMEOUT));
});
return retVal;

@@ -587,3 +600,12 @@ };

util.inherits(Subscription, events.EventEmitter);
/**
* Returns true if the subscription has been closed or unsubscribed from.
* @returns {boolean}
*/
Subscription.prototype.isClosed = function() {
return this.stanConnection === undefined;
};
/**
* Unregisters the subscription from the streaming server. You cannot unsubscribe

@@ -594,18 +616,50 @@ * from the server unless the Subscription#ready notification has already fired.

Subscription.prototype.unsubscribe = function() {
var sc = this.stanConnection;
delete sc.subMap[this.inbox];
closeOrUnsubscribe(this, false);
};
/**
* Close removes the subscriber from the server, but unlike the Subscription#unsubscribe(),
* the durable interest is not removed. If the client has connected to a server
* for which this feature is not available, Subscription#Close() will emit a
* Subscription#error(NO_SERVER_SUPPORT) error. Note that this affects durable clients only.
* If called on a non-durable subscriber, this is equivalent to Subscription#close()
*
* @fires Subscription#error({Error}, Subscription#closed
*/
Subscription.prototype.close = function() {
closeOrUnsubscribe(this, true);
};
function closeOrUnsubscribe(that, doClose) {
if(that.isClosed()) {
that.emit('error', new Error(BAD_SUBSCRIPTION));
}
var sc = that.stanConnection;
delete that.stanConnection;
delete sc.subMap[that.inbox];
if(sc.isClosed()) {
this.emit('error', new Error(CONN_CLOSED));
that.emit('error', new Error(CONN_CLOSED));
return;
}
sc.nc.unsubscribe(this.inboxSub);
var reqSubject = sc.unsubRequests;
if(doClose) {
reqSubject = sc.subCloseRequests;
if(!reqSubject) {
that.emit('error', new Error(NO_SERVER_SUPPORT));
}
}
sc.nc.unsubscribe(that.inboxSub);
//noinspection JSUnresolvedFunction
var ur = new proto.UnsubscribeRequest();
ur.setClientId(sc.clientID);
ur.setSubject(this.subject);
ur.setInbox(this.ackInbox);
ur.setSubject(that.subject);
ur.setInbox(that.ackInbox);
var that = this;
var sid = sc.nc.request(sc.unsubRequests, new Buffer(ur.serializeBinary()), {'max': 1}, function (msg) {
var sid = sc.nc.request(reqSubject, new Buffer(ur.serializeBinary()), {'max': 1}, function (msg) {
//noinspection JSUnresolvedVariable

@@ -617,11 +671,13 @@ var r = proto.SubscriptionResponse.deserializeBinary(new Buffer(msg, 'binary').toByteArray());

} else {
that.emit('unsubscribed');
that.emit(doClose ? 'closed' : 'unsubscribed');
}
});
// FIXME: this is hardcoding the timeout, likely needs to be read from opts.ConnectTimeout
sc.nc.timeout(sid, 2 * 1000, 1, function () {
that.emit('timeout', new Error(ACK_TIMEOUT));
that.emit('timeout', new Error(doClose ? CLOSE_REQ_TIMEOUT : UNSUB_REQ_TIMEOUT));
});
};
}
/**

@@ -755,3 +811,3 @@ * Internal function to process in-bound messages.

Message.prototype.ack = function() {
if(!this.stanClient.isClosed()) {
if(!this.subscription.isClosed()) {
var ack = new proto.Ack();

@@ -758,0 +814,0 @@ ack.setSubject(this.getSubject());

{
"name": "node-nats-streaming",
"version": "0.0.14",
"version": "0.0.22",
"description": "Node.js client for NATS Streaming, a lightweight, high-performance cloud native messaging system",

@@ -24,2 +24,3 @@ "keywords": [

},
"license": "MIT",
"private": false,

@@ -33,16 +34,19 @@ "author": {

"scripts": {
"lint": "jshint --reporter node_modules/jshint-stylish lib/*.js test/*.js test/support/*.js examples/*.js",
"cover": "istanbul cover _mocha",
"coveralls": "npm run cover -- --report lcovonly && cat ./reports/coverage/lcov.info | coveralls",
"depcheck": "dependency-check . lib/* lib/pb/*",
"depcheck:unused": "dependency-check ./package.json --unused --no-dev lib/*.js",
"test:unit": "mkdir -p reports/ && NODE_ENV=test multi='spec=- xunit=reports/mocha-xunit.xml' istanbul cover _mocha -- -R mocha-multi --timeout 10000 --slow 750 && istanbul check-coverage",
"gen": "protoc --js_out=import_style=commonjs,library=lib/pb/protocol_pb,binary:. lib/pb/protocol.proto",
"lint": "jshint --reporter node_modules/jshint-stylish lib/*.js test/*.js test/support/*.js examples/*.js",
"npm-publish": "npm publish https://github.com/nats-io/node-nats-streaming.git --access public",
"test": "npm run depcheck && npm run depcheck:unused && npm run lint && npm run test:unit",
"coveralls": "npm run cover -- --report lcovonly && cat ./reports/coverage/lcov.info | coveralls",
"cover": "istanbul cover _mocha"
"test:unit": "mkdir -p reports/ && NODE_ENV=test multi='spec=- xunit=reports/mocha-xunit.xml' istanbul cover _mocha -- -R mocha-multi --timeout 10000 --slow 750 && istanbul check-coverage"
},
"engines": {
"node": ">= .10.x"
"node": ">= 0.10.x"
},
"dependencies": {
"google-protobuf": "^3.0.0-alpha.6.2",
"nats": ">= 0.6.4"
"nats": ">= 0.7.10",
"nuid": ">=0.6.8"
},

@@ -49,0 +53,0 @@ "devDependencies": {

@@ -5,3 +5,3 @@ # [Work-in-progress] node-nats-streaming - Node.js NATS Streaming Client

[![License MIT](https://img.shields.io/npm/l/express.svg)](http://opensource.org/licenses/MIT)
[![License MIT](https://img.shields.io/badge/License-MIT-blue.svg)](http://opensource.org/licenses/MIT)
[![Build Status](https://travis-ci.org/nats-io/node-nats-streaming.svg?branch=master)](http://travis-ci.org/nats-io/node-nats-streaming) [![npm version](https://badge.fury.io/js/node-nats-streaming.svg)](http://badge.fury.io/js/nats)[![Coverage Status](https://coveralls.io/repos/github/nats-io/node-nats-streaming/badge.svg?branch=master)](https://coveralls.io/github/nats-io/node-nats-streaming?branch=master)

@@ -120,2 +120,16 @@

});
//...
// client suspends durable subscription
//
durableSub.close();
//...
// client resumes durable subscription
//
durableSub = stan.subscribe('foo', opts);
durableSub.on('message', function(msg) {
console.log('Received a message: ' + msg.getData());
});
// ...

@@ -122,0 +136,0 @@ // client receives message sequence 1-40, and disconnects

@@ -8,3 +8,3 @@ /* jshint node: true */

ssc = require('./support/stan_server_control'),
nuid = require('../lib/nuid'),
nuid = require('nuid'),
should = require('should'),

@@ -137,3 +137,3 @@ timers = require('timers');

var stan2 = STAN.connect(cluster, id, PORT);
stan2.on('error', function() {
stan2.on('error', function () {
wantTwo--;

@@ -144,3 +144,3 @@ if (wantTwo === 0) {

});
stan2.on('close', function() {
stan2.on('close', function () {
wantTwo--;

@@ -174,3 +174,2 @@ if (wantTwo === 0) {

it('should include the correct reply in the callback', function (done) {

@@ -383,3 +382,3 @@ var stan = STAN.connect(cluster, nuid.next(), PORT);

it('subscribe twice is invalid', function (done) {
it('unsubscribe twice is invalid', function (done) {
var stan = STAN.connect(cluster, nuid.next(), PORT);

@@ -402,2 +401,18 @@ stan.on('connect', function () {

it('unsubscribe marks it closed', function (done) {
var stan = STAN.connect(cluster, nuid.next(), PORT);
stan.on('connect', function () {
var sub = stan.subscribe(nuid.next());
sub.on('ready', function () {
sub.unsubscribe();
if(! sub.isClosed()) {
done("Subscription should have been closed");
}
});
sub.on('unsubscribed', function () {
done();
});
});
});
it('subscribe starting on second', function (done) {

@@ -502,3 +517,3 @@ var stan = STAN.connect(cluster, nuid.next(), PORT);

stan.publish(subj, 'third', waitForSix);
setTimeout(function() {
setTimeout(function () {
stan.publish(subj, 'fourth', waitForSix);

@@ -512,3 +527,2 @@ stan.publish(subj, 'fifth', waitForSix);

it('subscribe after a specific time on last received', function (done) {

@@ -547,3 +561,3 @@ this.timeout(6000);

stan.publish(subj, 'third', waitForSix);
setTimeout(function() {
setTimeout(function () {
stan.publish(subj, 'fourth', waitForSix);

@@ -697,3 +711,3 @@ stan.publish(subj, 'fifth', waitForSix);

sub1.on('ready', function () {
for(var i=0; i < 2; i++) {
for (var i = 0; i < 2; i++) {
stan.publish(subj);

@@ -706,7 +720,7 @@ }

count++;
if(count < 2) {
if (count < 2) {
msg.ack();
}
if(count === 2) {
setTimeout(function() {
if (count === 2) {
setTimeout(function () {
stan.close();

@@ -718,9 +732,9 @@ }, 100);

stan.on('close', function() {
stan.on('close', function () {
var stan2 = STAN.connect(cluster, clientID, PORT);
stan2.on('connect', function() {
stan2.on('connect', function () {
var sub2 = stan2.subscribe(subj, opts);
var second = false;
sub2.on('message', function(msg) {
if(!second) {
sub2.on('message', function (msg) {
if (!second) {
second = true;

@@ -735,2 +749,128 @@ msg.getSequence().should.be.equal(2);

});
it('sub close should stop getting messages', function (done) {
var stan = STAN.connect(cluster, nuid.next(), PORT);
stan.on('connect', function () {
// server needs to support close requests
if(!stan.subCloseRequests || stan.subCloseRequests.length === 0) {
done("Server doesn't support close");
}
// fail the test if error
function errorHandler(err) {
done(err);
}
// store the sent messages keyed
var counter = {before: 0, after: 0};
// issue a close after the first message
function msgHandler(sub, key) {
var k = key;
return function (m) {
counter[k]++;
if(key === 'before') {
// ack before the close
m.ack();
sub.close();
}
};
}
var subject = nuid.next();
function setupHandlers(sub, key) {
sub.on('message', msgHandler(sub, key));
sub.on('error', errorHandler);
}
var opts = stan.subscriptionOptions();
opts.setDeliverAllAvailable();
var sub = stan.subscribe(subject, '', opts);
setupHandlers(sub, "before");
// Fire one, flush, close, on close fire another, reconnect
sub.on('closed', function () {
counter.should.have.property('before', 1);
stan.publish(subject);
setTimeout(function() {
counter.should.have.property('before', 1);
done();
}, 250);
});
sub.on('ready', function () {
stan.publish(subject);
});
});
});
it('durable close should pause messages', function (done) {
var stan = STAN.connect(cluster, nuid.next(), PORT);
stan.on('connect', function () {
// server needs to support close requests
if(!stan.subCloseRequests || stan.subCloseRequests.length === 0) {
done("Server doesn't support close");
}
// fail the test if error
function errorHandler(err) {
done(err);
}
// store the sent messages keyed
var counter = {before: 0, after: 0};
// issue a close after the first message
function msgHandler(sub, key) {
var k = key;
return function(m) {
counter[k]++;
if(key === 'before') {
// this message has to be manually ack'ed or the ack won't be sent
m.ack();
sub.close();
}
};
}
function setupHandlers(sub, key) {
sub.on('message', msgHandler(sub, key));
sub.on('error', errorHandler);
}
var subject = nuid.next();
var opts = stan.subscriptionOptions();
opts.setDeliverAllAvailable();
opts.setDurableName("dur");
var sub = stan.subscribe(subject, '', opts);
setupHandlers(sub, "before");
// Fire one, flush, close, on close fire another, reconnect
sub.on('closed', function () {
counter.should.have.property('before', 1);
stan.publish(subject);
setTimeout(function () {
counter.should.have.property('before', 1);
restart();
}, 250);
});
sub.on('ready', function () {
stan.publish(subject);
});
function restart() {
var opts = stan.subscriptionOptions();
opts.setDeliverAllAvailable();
opts.setDurableName("dur");
var sub = stan.subscribe(subject, '', opts);
setupHandlers(sub, "after");
sub.on('ready', function() {
stan.publish(subject);
setTimeout(function() {
counter.should.have.property('after', 2);
done();
}, 250);
});
}
});
});
});

@@ -6,6 +6,7 @@ /* jslint node: true */

var STAN = require ('../lib/stan.js'),
nuid = require('../lib/nuid'),
nuid = require('nuid'),
ssc = require('./support/stan_server_control'),
should = require('should'),
timers = require('timers');
timers = require('timers'),
net = require('net');

@@ -228,2 +229,27 @@ describe('Basic Connectivity', function() {

});
it('reconnect should provide stan connection', function (done) {
this.timeout(15000);
var stan = STAN.connect(cluster, nuid.next(), {'url':'nats://localhost:' + PORT, 'reconnectTimeWait':1000});
var reconnected = false;
stan.on('connect', function (sc) {
should(stan).equal(sc, 'stan connect did not pass stan connection');
process.nextTick(function () {
ssc.stop_server(server);
});
});
stan.on('reconnecting', function () {
if (!reconnected) {
reconnected = true;
server = ssc.start_server(PORT, function () {
});
}
});
stan.on('reconnect', function (sc) {
should(stan).equal(sc, 'stan reconnect did not pass stan connection');
stan.close();
done();
});
});
});

@@ -7,3 +7,3 @@ /* jslint node: true */

NATS = require('nats'),
nuid = require('../lib/nuid'),
nuid = require('nuid'),
ssc = require('./support/stan_server_control'),

@@ -10,0 +10,0 @@ should = require('should'),

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc