Comparing version 7.1.4 to 8.0.0
Changelog | ||
========= | ||
# 8.0.0 | ||
- shovel ignores shoveling if destination exchange lacks bindings, could be breaking if cloneMessage function option was used to make things happen | ||
- abide to new lint rules | ||
- bump all dev dependencies | ||
# 7.1.4 | ||
@@ -5,0 +11,0 @@ |
@@ -289,3 +289,3 @@ "use strict"; | ||
if (self.getQueue(queueName)) throw new Error(`Queue named ${queueName} already exists`); | ||
const queueEmitter = (0, _Exchange.EventExchange)(queueName + '__events'); | ||
const queueEmitter = new _Exchange.EventExchange(`${queueName}__events`); | ||
this[kEventHandler].listen(queueEmitter); | ||
@@ -475,3 +475,3 @@ const queue = new _Queue.Queue(queueName, options, queueEmitter); | ||
} = msg.content; | ||
this.broker.events.publish('message.' + operation, message); | ||
this.broker.events.publish(`message.${operation}`, message); | ||
break; | ||
@@ -478,0 +478,0 @@ } |
@@ -18,3 +18,3 @@ "use strict"; | ||
if (['topic', 'direct'].indexOf(type) === -1) throw Error('Exchange type must be one of topic or direct'); | ||
const eventExchange = EventExchange(name + '__events'); | ||
const eventExchange = new EventExchange(`${name}__events`); | ||
return new ExchangeBase(name, type, options, eventExchange); | ||
@@ -21,0 +21,0 @@ } |
@@ -210,3 +210,3 @@ "use strict"; | ||
if (!requeue && message.properties.confirm) { | ||
this.emit('message.consumed.' + operation, { | ||
this.emit(`message.consumed.${operation}`, { | ||
operation, | ||
@@ -424,3 +424,3 @@ message: { | ||
}; | ||
if (!this.options.consumerTag) this.options.consumerTag = 'smq.ctag-' + (0, _shared.generateId)(); | ||
if (!this.options.consumerTag) this.options.consumerTag = `smq.ctag-${(0, _shared.generateId)()}`; | ||
this.queue = queue; | ||
@@ -433,3 +433,3 @@ this.onMessage = onMessage; | ||
this[kConsuming] = false; | ||
this[kInternalQueue] = new Queue(this.options.consumerTag + '-q', { | ||
this[kInternalQueue] = new Queue(`${this.options.consumerTag}-q`, { | ||
autoDelete: false, | ||
@@ -436,0 +436,0 @@ maxLength: this.options.prefetch |
@@ -134,2 +134,4 @@ "use strict"; | ||
Shovel.prototype._onShovelMessage = function onShovelMessage(routingKey, message) { | ||
const destinationExchange = this[kDestinationExchange]; | ||
if (!destinationExchange.bindingCount && !message.properties.mandatory) return message.ack(); | ||
const { | ||
@@ -145,4 +147,4 @@ content, | ||
if (!this[kBrokerInternal]) props['shovel-name'] = this.name; | ||
this[kDestinationExchange].publish(this.destination.exchangeKey || routingKey, content, props); | ||
destinationExchange.publish(this.destination.exchangeKey || routingKey, content, props); | ||
message.ack(); | ||
}; |
{ | ||
"name": "smqp", | ||
"version": "7.1.4", | ||
"version": "8.0.0", | ||
"type": "module", | ||
@@ -60,9 +60,10 @@ "description": "Synchronous message queueing package", | ||
"devDependencies": { | ||
"@babel/cli": "^7.21.0", | ||
"@babel/core": "^7.21.4", | ||
"@babel/preset-env": "^7.21.4", | ||
"c8": "^7.13.0", | ||
"@babel/cli": "^7.22.5", | ||
"@babel/core": "^7.22.5", | ||
"@babel/preset-env": "^7.22.5", | ||
"c8": "^8.0.0", | ||
"chai": "^4.3.7", | ||
"chronokinesis": "^4.0.1", | ||
"eslint": "^8.37.0", | ||
"chronokinesis": "^5.0.2", | ||
"eslint": "^8.43.0", | ||
"eslint-config-exp": "^0.6.2", | ||
"markdown-toc": "^1.2.0", | ||
@@ -69,0 +70,0 @@ "mocha": "^10.2.0" |
@@ -1,4 +0,4 @@ | ||
import {Exchange, EventExchange} from './Exchange.js'; | ||
import {Queue} from './Queue.js'; | ||
import {Shovel} from './Shovel.js'; | ||
import { Exchange, EventExchange } from './Exchange.js'; | ||
import { Queue } from './Queue.js'; | ||
import { Shovel } from './Shovel.js'; | ||
@@ -44,3 +44,3 @@ const kEntities = Symbol.for('entities'); | ||
Broker.prototype.subscribe = function subscribe(exchangeName, pattern, queueName, onMessage, options = {durable: true}) { | ||
Broker.prototype.subscribe = function subscribe(exchangeName, pattern, queueName, onMessage, options = { durable: true }) { | ||
if (!exchangeName || !pattern || typeof onMessage !== 'function') throw new Error('exchange name, pattern, and message callback are required'); | ||
@@ -58,3 +58,3 @@ if (options && options.consumerTag) this.validateConsumerTag(options.consumerTag); | ||
Broker.prototype.subscribeTmp = function subscribeTmp(exchangeName, pattern, onMessage, options) { | ||
return this.subscribe(exchangeName, pattern, null, onMessage, {...options, durable: false}); | ||
return this.subscribe(exchangeName, pattern, null, onMessage, { ...options, durable: false }); | ||
}; | ||
@@ -67,3 +67,3 @@ | ||
const exchange = this.assertExchange(exchangeName); | ||
const onceOptions = {autoDelete: true, durable: false, priority: options.priority || 0}; | ||
const onceOptions = { autoDelete: true, durable: false, priority: options.priority || 0 }; | ||
@@ -73,3 +73,3 @@ const onceQueue = this.createQueue(null, onceOptions); | ||
return this.consume(onceQueue.name, wrappedOnMessage, {noAck: true, consumerTag: options.consumerTag}); | ||
return this.consume(onceQueue.name, wrappedOnMessage, { noAck: true, consumerTag: options.consumerTag }); | ||
@@ -137,3 +137,3 @@ function wrappedOnMessage(...args) { | ||
consumerTag: consumer.options.consumerTag, | ||
options: {...consumer.options}, | ||
options: { ...consumer.options }, | ||
}; | ||
@@ -148,6 +148,6 @@ }); | ||
Broker.prototype.getExchange = function getExchange(exchangeName) { | ||
return this[kEntities].exchanges.find(({name}) => name === exchangeName); | ||
return this[kEntities].exchanges.find(({ name }) => name === exchangeName); | ||
}; | ||
Broker.prototype.deleteExchange = function deleteExchange(exchangeName, {ifUnused} = {}) { | ||
Broker.prototype.deleteExchange = function deleteExchange(exchangeName, { ifUnused } = {}) { | ||
const exchanges = this[kEntities].exchanges; | ||
@@ -166,3 +166,3 @@ const idx = exchanges.findIndex((exchange) => exchange.name === exchangeName); | ||
Broker.prototype.stop = function stop() { | ||
const {exchanges, queues} = this[kEntities]; | ||
const { exchanges, queues } = this[kEntities]; | ||
for (const exchange of exchanges) exchange.stop(); | ||
@@ -173,3 +173,3 @@ for (const queue of queues) queue.stop(); | ||
Broker.prototype.close = function close() { | ||
const {shovels, exchanges, queues} = this[kEntities]; | ||
const { shovels, exchanges, queues } = this[kEntities]; | ||
for (const shovel of shovels) shovel.close(); | ||
@@ -208,3 +208,3 @@ for (const exchange of exchanges) exchange.close(); | ||
} else { | ||
const {queues, exchanges} = self[kEntities]; | ||
const { queues, exchanges } = self[kEntities]; | ||
for (const queue of queues) { | ||
@@ -233,3 +233,3 @@ if (queue.stopped) queue.recover(); | ||
const name = `e2e-${source}2${destination}-${pattern}`; | ||
const {priority} = args; | ||
const { priority } = args; | ||
const shovel = this.createShovel(name, { | ||
@@ -244,7 +244,5 @@ broker: this, | ||
exchange: destination, | ||
}, { | ||
...args, | ||
}); | ||
}, { ...args }); | ||
const {consumerTag, source: shovelSource} = shovel; | ||
const { consumerTag, source: shovelSource } = shovel; | ||
@@ -313,3 +311,3 @@ return { | ||
const queueEmitter = EventExchange(queueName + '__events'); | ||
const queueEmitter = new EventExchange(`${queueName}__events`); | ||
this[kEventHandler].listen(queueEmitter); | ||
@@ -333,3 +331,3 @@ const queue = new Queue(queueName, options, queueEmitter); | ||
const queue = this.getQueue(queueName); | ||
options = {durable: true, ...options}; | ||
options = { durable: true, ...options }; | ||
if (!queue) return this.createQueue(queueName, options); | ||
@@ -348,7 +346,7 @@ | ||
Broker.prototype.get = function getMessageFromQueue(queueName, {noAck} = {}) { | ||
Broker.prototype.get = function getMessageFromQueue(queueName, { noAck } = {}) { | ||
const queue = this.getQueue(queueName); | ||
if (!queue) return; | ||
return queue.get({noAck}); | ||
return queue.get({ noAck }); | ||
}; | ||
@@ -389,3 +387,3 @@ | ||
if (this.getShovel(name)) throw new Error(`Shovel name must be unique, ${name} is occupied`); | ||
const shovel = new Shovel(name, {...source, broker: this}, destination, options); | ||
const shovel = new Shovel(name, { ...source, broker: this }, destination, options); | ||
this[kEventHandler].listen(shovel.events); | ||
@@ -414,3 +412,3 @@ shovels.push(shovel); | ||
Broker.prototype.on = function on(eventName, callback, options) { | ||
return this.events.on(eventName, getEventCallback(), {...options, origin: callback}); | ||
return this.events.on(eventName, getEventCallback(), { ...options, origin: callback }); | ||
@@ -428,3 +426,3 @@ function getEventCallback() { | ||
Broker.prototype.off = function off(eventName, callbackOrObject) { | ||
const {consumerTag} = callbackOrObject; | ||
const { consumerTag } = callbackOrObject; | ||
for (const binding of this.events.bindings) { | ||
@@ -485,3 +483,3 @@ if (binding.pattern === eventName) { | ||
if (!exchange) return; | ||
const {fields, content, properties} = msg.content.message; | ||
const { fields, content, properties } = msg.content.message; | ||
exchange.publish(fields.routingKey, content, properties); | ||
@@ -502,4 +500,4 @@ break; | ||
case 'queue.message.consumed.nack': { | ||
const {operation, message} = msg.content; | ||
this.broker.events.publish('message.' + operation, message); | ||
const { operation, message } = msg.content; | ||
this.broker.events.publish(`message.${operation}`, message); | ||
break; | ||
@@ -506,0 +504,0 @@ } |
@@ -1,6 +0,6 @@ | ||
import {Message} from './Message.js'; | ||
import {Queue} from './Queue.js'; | ||
import {sortByPriority, getRoutingKeyPattern, generateId} from './shared.js'; | ||
import { Message } from './Message.js'; | ||
import { Queue } from './Queue.js'; | ||
import { sortByPriority, getRoutingKeyPattern, generateId } from './shared.js'; | ||
export {Exchange, EventExchange}; | ||
export { Exchange, EventExchange }; | ||
@@ -14,4 +14,4 @@ const kType = Symbol.for('type'); | ||
if (!name) throw new Error('Exchange name is required'); | ||
if (['topic', 'direct'].indexOf(type) === -1) throw Error('Exchange type must be one of topic or direct'); | ||
const eventExchange = EventExchange(name + '__events'); | ||
if ([ 'topic', 'direct' ].indexOf(type) === -1) throw Error('Exchange type must be one of topic or direct'); | ||
const eventExchange = new EventExchange(`${name}__events`); | ||
return new ExchangeBase(name, type, options, eventExchange); | ||
@@ -22,3 +22,3 @@ } | ||
if (!name) name = `smq.ename-${generateId()}`; | ||
return new ExchangeBase(name, 'topic', {durable: false, autoDelete: true}); | ||
return new ExchangeBase(name, 'topic', { durable: false, autoDelete: true }); | ||
} | ||
@@ -31,8 +31,8 @@ | ||
this[kStopped] = false; | ||
this.options = {durable: true, autoDelete: true, ...options}; | ||
this.options = { durable: true, autoDelete: true, ...options }; | ||
this.events = eventExchange; | ||
const deliveryQueue = this[kDeliveryQueue] = new Queue('delivery-q', {autoDelete: false}); | ||
const deliveryQueue = this[kDeliveryQueue] = new Queue('delivery-q', { autoDelete: false }); | ||
const onMessage = (type === 'topic' ? this._onTopicMessage : this._onDirectMessage).bind(this); | ||
deliveryQueue.consume(onMessage, {exclusive: true, consumerTag: '_exchange-tag'}); | ||
deliveryQueue.consume(onMessage, { exclusive: true, consumerTag: '_exchange-tag' }); | ||
} | ||
@@ -74,3 +74,3 @@ | ||
return this[kDeliveryQueue].queueMessage({routingKey}, { | ||
return this[kDeliveryQueue].queueMessage({ routingKey }, { | ||
content, | ||
@@ -124,3 +124,3 @@ properties, | ||
ExchangeBase.prototype._publishToQueue = function publishToQueue(queue, routingKey, content, properties) { | ||
queue.queueMessage({routingKey, exchange: this.name}, content, properties); | ||
queue.queueMessage({ routingKey, exchange: this.name }, content, properties); | ||
}; | ||
@@ -132,6 +132,6 @@ | ||
if (properties.confirm) { | ||
this.emit('message.undelivered', new Message({routingKey, exchange: this.name}, content, properties)); | ||
this.emit('message.undelivered', new Message({ routingKey, exchange: this.name }, content, properties)); | ||
} | ||
if (properties.mandatory) { | ||
this.emit('return', new Message({routingKey, exchange: this.name}, content, properties)); | ||
this.emit('return', new Message({ routingKey, exchange: this.name }, content, properties)); | ||
} | ||
@@ -159,3 +159,3 @@ }; | ||
const [binding] = bindings.splice(idx, 1); | ||
const [ binding ] = bindings.splice(idx, 1); | ||
binding.close(); | ||
@@ -196,5 +196,5 @@ | ||
type: this.type, | ||
options: {...this.options}, | ||
...(deliveryQueue.messageCount ? {deliveryQueue: deliveryQueue.getState()} : undefined), | ||
...(bindings ? {bindings} : undefined), | ||
options: { ...this.options }, | ||
...(deliveryQueue.messageCount ? { deliveryQueue: deliveryQueue.getState() } : undefined), | ||
...(bindings ? { bindings } : undefined), | ||
}; | ||
@@ -222,3 +222,3 @@ }; | ||
const onMessage = (this[kType] === 'topic' ? this._onTopicMessage : this._onDirectMessage).bind(this); | ||
deliveryQueue.consume(onMessage, {exclusive: true, consumerTag: '_exchange-tag'}); | ||
deliveryQueue.consume(onMessage, { exclusive: true, consumerTag: '_exchange-tag' }); | ||
} | ||
@@ -241,3 +241,3 @@ | ||
const eventQueue = new Queue(null, {durable: false, autoDelete: true}); | ||
const eventQueue = new Queue(null, { durable: false, autoDelete: true }); | ||
const binding = this.bindQueue(eventQueue, pattern); | ||
@@ -250,3 +250,3 @@ eventQueue.events = { | ||
return eventQueue.consume(handler, {...consumeOptions, noAck: true}, this); | ||
return eventQueue.consume(handler, { ...consumeOptions, noAck: true }, this); | ||
}; | ||
@@ -257,3 +257,3 @@ | ||
const {consumerTag} = handler; | ||
const { consumerTag } = handler; | ||
@@ -270,3 +270,3 @@ for (const binding of this[kBindings]) { | ||
this.id = `${queue.name}/${pattern}`; | ||
this.options = {priority: 0, ...bindOptions}; | ||
this.options = { priority: 0, ...bindOptions }; | ||
this.pattern = pattern; | ||
@@ -293,3 +293,3 @@ this.exchange = exchange; | ||
id: this.id, | ||
options: {...this.options}, | ||
options: { ...this.options }, | ||
queueName: this.queue.name, | ||
@@ -296,0 +296,0 @@ pattern: this.pattern, |
@@ -1,6 +0,6 @@ | ||
import {Broker} from './Broker.js'; | ||
import {Shovel} from './Shovel.js'; | ||
import {getRoutingKeyPattern} from './shared.js'; | ||
import { Broker } from './Broker.js'; | ||
import { Shovel } from './Shovel.js'; | ||
import { getRoutingKeyPattern } from './shared.js'; | ||
export default Broker; | ||
export {Broker, Shovel, getRoutingKeyPattern}; | ||
export { Broker, Shovel, getRoutingKeyPattern }; |
@@ -9,3 +9,3 @@ import { generateId } from './shared.js'; | ||
function Message(fields, content, properties, onConsumed) { | ||
this[kOnConsumed] = [null, onConsumed]; | ||
this[kOnConsumed] = [ null, onConsumed ]; | ||
this[kPending] = false; | ||
@@ -22,3 +22,3 @@ | ||
this.fields = {...fields, consumerTag: undefined}; | ||
this.fields = { ...fields, consumerTag: undefined }; | ||
this.content = content; | ||
@@ -34,3 +34,3 @@ this.properties = mproperties; | ||
Message.prototype._consume = function consume({consumerTag}, consumedCb) { | ||
Message.prototype._consume = function consume({ consumerTag }, consumedCb) { | ||
this[kPending] = true; | ||
@@ -37,0 +37,0 @@ this.fields.consumerTag = consumerTag; |
@@ -1,5 +0,5 @@ | ||
import {generateId, sortByPriority} from './shared.js'; | ||
import {Message} from './Message.js'; | ||
import { generateId, sortByPriority } from './shared.js'; | ||
import { Message } from './Message.js'; | ||
export {Queue, Consumer}; | ||
export { Queue, Consumer }; | ||
@@ -19,3 +19,3 @@ const kConsumers = Symbol.for('consumers'); | ||
this.options = {autoDelete: true, ...options}; | ||
this.options = { autoDelete: true, ...options }; | ||
@@ -68,3 +68,3 @@ this.messages = []; | ||
const messageTtl = this.options.messageTtl; | ||
const messageProperties = {...properties}; | ||
const messageProperties = { ...properties }; | ||
if (messageTtl && !('expiration' in messageProperties)) { | ||
@@ -161,4 +161,4 @@ messageProperties.expiration = messageTtl; | ||
Queue.prototype.get = function getMessage({noAck, consumerTag} = {}) { | ||
const message = this._consumeMessages(1, {noAck, consumerTag})[0]; | ||
Queue.prototype.get = function getMessage({ noAck, consumerTag } = {}) { | ||
const message = this._consumeMessages(1, { noAck, consumerTag })[0]; | ||
if (!message) return; | ||
@@ -226,3 +226,3 @@ if (noAck) this._dequeueMessage(message); | ||
this[kAvailableCount]++; | ||
messages.splice(msgIdx, 0, new Message({...message.fields, redelivered: true}, message.content, message.properties, this[kOnConsumed])); | ||
messages.splice(msgIdx, 0, new Message({ ...message.fields, redelivered: true }, message.content, message.properties, this[kOnConsumed])); | ||
} else { | ||
@@ -243,3 +243,3 @@ deadLetterExchange = this.options.deadLetterExchange; | ||
if (!requeue && message.properties.confirm) { | ||
this.emit('message.consumed.' + operation, {operation, message: {...message}}); | ||
this.emit(`message.consumed.${operation}`, { operation, message: { ...message } }); | ||
} | ||
@@ -249,3 +249,3 @@ | ||
const deadLetterRoutingKey = this.options.deadLetterRoutingKey; | ||
const deadMessage = new Message(message.fields, message.content, {...message.properties, expiration: undefined}); | ||
const deadMessage = new Message(message.fields, message.content, { ...message.properties, expiration: undefined }); | ||
if (deadLetterRoutingKey) deadMessage.fields.routingKey = deadLetterRoutingKey; | ||
@@ -359,3 +359,3 @@ | ||
Queue.prototype.purge = function purge() { | ||
const toDelete = this.messages.filter(({pending}) => !pending); | ||
const toDelete = this.messages.filter(({ pending }) => !pending); | ||
this[kAvailableCount] = 0; | ||
@@ -383,3 +383,3 @@ | ||
name: this.name, | ||
options: {...this.options}, | ||
options: { ...this.options }, | ||
}; | ||
@@ -421,5 +421,5 @@ if (msgs.length) { | ||
const onConsumed = this[kOnConsumed]; | ||
for (const {fields, content, properties} of state.messages) { | ||
for (const { fields, content, properties } of state.messages) { | ||
if (properties.persistent === false) continue; | ||
const msg = new Message({...fields, redelivered: true}, content, properties, onConsumed); | ||
const msg = new Message({ ...fields, redelivered: true }, content, properties, onConsumed); | ||
this.messages.push(msg); | ||
@@ -436,3 +436,3 @@ } | ||
Queue.prototype.delete = function deleteQueue({ifUnused, ifEmpty} = {}) { | ||
Queue.prototype.delete = function deleteQueue({ ifUnused, ifEmpty } = {}) { | ||
const consumers = this[kConsumers]; | ||
@@ -452,3 +452,3 @@ if (ifUnused && consumers.length) return; | ||
this.emit('delete', this); | ||
return {messageCount}; | ||
return { messageCount }; | ||
}; | ||
@@ -479,4 +479,4 @@ | ||
this.options = {prefetch: 1, priority: 0, noAck: false, ...options}; | ||
if (!this.options.consumerTag) this.options.consumerTag = 'smq.ctag-' + generateId(); | ||
this.options = { prefetch: 1, priority: 0, noAck: false, ...options }; | ||
if (!this.options.consumerTag) this.options.consumerTag = `smq.ctag-${generateId()}`; | ||
@@ -491,3 +491,3 @@ this.queue = queue; | ||
this[kInternalQueue] = new Queue(this.options.consumerTag + '-q', { | ||
this[kInternalQueue] = new Queue(`${this.options.consumerTag}-q`, { | ||
autoDelete: false, | ||
@@ -494,0 +494,0 @@ maxLength: this.options.prefetch, |
@@ -1,2 +0,2 @@ | ||
export {generateId, getRoutingKeyPattern, sortByPriority}; | ||
export { generateId, getRoutingKeyPattern, sortByPriority }; | ||
@@ -3,0 +3,0 @@ const allDots = /\./g; |
@@ -1,2 +0,2 @@ | ||
import {EventExchange} from './Exchange.js'; | ||
import { EventExchange } from './Exchange.js'; | ||
@@ -13,4 +13,4 @@ const kBrokerInternal = Symbol.for('brokerInternal'); | ||
export function Shovel(name, source, destination, options = {}) { | ||
const {broker: sourceBroker, exchange: sourceExchangeName, pattern, queue, priority} = source; | ||
const {broker: destinationBroker, exchange: destinationExchangeName} = destination; | ||
const { broker: sourceBroker, exchange: sourceExchangeName, pattern, queue, priority } = source; | ||
const { broker: destinationBroker, exchange: destinationExchangeName } = destination; | ||
@@ -35,4 +35,4 @@ const sourceExchange = sourceBroker.getExchange(sourceExchangeName); | ||
this.name = name; | ||
this.source = {...source, pattern: routingKeyPattern}; | ||
this.destination = {...destination}; | ||
this.source = { ...source, pattern: routingKeyPattern }; | ||
this.destination = { ...destination }; | ||
this.events = new EventExchange('shovel__events'); | ||
@@ -57,5 +57,5 @@ | ||
if (queue) { | ||
consumer = sourceBroker.subscribe(sourceExchangeName, routingKeyPattern, queue, shovelHandler, {consumerTag, priority}); | ||
consumer = sourceBroker.subscribe(sourceExchangeName, routingKeyPattern, queue, shovelHandler, { consumerTag, priority }); | ||
} else { | ||
consumer = sourceBroker.subscribeTmp(sourceExchangeName, routingKeyPattern, shovelHandler, {consumerTag, priority}); | ||
consumer = sourceBroker.subscribeTmp(sourceExchangeName, routingKeyPattern, shovelHandler, { consumerTag, priority }); | ||
this.source.queue = consumer.queue.name; | ||
@@ -106,7 +106,7 @@ } | ||
const {fields, content, properties} = message; | ||
const {content: newContent, properties: newProperties} = cloneMessage({ | ||
fields: {...fields}, | ||
const { fields, content, properties } = message; | ||
const { content: newContent, properties: newProperties } = cloneMessage({ | ||
fields: { ...fields }, | ||
content, | ||
properties: {...properties}, | ||
properties: { ...properties }, | ||
}); | ||
@@ -117,3 +117,3 @@ | ||
content: newContent, | ||
properties: {...properties, ...newProperties}, | ||
properties: { ...properties, ...newProperties }, | ||
}; | ||
@@ -123,7 +123,10 @@ }; | ||
Shovel.prototype._onShovelMessage = function onShovelMessage(routingKey, message) { | ||
const {content, properties} = this._messageHandler(message); | ||
const props = {...properties, ...this.destination.publishProperties, 'source-exchange': this[kSourceExchange].name}; | ||
const destinationExchange = this[kDestinationExchange]; | ||
if (!destinationExchange.bindingCount && !message.properties.mandatory) return message.ack(); | ||
const { content, properties } = this._messageHandler(message); | ||
const props = { ...properties, ...this.destination.publishProperties, 'source-exchange': this[kSourceExchange].name }; | ||
if (!this[kBrokerInternal]) props['shovel-name'] = this.name; | ||
this[kDestinationExchange].publish(this.destination.exchangeKey || routingKey, content, props); | ||
destinationExchange.publish(this.destination.exchangeKey || routingKey, content, props); | ||
message.ack(); | ||
}; |
@@ -20,4 +20,10 @@ import { Consumer } from './Queue.js'; | ||
export interface ShovelDestination { | ||
/** destination broker */ | ||
broker: Broker; | ||
/** destination exchange */ | ||
exchange: string; | ||
/** optional destination exchange routing key, defaults to original message's routing key */ | ||
exchangeKey?: string; | ||
/** optional object with message properties to overwrite when shovelling messages */ | ||
publishProperties?: Record<string, any>; | ||
} | ||
@@ -24,0 +30,0 @@ |
119660
3263
10