@techteamer/mq
Advanced tools
Comparing version 4.1.4 to 5.0.0
@@ -0,1 +1,7 @@ | ||
5.0.0 | ||
- added Protobuf support | ||
- added more options to configure queue and exchange assertions | ||
- updated and upgraded dependencies | ||
- added option to exit on connection closing | ||
4.1.4 | ||
@@ -2,0 +8,0 @@ - dependency updates |
{ | ||
"name": "@techteamer/mq", | ||
"version": "4.1.4", | ||
"version": "5.0.0", | ||
"description": "A RabbitMQ wrapper for node", | ||
@@ -14,3 +14,3 @@ "main": "index.js", | ||
"type": "git", | ||
"url": "https://github.com/TechTeamer/techteamer_mq" | ||
"url": "https://github.com/TechTeamer/mq" | ||
}, | ||
@@ -26,3 +26,3 @@ "engines": { | ||
"dependencies": { | ||
"amqplib": "^0.9.1", | ||
"amqplib": "^0.10.1", | ||
"uuid": "^8.2.0" | ||
@@ -41,4 +41,8 @@ }, | ||
"nyc": "^15.1.0", | ||
"protobufjs": "^7.0.0", | ||
"seed-random": "^2.2.0" | ||
}, | ||
"peerDependencies": { | ||
"protobufjs": "^7.0.0" | ||
} | ||
} |
@@ -5,4 +5,4 @@ TechTeamer MQ | ||
[![npm](https://img.shields.io/npm/v/@techteamer/mq.svg)](https://www.npmjs.com/package/@techteamer/mq) | ||
[![Build Status](https://travis-ci.org/TechTeamer/techteamer_mq.svg)](https://travis-ci.org/TechTeamer/techteamer_mq) | ||
[![Coverage Status](https://coveralls.io/repos/github/TechTeamer/techteamer_mq/badge.svg?branch=master)](https://coveralls.io/github/TechTeamer/techteamer_mq?branch=master) | ||
[![Build Status](https://travis-ci.org/TechTeamer/mq.svg)](https://travis-ci.org/TechTeamer/mq) | ||
[![Coverage Status](https://coveralls.io/repos/github/TechTeamer/mq/badge.svg?branch=master)](https://coveralls.io/github/TechTeamer/mq?branch=master) | ||
@@ -9,0 +9,0 @@ A RabbitMQ wrapper for node |
@@ -20,6 +20,18 @@ const { v4: uuid } = require('uuid') | ||
const { queueMaxSize, timeoutMs, serverCount = 0 } = options | ||
const { | ||
queueMaxSize, | ||
timeoutMs, | ||
serverCount = 0, | ||
assertQueueOptions, | ||
assertExchange = true, | ||
assertExchangeOptions = null | ||
} = options | ||
this._rpcQueueMaxSize = queueMaxSize | ||
this._rpcTimeoutMs = timeoutMs | ||
this._gatheringServerCount = serverCount | ||
this._assertExchange = assertExchange === true | ||
this._assertQueueOptions = Object.assign({ exclusive: true }, assertQueueOptions || {}) | ||
this._assertExchangeOptions = Object.assign({ durable: true }, assertExchangeOptions || {}) | ||
} | ||
@@ -30,5 +42,7 @@ | ||
const channel = await this._connection.getChannel() | ||
await channel.assertExchange(this.name, 'fanout', { durable: true }) | ||
if (this._assertExchange) { | ||
await channel.assertExchange(this.name, 'fanout', this._assertExchangeOptions) | ||
} | ||
const replyQueue = await channel.assertQueue('', { exclusive: true }) | ||
const replyQueue = await channel.assertQueue('', this._assertQueueOptions) | ||
this._replyQueue = replyQueue.queue | ||
@@ -35,0 +49,0 @@ |
@@ -17,6 +17,17 @@ const QueueMessage = require('./QueueMessage') | ||
const { prefetchCount, timeoutMs } = options || {} | ||
const { | ||
prefetchCount, | ||
timeoutMs, | ||
assertQueueOptions, | ||
assertExchange = true, | ||
assertExchangeOptions = null | ||
} = options || {} | ||
this._prefetchCount = prefetchCount | ||
this._responseTimeoutMs = timeoutMs | ||
this._assertExchange = assertExchange === true | ||
this._assertQueueOptions = Object.assign({ exclusive: true }, assertQueueOptions || {}) | ||
this._assertExchangeOptions = Object.assign({ durable: true }, assertExchangeOptions || {}) | ||
this.actions = new Map() | ||
@@ -28,4 +39,6 @@ } | ||
const channel = await this._connection.getChannel() | ||
await channel.assertExchange(this.name, 'fanout', { durable: true }) | ||
const serverQueue = await channel.assertQueue('', { exclusive: true }) | ||
if (this._assertExchange) { | ||
await channel.assertExchange(this.name, 'fanout', this._assertExchangeOptions) | ||
} | ||
const serverQueue = await channel.assertQueue('', this._assertQueueOptions) | ||
const serverQueueName = serverQueue.queue | ||
@@ -32,0 +45,0 @@ |
@@ -8,4 +8,5 @@ const QueueMessage = require('./QueueMessage') | ||
* @param {String} exchange | ||
* @param {Object} options | ||
*/ | ||
constructor (queueConnection, logger, exchange) { | ||
constructor (queueConnection, logger, exchange, options) { | ||
this._connection = queueConnection | ||
@@ -15,2 +16,16 @@ this._logger = logger | ||
this.routingKey = '' | ||
this.options = options | ||
const { | ||
MessageModel, | ||
ContentSchema, | ||
assertExchange = true, | ||
assertExchangeOptions = null | ||
} = options || {} | ||
this._assertExchange = assertExchange === true | ||
this._assertExchangeOptions = Object.assign({ durable: true }, assertExchangeOptions || {}) | ||
this.MessageModel = MessageModel || QueueMessage | ||
this.ContentSchema = ContentSchema || JSON | ||
} | ||
@@ -25,3 +40,5 @@ | ||
assertExchangeOrQueue (channel) { | ||
return channel.assertExchange(this.exchange, 'fanout', { durable: true }) | ||
if (this._assertExchange) { | ||
return channel.assertExchange(this.exchange, 'fanout', this._assertExchangeOptions) | ||
} | ||
} | ||
@@ -64,3 +81,3 @@ | ||
try { | ||
param = new QueueMessage('ok', message, timeOut) | ||
param = new this.MessageModel('ok', message, timeOut, this.ContentSchema) | ||
if (attachments instanceof Map) { | ||
@@ -67,0 +84,0 @@ for (const [key, value] of attachments) { |
@@ -8,6 +8,16 @@ const Publisher = require('./Publisher') | ||
* @param {String} name | ||
* @param {Object} options | ||
*/ | ||
constructor (queueConnection, logger, name) { | ||
super(queueConnection, logger, '') | ||
constructor (queueConnection, logger, name, options) { | ||
super(queueConnection, logger, '', options) | ||
this.routingKey = name | ||
this._assertQueue = null | ||
const { | ||
assertQueue = true, | ||
assertQueueOptions | ||
} = options || {} | ||
this._assertQueue = assertQueue === true | ||
this._assertQueueOptions = Object.assign({ durable: true }, assertQueueOptions || {}) | ||
} | ||
@@ -20,3 +30,5 @@ | ||
assertExchangeOrQueue (channel) { | ||
return channel.assertQueue(this.routingKey, { durable: true }) | ||
if (this._assertQueue) { | ||
return channel.assertQueue(this.routingKey, this._assertQueueOptions) | ||
} | ||
} | ||
@@ -23,0 +35,0 @@ } |
@@ -36,3 +36,4 @@ | ||
rpcQueueMaxSize = 100, | ||
logger = console | ||
logger = console, | ||
exitOnConnectionClose = 2 | ||
} = config | ||
@@ -46,2 +47,3 @@ | ||
this.logger = logger | ||
this.exitOnConnectionClose = exitOnConnectionClose | ||
} | ||
@@ -48,0 +50,0 @@ } |
@@ -56,3 +56,5 @@ const fs = require('fs') | ||
this._logger.error('RabbitMQ closed') | ||
process.exit(2) | ||
if (this._config.exitOnConnectionClose) { | ||
process.exit(this._config.exitOnConnectionClose) | ||
} | ||
}) | ||
@@ -59,0 +61,0 @@ conn.on('blocked', (reason) => { |
@@ -165,5 +165,6 @@ const QueueConfig = require('./QueueConfig') | ||
* @param {Publisher|function() : Publisher} OverrideClass | ||
* @param {Object} [options] | ||
* @return Publisher | ||
*/ | ||
getPublisher (exchangeName, OverrideClass = Publisher) { | ||
getPublisher (exchangeName, OverrideClass = Publisher, options = {}) { | ||
if (this.publishers.has(exchangeName)) { | ||
@@ -173,2 +174,7 @@ return this.publishers.get(exchangeName) | ||
if (arguments.length === 2 && typeof OverrideClass !== 'function') { | ||
options = OverrideClass | ||
OverrideClass = Publisher | ||
} | ||
if (OverrideClass !== Publisher && !(OverrideClass.prototype instanceof Publisher)) { | ||
@@ -178,3 +184,3 @@ throw new Error('Override must be a subclass of Publisher') | ||
const publisher = new OverrideClass(this.connection, this._logger, exchangeName) | ||
const publisher = new OverrideClass(this.connection, this._logger, exchangeName, options) | ||
@@ -280,5 +286,6 @@ this.publishers.set(exchangeName, publisher) | ||
* @param {QueueClient|function() : QueueClient} OverrideClass | ||
* @param {Object} [options={}] | ||
* @return QueueClient | ||
*/ | ||
getQueueClient (queueName, OverrideClass = QueueClient) { | ||
getQueueClient (queueName, OverrideClass = QueueClient, options = {}) { | ||
if (this.queueClients.has(queueName)) { | ||
@@ -288,2 +295,7 @@ return this.queueClients.get(queueName) | ||
if (arguments.length === 2 && typeof OverrideClass !== 'function') { | ||
options = OverrideClass | ||
OverrideClass = QueueClient | ||
} | ||
if (OverrideClass !== QueueClient && !(OverrideClass.prototype instanceof QueueClient)) { | ||
@@ -293,3 +305,3 @@ throw new Error('Override must be a subclass of QueueClient') | ||
const queueClient = new OverrideClass(this.connection, this._logger, queueName) | ||
const queueClient = new OverrideClass(this.connection, this._logger, queueName, options) | ||
@@ -296,0 +308,0 @@ this.queueClients.set(queueName, queueClient) |
class QueueMessage { | ||
constructor (status, data, timeOut) { | ||
constructor (status, data, timeOut, ContentSchema = JSON) { | ||
this.status = status | ||
@@ -7,2 +7,3 @@ this.data = data | ||
this.attachments = new Map() | ||
this.ContentSchema = ContentSchema | ||
} | ||
@@ -46,17 +47,27 @@ | ||
static unserialize (buffer) { | ||
if (buffer.toString('utf8', 0, 1) === '+') { | ||
const jsonLength = buffer.slice(1, 5).readUInt32BE() | ||
const { status, data, timeOut, attachArray } = JSON.parse(buffer.toString('utf8', 5, 5 + jsonLength)) | ||
let prevAttachmentLength = 5 + jsonLength | ||
const queueMessage = new QueueMessage(status, data, timeOut) | ||
for (const [key, length] of attachArray) { | ||
queueMessage.addAttachment(key, buffer.slice(prevAttachmentLength, prevAttachmentLength + length)) | ||
prevAttachmentLength = prevAttachmentLength + length | ||
/** | ||
* @param {Buffer} buffer | ||
* @param ContentSchema | ||
* @returns {QueueMessage} | ||
*/ | ||
static unserialize (buffer, ContentSchema = JSON) { | ||
if (!ContentSchema || ContentSchema === JSON) { | ||
if (buffer.toString('utf8', 0, 1) === '+') { | ||
const jsonLength = buffer.slice(1, 5).readUInt32BE() | ||
const { status, data, timeOut, attachArray } = JSON.parse(buffer.toString('utf8', 5, 5 + jsonLength)) | ||
let prevAttachmentLength = 5 + jsonLength | ||
const queueMessage = new this(status, data, timeOut, ContentSchema) | ||
for (const [key, length] of attachArray) { | ||
queueMessage.addAttachment(key, buffer.slice(prevAttachmentLength, prevAttachmentLength + length)) | ||
prevAttachmentLength = prevAttachmentLength + length | ||
} | ||
return queueMessage | ||
} else if (buffer.toString('utf8', 0, 1) === '{') { | ||
return this.fromJSON(buffer.toString('utf8')) | ||
} else { | ||
throw new Error('Impossible to deserialize the message: unrecognized format') | ||
} | ||
return queueMessage | ||
} else if (buffer.toString('utf8', 0, 1) === '{') { | ||
return this.fromJSON(buffer.toString('utf8')) | ||
} else { | ||
throw new Error('Impossible to deserialize the message') | ||
throw new Error('Impossible to deserialize the message: unknown content schema') | ||
} | ||
@@ -63,0 +74,0 @@ } |
@@ -12,3 +12,13 @@ const Subscriber = require('./Subscriber') | ||
super(queueConnection, logger, name, options) | ||
this._prefetchCount = options.prefetchCount | ||
const { | ||
assertQueue = true, | ||
assertQueueOptions = null, | ||
prefetchCount | ||
} = options || {} | ||
this._assertQueue = null | ||
this._prefetchCount = prefetchCount | ||
this._assertQueue = assertQueue === true | ||
this._assertQueueOptions = Object.assign({ durable: true }, assertQueueOptions || {}) | ||
} | ||
@@ -22,3 +32,5 @@ | ||
const channel = await this._connection.getChannel() | ||
await channel.assertQueue(this.name, { durable: true }) | ||
if (this._assertQueue) { | ||
await channel.assertQueue(this.name, this._assertQueueOptions) | ||
} | ||
await channel.prefetch(this._prefetchCount) | ||
@@ -29,3 +41,3 @@ await channel.consume(this.name, (msg) => { | ||
} catch (err) { | ||
this._logger.error('CANNOT INITIALIZE QUEUE SERVER', err) | ||
this._logger.error('CANNOT INITIALIZE QUEUE SERVER', this.name, this._assertQueueOptions, err) | ||
throw err | ||
@@ -32,0 +44,0 @@ } |
@@ -16,11 +16,29 @@ const { v4: uuid } = require('uuid') | ||
constructor (queueConnection, logger, rpcName, options) { | ||
const { | ||
queueMaxSize, | ||
timeoutMs, | ||
RequestMessageModel, | ||
ResponseMessageModel, | ||
RequestContentSchema, | ||
ResponseContentSchema, | ||
replyQueueName = '', | ||
assertReplyQueue = true, | ||
assertReplyQueueOptions = null | ||
} = options || {} | ||
this._connection = queueConnection | ||
this._logger = logger | ||
this.name = rpcName | ||
this._replyQueue = '' | ||
this._replyQueue = replyQueueName || '' | ||
this._correlationIdMap = new Map() | ||
const { queueMaxSize, timeoutMs } = options | ||
this._assertReplyQueue = assertReplyQueue === true | ||
this._assertReplyQueueOptions = Object.assign({ exclusive: true }, assertReplyQueueOptions || {}) | ||
this._rpcQueueMaxSize = queueMaxSize | ||
this._rpcTimeoutMs = timeoutMs | ||
this.RequestMessageModel = RequestMessageModel || QueueMessage | ||
this.ResponseMessageModel = ResponseMessageModel || QueueMessage | ||
this.RequestContentSchema = RequestContentSchema || JSON | ||
this.ResponseContentSchema = ResponseContentSchema || JSON | ||
} | ||
@@ -96,3 +114,3 @@ | ||
try { | ||
param = new QueueMessage('ok', message, timeoutMs) | ||
param = new this.RequestMessageModel('ok', message, timeoutMs, this.RequestContentSchema) | ||
if (attachments instanceof Map) { | ||
@@ -140,4 +158,6 @@ for (const [key, value] of attachments) { | ||
try { | ||
const replyQueue = await ch.assertQueue('', { exclusive: true }) | ||
this._replyQueue = replyQueue.queue | ||
if (this._assertReplyQueue) { | ||
const assertResult = await ch.assertQueue(this._replyQueue, this._assertReplyQueueOptions) | ||
this._replyQueue = assertResult.queue | ||
} | ||
@@ -179,3 +199,3 @@ ch.consume(this._replyQueue, (msg) => { | ||
const replyContent = QueueMessage.unserialize(reply.content) | ||
const replyContent = this.ResponseMessageModel.unserialize(reply.content, this.ResponseContentSchema) | ||
@@ -182,0 +202,0 @@ if (replyContent.status === 'ok') { |
const QueueMessage = require('./QueueMessage') | ||
const QueueResponse = require('./QueueResponse') | ||
const RPCError = require('./RPCError') | ||
@@ -16,2 +15,13 @@ /** | ||
constructor (queueConnection, logger, rpcName, options) { | ||
const { | ||
prefetchCount, | ||
timeoutMs, | ||
RequestMessageModel, | ||
ResponseMessageModel, | ||
RequestContentSchema, | ||
ResponseContentSchema, | ||
assertQueue = true, | ||
assertQueueOptions = null | ||
} = options || {} | ||
this._connection = queueConnection | ||
@@ -21,5 +31,11 @@ this._logger = logger | ||
const { prefetchCount, timeoutMs } = options | ||
this._assertQueue = assertQueue === true | ||
this._assertQueueOptions = Object.assign({ durable: true }, assertQueueOptions || {}) | ||
this._prefetchCount = prefetchCount | ||
this._timeoutMs = timeoutMs | ||
this.RequestModel = RequestMessageModel || QueueMessage | ||
this.ResponseModel = ResponseMessageModel || QueueMessage | ||
this.RequestContentSchema = RequestContentSchema || JSON | ||
this.ResponseContentSchema = ResponseContentSchema || JSON | ||
@@ -67,6 +83,6 @@ this.actions = new Map() | ||
const channel = await this._connection.getChannel() | ||
await channel.assertQueue(this.name, { durable: true }) | ||
if (this._assertQueue) { | ||
await channel.assertQueue(this.name, this._assertQueueOptions) | ||
} | ||
await channel.prefetch(this._prefetchCount) | ||
await channel.consume(this.name, (msg) => { | ||
@@ -95,2 +111,60 @@ this._processMessage(channel, msg) | ||
_createResponseTimeoutReply (_msg, _request) { | ||
return new this.ResponseModel('error', 'timeout', null, this.ResponseContentSchema) | ||
} | ||
onResponseTimeout (ch, msg, request) { | ||
ch.sendToQueue(msg.properties.replyTo, this._createResponseTimeoutReply(msg, request).serialize(), { correlationId: msg.properties.correlationId }) | ||
this._ack(ch, msg) | ||
} | ||
handleResponseTimeout (ch, msg, request) { | ||
try { | ||
this.onResponseTimeout(ch, msg, request) | ||
} catch (err) { | ||
this._logger.error('Error handling RPC response timeout', err) | ||
this._ack(ch, msg) | ||
} | ||
} | ||
_createRequestErrorReply (_msg, _request) { | ||
return new this.ResponseModel('error', 'cannot decode parameters', null, this.ResponseContentSchema) | ||
} | ||
onRequestError (ch, msg, request) { | ||
ch.sendToQueue(msg.properties.replyTo, this._createRequestErrorReply(msg, request).serialize(), { correlationId: msg.properties.correlationId }) | ||
this._ack(ch, msg) | ||
} | ||
handleRequestError (ch, msg, request) { | ||
try { | ||
this.onRequestError(ch, msg, request) | ||
} catch (err) { | ||
this._logger.error('Error handling RPC request error', err) | ||
this._ack(ch, msg) | ||
} | ||
} | ||
_createResponseErrorReply (_msg, _error, _request) { | ||
return new this.ResponseModel('error', 'cannot answer', null, this.ResponseContentSchema) | ||
} | ||
onResponseError (ch, msg, error, request) { | ||
ch.sendToQueue(msg.properties.replyTo, this._createResponseErrorReply(msg, error, request).serialize(), { correlationId: msg.properties.correlationId }) | ||
this._ack(ch, msg) | ||
} | ||
handleResponseError (ch, msg, error, request) { | ||
try { | ||
this.onResponseError(ch, msg, error, request) | ||
} catch (err) { | ||
this._logger.error('Error handling RPC response error', err) | ||
this._ack(ch, msg) | ||
} | ||
} | ||
_createReply (_msg, answer) { | ||
return new this.ResponseModel('ok', answer, null, this.ResponseContentSchema) | ||
} | ||
/** | ||
@@ -103,3 +177,3 @@ * @param ch | ||
async _processMessage (ch, msg) { | ||
const request = QueueMessage.unserialize(msg.content) | ||
const request = this.RequestModel.unserialize(msg.content, this.RequestContentSchema) | ||
@@ -111,4 +185,3 @@ const response = new QueueResponse() | ||
ch.sendToQueue(msg.properties.replyTo, new QueueMessage('error', 'cannot decode parameters').serialize(), { correlationId: msg.properties.correlationId }) | ||
this._ack(ch, msg) | ||
this.handleRequestError(ch, msg, request) | ||
return | ||
@@ -121,5 +194,4 @@ } | ||
timedOut = true | ||
this._logger.error('timeout in RPCServer', this.name, request.data) | ||
ch.sendToQueue(msg.properties.replyTo, new QueueMessage('error', 'timeout').serialize(), { correlationId: msg.properties.correlationId }) | ||
this._ack(ch, msg) | ||
this._logger.error('RPCServer response timeout', this.name, request.data) | ||
this.handleResponseTimeout(ch, msg, request) | ||
}, timeoutMs) | ||
@@ -134,6 +206,6 @@ | ||
clearTimeout(timer) | ||
let reply | ||
let replyData | ||
const replyAttachments = response.getAttachments() | ||
try { | ||
reply = new QueueMessage('ok', answer) | ||
const reply = this._createReply(msg, answer) | ||
if (replyAttachments instanceof Map) { | ||
@@ -144,12 +216,10 @@ for (const [key, value] of replyAttachments) { | ||
} | ||
replyData = reply.serialize() | ||
} catch (err) { | ||
this._logger.error('CANNOT SEND RPC REPLY', this.name, err) | ||
ch.sendToQueue(msg.properties.replyTo, new QueueMessage('error', 'cannot encode anwser').serialize(), { correlationId: msg.properties.correlationId }) | ||
this._ack(ch, msg) | ||
this._logger.error('CANNOT CREATE RPC REPLY', this.name, err) | ||
this.handleResponseError(ch, msg, err, request) | ||
return | ||
} | ||
ch.sendToQueue(msg.properties.replyTo, reply.serialize(), { correlationId: msg.properties.correlationId }) | ||
ch.sendToQueue(msg.properties.replyTo, replyData, { correlationId: msg.properties.correlationId }) | ||
this._ack(ch, msg) | ||
@@ -162,12 +232,5 @@ } catch (err) { | ||
clearTimeout(timer) | ||
let message = `cannot answer to rpc call: ${err}` | ||
if (!(err instanceof RPCError)) { | ||
this._logger.error('RPC REPLY FAILED %s', this.name, err) | ||
} else { | ||
message = err.message | ||
} | ||
ch.sendToQueue(msg.properties.replyTo, new QueueMessage('error', message).serialize(), { correlationId: msg.properties.correlationId }) | ||
this._ack(ch, msg) | ||
this._logger.error('CANNOT SEND RPC REPLY', this.name, err) | ||
this.handleResponseError(ch, msg, err, request) | ||
} | ||
@@ -174,0 +237,0 @@ } |
@@ -15,6 +15,21 @@ const QueueMessage = require('./QueueMessage') | ||
const { maxRetry, timeoutMs } = options | ||
const { | ||
maxRetry, | ||
timeoutMs, | ||
MessageModel, | ||
ContentSchema, | ||
assertQueueOptions = null, | ||
assertExchange = true, | ||
assertExchangeOptions = null | ||
} = options || {} | ||
this._maxRetry = maxRetry | ||
this._timeoutMs = timeoutMs | ||
this.MessageModel = MessageModel || QueueMessage | ||
this.ContentSchema = ContentSchema || JSON | ||
this._assertQueueOptions = Object.assign({ exclusive: true }, assertQueueOptions || {}) | ||
this._assertExchange = assertExchange === true | ||
this._assertExchangeOptions = Object.assign({ durable: true }, assertExchangeOptions || {}) | ||
this._retryMap = new Map() | ||
@@ -55,4 +70,6 @@ | ||
const channel = await this._connection.getChannel() | ||
await channel.assertExchange(this.name, 'fanout', { durable: true }) | ||
const queue = await channel.assertQueue('', { exclusive: true }) | ||
if (this._assertExchange) { | ||
await channel.assertExchange(this.name, 'fanout', this._assertExchangeOptions) | ||
} | ||
const queue = await channel.assertQueue('', this._assertQueueOptions) | ||
@@ -97,12 +114,57 @@ await channel.bindQueue(queue.queue, this.name, '') | ||
_parseMessage (msg) { | ||
try { | ||
const request = this.MessageModel.unserialize(msg.content, this.ContentSchema) | ||
if (request.status !== 'ok') { | ||
this._logger.error('CANNOT GET QUEUE MESSAGE PARAMS', this.name, request) | ||
return null | ||
} | ||
return request | ||
} catch (err) { | ||
this._logger.error('CANNOT PROCESS QUEUE MESSAGE', this.name, msg.properties, err) | ||
return null | ||
} | ||
} | ||
/** | ||
* @param channel | ||
* @param msg | ||
* @return {Promise} | ||
* @param request | ||
* @returns {boolean} true if too many retries reached | ||
* @private | ||
*/ | ||
_processMessage (channel, msg) { | ||
const request = QueueMessage.unserialize(msg.content) | ||
if (request.status !== 'ok') { | ||
this._logger.error('CANNOT GET QUEUE MESSAGE PARAMS', this.name, request) | ||
_handleMessageRetry (msg, request) { | ||
if (!msg.fields || !msg.fields.redelivered || !msg.fields.consumerTag) { | ||
return false | ||
} | ||
const consumerTag = msg.fields.consumerTag | ||
let counter = 1 | ||
if (this._retryMap.has(consumerTag)) { | ||
counter = this._retryMap.get(consumerTag) + 1 | ||
this._retryMap.set(consumerTag, counter) | ||
} else { | ||
this._retryMap.set(consumerTag, counter) | ||
} | ||
if (counter > this._maxRetry) { | ||
this._logger.error('SUBSCRIBER TRIED TOO MANY TIMES', this.name, request, msg) | ||
this._retryMap.delete(consumerTag) | ||
return true | ||
} | ||
return false | ||
} | ||
/** | ||
* @param channel | ||
* @param msg | ||
* @return {Promise} | ||
* @protected | ||
*/ | ||
async _processMessage (channel, msg) { | ||
const request = this._parseMessage(msg) | ||
if (!request) { | ||
this._ack(channel, msg) | ||
@@ -112,19 +174,6 @@ return | ||
if (msg.fields && msg.fields.redelivered && msg.fields.consumerTag) { | ||
let counter = 1 | ||
if (this._retryMap.has(msg.fields.consumerTag)) { | ||
counter = this._retryMap.get(msg.fields.consumerTag) + 1 | ||
this._retryMap.set(msg.fields.consumerTag, counter) | ||
} else { | ||
this._retryMap.set(msg.fields.consumerTag, counter) | ||
} | ||
if (counter > this._maxRetry) { | ||
this._logger.error('SUBSCRIBER TRIED TOO MANY TIMES', this.name, request, msg) | ||
this._ack(channel, msg) | ||
if (msg.fields.consumerTag) { | ||
this._retryMap.delete(msg.fields.consumerTag) | ||
} | ||
return | ||
} | ||
const tooManyRetries = this._handleMessageRetry(msg, request) | ||
if (tooManyRetries) { | ||
this._ack(channel, msg) | ||
return | ||
} | ||
@@ -140,5 +189,5 @@ | ||
return Promise.resolve().then(() => { | ||
return this._callback(request.data, msg.properties, request, msg) | ||
}).then(() => { | ||
try { | ||
await this._callback(request.data, msg.properties, request, msg) | ||
if (!timedOut) { | ||
@@ -151,3 +200,3 @@ clearTimeout(timer) | ||
} | ||
}).catch((err) => { | ||
} catch (err) { | ||
if (!timedOut) { | ||
@@ -158,3 +207,3 @@ clearTimeout(timer) | ||
} | ||
}) | ||
} | ||
} | ||
@@ -161,0 +210,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Native code
Supply chain riskContains native code (e.g., compiled binaries or shared libraries). Including native code can obscure malicious behavior.
Found 1 instance in 1 package
1934
0
66774
3
12
+ Added@acuminous/bitsyntax@0.1.2(transitive)
+ Added@protobufjs/aspromise@1.1.2(transitive)
+ Added@protobufjs/base64@1.1.2(transitive)
+ Added@protobufjs/codegen@2.0.4(transitive)
+ Added@protobufjs/eventemitter@1.1.0(transitive)
+ Added@protobufjs/fetch@1.1.0(transitive)
+ Added@protobufjs/float@1.0.2(transitive)
+ Added@protobufjs/inquire@1.1.0(transitive)
+ Added@protobufjs/path@1.1.2(transitive)
+ Added@protobufjs/pool@1.1.0(transitive)
+ Added@protobufjs/utf8@1.1.0(transitive)
+ Added@types/node@22.9.0(transitive)
+ Addedamqplib@0.10.4(transitive)
+ Addeddebug@4.3.7(transitive)
+ Addedlong@5.2.3(transitive)
+ Addedms@2.1.3(transitive)
+ Addedprotobufjs@7.4.0(transitive)
+ Addedundici-types@6.19.8(transitive)
- Removedamqplib@0.9.1(transitive)
- Removedbitsyntax@0.1.0(transitive)
- Removedbluebird@3.7.2(transitive)
- Removeddebug@2.6.9(transitive)
- Removedms@2.0.0(transitive)
Updatedamqplib@^0.10.1