@techteamer/mq
Advanced tools
Comparing version 2.3.0 to 2.4.0
@@ -0,1 +1,4 @@ | ||
2.4.0 | ||
- RPC reply attachment | ||
2.3.0 | ||
@@ -2,0 +5,0 @@ - Collection Pool |
{ | ||
"name": "@techteamer/mq", | ||
"version": "2.3.0", | ||
"version": "2.4.0", | ||
"description": "A RabbitMQ wrapper for node", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -66,2 +66,6 @@ class QueueMessage { | ||
/** | ||
* @param {String} name | ||
* @param {Buffer} buffer | ||
*/ | ||
addAttachment (name, buffer) { | ||
@@ -71,2 +75,28 @@ this.attachments.set(name, buffer) | ||
/** | ||
* @param {String} name | ||
* @returns {Buffer} | ||
*/ | ||
getAttachment (name) { | ||
return this.attachments.get(name) | ||
} | ||
/** | ||
* @param {String} name | ||
* @returns {boolean} | ||
*/ | ||
hasAttachment (name) { | ||
return this.attachments.has(name) | ||
} | ||
/** | ||
* @returns {boolean} | ||
*/ | ||
hasAnyAttachments () { | ||
return this.attachments.size > 0 | ||
} | ||
/** | ||
* @returns {Map<String, Buffer>} | ||
*/ | ||
getAttachments () { | ||
@@ -73,0 +103,0 @@ return this.attachments |
@@ -35,3 +35,3 @@ const uuid = require('uuid/v4') | ||
*/ | ||
_registerMessage (resolve, reject, timeoutMs) { | ||
_registerMessage (resolve, reject, timeoutMs, resolveWithFullResponse) { | ||
let correlationId | ||
@@ -46,2 +46,3 @@ let timeoutId | ||
this._correlationIdMap.set(correlationId, { | ||
resolveWithFullResponse: resolveWithFullResponse, | ||
resolve: (result) => { | ||
@@ -79,5 +80,6 @@ if (!timedOut) { | ||
* @param {Map} attachments | ||
* @return {Promise} | ||
* @param {Boolean} [resolveWithFullResponse=false] | ||
* @return {Promise<QueueMessage|*>} | ||
* */ | ||
call (message, timeoutMs = null, attachments = null) { | ||
call (message, timeoutMs = null, attachments = null, resolveWithFullResponse = false) { | ||
let channel | ||
@@ -111,3 +113,3 @@ | ||
let correlationId = this._registerMessage(resolve, reject, timeoutMs) | ||
let correlationId = this._registerMessage(resolve, reject, timeoutMs, resolveWithFullResponse) | ||
@@ -189,3 +191,3 @@ channel.sendToQueue(this.name, param.serialize(), { | ||
const { resolve, reject } = this._correlationIdMap.get(reply.properties.correlationId) | ||
const { resolve, reject, resolveWithFullResponse } = this._correlationIdMap.get(reply.properties.correlationId) | ||
@@ -197,3 +199,7 @@ this._correlationIdMap.delete(reply.properties.correlationId) | ||
if (replyContent.status === 'ok') { | ||
resolve(replyContent.data) | ||
if (resolveWithFullResponse) { | ||
resolve(replyContent) | ||
} else { | ||
resolve(replyContent.data) | ||
} | ||
} else { | ||
@@ -200,0 +206,0 @@ this._logger.error('RPC CLIENT GOT ERROR', this.name, reply.properties.correlationId, replyContent) |
const QueueMessage = require('./QueueMessage') | ||
const QueueResponse = require('./QueueResponse') | ||
const RPCError = require('./RPCError') | ||
@@ -29,3 +30,10 @@ | ||
_callback (msg) { | ||
/** | ||
* @param {*} msg | ||
* @param {QueueMessage} request | ||
* @param {QueueResponse} response | ||
* @protected | ||
* @returns {Promise} | ||
*/ | ||
_callback (msg, request, response) { | ||
let { action, data } = msg || {} | ||
@@ -100,2 +108,4 @@ if (!this.actions.has(action)) { | ||
let response = new QueueResponse() | ||
if (request.status !== 'ok') { | ||
@@ -119,3 +129,3 @@ this._logger.error('CANNOT GET RPC CALL PARAMS', this.name, request) | ||
return Promise.resolve().then(() => { | ||
return this._callback(request.data, request) | ||
return this._callback(request.data, request, response) | ||
}).then((answer) => { | ||
@@ -128,4 +138,10 @@ if (timedOut) { | ||
let reply | ||
let replyAttachments = response.getAttachments() | ||
try { | ||
reply = new QueueMessage('ok', answer).serialize() | ||
reply = new QueueMessage('ok', answer) | ||
if (replyAttachments instanceof Map) { | ||
for (const [key, value] of replyAttachments) { | ||
reply.addAttachment(key, value) | ||
} | ||
} | ||
} catch (err) { | ||
@@ -140,3 +156,3 @@ this._logger.error('CANNOT SEND RPC REPLY', this.name, err) | ||
ch.sendToQueue(msg.properties.replyTo, reply, { correlationId: msg.properties.correlationId }) | ||
ch.sendToQueue(msg.properties.replyTo, reply.serialize(), { correlationId: msg.properties.correlationId }) | ||
this._ack(ch, msg) | ||
@@ -143,0 +159,0 @@ }).catch((err) => { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
38378
17
1189