New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

smqp

Package Overview
Dependencies
Maintainers
1
Versions
79
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

smqp - npm Package Compare versions

Comparing version 7.1.4 to 8.0.0

6

CHANGELOG.md
Changelog
=========
# 8.0.0
- shovel ignores shoveling if destination exchange lacks bindings, could be breaking if cloneMessage function option was used to make things happen
- abide to new lint rules
- bump all dev dependencies
# 7.1.4

@@ -5,0 +11,0 @@

4

dist/Broker.js

@@ -289,3 +289,3 @@ "use strict";

if (self.getQueue(queueName)) throw new Error(`Queue named ${queueName} already exists`);
const queueEmitter = (0, _Exchange.EventExchange)(queueName + '__events');
const queueEmitter = new _Exchange.EventExchange(`${queueName}__events`);
this[kEventHandler].listen(queueEmitter);

@@ -475,3 +475,3 @@ const queue = new _Queue.Queue(queueName, options, queueEmitter);

} = msg.content;
this.broker.events.publish('message.' + operation, message);
this.broker.events.publish(`message.${operation}`, message);
break;

@@ -478,0 +478,0 @@ }

@@ -18,3 +18,3 @@ "use strict";

if (['topic', 'direct'].indexOf(type) === -1) throw Error('Exchange type must be one of topic or direct');
const eventExchange = EventExchange(name + '__events');
const eventExchange = new EventExchange(`${name}__events`);
return new ExchangeBase(name, type, options, eventExchange);

@@ -21,0 +21,0 @@ }

@@ -210,3 +210,3 @@ "use strict";

if (!requeue && message.properties.confirm) {
this.emit('message.consumed.' + operation, {
this.emit(`message.consumed.${operation}`, {
operation,

@@ -424,3 +424,3 @@ message: {

};
if (!this.options.consumerTag) this.options.consumerTag = 'smq.ctag-' + (0, _shared.generateId)();
if (!this.options.consumerTag) this.options.consumerTag = `smq.ctag-${(0, _shared.generateId)()}`;
this.queue = queue;

@@ -433,3 +433,3 @@ this.onMessage = onMessage;

this[kConsuming] = false;
this[kInternalQueue] = new Queue(this.options.consumerTag + '-q', {
this[kInternalQueue] = new Queue(`${this.options.consumerTag}-q`, {
autoDelete: false,

@@ -436,0 +436,0 @@ maxLength: this.options.prefetch

@@ -134,2 +134,4 @@ "use strict";

Shovel.prototype._onShovelMessage = function onShovelMessage(routingKey, message) {
const destinationExchange = this[kDestinationExchange];
if (!destinationExchange.bindingCount && !message.properties.mandatory) return message.ack();
const {

@@ -145,4 +147,4 @@ content,

if (!this[kBrokerInternal]) props['shovel-name'] = this.name;
this[kDestinationExchange].publish(this.destination.exchangeKey || routingKey, content, props);
destinationExchange.publish(this.destination.exchangeKey || routingKey, content, props);
message.ack();
};
{
"name": "smqp",
"version": "7.1.4",
"version": "8.0.0",
"type": "module",

@@ -60,9 +60,10 @@ "description": "Synchronous message queueing package",

"devDependencies": {
"@babel/cli": "^7.21.0",
"@babel/core": "^7.21.4",
"@babel/preset-env": "^7.21.4",
"c8": "^7.13.0",
"@babel/cli": "^7.22.5",
"@babel/core": "^7.22.5",
"@babel/preset-env": "^7.22.5",
"c8": "^8.0.0",
"chai": "^4.3.7",
"chronokinesis": "^4.0.1",
"eslint": "^8.37.0",
"chronokinesis": "^5.0.2",
"eslint": "^8.43.0",
"eslint-config-exp": "^0.6.2",
"markdown-toc": "^1.2.0",

@@ -69,0 +70,0 @@ "mocha": "^10.2.0"

@@ -1,4 +0,4 @@

import {Exchange, EventExchange} from './Exchange.js';
import {Queue} from './Queue.js';
import {Shovel} from './Shovel.js';
import { Exchange, EventExchange } from './Exchange.js';
import { Queue } from './Queue.js';
import { Shovel } from './Shovel.js';

@@ -44,3 +44,3 @@ const kEntities = Symbol.for('entities');

Broker.prototype.subscribe = function subscribe(exchangeName, pattern, queueName, onMessage, options = {durable: true}) {
Broker.prototype.subscribe = function subscribe(exchangeName, pattern, queueName, onMessage, options = { durable: true }) {
if (!exchangeName || !pattern || typeof onMessage !== 'function') throw new Error('exchange name, pattern, and message callback are required');

@@ -58,3 +58,3 @@ if (options && options.consumerTag) this.validateConsumerTag(options.consumerTag);

Broker.prototype.subscribeTmp = function subscribeTmp(exchangeName, pattern, onMessage, options) {
return this.subscribe(exchangeName, pattern, null, onMessage, {...options, durable: false});
return this.subscribe(exchangeName, pattern, null, onMessage, { ...options, durable: false });
};

@@ -67,3 +67,3 @@

const exchange = this.assertExchange(exchangeName);
const onceOptions = {autoDelete: true, durable: false, priority: options.priority || 0};
const onceOptions = { autoDelete: true, durable: false, priority: options.priority || 0 };

@@ -73,3 +73,3 @@ const onceQueue = this.createQueue(null, onceOptions);

return this.consume(onceQueue.name, wrappedOnMessage, {noAck: true, consumerTag: options.consumerTag});
return this.consume(onceQueue.name, wrappedOnMessage, { noAck: true, consumerTag: options.consumerTag });

@@ -137,3 +137,3 @@ function wrappedOnMessage(...args) {

consumerTag: consumer.options.consumerTag,
options: {...consumer.options},
options: { ...consumer.options },
};

@@ -148,6 +148,6 @@ });

Broker.prototype.getExchange = function getExchange(exchangeName) {
return this[kEntities].exchanges.find(({name}) => name === exchangeName);
return this[kEntities].exchanges.find(({ name }) => name === exchangeName);
};
Broker.prototype.deleteExchange = function deleteExchange(exchangeName, {ifUnused} = {}) {
Broker.prototype.deleteExchange = function deleteExchange(exchangeName, { ifUnused } = {}) {
const exchanges = this[kEntities].exchanges;

@@ -166,3 +166,3 @@ const idx = exchanges.findIndex((exchange) => exchange.name === exchangeName);

Broker.prototype.stop = function stop() {
const {exchanges, queues} = this[kEntities];
const { exchanges, queues } = this[kEntities];
for (const exchange of exchanges) exchange.stop();

@@ -173,3 +173,3 @@ for (const queue of queues) queue.stop();

Broker.prototype.close = function close() {
const {shovels, exchanges, queues} = this[kEntities];
const { shovels, exchanges, queues } = this[kEntities];
for (const shovel of shovels) shovel.close();

@@ -208,3 +208,3 @@ for (const exchange of exchanges) exchange.close();

} else {
const {queues, exchanges} = self[kEntities];
const { queues, exchanges } = self[kEntities];
for (const queue of queues) {

@@ -233,3 +233,3 @@ if (queue.stopped) queue.recover();

const name = `e2e-${source}2${destination}-${pattern}`;
const {priority} = args;
const { priority } = args;
const shovel = this.createShovel(name, {

@@ -244,7 +244,5 @@ broker: this,

exchange: destination,
}, {
...args,
});
}, { ...args });
const {consumerTag, source: shovelSource} = shovel;
const { consumerTag, source: shovelSource } = shovel;

@@ -313,3 +311,3 @@ return {

const queueEmitter = EventExchange(queueName + '__events');
const queueEmitter = new EventExchange(`${queueName}__events`);
this[kEventHandler].listen(queueEmitter);

@@ -333,3 +331,3 @@ const queue = new Queue(queueName, options, queueEmitter);

const queue = this.getQueue(queueName);
options = {durable: true, ...options};
options = { durable: true, ...options };
if (!queue) return this.createQueue(queueName, options);

@@ -348,7 +346,7 @@

Broker.prototype.get = function getMessageFromQueue(queueName, {noAck} = {}) {
Broker.prototype.get = function getMessageFromQueue(queueName, { noAck } = {}) {
const queue = this.getQueue(queueName);
if (!queue) return;
return queue.get({noAck});
return queue.get({ noAck });
};

@@ -389,3 +387,3 @@

if (this.getShovel(name)) throw new Error(`Shovel name must be unique, ${name} is occupied`);
const shovel = new Shovel(name, {...source, broker: this}, destination, options);
const shovel = new Shovel(name, { ...source, broker: this }, destination, options);
this[kEventHandler].listen(shovel.events);

@@ -414,3 +412,3 @@ shovels.push(shovel);

Broker.prototype.on = function on(eventName, callback, options) {
return this.events.on(eventName, getEventCallback(), {...options, origin: callback});
return this.events.on(eventName, getEventCallback(), { ...options, origin: callback });

@@ -428,3 +426,3 @@ function getEventCallback() {

Broker.prototype.off = function off(eventName, callbackOrObject) {
const {consumerTag} = callbackOrObject;
const { consumerTag } = callbackOrObject;
for (const binding of this.events.bindings) {

@@ -485,3 +483,3 @@ if (binding.pattern === eventName) {

if (!exchange) return;
const {fields, content, properties} = msg.content.message;
const { fields, content, properties } = msg.content.message;
exchange.publish(fields.routingKey, content, properties);

@@ -502,4 +500,4 @@ break;

case 'queue.message.consumed.nack': {
const {operation, message} = msg.content;
this.broker.events.publish('message.' + operation, message);
const { operation, message } = msg.content;
this.broker.events.publish(`message.${operation}`, message);
break;

@@ -506,0 +504,0 @@ }

@@ -1,6 +0,6 @@

import {Message} from './Message.js';
import {Queue} from './Queue.js';
import {sortByPriority, getRoutingKeyPattern, generateId} from './shared.js';
import { Message } from './Message.js';
import { Queue } from './Queue.js';
import { sortByPriority, getRoutingKeyPattern, generateId } from './shared.js';
export {Exchange, EventExchange};
export { Exchange, EventExchange };

@@ -14,4 +14,4 @@ const kType = Symbol.for('type');

if (!name) throw new Error('Exchange name is required');
if (['topic', 'direct'].indexOf(type) === -1) throw Error('Exchange type must be one of topic or direct');
const eventExchange = EventExchange(name + '__events');
if ([ 'topic', 'direct' ].indexOf(type) === -1) throw Error('Exchange type must be one of topic or direct');
const eventExchange = new EventExchange(`${name}__events`);
return new ExchangeBase(name, type, options, eventExchange);

@@ -22,3 +22,3 @@ }

if (!name) name = `smq.ename-${generateId()}`;
return new ExchangeBase(name, 'topic', {durable: false, autoDelete: true});
return new ExchangeBase(name, 'topic', { durable: false, autoDelete: true });
}

@@ -31,8 +31,8 @@

this[kStopped] = false;
this.options = {durable: true, autoDelete: true, ...options};
this.options = { durable: true, autoDelete: true, ...options };
this.events = eventExchange;
const deliveryQueue = this[kDeliveryQueue] = new Queue('delivery-q', {autoDelete: false});
const deliveryQueue = this[kDeliveryQueue] = new Queue('delivery-q', { autoDelete: false });
const onMessage = (type === 'topic' ? this._onTopicMessage : this._onDirectMessage).bind(this);
deliveryQueue.consume(onMessage, {exclusive: true, consumerTag: '_exchange-tag'});
deliveryQueue.consume(onMessage, { exclusive: true, consumerTag: '_exchange-tag' });
}

@@ -74,3 +74,3 @@

return this[kDeliveryQueue].queueMessage({routingKey}, {
return this[kDeliveryQueue].queueMessage({ routingKey }, {
content,

@@ -124,3 +124,3 @@ properties,

ExchangeBase.prototype._publishToQueue = function publishToQueue(queue, routingKey, content, properties) {
queue.queueMessage({routingKey, exchange: this.name}, content, properties);
queue.queueMessage({ routingKey, exchange: this.name }, content, properties);
};

@@ -132,6 +132,6 @@

if (properties.confirm) {
this.emit('message.undelivered', new Message({routingKey, exchange: this.name}, content, properties));
this.emit('message.undelivered', new Message({ routingKey, exchange: this.name }, content, properties));
}
if (properties.mandatory) {
this.emit('return', new Message({routingKey, exchange: this.name}, content, properties));
this.emit('return', new Message({ routingKey, exchange: this.name }, content, properties));
}

@@ -159,3 +159,3 @@ };

const [binding] = bindings.splice(idx, 1);
const [ binding ] = bindings.splice(idx, 1);
binding.close();

@@ -196,5 +196,5 @@

type: this.type,
options: {...this.options},
...(deliveryQueue.messageCount ? {deliveryQueue: deliveryQueue.getState()} : undefined),
...(bindings ? {bindings} : undefined),
options: { ...this.options },
...(deliveryQueue.messageCount ? { deliveryQueue: deliveryQueue.getState() } : undefined),
...(bindings ? { bindings } : undefined),
};

@@ -222,3 +222,3 @@ };

const onMessage = (this[kType] === 'topic' ? this._onTopicMessage : this._onDirectMessage).bind(this);
deliveryQueue.consume(onMessage, {exclusive: true, consumerTag: '_exchange-tag'});
deliveryQueue.consume(onMessage, { exclusive: true, consumerTag: '_exchange-tag' });
}

@@ -241,3 +241,3 @@

const eventQueue = new Queue(null, {durable: false, autoDelete: true});
const eventQueue = new Queue(null, { durable: false, autoDelete: true });
const binding = this.bindQueue(eventQueue, pattern);

@@ -250,3 +250,3 @@ eventQueue.events = {

return eventQueue.consume(handler, {...consumeOptions, noAck: true}, this);
return eventQueue.consume(handler, { ...consumeOptions, noAck: true }, this);
};

@@ -257,3 +257,3 @@

const {consumerTag} = handler;
const { consumerTag } = handler;

@@ -270,3 +270,3 @@ for (const binding of this[kBindings]) {

this.id = `${queue.name}/${pattern}`;
this.options = {priority: 0, ...bindOptions};
this.options = { priority: 0, ...bindOptions };
this.pattern = pattern;

@@ -293,3 +293,3 @@ this.exchange = exchange;

id: this.id,
options: {...this.options},
options: { ...this.options },
queueName: this.queue.name,

@@ -296,0 +296,0 @@ pattern: this.pattern,

@@ -1,6 +0,6 @@

import {Broker} from './Broker.js';
import {Shovel} from './Shovel.js';
import {getRoutingKeyPattern} from './shared.js';
import { Broker } from './Broker.js';
import { Shovel } from './Shovel.js';
import { getRoutingKeyPattern } from './shared.js';
export default Broker;
export {Broker, Shovel, getRoutingKeyPattern};
export { Broker, Shovel, getRoutingKeyPattern };

@@ -9,3 +9,3 @@ import { generateId } from './shared.js';

function Message(fields, content, properties, onConsumed) {
this[kOnConsumed] = [null, onConsumed];
this[kOnConsumed] = [ null, onConsumed ];
this[kPending] = false;

@@ -22,3 +22,3 @@

this.fields = {...fields, consumerTag: undefined};
this.fields = { ...fields, consumerTag: undefined };
this.content = content;

@@ -34,3 +34,3 @@ this.properties = mproperties;

Message.prototype._consume = function consume({consumerTag}, consumedCb) {
Message.prototype._consume = function consume({ consumerTag }, consumedCb) {
this[kPending] = true;

@@ -37,0 +37,0 @@ this.fields.consumerTag = consumerTag;

@@ -1,5 +0,5 @@

import {generateId, sortByPriority} from './shared.js';
import {Message} from './Message.js';
import { generateId, sortByPriority } from './shared.js';
import { Message } from './Message.js';
export {Queue, Consumer};
export { Queue, Consumer };

@@ -19,3 +19,3 @@ const kConsumers = Symbol.for('consumers');

this.options = {autoDelete: true, ...options};
this.options = { autoDelete: true, ...options };

@@ -68,3 +68,3 @@ this.messages = [];

const messageTtl = this.options.messageTtl;
const messageProperties = {...properties};
const messageProperties = { ...properties };
if (messageTtl && !('expiration' in messageProperties)) {

@@ -161,4 +161,4 @@ messageProperties.expiration = messageTtl;

Queue.prototype.get = function getMessage({noAck, consumerTag} = {}) {
const message = this._consumeMessages(1, {noAck, consumerTag})[0];
Queue.prototype.get = function getMessage({ noAck, consumerTag } = {}) {
const message = this._consumeMessages(1, { noAck, consumerTag })[0];
if (!message) return;

@@ -226,3 +226,3 @@ if (noAck) this._dequeueMessage(message);

this[kAvailableCount]++;
messages.splice(msgIdx, 0, new Message({...message.fields, redelivered: true}, message.content, message.properties, this[kOnConsumed]));
messages.splice(msgIdx, 0, new Message({ ...message.fields, redelivered: true }, message.content, message.properties, this[kOnConsumed]));
} else {

@@ -243,3 +243,3 @@ deadLetterExchange = this.options.deadLetterExchange;

if (!requeue && message.properties.confirm) {
this.emit('message.consumed.' + operation, {operation, message: {...message}});
this.emit(`message.consumed.${operation}`, { operation, message: { ...message } });
}

@@ -249,3 +249,3 @@

const deadLetterRoutingKey = this.options.deadLetterRoutingKey;
const deadMessage = new Message(message.fields, message.content, {...message.properties, expiration: undefined});
const deadMessage = new Message(message.fields, message.content, { ...message.properties, expiration: undefined });
if (deadLetterRoutingKey) deadMessage.fields.routingKey = deadLetterRoutingKey;

@@ -359,3 +359,3 @@

Queue.prototype.purge = function purge() {
const toDelete = this.messages.filter(({pending}) => !pending);
const toDelete = this.messages.filter(({ pending }) => !pending);
this[kAvailableCount] = 0;

@@ -383,3 +383,3 @@

name: this.name,
options: {...this.options},
options: { ...this.options },
};

@@ -421,5 +421,5 @@ if (msgs.length) {

const onConsumed = this[kOnConsumed];
for (const {fields, content, properties} of state.messages) {
for (const { fields, content, properties } of state.messages) {
if (properties.persistent === false) continue;
const msg = new Message({...fields, redelivered: true}, content, properties, onConsumed);
const msg = new Message({ ...fields, redelivered: true }, content, properties, onConsumed);
this.messages.push(msg);

@@ -436,3 +436,3 @@ }

Queue.prototype.delete = function deleteQueue({ifUnused, ifEmpty} = {}) {
Queue.prototype.delete = function deleteQueue({ ifUnused, ifEmpty } = {}) {
const consumers = this[kConsumers];

@@ -452,3 +452,3 @@ if (ifUnused && consumers.length) return;

this.emit('delete', this);
return {messageCount};
return { messageCount };
};

@@ -479,4 +479,4 @@

this.options = {prefetch: 1, priority: 0, noAck: false, ...options};
if (!this.options.consumerTag) this.options.consumerTag = 'smq.ctag-' + generateId();
this.options = { prefetch: 1, priority: 0, noAck: false, ...options };
if (!this.options.consumerTag) this.options.consumerTag = `smq.ctag-${generateId()}`;

@@ -491,3 +491,3 @@ this.queue = queue;

this[kInternalQueue] = new Queue(this.options.consumerTag + '-q', {
this[kInternalQueue] = new Queue(`${this.options.consumerTag}-q`, {
autoDelete: false,

@@ -494,0 +494,0 @@ maxLength: this.options.prefetch,

@@ -1,2 +0,2 @@

export {generateId, getRoutingKeyPattern, sortByPriority};
export { generateId, getRoutingKeyPattern, sortByPriority };

@@ -3,0 +3,0 @@ const allDots = /\./g;

@@ -1,2 +0,2 @@

import {EventExchange} from './Exchange.js';
import { EventExchange } from './Exchange.js';

@@ -13,4 +13,4 @@ const kBrokerInternal = Symbol.for('brokerInternal');

export function Shovel(name, source, destination, options = {}) {
const {broker: sourceBroker, exchange: sourceExchangeName, pattern, queue, priority} = source;
const {broker: destinationBroker, exchange: destinationExchangeName} = destination;
const { broker: sourceBroker, exchange: sourceExchangeName, pattern, queue, priority } = source;
const { broker: destinationBroker, exchange: destinationExchangeName } = destination;

@@ -35,4 +35,4 @@ const sourceExchange = sourceBroker.getExchange(sourceExchangeName);

this.name = name;
this.source = {...source, pattern: routingKeyPattern};
this.destination = {...destination};
this.source = { ...source, pattern: routingKeyPattern };
this.destination = { ...destination };
this.events = new EventExchange('shovel__events');

@@ -57,5 +57,5 @@

if (queue) {
consumer = sourceBroker.subscribe(sourceExchangeName, routingKeyPattern, queue, shovelHandler, {consumerTag, priority});
consumer = sourceBroker.subscribe(sourceExchangeName, routingKeyPattern, queue, shovelHandler, { consumerTag, priority });
} else {
consumer = sourceBroker.subscribeTmp(sourceExchangeName, routingKeyPattern, shovelHandler, {consumerTag, priority});
consumer = sourceBroker.subscribeTmp(sourceExchangeName, routingKeyPattern, shovelHandler, { consumerTag, priority });
this.source.queue = consumer.queue.name;

@@ -106,7 +106,7 @@ }

const {fields, content, properties} = message;
const {content: newContent, properties: newProperties} = cloneMessage({
fields: {...fields},
const { fields, content, properties } = message;
const { content: newContent, properties: newProperties } = cloneMessage({
fields: { ...fields },
content,
properties: {...properties},
properties: { ...properties },
});

@@ -117,3 +117,3 @@

content: newContent,
properties: {...properties, ...newProperties},
properties: { ...properties, ...newProperties },
};

@@ -123,7 +123,10 @@ };

Shovel.prototype._onShovelMessage = function onShovelMessage(routingKey, message) {
const {content, properties} = this._messageHandler(message);
const props = {...properties, ...this.destination.publishProperties, 'source-exchange': this[kSourceExchange].name};
const destinationExchange = this[kDestinationExchange];
if (!destinationExchange.bindingCount && !message.properties.mandatory) return message.ack();
const { content, properties } = this._messageHandler(message);
const props = { ...properties, ...this.destination.publishProperties, 'source-exchange': this[kSourceExchange].name };
if (!this[kBrokerInternal]) props['shovel-name'] = this.name;
this[kDestinationExchange].publish(this.destination.exchangeKey || routingKey, content, props);
destinationExchange.publish(this.destination.exchangeKey || routingKey, content, props);
message.ack();
};

@@ -20,4 +20,10 @@ import { Consumer } from './Queue.js';

export interface ShovelDestination {
/** destination broker */
broker: Broker;
/** destination exchange */
exchange: string;
/** optional destination exchange routing key, defaults to original message's routing key */
exchangeKey?: string;
/** optional object with message properties to overwrite when shovelling messages */
publishProperties?: Record<string, any>;
}

@@ -24,0 +30,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc