node-nats-streaming
Advanced tools
Comparing version 0.0.8 to 0.0.10
@@ -24,3 +24,3 @@ /*! | ||
*/ | ||
var VERSION = '0.0.8', | ||
var VERSION = '0.0.10', | ||
DEFAULT_PORT = 4222, | ||
@@ -256,3 +256,2 @@ DEFAULT_PRE = 'nats://localhost:', | ||
that.nc.request(discoverSubject, new Buffer(req.serializeBinary()), {max:1}, function(msg) { | ||
//noinspection JSUnresolvedVariable | ||
var cr = proto.ConnectResponse.deserializeBinary(new Buffer(msg, 'binary').toByteArray()); | ||
@@ -476,3 +475,2 @@ that.pubPrefix = cr.getPubPrefix(); | ||
retVal.inboxSub = this.nc.subscribe(retVal.inbox, this.processMsg()); | ||
//noinspection JSUnresolvedFunction | ||
var sr = new proto.SubscriptionRequest(); | ||
@@ -490,3 +488,3 @@ sr.setClientId(this.clientID); | ||
case proto.StartPosition.TIME_DELTA_START: | ||
sr.setStartAtTimeDelta(retVal.opts.startTime); | ||
sr.setStartTimeDelta(retVal.opts.startTime); | ||
break; | ||
@@ -811,3 +809,3 @@ case proto.StartPosition.SEQUENCE_START: | ||
// server expects values in ns | ||
this.startTime = date.getTime() * 1000000; | ||
this.startTime = (Date.now() - date.valueOf()) * 1000000; | ||
return this; | ||
@@ -823,5 +821,4 @@ }; | ||
//noinspection JSUnresolvedFunction | ||
var now = new Date().getTime(); | ||
// server expects values in ns | ||
this.startTime = (now - millis) * 1000000; | ||
this.startTime = millis * 1000000; | ||
return this; | ||
@@ -828,0 +825,0 @@ }; |
{ | ||
"name": "node-nats-streaming", | ||
"version": "0.0.8", | ||
"version": "0.0.10", | ||
"description": "Node.js client for NATS Streaming, a lightweight, high-performance cloud native messaging system", | ||
@@ -5,0 +5,0 @@ "keywords": [ |
@@ -150,2 +150,3 @@ /* jshint node: true */ | ||
it('should include the correct reply in the callback', function (done) { | ||
@@ -443,2 +444,85 @@ var stan = STAN.connect(cluster, nuid.next(), PORT); | ||
it('subscribe after 500ms on last received', function (done) { | ||
this.timeout(5000); | ||
var stan = STAN.connect(cluster, nuid.next(), PORT); | ||
var subj = nuid.next(); | ||
var count = 0; | ||
function subscribe() { | ||
var gotFirst = false; | ||
var opts = stan.subscriptionOptions(); | ||
opts.setStartAtTimeDelta(1000); | ||
var sub = stan.subscribe(subj, opts); | ||
sub.on('message', function (msg) { | ||
if (!gotFirst) { | ||
gotFirst = true; | ||
should(msg.getData()).equal('fourth', 'message was not the one expected'); | ||
done(); | ||
} | ||
}); | ||
} | ||
var waitForSix = function () { | ||
count++; | ||
if (count === 6) { | ||
process.nextTick(subscribe); | ||
} | ||
}; | ||
stan.on('connect', function () { | ||
stan.publishAsync(subj, 'first', waitForSix); | ||
stan.publishAsync(subj, 'second', waitForSix); | ||
stan.publishAsync(subj, 'third', waitForSix); | ||
setTimeout(function() { | ||
stan.publishAsync(subj, 'fourth', waitForSix); | ||
stan.publishAsync(subj, 'fifth', waitForSix); | ||
stan.publishAsync(subj, 'sixth', waitForSix); | ||
}, 1500); | ||
}); | ||
}); | ||
it('subscribe after a specific time on last received', function (done) { | ||
this.timeout(6000); | ||
var stan = STAN.connect(cluster, nuid.next(), PORT); | ||
var subj = nuid.next(); | ||
var count = 0; | ||
function subscribe() { | ||
var gotFirst = false; | ||
var opts = stan.subscriptionOptions(); | ||
opts.setStartTime(new Date(Date.now() - 1000)); | ||
var sub = stan.subscribe(subj, opts); | ||
sub.on('message', function (msg) { | ||
if (!gotFirst) { | ||
gotFirst = true; | ||
// node will be spurious since we are in a single thread | ||
var ok = msg.getData() === 'fourth' || msg.getData() === 'fifth' || msg.getData() === 'sixth'; | ||
should(ok).equal(true, 'message was not the one expected'); | ||
done(); | ||
} | ||
}); | ||
} | ||
var waitForSix = function () { | ||
count++; | ||
if (count === 6) { | ||
process.nextTick(subscribe); | ||
} | ||
}; | ||
stan.on('connect', function () { | ||
stan.publishAsync(subj, 'first', waitForSix); | ||
stan.publishAsync(subj, 'second', waitForSix); | ||
stan.publishAsync(subj, 'third', waitForSix); | ||
setTimeout(function() { | ||
stan.publishAsync(subj, 'fourth', waitForSix); | ||
stan.publishAsync(subj, 'fifth', waitForSix); | ||
stan.publishAsync(subj, 'sixth', waitForSix); | ||
}, 1500); | ||
}); | ||
}); | ||
it('subscribe starting on new', function (done) { | ||
@@ -445,0 +529,0 @@ var stan = STAN.connect(cluster, nuid.next(), PORT); |
163732
4692