New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

smqp

Package Overview
Dependencies
Maintainers
1
Versions
79
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

smqp - npm Package Compare versions

Comparing version 0.5.1 to 0.6.0

60

dist/index.js

@@ -279,4 +279,4 @@ 'use strict';

const directQueue = Queue('messages-queue', { autoDelete: false });
directQueue.addConsumer(direct);
const publishQueue = Queue('messages-queue', { autoDelete: false });
publishQueue.addConsumer(type === 'topic' ? topic : direct);

@@ -317,21 +317,18 @@ const exchange = {

if (stopped) return;
return publishQueue.queueMessage(routingKey, content, msgOptions);
}
if (type === 'direct') return directQueue.queueMessage(routingKey, content, msgOptions);
function topic(routingKey, message) {
const deliverTo = getConcernedBindings(routingKey);
if (!deliverTo.length) {
message.ack();
return 0;
}
const deliverTo = bindings.reduce((result, { queue, testPattern }) => {
if (testPattern(routingKey)) result.push(queue);
return result;
}, []);
if (!deliverTo.length) return 0;
deliverTo.forEach(queue => queue.queueMessage(routingKey, content, msgOptions));
deliverTo.forEach(({ queue }) => queue.queueMessage(routingKey, message.content, message.options));
message.ack();
}
function direct(routingKey, message) {
const deliverTo = bindings.reduce((result, bound) => {
if (bound.testPattern(routingKey)) result.push(bound);
return result;
}, []);
const deliverTo = getConcernedBindings(routingKey);
const first = deliverTo[0];

@@ -342,2 +339,3 @@ if (!first) {

}
if (deliverTo.length > 1) shift(deliverTo[0]);

@@ -347,2 +345,9 @@ first.queue.queueMessage(routingKey, message.content, message.options, message.ack);

function getConcernedBindings(routingKey) {
return bindings.reduce((result, bound) => {
if (bound.testPattern(routingKey)) result.push(bound);
return result;
}, []);
}
function shift(bound) {

@@ -380,4 +385,4 @@ const idx = bindings.indexOf(bound);

bindings.forEach(q => q.close());
directQueue.removeConsumer(direct, false);
directQueue.close();
publishQueue.removeConsumer(type === 'topic' ? topic : direct, false);
publishQueue.close();
}

@@ -404,4 +409,3 @@

function getUndelivered() {
if (type !== 'direct') return;
return directQueue.getState().messages;
return publishQueue.getState().messages;
}

@@ -416,13 +420,6 @@ }

stopped = false;
if (!state) {
if (type === 'direct') {
directQueue.recover();
}
return;
}
recoverBindings();
if (state && type === 'direct') {
directQueue.recover({ messages: state.undelivered });
if (state) {
publishQueue.recover({ messages: state.undelivered });
}

@@ -433,3 +430,3 @@

function recoverBindings() {
if (!state.bindings) return;
if (!state || !state.bindings) return;
state.bindings.forEach(bindingState => {

@@ -545,3 +542,3 @@ const queue = getQueue(bindingState.queueName);

if (exclusive) throw new Error(`Queue ${queueName} is exclusively consumed`);
if (exclusive) throw new Error(`Queue ${queueName} is exclusively consumed by ${queueConsumers[0].consumerTag}`);
if (consumeOptions && consumeOptions.exclusive) {

@@ -553,2 +550,3 @@ if (queueConsumers.length) throw new Error(`Cannot exclusively subscribe to queue ${queueName} since it is already consumed`);

consumer = Consumer(queueName, onMessage, consumeOptions);
consumers.push(consumer);

@@ -555,0 +553,0 @@ queueConsumers.push(consumer);

@@ -273,4 +273,4 @@ export function Broker(source) {

const directQueue = Queue('messages-queue', {autoDelete: false});
directQueue.addConsumer(direct);
const publishQueue = Queue('messages-queue', {autoDelete: false});
publishQueue.addConsumer(type === 'topic' ? topic : direct);

@@ -311,21 +311,18 @@ const exchange = {

if (stopped) return;
return publishQueue.queueMessage(routingKey, content, msgOptions);
}
if (type === 'direct') return directQueue.queueMessage(routingKey, content, msgOptions);
function topic(routingKey, message) {
const deliverTo = getConcernedBindings(routingKey);
if (!deliverTo.length) {
message.ack();
return 0;
}
const deliverTo = bindings.reduce((result, {queue, testPattern}) => {
if (testPattern(routingKey)) result.push(queue);
return result;
}, []);
if (!deliverTo.length) return 0;
deliverTo.forEach((queue) => queue.queueMessage(routingKey, content, msgOptions));
deliverTo.forEach(({queue}) => queue.queueMessage(routingKey, message.content, message.options));
message.ack();
}
function direct(routingKey, message) {
const deliverTo = bindings.reduce((result, bound) => {
if (bound.testPattern(routingKey)) result.push(bound);
return result;
}, []);
const deliverTo = getConcernedBindings(routingKey);
const first = deliverTo[0];

@@ -336,2 +333,3 @@ if (!first) {

}
if (deliverTo.length > 1) shift(deliverTo[0]);

@@ -341,2 +339,9 @@ first.queue.queueMessage(routingKey, message.content, message.options, message.ack);

function getConcernedBindings(routingKey) {
return bindings.reduce((result, bound) => {
if (bound.testPattern(routingKey)) result.push(bound);
return result;
}, []);
}
function shift(bound) {

@@ -374,4 +379,4 @@ const idx = bindings.indexOf(bound);

bindings.forEach((q) => q.close());
directQueue.removeConsumer(direct, false);
directQueue.close();
publishQueue.removeConsumer(type === 'topic' ? topic : direct, false);
publishQueue.close();
}

@@ -398,4 +403,3 @@

function getUndelivered() {
if (type !== 'direct') return;
return directQueue.getState().messages;
return publishQueue.getState().messages;
}

@@ -410,13 +414,6 @@ }

stopped = false;
if (!state) {
if (type === 'direct') {
directQueue.recover();
}
return;
}
recoverBindings();
if (state && type === 'direct') {
directQueue.recover({messages: state.undelivered});
if (state) {
publishQueue.recover({messages: state.undelivered});
}

@@ -427,3 +424,3 @@

function recoverBindings() {
if (!state.bindings) return;
if (!state || !state.bindings) return;
state.bindings.forEach((bindingState) => {

@@ -541,3 +538,3 @@ const queue = getQueue(bindingState.queueName);

if (exclusive) throw new Error(`Queue ${queueName} is exclusively consumed`);
if (exclusive) throw new Error(`Queue ${queueName} is exclusively consumed by ${queueConsumers[0].consumerTag}`);
if (consumeOptions && consumeOptions.exclusive) {

@@ -549,2 +546,3 @@ if (queueConsumers.length) throw new Error(`Cannot exclusively subscribe to queue ${queueName} since it is already consumed`);

consumer = Consumer(queueName, onMessage, consumeOptions);
consumers.push(consumer);

@@ -551,0 +549,0 @@ queueConsumers.push(consumer);

{
"name": "smqp",
"version": "0.5.1",
"version": "0.6.0",
"description": "Synchronous message queuing package",

@@ -54,3 +54,3 @@ "author": {

"chai": "^4.1.2",
"eslint": "^5.0.0",
"eslint": "^5.0.1",
"markdown-toc": "^1.2.0",

@@ -57,0 +57,0 @@ "mocha": "^5.2.0"

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc