Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@nats-io/jetstream

Package Overview
Dependencies
Maintainers
0
Versions
23
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@nats-io/jetstream - npm Package Compare versions

Comparing version 3.0.0-25 to 3.0.0-26

11

lib/consumer.d.ts

@@ -5,4 +5,4 @@ import type { CallbackFn, Delay, QueuedIterator, Status, Subscription, Timeout } from "@nats-io/nats-core/internal";

import type { JsMsg } from "./jsmsg";
import type { ConsumerConfig, ConsumerInfo, PullOptions } from "./jsapi_types";
import type { ConsumeOptions, Consumer, ConsumerAPI, ConsumerCallbackFn, ConsumerMessages, ConsumerStatus, FetchOptions, NextOptions, OrderedConsumerOptions, PullConsumerOptions } from "./types";
import type { ConsumerConfig, ConsumerInfo, OverflowMinPendingAndMinAck, PullOptions } from "./jsapi_types";
import type { ConsumeOptions, Consumer, ConsumerAPI, ConsumerCallbackFn, ConsumerMessages, ConsumerStatus, Expires, FetchOptions, IdleHeartbeat, MaxBytes, MaxMessages, NextOptions, OrderedConsumerOptions, PullConsumerOptions, ThresholdBytes, ThresholdMessages } from "./types";
import { ConsumerDebugEvents, ConsumerEvents } from "./types";

@@ -32,5 +32,7 @@ import { JetStreamStatus } from "./jserrors";

};
type InternalPullOptions = MaxMessages & MaxBytes & Expires & IdleHeartbeat & ThresholdMessages & OverflowMinPendingAndMinAck & ThresholdBytes;
export declare function isOverflowOptions(opts: unknown): opts is OverflowMinPendingAndMinAck;
export declare class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg> implements ConsumerMessages {
consumer: PullConsumerImpl;
opts: Record<string, number>;
opts: InternalPullOptions;
sub: Subscription;

@@ -74,3 +76,3 @@ monitor: IdleHeartbeatMonitor | null;

stop(err?: Error): void;
parseOptions(opts: PullConsumerOptions, refilling?: boolean): Record<string, number>;
parseOptions(opts: PullConsumerOptions, refilling?: boolean): InternalPullOptions;
status(): AsyncIterable<ConsumerStatus>;

@@ -101,2 +103,3 @@ }

}
export declare function validateOverflowPullOptions(opts: unknown): void;
export {};

@@ -18,2 +18,4 @@ "use strict";

exports.PullConsumerImpl = exports.PullConsumerMessagesImpl = void 0;
exports.isOverflowOptions = isOverflowOptions;
exports.validateOverflowPullOptions = validateOverflowPullOptions;
const internal_1 = require("@nats-io/nats-core/internal");

@@ -24,2 +26,3 @@ const jsmsg_1 = require("./jsmsg");

const jserrors_1 = require("./jserrors");
const jsutil_1 = require("./jsutil");
var PullConsumerType;

@@ -31,2 +34,8 @@ (function (PullConsumerType) {

})(PullConsumerType || (PullConsumerType = {}));
function isOverflowOptions(opts) {
const oo = opts;
return oo && typeof oo.group === "string" ||
typeof oo.min_pending === "number" ||
typeof oo.min_ack_pending === "number";
}
class PullConsumerMessagesImpl extends internal_1.QueuedIteratorImpl {

@@ -57,2 +66,9 @@ consumer;

if (this.consumer.ordered) {
if (isOverflowOptions(opts)) {
throw internal_1.errors.InvalidArgumentError.format([
"group",
"min_pending",
"min_ack_pending",
], "cannot be specified for ordered consumers");
}
if (this.consumer.orderedConsumerState === undefined) {

@@ -458,3 +474,13 @@ // initialize the state for the order consumer

const expires = (0, internal_1.nanos)(this.opts.expires);
return { batch, max_bytes, idle_heartbeat, expires };
const opts = { batch, max_bytes, idle_heartbeat, expires };
if (isOverflowOptions(this.opts)) {
opts.group = this.opts.group;
if (this.opts.min_pending) {
opts.min_pending = this.opts.min_pending;
}
if (this.opts.min_ack_pending) {
opts.min_ack_pending = this.opts.min_ack_pending;
}
}
return opts;
}

@@ -497,3 +523,3 @@ trackTimeout(t) {

if (args.max_messages !== 0 && args.max_bytes !== 0) {
throw new Error(`only specify one of max_messages or max_bytes`);
throw internal_1.errors.InvalidArgumentError.format(["max_messages", "max_bytes"], "are mutually exclusive");
}

@@ -527,2 +553,18 @@ // we must have at least one limit - default to 100 msgs

}
if (isOverflowOptions(opts)) {
const { min, ok } = this.consumer.api.nc.features.get(internal_1.Feature.JS_PRIORITY_GROUPS);
if (!ok) {
throw new Error(`priority_groups require server ${min}`);
}
validateOverflowPullOptions(opts);
if (opts.group) {
args.group = opts.group;
}
if (opts.min_ack_pending) {
args.min_ack_pending = opts.min_ack_pending;
}
if (opts.min_pending) {
args.min_pending = opts.min_pending;
}
}
return args;

@@ -682,2 +724,20 @@ }

exports.PullConsumerImpl = PullConsumerImpl;
function validateOverflowPullOptions(opts) {
if (isOverflowOptions(opts)) {
(0, jsutil_1.minValidation)("group", opts.group);
if (opts.group.length > 16) {
throw internal_1.errors.InvalidArgumentError.format("group", "must be 16 characters or less");
}
const { min_pending, min_ack_pending } = opts;
if (!min_pending && !min_ack_pending) {
throw internal_1.errors.InvalidArgumentError.format(["min_pending", "min_ack_pending"], "at least one must be specified");
}
if (min_pending && typeof min_pending !== "number") {
throw internal_1.errors.InvalidArgumentError.format(["min_pending"], "must be a number");
}
if (min_ack_pending && typeof min_ack_pending !== "number") {
throw internal_1.errors.InvalidArgumentError.format(["min_ack_pending"], "must be a number");
}
}
}
//# sourceMappingURL=consumer.js.map

@@ -823,3 +823,7 @@ import type { Nanos } from "@nats-io/nats-core";

}
export interface ConsumerConfig extends ConsumerUpdateConfig {
export type PriorityGroups = {
priority_groups?: string[];
priority_policy?: PriorityPolicy;
};
export type ConsumerConfig = ConsumerUpdateConfig & {
/**

@@ -880,4 +884,4 @@ * The type of acknowledgment required by the Consumer

"pause_until"?: string;
}
export interface ConsumerUpdateConfig {
};
export type ConsumerUpdateConfig = PriorityGroups & {
/**

@@ -933,4 +937,4 @@ * A short description of the purpose of this consume

/**
* List of durations in nanoseconds format that represents a retry timescale for
* NaK'd messages or those being normally retried
* List of durations in nanoseconds that represents a retry timescale for
* the redelivery of messages
*/

@@ -966,4 +970,43 @@ "backoff"?: Nanos[];

metadata?: Record<string, string>;
};
export declare enum PriorityPolicy {
None = "none",
Overflow = "overflow"
}
export declare function defaultConsumer(name: string, opts?: Partial<ConsumerConfig>): ConsumerConfig;
export type OverflowMinPending = {
/**
* The name of the priority_group
*/
group: string;
/**
* Only deliver messages when num_pending for the consumer is greater than this value
*/
min_pending: number;
};
export type OverflowMinAckPending = {
/**
* The name of the priority_group
*/
group: string;
/**
* Only deliver messages when num_ack_pending for the consumer is greater than this value
*/
min_ack_pending: number;
};
export type OverflowMinPendingAndMinAck = {
/**
* The name of the priority_group
*/
group: string;
/**
* Only deliver messages when num_pending for the consumer is greater than this value
*/
min_pending: number;
/**
* Only deliver messages when num_ack_pending for the consumer is greater than this value
*/
min_ack_pending: number;
};
export type OverflowOptions = OverflowMinPending | OverflowMinAckPending | OverflowMinPendingAndMinAck;
/**

@@ -974,3 +1017,3 @@ * Options for a JetStream pull subscription which define how long

*/
export interface PullOptions {
export type PullOptions = Partial<OverflowMinPendingAndMinAck> & {
/**

@@ -994,4 +1037,7 @@ * Max number of messages to retrieve in a pull.

"max_bytes": number;
/**
* Number of nanos between messages for the server to emit an idle_heartbeat
*/
"idle_heartbeat": number;
}
};
export interface DeliveryInfo {

@@ -998,0 +1044,0 @@ /**

@@ -17,3 +17,3 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
exports.PubHeaders = exports.ConsumerApiAction = exports.StoreCompression = exports.ReplayPolicy = exports.AckPolicy = exports.DeliverPolicy = exports.StorageType = exports.DiscardPolicy = exports.RetentionPolicy = void 0;
exports.PubHeaders = exports.PriorityPolicy = exports.ConsumerApiAction = exports.StoreCompression = exports.ReplayPolicy = exports.AckPolicy = exports.DeliverPolicy = exports.StorageType = exports.DiscardPolicy = exports.RetentionPolicy = void 0;
exports.defaultConsumer = defaultConsumer;

@@ -132,2 +132,7 @@ const nats_core_1 = require("@nats-io/nats-core");

})(ConsumerApiAction || (exports.ConsumerApiAction = ConsumerApiAction = {}));
var PriorityPolicy;
(function (PriorityPolicy) {
PriorityPolicy["None"] = "none";
PriorityPolicy["Overflow"] = "overflow";
})(PriorityPolicy || (exports.PriorityPolicy = PriorityPolicy = {}));
function defaultConsumer(name, opts = {}) {

@@ -134,0 +139,0 @@ return Object.assign({

import { BaseApiClientImpl } from "./jsbaseclient_api";
import type { Nanos, NatsConnection } from "@nats-io/nats-core/internal";
import type { ConsumerConfig, ConsumerInfo, ConsumerUpdateConfig } from "./jsapi_types";
import { ConsumerApiAction } from "./jsapi_types";
import type { ConsumerConfig, ConsumerInfo, ConsumerUpdateConfig } from "./jsapi_types";
import type { ConsumerAPI, JetStreamOptions, Lister } from "./types";

@@ -6,0 +6,0 @@ export declare class ConsumerAPIImpl extends BaseApiClientImpl implements ConsumerAPI {

@@ -35,2 +35,12 @@ "use strict";

}
if (isPriorityGroup(cfg)) {
const { min, ok } = this.nc.features.get(internal_1.Feature.JS_PRIORITY_GROUPS);
if (!ok) {
throw new Error(`priority_groups require server ${min}`);
}
if (cfg.deliver_subject) {
throw internal_1.InvalidArgumentError.format("deliver_subject", "cannot be set when using priority groups");
}
validatePriorityGroups(cfg);
}
const cr = {};

@@ -144,2 +154,27 @@ cr.config = cfg;

exports.ConsumerAPIImpl = ConsumerAPIImpl;
function isPriorityGroup(config) {
const pg = config;
return pg && pg.priority_groups !== undefined ||
pg.priority_policy !== undefined;
}
function validatePriorityGroups(pg) {
if (isPriorityGroup(pg)) {
if (!Array.isArray(pg.priority_groups)) {
throw internal_1.InvalidArgumentError.format(["priority_groups"], "must be an array");
}
if (pg.priority_groups.length === 0) {
throw internal_1.InvalidArgumentError.format(["priority_groups"], "must have at least one group");
}
pg.priority_groups.forEach((g) => {
(0, jsutil_1.minValidation)("priority_group", g);
if (g.length > 16) {
throw internal_1.errors.InvalidArgumentError.format("group", "must be 16 characters or less");
}
});
if (pg.priority_policy !== jsapi_types_1.PriorityPolicy.None &&
pg.priority_policy !== jsapi_types_1.PriorityPolicy.Overflow) {
throw internal_1.InvalidArgumentError.format(["priority_policy"], "must be 'none' or 'overflow'");
}
}
}
//# sourceMappingURL=jsmconsumer_api.js.map

@@ -37,2 +37,10 @@ import type { Msg, MsgHdrs } from "@nats-io/nats-core/internal";

/**
* The time the message was received
*/
time: Date;
/**
* The time the message was received as an ISO formatted date string
*/
timestamp: string;
/**
* Indicate to the JetStream server that the message was processed

@@ -113,2 +121,4 @@ * successfully.

get seq(): number;
get time(): Date;
get timestamp(): string;
doAck(payload: Uint8Array): void;

@@ -115,0 +125,0 @@ isWIP(p: Uint8Array): boolean;

@@ -93,2 +93,9 @@ "use strict";

}
get time() {
const ms = (0, internal_1.millis)(this.info.timestampNanos);
return new Date(ms);
}
get timestamp() {
return this.time.toISOString();
}
doAck(payload) {

@@ -95,0 +102,0 @@ if (!this.didAck) {

import type { MsgHdrs, Nanos, Payload, QueuedIterator, ReviverFn } from "@nats-io/nats-core/internal";
import type { DeliverPolicy, ReplayPolicy } from "./jsapi_types";
import type { ConsumerConfig, ConsumerInfo, ConsumerUpdateConfig, DirectBatchOptions, DirectMsgRequest, JetStreamAccountStats, MsgRequest, PurgeOpts, PurgeResponse, StreamAlternate, StreamConfig, StreamInfo, StreamInfoRequestOptions, StreamUpdateConfig } from "./jsapi_types";
import type { ConsumerConfig, ConsumerInfo, ConsumerUpdateConfig, DirectBatchOptions, DirectMsgRequest, JetStreamAccountStats, MsgRequest, OverflowOptions, PurgeOpts, PurgeResponse, StreamAlternate, StreamConfig, StreamInfo, StreamInfoRequestOptions, StreamUpdateConfig } from "./jsapi_types";
import type { JsMsg } from "./jsmsg";

@@ -263,4 +263,4 @@ export interface JetStreamOptions {

export type NextOptions = Expires & Bind;
export type ConsumeBytes = MaxBytes & Partial<MaxMessages> & Partial<ThresholdBytes> & Expires & IdleHeartbeat & ConsumeCallback & AbortOnMissingResource & Bind;
export type ConsumeMessages = Partial<MaxMessages> & Partial<ThresholdMessages> & Expires & IdleHeartbeat & ConsumeCallback & AbortOnMissingResource & Bind;
export type ConsumeBytes = MaxBytes & Partial<MaxMessages> & Partial<ThresholdBytes> & Expires & IdleHeartbeat & ConsumeCallback & AbortOnMissingResource & Bind & Partial<OverflowOptions>;
export type ConsumeMessages = Partial<MaxMessages> & Partial<ThresholdMessages> & Expires & IdleHeartbeat & ConsumeCallback & AbortOnMissingResource & Bind & Partial<OverflowOptions>;
export type ConsumeOptions = ConsumeBytes | ConsumeMessages;

@@ -270,7 +270,7 @@ /**

*/
export type FetchBytes = MaxBytes & Partial<MaxMessages> & Expires & IdleHeartbeat & Bind;
export type FetchBytes = MaxBytes & Partial<MaxMessages> & Expires & IdleHeartbeat & Bind & Partial<OverflowOptions>;
/**
* Options for fetching messages
*/
export type FetchMessages = Partial<MaxMessages> & Expires & IdleHeartbeat & Bind;
export type FetchMessages = Partial<MaxMessages> & Expires & IdleHeartbeat & Bind & Partial<OverflowOptions>;
export type FetchOptions = FetchBytes | FetchMessages;

@@ -277,0 +277,0 @@ export type PullConsumerOptions = FetchOptions | ConsumeOptions;

{
"name": "@nats-io/jetstream",
"version": "3.0.0-25",
"version": "3.0.0-26",
"files": [

@@ -37,3 +37,3 @@ "lib/",

"dependencies": {
"@nats-io/nats-core": "3.0.0-36"
"@nats-io/nats-core": "3.0.0-37"
},

@@ -46,2 +46,2 @@ "devDependencies": {

}
}
}

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc