node-nats-streaming
Advanced tools
Comparing version 0.0.14 to 0.0.22
@@ -80,5 +80,19 @@ | ||
/** | ||
* Returns true if the subscription has been closed or unsubscribed from. | ||
*/ | ||
isClosed():boolean; | ||
/** | ||
* Unregisters the subscription from the streaming server. | ||
*/ | ||
unsubscribe(); | ||
/** | ||
* Close removes the subscriber from the server, but unlike the Subscription#unsubscribe(), | ||
* the durable interest is not removed. If the client has connected to a server | ||
* for which this feature is not available, Subscription#Close() will emit a | ||
* Subscription#error(NO_SERVER_SUPPORT) error. Note that this affects durable clients only. | ||
* If called on a non-durable subscriber, this is equivalent to Subscription#close() | ||
*/ | ||
close(); | ||
} | ||
@@ -85,0 +99,0 @@ |
@@ -18,3 +18,3 @@ /*! | ||
events = require('events'), | ||
nuid = require('./nuid'), | ||
nuid = require('nuid'), | ||
proto = require('./pb'); | ||
@@ -25,3 +25,3 @@ | ||
*/ | ||
var VERSION = '0.0.14', | ||
var VERSION = '0.0.22', | ||
DEFAULT_PORT = 4222, | ||
@@ -40,6 +40,12 @@ DEFAULT_PRE = 'nats://localhost:', | ||
BAD_CLIENT_ID = 'stan: client ID must be supplied', | ||
ACK_TIMEOUT = 'stan: publish ack timeout', | ||
MAX_FLIGHT_LIMIT_REACHED = 'stan: max in flight reached.', | ||
CONN_CLOSED = 'stan: Connection closed', | ||
BINARY_ENCODING_REQUIRED = 'stan: NATS connection encoding must be \'binary\'.'; | ||
BAD_SUBSCRIPTION = 'stan: invalid subscription', | ||
BINARY_ENCODING_REQUIRED = 'stan: NATS connection encoding must be \'binary\'.', | ||
NO_SERVER_SUPPORT = 'stan: not supported by server', | ||
ACK_TIMEOUT = 'stan: publish ack timeout', | ||
CONNECT_REQ_TIMEOUT = 'stan: connect request timeout', | ||
CLOSE_REQ_TIMEOUT = 'stan: close request timeout', | ||
SUB_REQ_TIMEOUT = 'stan: subscribe request timeout', | ||
UNSUB_REQ_TIMEOUT = 'stan: unsubscribe request timeout'; | ||
@@ -69,2 +75,3 @@ /** | ||
this.unsubRequests = null; // subject for unsubscribe requests | ||
this.subCloseRequests = null; // subject for subscription close requests | ||
this.closeRequests = null; // subject for close requests | ||
@@ -276,2 +283,3 @@ | ||
that.unsubRequests = cr.getUnsubRequests(); | ||
that.subCloseRequests = cr.getSubCloseRequests(); | ||
that.closeRequests = cr.getCloseRequests(); | ||
@@ -287,3 +295,3 @@ that.emit('connect', that); | ||
this.nc.on('reconnect', function() { | ||
that.emit('reconnect', this); | ||
that.emit('reconnect', that); | ||
}); | ||
@@ -511,4 +519,3 @@ | ||
// FIXME: go code has a 2 second timeout | ||
this.nc.request(this.subRequests, new Buffer(sr.serializeBinary()), {'max': 1}, function(msg) { | ||
var rsid = this.nc.request(this.subRequests, new Buffer(sr.serializeBinary()), {'max': 1}, function(msg) { | ||
//noinspection JSUnresolvedVariable | ||
@@ -526,2 +533,8 @@ var r = proto.SubscriptionResponse.deserializeBinary(new Buffer(msg, 'binary').toByteArray()); | ||
}); | ||
// FIXME: hardcoded timeout | ||
that.nc.timeout(rsid, 2 * 1000, 1, function () { | ||
that.emit('timeout', new Error(SUB_REQ_TIMEOUT)); | ||
}); | ||
return retVal; | ||
@@ -587,3 +600,12 @@ }; | ||
util.inherits(Subscription, events.EventEmitter); | ||
/** | ||
* Returns true if the subscription has been closed or unsubscribed from. | ||
* @returns {boolean} | ||
*/ | ||
Subscription.prototype.isClosed = function() { | ||
return this.stanConnection === undefined; | ||
}; | ||
/** | ||
* Unregisters the subscription from the streaming server. You cannot unsubscribe | ||
@@ -594,18 +616,50 @@ * from the server unless the Subscription#ready notification has already fired. | ||
Subscription.prototype.unsubscribe = function() { | ||
var sc = this.stanConnection; | ||
delete sc.subMap[this.inbox]; | ||
closeOrUnsubscribe(this, false); | ||
}; | ||
/** | ||
* Close removes the subscriber from the server, but unlike the Subscription#unsubscribe(), | ||
* the durable interest is not removed. If the client has connected to a server | ||
* for which this feature is not available, Subscription#Close() will emit a | ||
* Subscription#error(NO_SERVER_SUPPORT) error. Note that this affects durable clients only. | ||
* If called on a non-durable subscriber, this is equivalent to Subscription#close() | ||
* | ||
* @fires Subscription#error({Error}, Subscription#closed | ||
*/ | ||
Subscription.prototype.close = function() { | ||
closeOrUnsubscribe(this, true); | ||
}; | ||
function closeOrUnsubscribe(that, doClose) { | ||
if(that.isClosed()) { | ||
that.emit('error', new Error(BAD_SUBSCRIPTION)); | ||
} | ||
var sc = that.stanConnection; | ||
delete that.stanConnection; | ||
delete sc.subMap[that.inbox]; | ||
if(sc.isClosed()) { | ||
this.emit('error', new Error(CONN_CLOSED)); | ||
that.emit('error', new Error(CONN_CLOSED)); | ||
return; | ||
} | ||
sc.nc.unsubscribe(this.inboxSub); | ||
var reqSubject = sc.unsubRequests; | ||
if(doClose) { | ||
reqSubject = sc.subCloseRequests; | ||
if(!reqSubject) { | ||
that.emit('error', new Error(NO_SERVER_SUPPORT)); | ||
} | ||
} | ||
sc.nc.unsubscribe(that.inboxSub); | ||
//noinspection JSUnresolvedFunction | ||
var ur = new proto.UnsubscribeRequest(); | ||
ur.setClientId(sc.clientID); | ||
ur.setSubject(this.subject); | ||
ur.setInbox(this.ackInbox); | ||
ur.setSubject(that.subject); | ||
ur.setInbox(that.ackInbox); | ||
var that = this; | ||
var sid = sc.nc.request(sc.unsubRequests, new Buffer(ur.serializeBinary()), {'max': 1}, function (msg) { | ||
var sid = sc.nc.request(reqSubject, new Buffer(ur.serializeBinary()), {'max': 1}, function (msg) { | ||
//noinspection JSUnresolvedVariable | ||
@@ -617,11 +671,13 @@ var r = proto.SubscriptionResponse.deserializeBinary(new Buffer(msg, 'binary').toByteArray()); | ||
} else { | ||
that.emit('unsubscribed'); | ||
that.emit(doClose ? 'closed' : 'unsubscribed'); | ||
} | ||
}); | ||
// FIXME: this is hardcoding the timeout, likely needs to be read from opts.ConnectTimeout | ||
sc.nc.timeout(sid, 2 * 1000, 1, function () { | ||
that.emit('timeout', new Error(ACK_TIMEOUT)); | ||
that.emit('timeout', new Error(doClose ? CLOSE_REQ_TIMEOUT : UNSUB_REQ_TIMEOUT)); | ||
}); | ||
}; | ||
} | ||
/** | ||
@@ -755,3 +811,3 @@ * Internal function to process in-bound messages. | ||
Message.prototype.ack = function() { | ||
if(!this.stanClient.isClosed()) { | ||
if(!this.subscription.isClosed()) { | ||
var ack = new proto.Ack(); | ||
@@ -758,0 +814,0 @@ ack.setSubject(this.getSubject()); |
{ | ||
"name": "node-nats-streaming", | ||
"version": "0.0.14", | ||
"version": "0.0.22", | ||
"description": "Node.js client for NATS Streaming, a lightweight, high-performance cloud native messaging system", | ||
@@ -24,2 +24,3 @@ "keywords": [ | ||
}, | ||
"license": "MIT", | ||
"private": false, | ||
@@ -33,16 +34,19 @@ "author": { | ||
"scripts": { | ||
"lint": "jshint --reporter node_modules/jshint-stylish lib/*.js test/*.js test/support/*.js examples/*.js", | ||
"cover": "istanbul cover _mocha", | ||
"coveralls": "npm run cover -- --report lcovonly && cat ./reports/coverage/lcov.info | coveralls", | ||
"depcheck": "dependency-check . lib/* lib/pb/*", | ||
"depcheck:unused": "dependency-check ./package.json --unused --no-dev lib/*.js", | ||
"test:unit": "mkdir -p reports/ && NODE_ENV=test multi='spec=- xunit=reports/mocha-xunit.xml' istanbul cover _mocha -- -R mocha-multi --timeout 10000 --slow 750 && istanbul check-coverage", | ||
"gen": "protoc --js_out=import_style=commonjs,library=lib/pb/protocol_pb,binary:. lib/pb/protocol.proto", | ||
"lint": "jshint --reporter node_modules/jshint-stylish lib/*.js test/*.js test/support/*.js examples/*.js", | ||
"npm-publish": "npm publish https://github.com/nats-io/node-nats-streaming.git --access public", | ||
"test": "npm run depcheck && npm run depcheck:unused && npm run lint && npm run test:unit", | ||
"coveralls": "npm run cover -- --report lcovonly && cat ./reports/coverage/lcov.info | coveralls", | ||
"cover": "istanbul cover _mocha" | ||
"test:unit": "mkdir -p reports/ && NODE_ENV=test multi='spec=- xunit=reports/mocha-xunit.xml' istanbul cover _mocha -- -R mocha-multi --timeout 10000 --slow 750 && istanbul check-coverage" | ||
}, | ||
"engines": { | ||
"node": ">= .10.x" | ||
"node": ">= 0.10.x" | ||
}, | ||
"dependencies": { | ||
"google-protobuf": "^3.0.0-alpha.6.2", | ||
"nats": ">= 0.6.4" | ||
"nats": ">= 0.7.10", | ||
"nuid": ">=0.6.8" | ||
}, | ||
@@ -49,0 +53,0 @@ "devDependencies": { |
@@ -5,3 +5,3 @@ # [Work-in-progress] node-nats-streaming - Node.js NATS Streaming Client | ||
[![License MIT](https://img.shields.io/npm/l/express.svg)](http://opensource.org/licenses/MIT) | ||
[![License MIT](https://img.shields.io/badge/License-MIT-blue.svg)](http://opensource.org/licenses/MIT) | ||
[![Build Status](https://travis-ci.org/nats-io/node-nats-streaming.svg?branch=master)](http://travis-ci.org/nats-io/node-nats-streaming) [![npm version](https://badge.fury.io/js/node-nats-streaming.svg)](http://badge.fury.io/js/nats)[![Coverage Status](https://coveralls.io/repos/github/nats-io/node-nats-streaming/badge.svg?branch=master)](https://coveralls.io/github/nats-io/node-nats-streaming?branch=master) | ||
@@ -120,2 +120,16 @@ | ||
}); | ||
//... | ||
// client suspends durable subscription | ||
// | ||
durableSub.close(); | ||
//... | ||
// client resumes durable subscription | ||
// | ||
durableSub = stan.subscribe('foo', opts); | ||
durableSub.on('message', function(msg) { | ||
console.log('Received a message: ' + msg.getData()); | ||
}); | ||
// ... | ||
@@ -122,0 +136,0 @@ // client receives message sequence 1-40, and disconnects |
@@ -8,3 +8,3 @@ /* jshint node: true */ | ||
ssc = require('./support/stan_server_control'), | ||
nuid = require('../lib/nuid'), | ||
nuid = require('nuid'), | ||
should = require('should'), | ||
@@ -137,3 +137,3 @@ timers = require('timers'); | ||
var stan2 = STAN.connect(cluster, id, PORT); | ||
stan2.on('error', function() { | ||
stan2.on('error', function () { | ||
wantTwo--; | ||
@@ -144,3 +144,3 @@ if (wantTwo === 0) { | ||
}); | ||
stan2.on('close', function() { | ||
stan2.on('close', function () { | ||
wantTwo--; | ||
@@ -174,3 +174,2 @@ if (wantTwo === 0) { | ||
it('should include the correct reply in the callback', function (done) { | ||
@@ -383,3 +382,3 @@ var stan = STAN.connect(cluster, nuid.next(), PORT); | ||
it('subscribe twice is invalid', function (done) { | ||
it('unsubscribe twice is invalid', function (done) { | ||
var stan = STAN.connect(cluster, nuid.next(), PORT); | ||
@@ -402,2 +401,18 @@ stan.on('connect', function () { | ||
it('unsubscribe marks it closed', function (done) { | ||
var stan = STAN.connect(cluster, nuid.next(), PORT); | ||
stan.on('connect', function () { | ||
var sub = stan.subscribe(nuid.next()); | ||
sub.on('ready', function () { | ||
sub.unsubscribe(); | ||
if(! sub.isClosed()) { | ||
done("Subscription should have been closed"); | ||
} | ||
}); | ||
sub.on('unsubscribed', function () { | ||
done(); | ||
}); | ||
}); | ||
}); | ||
it('subscribe starting on second', function (done) { | ||
@@ -502,3 +517,3 @@ var stan = STAN.connect(cluster, nuid.next(), PORT); | ||
stan.publish(subj, 'third', waitForSix); | ||
setTimeout(function() { | ||
setTimeout(function () { | ||
stan.publish(subj, 'fourth', waitForSix); | ||
@@ -512,3 +527,2 @@ stan.publish(subj, 'fifth', waitForSix); | ||
it('subscribe after a specific time on last received', function (done) { | ||
@@ -547,3 +561,3 @@ this.timeout(6000); | ||
stan.publish(subj, 'third', waitForSix); | ||
setTimeout(function() { | ||
setTimeout(function () { | ||
stan.publish(subj, 'fourth', waitForSix); | ||
@@ -697,3 +711,3 @@ stan.publish(subj, 'fifth', waitForSix); | ||
sub1.on('ready', function () { | ||
for(var i=0; i < 2; i++) { | ||
for (var i = 0; i < 2; i++) { | ||
stan.publish(subj); | ||
@@ -706,7 +720,7 @@ } | ||
count++; | ||
if(count < 2) { | ||
if (count < 2) { | ||
msg.ack(); | ||
} | ||
if(count === 2) { | ||
setTimeout(function() { | ||
if (count === 2) { | ||
setTimeout(function () { | ||
stan.close(); | ||
@@ -718,9 +732,9 @@ }, 100); | ||
stan.on('close', function() { | ||
stan.on('close', function () { | ||
var stan2 = STAN.connect(cluster, clientID, PORT); | ||
stan2.on('connect', function() { | ||
stan2.on('connect', function () { | ||
var sub2 = stan2.subscribe(subj, opts); | ||
var second = false; | ||
sub2.on('message', function(msg) { | ||
if(!second) { | ||
sub2.on('message', function (msg) { | ||
if (!second) { | ||
second = true; | ||
@@ -735,2 +749,128 @@ msg.getSequence().should.be.equal(2); | ||
}); | ||
it('sub close should stop getting messages', function (done) { | ||
var stan = STAN.connect(cluster, nuid.next(), PORT); | ||
stan.on('connect', function () { | ||
// server needs to support close requests | ||
if(!stan.subCloseRequests || stan.subCloseRequests.length === 0) { | ||
done("Server doesn't support close"); | ||
} | ||
// fail the test if error | ||
function errorHandler(err) { | ||
done(err); | ||
} | ||
// store the sent messages keyed | ||
var counter = {before: 0, after: 0}; | ||
// issue a close after the first message | ||
function msgHandler(sub, key) { | ||
var k = key; | ||
return function (m) { | ||
counter[k]++; | ||
if(key === 'before') { | ||
// ack before the close | ||
m.ack(); | ||
sub.close(); | ||
} | ||
}; | ||
} | ||
var subject = nuid.next(); | ||
function setupHandlers(sub, key) { | ||
sub.on('message', msgHandler(sub, key)); | ||
sub.on('error', errorHandler); | ||
} | ||
var opts = stan.subscriptionOptions(); | ||
opts.setDeliverAllAvailable(); | ||
var sub = stan.subscribe(subject, '', opts); | ||
setupHandlers(sub, "before"); | ||
// Fire one, flush, close, on close fire another, reconnect | ||
sub.on('closed', function () { | ||
counter.should.have.property('before', 1); | ||
stan.publish(subject); | ||
setTimeout(function() { | ||
counter.should.have.property('before', 1); | ||
done(); | ||
}, 250); | ||
}); | ||
sub.on('ready', function () { | ||
stan.publish(subject); | ||
}); | ||
}); | ||
}); | ||
it('durable close should pause messages', function (done) { | ||
var stan = STAN.connect(cluster, nuid.next(), PORT); | ||
stan.on('connect', function () { | ||
// server needs to support close requests | ||
if(!stan.subCloseRequests || stan.subCloseRequests.length === 0) { | ||
done("Server doesn't support close"); | ||
} | ||
// fail the test if error | ||
function errorHandler(err) { | ||
done(err); | ||
} | ||
// store the sent messages keyed | ||
var counter = {before: 0, after: 0}; | ||
// issue a close after the first message | ||
function msgHandler(sub, key) { | ||
var k = key; | ||
return function(m) { | ||
counter[k]++; | ||
if(key === 'before') { | ||
// this message has to be manually ack'ed or the ack won't be sent | ||
m.ack(); | ||
sub.close(); | ||
} | ||
}; | ||
} | ||
function setupHandlers(sub, key) { | ||
sub.on('message', msgHandler(sub, key)); | ||
sub.on('error', errorHandler); | ||
} | ||
var subject = nuid.next(); | ||
var opts = stan.subscriptionOptions(); | ||
opts.setDeliverAllAvailable(); | ||
opts.setDurableName("dur"); | ||
var sub = stan.subscribe(subject, '', opts); | ||
setupHandlers(sub, "before"); | ||
// Fire one, flush, close, on close fire another, reconnect | ||
sub.on('closed', function () { | ||
counter.should.have.property('before', 1); | ||
stan.publish(subject); | ||
setTimeout(function () { | ||
counter.should.have.property('before', 1); | ||
restart(); | ||
}, 250); | ||
}); | ||
sub.on('ready', function () { | ||
stan.publish(subject); | ||
}); | ||
function restart() { | ||
var opts = stan.subscriptionOptions(); | ||
opts.setDeliverAllAvailable(); | ||
opts.setDurableName("dur"); | ||
var sub = stan.subscribe(subject, '', opts); | ||
setupHandlers(sub, "after"); | ||
sub.on('ready', function() { | ||
stan.publish(subject); | ||
setTimeout(function() { | ||
counter.should.have.property('after', 2); | ||
done(); | ||
}, 250); | ||
}); | ||
} | ||
}); | ||
}); | ||
}); | ||
@@ -6,6 +6,7 @@ /* jslint node: true */ | ||
var STAN = require ('../lib/stan.js'), | ||
nuid = require('../lib/nuid'), | ||
nuid = require('nuid'), | ||
ssc = require('./support/stan_server_control'), | ||
should = require('should'), | ||
timers = require('timers'); | ||
timers = require('timers'), | ||
net = require('net'); | ||
@@ -228,2 +229,27 @@ describe('Basic Connectivity', function() { | ||
}); | ||
it('reconnect should provide stan connection', function (done) { | ||
this.timeout(15000); | ||
var stan = STAN.connect(cluster, nuid.next(), {'url':'nats://localhost:' + PORT, 'reconnectTimeWait':1000}); | ||
var reconnected = false; | ||
stan.on('connect', function (sc) { | ||
should(stan).equal(sc, 'stan connect did not pass stan connection'); | ||
process.nextTick(function () { | ||
ssc.stop_server(server); | ||
}); | ||
}); | ||
stan.on('reconnecting', function () { | ||
if (!reconnected) { | ||
reconnected = true; | ||
server = ssc.start_server(PORT, function () { | ||
}); | ||
} | ||
}); | ||
stan.on('reconnect', function (sc) { | ||
should(stan).equal(sc, 'stan reconnect did not pass stan connection'); | ||
stan.close(); | ||
done(); | ||
}); | ||
}); | ||
}); |
@@ -7,3 +7,3 @@ /* jslint node: true */ | ||
NATS = require('nats'), | ||
nuid = require('../lib/nuid'), | ||
nuid = require('nuid'), | ||
ssc = require('./support/stan_server_control'), | ||
@@ -10,0 +10,0 @@ should = require('should'), |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
172284
4884
237
3
26
4
+ Addednuid@>=0.6.8
+ Addednuid@2.0.0(transitive)
Updatednats@>= 0.7.10