Comparing version 0.5.1 to 0.6.0
@@ -279,4 +279,4 @@ 'use strict'; | ||
const directQueue = Queue('messages-queue', { autoDelete: false }); | ||
directQueue.addConsumer(direct); | ||
const publishQueue = Queue('messages-queue', { autoDelete: false }); | ||
publishQueue.addConsumer(type === 'topic' ? topic : direct); | ||
@@ -317,21 +317,18 @@ const exchange = { | ||
if (stopped) return; | ||
return publishQueue.queueMessage(routingKey, content, msgOptions); | ||
} | ||
if (type === 'direct') return directQueue.queueMessage(routingKey, content, msgOptions); | ||
function topic(routingKey, message) { | ||
const deliverTo = getConcernedBindings(routingKey); | ||
if (!deliverTo.length) { | ||
message.ack(); | ||
return 0; | ||
} | ||
const deliverTo = bindings.reduce((result, { queue, testPattern }) => { | ||
if (testPattern(routingKey)) result.push(queue); | ||
return result; | ||
}, []); | ||
if (!deliverTo.length) return 0; | ||
deliverTo.forEach(queue => queue.queueMessage(routingKey, content, msgOptions)); | ||
deliverTo.forEach(({ queue }) => queue.queueMessage(routingKey, message.content, message.options)); | ||
message.ack(); | ||
} | ||
function direct(routingKey, message) { | ||
const deliverTo = bindings.reduce((result, bound) => { | ||
if (bound.testPattern(routingKey)) result.push(bound); | ||
return result; | ||
}, []); | ||
const deliverTo = getConcernedBindings(routingKey); | ||
const first = deliverTo[0]; | ||
@@ -342,2 +339,3 @@ if (!first) { | ||
} | ||
if (deliverTo.length > 1) shift(deliverTo[0]); | ||
@@ -347,2 +345,9 @@ first.queue.queueMessage(routingKey, message.content, message.options, message.ack); | ||
function getConcernedBindings(routingKey) { | ||
return bindings.reduce((result, bound) => { | ||
if (bound.testPattern(routingKey)) result.push(bound); | ||
return result; | ||
}, []); | ||
} | ||
function shift(bound) { | ||
@@ -380,4 +385,4 @@ const idx = bindings.indexOf(bound); | ||
bindings.forEach(q => q.close()); | ||
directQueue.removeConsumer(direct, false); | ||
directQueue.close(); | ||
publishQueue.removeConsumer(type === 'topic' ? topic : direct, false); | ||
publishQueue.close(); | ||
} | ||
@@ -404,4 +409,3 @@ | ||
function getUndelivered() { | ||
if (type !== 'direct') return; | ||
return directQueue.getState().messages; | ||
return publishQueue.getState().messages; | ||
} | ||
@@ -416,13 +420,6 @@ } | ||
stopped = false; | ||
if (!state) { | ||
if (type === 'direct') { | ||
directQueue.recover(); | ||
} | ||
return; | ||
} | ||
recoverBindings(); | ||
if (state && type === 'direct') { | ||
directQueue.recover({ messages: state.undelivered }); | ||
if (state) { | ||
publishQueue.recover({ messages: state.undelivered }); | ||
} | ||
@@ -433,3 +430,3 @@ | ||
function recoverBindings() { | ||
if (!state.bindings) return; | ||
if (!state || !state.bindings) return; | ||
state.bindings.forEach(bindingState => { | ||
@@ -545,3 +542,3 @@ const queue = getQueue(bindingState.queueName); | ||
if (exclusive) throw new Error(`Queue ${queueName} is exclusively consumed`); | ||
if (exclusive) throw new Error(`Queue ${queueName} is exclusively consumed by ${queueConsumers[0].consumerTag}`); | ||
if (consumeOptions && consumeOptions.exclusive) { | ||
@@ -553,2 +550,3 @@ if (queueConsumers.length) throw new Error(`Cannot exclusively subscribe to queue ${queueName} since it is already consumed`); | ||
consumer = Consumer(queueName, onMessage, consumeOptions); | ||
consumers.push(consumer); | ||
@@ -555,0 +553,0 @@ queueConsumers.push(consumer); |
60
index.js
@@ -273,4 +273,4 @@ export function Broker(source) { | ||
const directQueue = Queue('messages-queue', {autoDelete: false}); | ||
directQueue.addConsumer(direct); | ||
const publishQueue = Queue('messages-queue', {autoDelete: false}); | ||
publishQueue.addConsumer(type === 'topic' ? topic : direct); | ||
@@ -311,21 +311,18 @@ const exchange = { | ||
if (stopped) return; | ||
return publishQueue.queueMessage(routingKey, content, msgOptions); | ||
} | ||
if (type === 'direct') return directQueue.queueMessage(routingKey, content, msgOptions); | ||
function topic(routingKey, message) { | ||
const deliverTo = getConcernedBindings(routingKey); | ||
if (!deliverTo.length) { | ||
message.ack(); | ||
return 0; | ||
} | ||
const deliverTo = bindings.reduce((result, {queue, testPattern}) => { | ||
if (testPattern(routingKey)) result.push(queue); | ||
return result; | ||
}, []); | ||
if (!deliverTo.length) return 0; | ||
deliverTo.forEach((queue) => queue.queueMessage(routingKey, content, msgOptions)); | ||
deliverTo.forEach(({queue}) => queue.queueMessage(routingKey, message.content, message.options)); | ||
message.ack(); | ||
} | ||
function direct(routingKey, message) { | ||
const deliverTo = bindings.reduce((result, bound) => { | ||
if (bound.testPattern(routingKey)) result.push(bound); | ||
return result; | ||
}, []); | ||
const deliverTo = getConcernedBindings(routingKey); | ||
const first = deliverTo[0]; | ||
@@ -336,2 +333,3 @@ if (!first) { | ||
} | ||
if (deliverTo.length > 1) shift(deliverTo[0]); | ||
@@ -341,2 +339,9 @@ first.queue.queueMessage(routingKey, message.content, message.options, message.ack); | ||
function getConcernedBindings(routingKey) { | ||
return bindings.reduce((result, bound) => { | ||
if (bound.testPattern(routingKey)) result.push(bound); | ||
return result; | ||
}, []); | ||
} | ||
function shift(bound) { | ||
@@ -374,4 +379,4 @@ const idx = bindings.indexOf(bound); | ||
bindings.forEach((q) => q.close()); | ||
directQueue.removeConsumer(direct, false); | ||
directQueue.close(); | ||
publishQueue.removeConsumer(type === 'topic' ? topic : direct, false); | ||
publishQueue.close(); | ||
} | ||
@@ -398,4 +403,3 @@ | ||
function getUndelivered() { | ||
if (type !== 'direct') return; | ||
return directQueue.getState().messages; | ||
return publishQueue.getState().messages; | ||
} | ||
@@ -410,13 +414,6 @@ } | ||
stopped = false; | ||
if (!state) { | ||
if (type === 'direct') { | ||
directQueue.recover(); | ||
} | ||
return; | ||
} | ||
recoverBindings(); | ||
if (state && type === 'direct') { | ||
directQueue.recover({messages: state.undelivered}); | ||
if (state) { | ||
publishQueue.recover({messages: state.undelivered}); | ||
} | ||
@@ -427,3 +424,3 @@ | ||
function recoverBindings() { | ||
if (!state.bindings) return; | ||
if (!state || !state.bindings) return; | ||
state.bindings.forEach((bindingState) => { | ||
@@ -541,3 +538,3 @@ const queue = getQueue(bindingState.queueName); | ||
if (exclusive) throw new Error(`Queue ${queueName} is exclusively consumed`); | ||
if (exclusive) throw new Error(`Queue ${queueName} is exclusively consumed by ${queueConsumers[0].consumerTag}`); | ||
if (consumeOptions && consumeOptions.exclusive) { | ||
@@ -549,2 +546,3 @@ if (queueConsumers.length) throw new Error(`Cannot exclusively subscribe to queue ${queueName} since it is already consumed`); | ||
consumer = Consumer(queueName, onMessage, consumeOptions); | ||
consumers.push(consumer); | ||
@@ -551,0 +549,0 @@ queueConsumers.push(consumer); |
{ | ||
"name": "smqp", | ||
"version": "0.5.1", | ||
"version": "0.6.0", | ||
"description": "Synchronous message queuing package", | ||
@@ -54,3 +54,3 @@ "author": { | ||
"chai": "^4.1.2", | ||
"eslint": "^5.0.0", | ||
"eslint": "^5.0.1", | ||
"markdown-toc": "^1.2.0", | ||
@@ -57,0 +57,0 @@ "mocha": "^5.2.0" |
49695
1441