New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

smqp

Package Overview
Dependencies
Maintainers
1
Versions
79
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

smqp - npm Package Compare versions

Comparing version 8.2.0 to 8.2.1

8

CHANGELOG.md
Changelog
=========
# 8.3.0
- export Message, Queue, Consumer, and Exchange
- allow queue event options `queue.on(event, handler[, options])`
- cancelling a consumer returns true if consumer was found and false if not
- `broker.get(queueName, { noAck: true })` not only dequeues message it also marks the actual message as consumed on the, until now, undocumented `message.pending` flag
- fix other inconsistent message pending stuff
# 8.2.0

@@ -5,0 +13,0 @@

30

dist/Errors.js

@@ -15,21 +15,11 @@ "use strict";

exports.SmqpError = SmqpError;
const ERR_CONSUMER_TAG_CONFLICT = 'ERR_SMQP_CONSUMER_TAG_CONFLICT';
exports.ERR_CONSUMER_TAG_CONFLICT = ERR_CONSUMER_TAG_CONFLICT;
const ERR_EXCHANGE_TYPE_MISMATCH = 'ERR_SMQP_EXCHANGE_TYPE_MISMATCH';
exports.ERR_EXCHANGE_TYPE_MISMATCH = ERR_EXCHANGE_TYPE_MISMATCH;
const ERR_EXCLUSIVE_CONFLICT = 'ERR_SMQP_EXCLUSIVE_CONFLICT';
exports.ERR_EXCLUSIVE_CONFLICT = ERR_EXCLUSIVE_CONFLICT;
const ERR_EXCLUSIVE_NOT_ALLOWED = 'ERR_SMQP_EXCLUSIVE_NOT_ALLOWED';
exports.ERR_EXCLUSIVE_NOT_ALLOWED = ERR_EXCLUSIVE_NOT_ALLOWED;
const ERR_QUEUE_DURABLE_MISMATCH = 'ERR_SMQP_QUEUE_DURABLE_MISMATCH';
exports.ERR_QUEUE_DURABLE_MISMATCH = ERR_QUEUE_DURABLE_MISMATCH;
const ERR_QUEUE_NAME_CONFLICT = 'ERR_SMQP_QUEUE_NAME_CONFLICT';
exports.ERR_QUEUE_NAME_CONFLICT = ERR_QUEUE_NAME_CONFLICT;
const ERR_QUEUE_NOT_FOUND = 'ERR_SMQP_QUEUE_NOT_FOUND';
exports.ERR_QUEUE_NOT_FOUND = ERR_QUEUE_NOT_FOUND;
const ERR_SHOVEL_DESTINATION_EXCHANGE_NOT_FOUND = 'ERR_SMQP_SHOVEL_DESTINATION_EXCHANGE_NOT_FOUND';
exports.ERR_SHOVEL_DESTINATION_EXCHANGE_NOT_FOUND = ERR_SHOVEL_DESTINATION_EXCHANGE_NOT_FOUND;
const ERR_SHOVEL_NAME_CONFLICT = 'ERR_SMQP_SHOVEL_NAME_CONFLICT';
exports.ERR_SHOVEL_NAME_CONFLICT = ERR_SHOVEL_NAME_CONFLICT;
const ERR_SHOVEL_SOURCE_EXCHANGE_NOT_FOUND = 'ERR_SMQP_SHOVEL_SOURCE_EXCHANGE_NOT_FOUND';
exports.ERR_SHOVEL_SOURCE_EXCHANGE_NOT_FOUND = ERR_SHOVEL_SOURCE_EXCHANGE_NOT_FOUND;
const ERR_CONSUMER_TAG_CONFLICT = exports.ERR_CONSUMER_TAG_CONFLICT = 'ERR_SMQP_CONSUMER_TAG_CONFLICT';
const ERR_EXCHANGE_TYPE_MISMATCH = exports.ERR_EXCHANGE_TYPE_MISMATCH = 'ERR_SMQP_EXCHANGE_TYPE_MISMATCH';
const ERR_EXCLUSIVE_CONFLICT = exports.ERR_EXCLUSIVE_CONFLICT = 'ERR_SMQP_EXCLUSIVE_CONFLICT';
const ERR_EXCLUSIVE_NOT_ALLOWED = exports.ERR_EXCLUSIVE_NOT_ALLOWED = 'ERR_SMQP_EXCLUSIVE_NOT_ALLOWED';
const ERR_QUEUE_DURABLE_MISMATCH = exports.ERR_QUEUE_DURABLE_MISMATCH = 'ERR_SMQP_QUEUE_DURABLE_MISMATCH';
const ERR_QUEUE_NAME_CONFLICT = exports.ERR_QUEUE_NAME_CONFLICT = 'ERR_SMQP_QUEUE_NAME_CONFLICT';
const ERR_QUEUE_NOT_FOUND = exports.ERR_QUEUE_NOT_FOUND = 'ERR_SMQP_QUEUE_NOT_FOUND';
const ERR_SHOVEL_DESTINATION_EXCHANGE_NOT_FOUND = exports.ERR_SHOVEL_DESTINATION_EXCHANGE_NOT_FOUND = 'ERR_SMQP_SHOVEL_DESTINATION_EXCHANGE_NOT_FOUND';
const ERR_SHOVEL_NAME_CONFLICT = exports.ERR_SHOVEL_NAME_CONFLICT = 'ERR_SMQP_SHOVEL_NAME_CONFLICT';
const ERR_SHOVEL_SOURCE_EXCHANGE_NOT_FOUND = exports.ERR_SHOVEL_SOURCE_EXCHANGE_NOT_FOUND = 'ERR_SMQP_SHOVEL_SOURCE_EXCHANGE_NOT_FOUND';

@@ -174,6 +174,5 @@ "use strict";

ExchangeBase.prototype.getState = function getState() {
let bindings;
const bindings = [];
for (const binding of this[kBindings]) {
if (!binding.queue.options.durable) continue;
if (!bindings) bindings = [];
bindings.push(binding.getState());

@@ -191,3 +190,3 @@ }

} : undefined),
...(bindings ? {
...(bindings.length ? {
bindings

@@ -194,0 +193,0 @@ } : undefined)

@@ -6,2 +6,11 @@ "use strict";

});
var _exportNames = {
Broker: true,
Message: true,
Queue: true,
Consumer: true,
Shovel: true,
Exchange: true,
getRoutingKeyPattern: true
};
Object.defineProperty(exports, "Broker", {

@@ -13,14 +22,32 @@ enumerable: true,

});
Object.defineProperty(exports, "Shovel", {
Object.defineProperty(exports, "Consumer", {
enumerable: true,
get: function () {
return _Shovel.Shovel;
return _Queue.Consumer;
}
});
Object.defineProperty(exports, "SmqpError", {
Object.defineProperty(exports, "Exchange", {
enumerable: true,
get: function () {
return _Errors.SmqpError;
return _Exchange.Exchange;
}
});
Object.defineProperty(exports, "Message", {
enumerable: true,
get: function () {
return _Message.Message;
}
});
Object.defineProperty(exports, "Queue", {
enumerable: true,
get: function () {
return _Queue.Queue;
}
});
Object.defineProperty(exports, "Shovel", {
enumerable: true,
get: function () {
return _Shovel.Shovel;
}
});
exports.default = void 0;

@@ -34,6 +61,19 @@ Object.defineProperty(exports, "getRoutingKeyPattern", {

var _Broker = require("./Broker.js");
var _Message = require("./Message.js");
var _Queue = require("./Queue.js");
var _Shovel = require("./Shovel.js");
var _Exchange = require("./Exchange.js");
var _Errors = require("./Errors.js");
Object.keys(_Errors).forEach(function (key) {
if (key === "default" || key === "__esModule") return;
if (Object.prototype.hasOwnProperty.call(_exportNames, key)) return;
if (key in exports && exports[key] === _Errors[key]) return;
Object.defineProperty(exports, key, {
enumerable: true,
get: function () {
return _Errors[key];
}
});
});
var _shared = require("./shared.js");
var _Errors = require("./Errors.js");
var _default = _Broker.Broker;
exports.default = _default;
var _default = exports.default = _Broker.Broker;

@@ -7,4 +7,5 @@ "use strict";

exports.Message = Message;
exports.kPending = void 0;
var _shared = require("./shared.js");
const kPending = Symbol.for('pending');
const kPending = exports.kPending = Symbol.for('pending');
const kOnConsumed = Symbol.for('onConsumed');

@@ -11,0 +12,0 @@ function Message(fields, content, properties, onConsumed) {

@@ -16,3 +16,2 @@ "use strict";

const kIsReady = Symbol.for('isReady');
const kOnConsumed = Symbol.for('kOnConsumed');
const kAvailableCount = Symbol.for('availableCount');

@@ -33,3 +32,3 @@ const kStopped = Symbol.for('stopped');

this[kExclusive] = false;
this[kOnConsumed] = this._onMessageConsumed.bind(this);
this._onMessageConsumed = this._onMessageConsumed.bind(this);
}

@@ -74,3 +73,3 @@ Object.defineProperties(Queue.prototype, {

}
const message = new _Message.Message(fields, content, messageProperties, this[kOnConsumed]);
const message = new _Message.Message(fields, content, messageProperties, this._onMessageConsumed);
const capacity = this._getCapacity();

@@ -152,3 +151,6 @@ this.messages.push(message);

if (!message) return;
if (noAck) this._dequeueMessage(message);
if (noAck) {
this._dequeueMessage(message);
message[_Message.kPending] = false;
}
return message;

@@ -179,9 +181,9 @@ };

Queue.prototype.ack = function ack(message, allUpTo) {
this._onMessageConsumed(message, 'ack', allUpTo, false);
if (this._onMessageConsumed(message, 'ack', allUpTo, false)) message[_Message.kPending] = false;
};
Queue.prototype.nack = function nack(message, allUpTo, requeue = true) {
this._onMessageConsumed(message, 'nack', allUpTo, requeue);
if (this._onMessageConsumed(message, 'nack', allUpTo, requeue)) message[_Message.kPending] = false;
};
Queue.prototype.reject = function reject(message, requeue = true) {
this._onMessageConsumed(message, 'nack', false, requeue);
if (this._onMessageConsumed(message, 'nack', false, requeue)) message[_Message.kPending] = false;
};

@@ -191,3 +193,3 @@ Queue.prototype._onMessageConsumed = function onMessageConsumed(message, operation, allUpTo, requeue) {

const msgIdx = this._dequeueMessage(message);
if (msgIdx === -1) return;
if (msgIdx === -1) return false;
const messages = this.messages;

@@ -206,3 +208,3 @@ const pending = allUpTo && this._getPendingMessages(msgIdx);

redelivered: true
}, message.content, message.properties, this[kOnConsumed]));
}, message.content, message.properties, this._onMessageConsumed));
} else {

@@ -243,2 +245,3 @@ deadLetterExchange = this.options.deadLetterExchange;

}
return true;
};

@@ -280,5 +283,6 @@ Queue.prototype.ackAll = function ackAll() {

const idx = consumers.findIndex(c => c.consumerTag === consumerTag);
if (idx === -1) return;
if (idx === -1) return false;
const consumer = consumers[idx];
this.unbindConsumer(consumer, requeue);
return true;
};

@@ -307,6 +311,6 @@ Queue.prototype.dismiss = function dismiss(onMessage, requeue) {

};
Queue.prototype.on = function on(eventName, handler) {
Queue.prototype.on = function on(eventName, handler, options) {
const eventEmitter = this.events;
if (!eventEmitter) return;
return eventEmitter.on(`queue.${eventName}`, handler);
return eventEmitter.on(`queue.${eventName}`, handler, options);
};

@@ -371,3 +375,3 @@ Queue.prototype.off = function off(eventName, handler) {

if (!state.messages) return this;
const onConsumed = this[kOnConsumed];
const onConsumed = this._onMessageConsumed;
for (const {

@@ -374,0 +378,0 @@ fields,

{
"name": "smqp",
"version": "8.2.0",
"version": "8.2.1",
"type": "module",

@@ -60,9 +60,9 @@ "description": "Synchronous message queueing package",

"devDependencies": {
"@babel/cli": "^7.22.10",
"@babel/core": "^7.22.11",
"@babel/preset-env": "^7.22.14",
"@babel/cli": "^7.23.0",
"@babel/core": "^7.23.2",
"@babel/preset-env": "^7.23.2",
"c8": "^8.0.1",
"chai": "^4.3.8",
"chai": "^4.3.10",
"chronokinesis": "^6.0.0",
"eslint": "^8.48.0",
"eslint": "^8.51.0",
"eslint-config-exp": "^0.6.2",

@@ -69,0 +69,0 @@ "markdown-toc": "^1.2.0",

@@ -177,6 +177,5 @@ import { Message } from './Message.js';

ExchangeBase.prototype.getState = function getState() {
let bindings;
const bindings = [];
for (const binding of this[kBindings]) {
if (!binding.queue.options.durable) continue;
if (!bindings) bindings = [];
bindings.push(binding.getState());

@@ -191,3 +190,3 @@ }

...(deliveryQueue.messageCount ? { deliveryQueue: deliveryQueue.getState() } : undefined),
...(bindings ? { bindings } : undefined),
...(bindings.length ? { bindings } : undefined),
};

@@ -194,0 +193,0 @@ };

import { Broker } from './Broker.js';
import { Shovel } from './Shovel.js';
import { getRoutingKeyPattern } from './shared.js';
import { SmqpError } from './Errors.js';
export { Broker };
export { Message } from './Message.js';
export { Queue, Consumer } from './Queue.js';
export { Shovel } from './Shovel.js';
export { Exchange } from './Exchange.js';
export * from './Errors.js';
export { getRoutingKeyPattern } from './shared.js';
export default Broker;
export { Broker, Shovel, getRoutingKeyPattern, SmqpError };
import { generateId } from './shared.js';
const kPending = Symbol.for('pending');
export const kPending = Symbol.for('pending');
const kOnConsumed = Symbol.for('onConsumed');

@@ -5,0 +5,0 @@

import { generateId, sortByPriority } from './shared.js';
import { Message } from './Message.js';
import { kPending, Message } from './Message.js';
import { SmqpError, ERR_EXCLUSIVE_CONFLICT, ERR_EXCLUSIVE_NOT_ALLOWED } from './Errors.js';

@@ -10,3 +10,2 @@

const kIsReady = Symbol.for('isReady');
const kOnConsumed = Symbol.for('kOnConsumed');
const kAvailableCount = Symbol.for('availableCount');

@@ -28,3 +27,3 @@ const kStopped = Symbol.for('stopped');

this[kExclusive] = false;
this[kOnConsumed] = this._onMessageConsumed.bind(this);
this._onMessageConsumed = this._onMessageConsumed.bind(this);
}

@@ -71,3 +70,3 @@

}
const message = new Message(fields, content, messageProperties, this[kOnConsumed]);
const message = new Message(fields, content, messageProperties, this._onMessageConsumed);

@@ -163,3 +162,6 @@ const capacity = this._getCapacity();

if (!message) return;
if (noAck) this._dequeueMessage(message);
if (noAck) {
this._dequeueMessage(message);
message[kPending] = false;
}

@@ -198,11 +200,11 @@ return message;

Queue.prototype.ack = function ack(message, allUpTo) {
this._onMessageConsumed(message, 'ack', allUpTo, false);
if (this._onMessageConsumed(message, 'ack', allUpTo, false)) message[kPending] = false;
};
Queue.prototype.nack = function nack(message, allUpTo, requeue = true) {
this._onMessageConsumed(message, 'nack', allUpTo, requeue);
if (this._onMessageConsumed(message, 'nack', allUpTo, requeue)) message[kPending] = false;
};
Queue.prototype.reject = function reject(message, requeue = true) {
this._onMessageConsumed(message, 'nack', false, requeue);
if (this._onMessageConsumed(message, 'nack', false, requeue)) message[kPending] = false;
};

@@ -214,3 +216,3 @@

const msgIdx = this._dequeueMessage(message);
if (msgIdx === -1) return;
if (msgIdx === -1) return false;

@@ -227,3 +229,3 @@ const messages = this.messages;

this[kAvailableCount]++;
messages.splice(msgIdx, 0, new Message({ ...message.fields, redelivered: true }, message.content, message.properties, this[kOnConsumed]));
messages.splice(msgIdx, 0, new Message({ ...message.fields, redelivered: true }, message.content, message.properties, this._onMessageConsumed));
} else {

@@ -263,2 +265,3 @@ deadLetterExchange = this.options.deadLetterExchange;

}
return true;
};

@@ -310,6 +313,8 @@

const idx = consumers.findIndex((c) => c.consumerTag === consumerTag);
if (idx === -1) return;
if (idx === -1) return false;
const consumer = consumers[idx];
this.unbindConsumer(consumer, requeue);
return true;
};

@@ -347,6 +352,6 @@

Queue.prototype.on = function on(eventName, handler) {
Queue.prototype.on = function on(eventName, handler, options) {
const eventEmitter = this.events;
if (!eventEmitter) return;
return eventEmitter.on(`queue.${eventName}`, handler);
return eventEmitter.on(`queue.${eventName}`, handler, options);
};

@@ -420,3 +425,3 @@

const onConsumed = this[kOnConsumed];
const onConsumed = this._onMessageConsumed;
for (const { fields, content, properties } of state.messages) {

@@ -423,0 +428,0 @@ if (properties.persistent === false) continue;

@@ -63,2 +63,8 @@ import { Queue, Consumer, queueOptions, onMessage, consumeOptions, deleteQueueOptions, QueueState } from "./Queue.js";

consume(queueName: string, onMessage: onMessage, options?: consumeOptions): Consumer;
/**
* Cancel consumer
* @param consumerTag Consumer tag
* @param requeue optional boolean to requeue messages consumed by consumer, defaults to true
* @returns true if found, false if not
*/
cancel(consumerTag: string, requeue?: boolean): boolean;

@@ -90,3 +96,3 @@ getExchange(exchangeName: string): Exchange;

* Get broker state with durable entities
* @param onlyWithContent [boolean] only return state if any durable exchanges or queues that have messages
* @param {boolean} onlyWithContent only return state if any durable exchanges or queues that have messages
*/

@@ -98,3 +104,3 @@ getState(onlyWithContent: boolean): BrokerState | undefined;

noAck: boolean;
}): Message;
}): Message | undefined;
ack(message: Message, allUpTo?: boolean): void;

@@ -105,5 +111,16 @@ ackAll(): void;

reject(message: Message, requeue?: boolean): void;
/**
* Check if consumer tag is occupied
* @param consumerTag
* @returns {boolean} true if not occupied, throws SmqpError if it is
*/
validateConsumerTag(consumerTag: string): boolean;
on(eventName: string, callback: CallableFunction, options?: any): Consumer;
off(eventName: string, callbackOrObject: any): void;
/**
* Listen broker for events
* @param eventName event name, e.g. return for undelivered messages, a routing key pattern can be used, e.g. queue.# for all queue events
* @param callback event handler function
* @param options consume options, consumerTag is probably the most usable option, noAck is ignored and always true
*/
on(eventName: string, callback: CallableFunction, options?: consumeOptions): Consumer;
off(eventName: string, callbackOrObject: CallableFunction | consumeOptions): void;
prefetch(value?: number): void;

@@ -110,0 +127,0 @@ /** DANGER deletes all broker entities and closes broker */

import { MessageProperties } from './Message.js';
import { Queue, Consumer, QueueState } from './Queue.js';
import { Queue, Consumer, QueueState, consumeOptions } from './Queue.js';

@@ -45,3 +45,3 @@ type exchangeType = 'topic' | 'direct';

close(): void;
getState(): any;
getState(): ExchangeState;
stop(): void;

@@ -60,4 +60,4 @@ /**

emit(eventName: string, content?: any): any;
on(pattern: string, handler: CallableFunction, consumeOptions?: {consumerTag?: string, [x: string]: any}): Consumer;
off(pattern: string, handler: any): void;
on(pattern: string, handler: CallableFunction, consumeOptions?: consumeOptions): Consumer;
off(pattern: string, handler: CallableFunction | consumeOptions): void;
}

@@ -64,0 +64,0 @@

import { Broker } from './Broker.js';
import { Shovel } from './Shovel.js';
import { getRoutingKeyPattern } from './shared.js';
import { SmqpError } from './Errors.js';
export { Broker };
export { Message } from './Message.js';
export { Queue, Consumer } from './Queue.js';
export { Shovel } from './Shovel.js';
export { Exchange } from './Exchange.js';
export * from './Errors.js';
export { getRoutingKeyPattern } from './shared.js';
export default Broker;
export { Broker, Shovel, getRoutingKeyPattern, SmqpError };

@@ -50,2 +50,4 @@ export interface MessageFields {

reject(requeue?: boolean): void;
/** Message is pending ack */
get pending(): boolean
}

@@ -39,2 +39,21 @@ import { Message, MessageFields, MessageProperties, MessageMessage } from './Message.js';

export const enum QueueEventNames {
/** Consumer was cancelled */
QueueConsumerCancel = 'consumer.cancel',
/** Consumer was added */
QueueConsume = 'consume',
/** Message was dead-lettered, sends deadLetterExchange name and message */
QueueDeadLetter = 'dead-letter',
/** Queue was deleted */
QueueDelete = 'delete',
/** Queue is depleted */
QueueDepleted = 'depleted',
/** Message was queued */
QueueMessage = 'message',
/** Queue is ready to receive new messages */
QueueReady = 'ready',
/** Queue is saturated, i.e. max capacity was reached */
QueueSaturated = 'saturated',
}
export interface Queue {

@@ -52,3 +71,3 @@ name: string;

assertConsumer(onMessage: onMessage, consumeOptions?: consumeOptions, owner?: any): Consumer;
get(options?: consumeOptions): Message;
get(options?: consumeOptions): Message | undefined;
ack(message: Message, allUpTo?: boolean): void;

@@ -59,15 +78,13 @@ nack(message: Message, allUpTo?: boolean, requeue?: boolean): void;

nackAll(requeue?: boolean): void;
peek(ignoreDelivered?: boolean): Message;
cancel(consumerTag: string, requeue?: boolean): void;
peek(ignoreDelivered?: boolean): Message | undefined;
cancel(consumerTag: string, requeue?: boolean): boolean;
dismiss(onMessage: onMessage, requeue?: boolean): void;
unbindConsumer(consumer: Consumer, requeue?: boolean): void;
emit(eventName: string, content?: any): void;
on(eventName: string, handler: CallableFunction): Consumer;
off(eventName: string, handler: CallableFunction): Consumer;
on(eventName: string | QueueEventNames, handler: CallableFunction, options?: consumeOptions): Consumer;
off(eventName: string | QueueEventNames, handler: CallableFunction | consumeOptions): Consumer;
purge(): number;
getState(): QueueState;
recover(state?: QueueState): Queue;
delete(options?: deleteQueueOptions): {
messageCount: number;
};
delete(options?: deleteQueueOptions): { messageCount: number };
close(): void;

@@ -74,0 +91,0 @@ stop(): void;

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc