Comparing version 2.25.0 to 2.26.0
@@ -163,2 +163,6 @@ import { Timeout } from "../nats-base-client/util"; | ||
/** | ||
* Notifies that the current consumer will be reset | ||
*/ | ||
Reset = "reset", | ||
/** | ||
* Notifies whenever there's a request for additional messages from the server. | ||
@@ -292,3 +296,7 @@ * This notification telegraphs the request options, which should be treated as | ||
internalHandler(serial: number): (m: JsMsg) => void; | ||
reset(opts?: ConsumeOptions | FetchOptions, fromFetch?: boolean): Promise<ConsumerMessages>; | ||
reset(opts?: ConsumeOptions | FetchOptions, info?: Partial<{ | ||
fromFetch: boolean; | ||
orderedReset: boolean; | ||
}>): Promise<void>; | ||
notifyOrderedResetAndReset(): void; | ||
consume(opts?: ConsumeOptions): Promise<ConsumerMessages>; | ||
@@ -295,0 +303,0 @@ fetch(opts?: FetchOptions): Promise<ConsumerMessages>; |
@@ -112,2 +112,6 @@ "use strict"; | ||
/** | ||
* Notifies that the current consumer will be reset | ||
*/ | ||
ConsumerDebugEvents["Reset"] = "reset"; | ||
/** | ||
* Notifies whenever there's a request for additional messages from the server. | ||
@@ -858,3 +862,3 @@ * This notification telegraphs the request options, which should be treated as | ||
if (dseq !== this.cursor.deliver_seq + 1) { | ||
this.reset(this.opts); | ||
this.notifyOrderedResetAndReset(); | ||
return; | ||
@@ -876,5 +880,23 @@ } | ||
expires: 30000, | ||
}, fromFetch = false) { | ||
this.currentConsumer = yield this.resetConsumer(this.cursor.stream_seq + 1); | ||
if (this.iter === null) { | ||
}, info) { | ||
var _a, _b; | ||
info = info || {}; | ||
// this is known to be directly related to a pull | ||
const fromFetch = info.fromFetch || false; | ||
// a sequence order caused the reset | ||
const orderedReset = info.orderedReset || false; | ||
if (this.type === PullConsumerType.Fetch && orderedReset) { | ||
// the fetch pull simply needs to end the iterator | ||
(_a = this.iter) === null || _a === void 0 ? void 0 : _a.src.stop(); | ||
yield ((_b = this.iter) === null || _b === void 0 ? void 0 : _b.closed()); | ||
this.currentConsumer = null; | ||
return; | ||
} | ||
if (this.currentConsumer === null || orderedReset) { | ||
this.currentConsumer = yield this.resetConsumer(this.cursor.stream_seq + 1); | ||
} | ||
// if we don't have an iterator, or it is a fetch request | ||
// we create the iterator - otherwise this is a reset that is happening | ||
// while the OC is active, so simply bind the new OC to current iterator. | ||
if (this.iter === null || fromFetch) { | ||
this.iter = new OrderedConsumerMessages(); | ||
@@ -893,59 +915,65 @@ } | ||
} | ||
else { | ||
return Promise.reject("reset called with unset consumer type"); | ||
} | ||
const msgsImpl = msgs; | ||
msgsImpl.forOrderedConsumer = true; | ||
msgsImpl.resetHandler = () => { | ||
this.reset(this.opts); | ||
this.notifyOrderedResetAndReset(); | ||
}; | ||
this.iter.setSource(msgsImpl); | ||
}); | ||
} | ||
notifyOrderedResetAndReset() { | ||
var _a; | ||
(_a = this.iter) === null || _a === void 0 ? void 0 : _a.notify(ConsumerDebugEvents.Reset, ""); | ||
this.reset(this.opts, { orderedReset: true }); | ||
} | ||
consume() { | ||
return __awaiter(this, arguments, void 0, function* (opts = { | ||
max_messages: 100, | ||
expires: 30000, | ||
}) { | ||
const copts = opts; | ||
if (copts.bind) { | ||
return Promise.reject(new Error("bind is not supported")); | ||
} | ||
if (this.type === PullConsumerType.Fetch) { | ||
return Promise.reject(new Error("ordered consumer initialized as fetch")); | ||
} | ||
if (this.type === PullConsumerType.Consume) { | ||
return Promise.reject(new Error("ordered consumer doesn't support concurrent consume")); | ||
} | ||
const { callback } = opts; | ||
if (callback) { | ||
this.userCallback = callback; | ||
} | ||
this.type = PullConsumerType.Consume; | ||
this.opts = opts; | ||
yield this.reset(opts); | ||
return this.iter; | ||
}); | ||
} | ||
consume(opts = { | ||
max_messages: 100, | ||
expires: 30000, | ||
}) { | ||
const copts = opts; | ||
if (copts.bind) { | ||
return Promise.reject(new Error("bind is not supported")); | ||
} | ||
if (this.type === PullConsumerType.Fetch) { | ||
return Promise.reject(new Error("ordered consumer initialized as fetch")); | ||
} | ||
if (this.type === PullConsumerType.Consume) { | ||
return Promise.reject(new Error("ordered consumer doesn't support concurrent consume")); | ||
} | ||
const { callback } = opts; | ||
if (callback) { | ||
this.userCallback = callback; | ||
} | ||
this.type = PullConsumerType.Consume; | ||
this.opts = opts; | ||
return this.reset(opts); | ||
fetch() { | ||
return __awaiter(this, arguments, void 0, function* (opts = { max_messages: 100, expires: 30000 }) { | ||
var _a; | ||
const copts = opts; | ||
if (copts.bind) { | ||
return Promise.reject(new Error("bind is not supported")); | ||
} | ||
if (this.type === PullConsumerType.Consume) { | ||
return Promise.reject(new Error("ordered consumer already initialized as consume")); | ||
} | ||
if (((_a = this.iter) === null || _a === void 0 ? void 0 : _a.done) === false) { | ||
return Promise.reject(new Error("ordered consumer doesn't support concurrent fetch")); | ||
} | ||
//@ts-ignore: allow this for tests - api doesn't use it because | ||
// iterator close is the user signal that the pull is done. | ||
const { callback } = opts; | ||
if (callback) { | ||
this.userCallback = callback; | ||
} | ||
this.type = PullConsumerType.Fetch; | ||
this.opts = opts; | ||
yield this.reset(opts, { fromFetch: true }); | ||
return this.iter; | ||
}); | ||
} | ||
fetch(opts = { max_messages: 100, expires: 30000 }) { | ||
var _a; | ||
const copts = opts; | ||
if (copts.bind) { | ||
return Promise.reject(new Error("bind is not supported")); | ||
} | ||
if (this.type === PullConsumerType.Consume) { | ||
return Promise.reject(new Error("ordered consumer already initialized as consume")); | ||
} | ||
if (((_a = this.iter) === null || _a === void 0 ? void 0 : _a.done) === false) { | ||
return Promise.reject(new Error("ordered consumer doesn't support concurrent fetch")); | ||
} | ||
//@ts-ignore: allow this for tests - api doesn't use it because | ||
// iterator close is the user signal that the pull is done. | ||
const { callback } = opts; | ||
if (callback) { | ||
this.userCallback = callback; | ||
} | ||
this.type = PullConsumerType.Fetch; | ||
this.opts = opts; | ||
this.iter = new OrderedConsumerMessages(); | ||
return this.reset(opts, true); | ||
} | ||
next() { | ||
@@ -952,0 +980,0 @@ return __awaiter(this, arguments, void 0, function* (opts = { expires: 30000 }) { |
@@ -47,3 +47,3 @@ "use strict"; | ||
const dns = require("dns"); | ||
const VERSION = "2.25.0"; | ||
const VERSION = "2.26.0"; | ||
const LANG = "nats.js"; | ||
@@ -50,0 +50,0 @@ class NodeTransport { |
{ | ||
"name": "nats", | ||
"version": "2.25.0", | ||
"version": "2.26.0", | ||
"description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system", | ||
@@ -45,3 +45,3 @@ "keywords": [ | ||
"clean": "shx rm -Rf ./lib/* ./nats-base-client ./.deps", | ||
"clone-nbc": "shx mkdir -p ./.deps && cd ./.deps && git clone --branch v1.25.0 https://github.com/nats-io/nats.deno.git", | ||
"clone-nbc": "shx mkdir -p ./.deps && cd ./.deps && git clone --branch v1.26.0 https://github.com/nats-io/nats.deno.git", | ||
"fmt": "deno fmt ./src/ ./examples/ ./test/", | ||
@@ -70,3 +70,3 @@ "prepack": "npm run clone-nbc && npm run cjs && npm run check-package && npm run build", | ||
"devDependencies": { | ||
"@types/node": "^20.12.8", | ||
"@types/node": "^20.12.12", | ||
"ava": "^5.3.x", | ||
@@ -73,0 +73,0 @@ "minimist": "^1.2.8", |
Sorry, the diff of this file is not supported yet
1160401
19711