@nats-io/jetstream
Advanced tools
Comparing version 3.0.0-21 to 3.0.0-22
@@ -92,4 +92,10 @@ "use strict"; | ||
this.monitor?.work(); | ||
const isProtocol = msg.subject === this.inbox; | ||
const isProtocol = this.consumer.ordered | ||
? msg.subject.indexOf(this?.inboxPrefix) === 0 | ||
: msg.subject === this.inbox; | ||
if (isProtocol) { | ||
if (msg.subject !== this.sub.subject) { | ||
// this is a stale message - was not sent to the current inbox | ||
return; | ||
} | ||
const status = new jserrors_1.JetStreamStatus(msg); | ||
@@ -103,3 +109,2 @@ if (status.isIdleHeartbeat()) { | ||
const { msgsLeft, bytesLeft } = status.parseDiscard(); | ||
console.log("pending", msgsLeft, bytesLeft); | ||
if ((msgsLeft && msgsLeft > 0) || (bytesLeft && bytesLeft > 0)) { | ||
@@ -662,3 +667,3 @@ this.pending.msgs -= msgsLeft; | ||
} | ||
info(cached = false) { | ||
async info(cached = false) { | ||
if (cached) { | ||
@@ -668,7 +673,4 @@ return Promise.resolve(this._info); | ||
const { stream_name, name } = this._info; | ||
return this.api.info(stream_name, name) | ||
.then((ci) => { | ||
this._info = ci; | ||
return this._info; | ||
}); | ||
this._info = await this.api.info(stream_name, name); | ||
return this._info; | ||
} | ||
@@ -675,0 +677,0 @@ } |
@@ -75,4 +75,4 @@ "use strict"; | ||
const re = err instanceof internal_1.RequestError ? err : null; | ||
if (err instanceof internal_1.errors.TimeoutError || | ||
re?.isNoResponders() && i + 1 < retries) { | ||
if ((err instanceof internal_1.errors.TimeoutError || re?.isNoResponders()) && | ||
i + 1 < retries) { | ||
await (0, internal_1.delay)(bo.backoff(i)); | ||
@@ -79,0 +79,0 @@ } |
@@ -59,3 +59,5 @@ "use strict"; | ||
console.log({ | ||
message: this.description, | ||
subject: this.msg.subject, | ||
reply: this.msg.reply, | ||
description: this.description, | ||
status: this.code, | ||
@@ -62,0 +64,0 @@ headers: this.msg.headers, |
@@ -227,5 +227,19 @@ "use strict"; | ||
this.monitor?.work(); | ||
const isProtocol = msg.subject === subject; | ||
// need to make sure to catch all protocol messages even | ||
const isProtocol = this.ordered | ||
? msg.subject.indexOf(this?.deliverPrefix) === 0 | ||
: msg.subject === subject; | ||
if (isProtocol) { | ||
if (msg.subject !== this.sub.subject) { | ||
// this is a stale message - was not sent to the current inbox | ||
return; | ||
} | ||
const status = new jserrors_1.JetStreamStatus(msg); | ||
if (status.isFlowControlRequest()) { | ||
this._push(() => { | ||
msg.respond(); | ||
this.notify(types_1.ConsumerDebugEvents.FlowControl, null); | ||
}); | ||
return; | ||
} | ||
if (status.isIdleHeartbeat()) { | ||
@@ -240,10 +254,2 @@ const natsLastConsumer = msg.headers?.get("Nats-Last-Consumer"); | ||
} | ||
if (status.isFlowControlRequest()) { | ||
status.debug(); | ||
this._push(() => { | ||
msg.respond(); | ||
this.notify(types_1.ConsumerDebugEvents.FlowControl, null); | ||
}); | ||
return; | ||
} | ||
const code = status.code; | ||
@@ -250,0 +256,0 @@ const description = status.description; |
{ | ||
"name": "@nats-io/jetstream", | ||
"version": "3.0.0-21", | ||
"version": "3.0.0-22", | ||
"files": [ | ||
@@ -45,2 +45,2 @@ "lib/", | ||
} | ||
} | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
332726
5554