New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

nats

Package Overview
Dependencies
Maintainers
3
Versions
198
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

nats - npm Package Compare versions

Comparing version 2.25.0 to 2.26.0

10

lib/jetstream/consumer.d.ts

@@ -163,2 +163,6 @@ import { Timeout } from "../nats-base-client/util";

/**
* Notifies that the current consumer will be reset
*/
Reset = "reset",
/**
* Notifies whenever there's a request for additional messages from the server.

@@ -292,3 +296,7 @@ * This notification telegraphs the request options, which should be treated as

internalHandler(serial: number): (m: JsMsg) => void;
reset(opts?: ConsumeOptions | FetchOptions, fromFetch?: boolean): Promise<ConsumerMessages>;
reset(opts?: ConsumeOptions | FetchOptions, info?: Partial<{
fromFetch: boolean;
orderedReset: boolean;
}>): Promise<void>;
notifyOrderedResetAndReset(): void;
consume(opts?: ConsumeOptions): Promise<ConsumerMessages>;

@@ -295,0 +303,0 @@ fetch(opts?: FetchOptions): Promise<ConsumerMessages>;

132

lib/jetstream/consumer.js

@@ -112,2 +112,6 @@ "use strict";

/**
* Notifies that the current consumer will be reset
*/
ConsumerDebugEvents["Reset"] = "reset";
/**
* Notifies whenever there's a request for additional messages from the server.

@@ -858,3 +862,3 @@ * This notification telegraphs the request options, which should be treated as

if (dseq !== this.cursor.deliver_seq + 1) {
this.reset(this.opts);
this.notifyOrderedResetAndReset();
return;

@@ -876,5 +880,23 @@ }

expires: 30000,
}, fromFetch = false) {
this.currentConsumer = yield this.resetConsumer(this.cursor.stream_seq + 1);
if (this.iter === null) {
}, info) {
var _a, _b;
info = info || {};
// this is known to be directly related to a pull
const fromFetch = info.fromFetch || false;
// a sequence order caused the reset
const orderedReset = info.orderedReset || false;
if (this.type === PullConsumerType.Fetch && orderedReset) {
// the fetch pull simply needs to end the iterator
(_a = this.iter) === null || _a === void 0 ? void 0 : _a.src.stop();
yield ((_b = this.iter) === null || _b === void 0 ? void 0 : _b.closed());
this.currentConsumer = null;
return;
}
if (this.currentConsumer === null || orderedReset) {
this.currentConsumer = yield this.resetConsumer(this.cursor.stream_seq + 1);
}
// if we don't have an iterator, or it is a fetch request
// we create the iterator - otherwise this is a reset that is happening
// while the OC is active, so simply bind the new OC to current iterator.
if (this.iter === null || fromFetch) {
this.iter = new OrderedConsumerMessages();

@@ -893,59 +915,65 @@ }

}
else {
return Promise.reject("reset called with unset consumer type");
}
const msgsImpl = msgs;
msgsImpl.forOrderedConsumer = true;
msgsImpl.resetHandler = () => {
this.reset(this.opts);
this.notifyOrderedResetAndReset();
};
this.iter.setSource(msgsImpl);
});
}
notifyOrderedResetAndReset() {
var _a;
(_a = this.iter) === null || _a === void 0 ? void 0 : _a.notify(ConsumerDebugEvents.Reset, "");
this.reset(this.opts, { orderedReset: true });
}
consume() {
return __awaiter(this, arguments, void 0, function* (opts = {
max_messages: 100,
expires: 30000,
}) {
const copts = opts;
if (copts.bind) {
return Promise.reject(new Error("bind is not supported"));
}
if (this.type === PullConsumerType.Fetch) {
return Promise.reject(new Error("ordered consumer initialized as fetch"));
}
if (this.type === PullConsumerType.Consume) {
return Promise.reject(new Error("ordered consumer doesn't support concurrent consume"));
}
const { callback } = opts;
if (callback) {
this.userCallback = callback;
}
this.type = PullConsumerType.Consume;
this.opts = opts;
yield this.reset(opts);
return this.iter;
});
}
consume(opts = {
max_messages: 100,
expires: 30000,
}) {
const copts = opts;
if (copts.bind) {
return Promise.reject(new Error("bind is not supported"));
}
if (this.type === PullConsumerType.Fetch) {
return Promise.reject(new Error("ordered consumer initialized as fetch"));
}
if (this.type === PullConsumerType.Consume) {
return Promise.reject(new Error("ordered consumer doesn't support concurrent consume"));
}
const { callback } = opts;
if (callback) {
this.userCallback = callback;
}
this.type = PullConsumerType.Consume;
this.opts = opts;
return this.reset(opts);
fetch() {
return __awaiter(this, arguments, void 0, function* (opts = { max_messages: 100, expires: 30000 }) {
var _a;
const copts = opts;
if (copts.bind) {
return Promise.reject(new Error("bind is not supported"));
}
if (this.type === PullConsumerType.Consume) {
return Promise.reject(new Error("ordered consumer already initialized as consume"));
}
if (((_a = this.iter) === null || _a === void 0 ? void 0 : _a.done) === false) {
return Promise.reject(new Error("ordered consumer doesn't support concurrent fetch"));
}
//@ts-ignore: allow this for tests - api doesn't use it because
// iterator close is the user signal that the pull is done.
const { callback } = opts;
if (callback) {
this.userCallback = callback;
}
this.type = PullConsumerType.Fetch;
this.opts = opts;
yield this.reset(opts, { fromFetch: true });
return this.iter;
});
}
fetch(opts = { max_messages: 100, expires: 30000 }) {
var _a;
const copts = opts;
if (copts.bind) {
return Promise.reject(new Error("bind is not supported"));
}
if (this.type === PullConsumerType.Consume) {
return Promise.reject(new Error("ordered consumer already initialized as consume"));
}
if (((_a = this.iter) === null || _a === void 0 ? void 0 : _a.done) === false) {
return Promise.reject(new Error("ordered consumer doesn't support concurrent fetch"));
}
//@ts-ignore: allow this for tests - api doesn't use it because
// iterator close is the user signal that the pull is done.
const { callback } = opts;
if (callback) {
this.userCallback = callback;
}
this.type = PullConsumerType.Fetch;
this.opts = opts;
this.iter = new OrderedConsumerMessages();
return this.reset(opts, true);
}
next() {

@@ -952,0 +980,0 @@ return __awaiter(this, arguments, void 0, function* (opts = { expires: 30000 }) {

@@ -47,3 +47,3 @@ "use strict";

const dns = require("dns");
const VERSION = "2.25.0";
const VERSION = "2.26.0";
const LANG = "nats.js";

@@ -50,0 +50,0 @@ class NodeTransport {

{
"name": "nats",
"version": "2.25.0",
"version": "2.26.0",
"description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system",

@@ -45,3 +45,3 @@ "keywords": [

"clean": "shx rm -Rf ./lib/* ./nats-base-client ./.deps",
"clone-nbc": "shx mkdir -p ./.deps && cd ./.deps && git clone --branch v1.25.0 https://github.com/nats-io/nats.deno.git",
"clone-nbc": "shx mkdir -p ./.deps && cd ./.deps && git clone --branch v1.26.0 https://github.com/nats-io/nats.deno.git",
"fmt": "deno fmt ./src/ ./examples/ ./test/",

@@ -70,3 +70,3 @@ "prepack": "npm run clone-nbc && npm run cjs && npm run check-package && npm run build",

"devDependencies": {
"@types/node": "^20.12.8",
"@types/node": "^20.12.12",
"ava": "^5.3.x",

@@ -73,0 +73,0 @@ "minimist": "^1.2.8",

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc