Comparing version 1.0.0 to 1.1.0
@@ -12,2 +12,3 @@ /// <reference types="node" /> | ||
retryStrategy?: (times: number) => number; | ||
exchange?: string; | ||
} | ||
@@ -14,0 +15,0 @@ export interface AMQPConnection { |
@@ -6,2 +6,3 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.ConnectionStatus = void 0; | ||
const amqp_node_1 = __importDefault(require("./adapters/amqp-node")); | ||
@@ -8,0 +9,0 @@ const service_1 = __importDefault(require("./service")); |
@@ -6,2 +6,3 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.emptyMessageError = exports.amqpConnectError = exports.amqpConnectGracefullyStopped = exports.connectServicesError = exports.ConnectionNotInitialized = exports.EmptyMessageError = exports.AmqpConnectGracefullyStopped = exports.AmqpConnectError = exports.ConnectServicesError = void 0; | ||
const omit_1 = __importDefault(require("lodash/omit")); | ||
@@ -8,0 +9,0 @@ class ConnectServicesError extends Error { |
@@ -16,2 +16,4 @@ import { AMQPOptions } from './adapters/amqp-node'; | ||
correlationId?: string; | ||
routingKey?: string; | ||
isOriginalContent?: boolean; | ||
} | ||
@@ -18,0 +20,0 @@ declare type Listener = (eventName: string, listener: (...args: any[]) => void) => void; |
@@ -6,2 +6,3 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.createClient = void 0; | ||
const connection_1 = __importDefault(require("./connection")); | ||
@@ -16,3 +17,3 @@ /** | ||
send: async (clientSendMessageOptions) => connection.then(async () => { | ||
const { payload, action, requestId, recipients = [], correlationId } = clientSendMessageOptions; | ||
const { payload, action, requestId, recipients = [], correlationId, routingKey, isOriginalContent = false } = clientSendMessageOptions; | ||
const sendMessageOptions = { | ||
@@ -25,3 +26,5 @@ replyTo: serviceName, | ||
recipients: recipients.join(','), | ||
action | ||
action, | ||
routingKey, | ||
isOriginalContent | ||
} | ||
@@ -41,6 +44,6 @@ }; | ||
var create_error_1 = require("./create-error"); | ||
exports.createMessageError = create_error_1.default; | ||
Object.defineProperty(exports, "createMessageError", { enumerable: true, get: function () { return create_error_1.default; } }); | ||
var service_1 = require("./service"); | ||
exports.connectService = service_1.default; | ||
exports.ServiceConnection = service_1.ServiceConnection; | ||
Object.defineProperty(exports, "connectService", { enumerable: true, get: function () { return service_1.default; } }); | ||
Object.defineProperty(exports, "ServiceConnection", { enumerable: true, get: function () { return service_1.ServiceConnection; } }); | ||
//# sourceMappingURL=index.js.map |
@@ -7,2 +7,4 @@ /// <reference types="node" /> | ||
type?: string; | ||
routingKey?: string; | ||
isOriginalContent?: boolean; | ||
} | ||
@@ -9,0 +11,0 @@ /** |
@@ -12,2 +12,12 @@ /// <reference types="node" /> | ||
static validateMessage(message: Message): void | never; | ||
/** | ||
* The method of converting input content, in case of an error, returns an object with a string placed inside | ||
* @param {Buffer} content | ||
*/ | ||
static getContent(content: Buffer): {}; | ||
/** | ||
* Return exchange name, if @exchange is empty, returns the default value(ServiceConnection.topicExchange) | ||
* @param exchange | ||
*/ | ||
static getTopicExchange(exchange?: string): string; | ||
static topicExchange: string; | ||
@@ -14,0 +24,0 @@ name: string; |
@@ -6,2 +6,3 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.ServiceConnection = void 0; | ||
const uuid_1 = __importDefault(require("uuid")); | ||
@@ -17,378 +18,407 @@ const omit_1 = __importDefault(require("lodash/omit")); | ||
const DEFAULT_HEART_BEAT = 30; | ||
class ServiceConnection extends events_1.default { | ||
constructor(adapter, options, serviceName, log) { | ||
super(); | ||
this.status = connection_1.ConnectionStatus.CONNECTING; | ||
this.queuesConsumerTags = {}; | ||
this.handlers = {}; | ||
this.connection = null; | ||
this.options = options; | ||
this.name = serviceName; | ||
this.log = log; | ||
this.amqp = adapter; | ||
this.setConnectionStatus(connection_1.ConnectionStatus.CONNECTING); | ||
this.handlers = { | ||
defaultAction: async ({ message, ack }) => { | ||
ack(); | ||
const { fields } = message; // TODO check for {} | ||
log.error('[amqp-connection] No action for message', fields); | ||
let ServiceConnection = /** @class */ (() => { | ||
class ServiceConnection extends events_1.default { | ||
constructor(adapter, options, serviceName, log) { | ||
super(); | ||
this.status = connection_1.ConnectionStatus.CONNECTING; | ||
this.queuesConsumerTags = {}; | ||
this.handlers = {}; | ||
this.connection = null; | ||
this.options = options; | ||
this.name = serviceName; | ||
this.log = log; | ||
this.amqp = adapter; | ||
this.setConnectionStatus(connection_1.ConnectionStatus.CONNECTING); | ||
this.handlers = { | ||
defaultAction: async ({ message, ack }) => { | ||
ack(); | ||
const { fields } = message; // TODO check for {} | ||
log.error('[amqp-connection] No action for message', fields); | ||
} | ||
}; | ||
} | ||
/** | ||
* Validate AMQP message against service rules | ||
*/ | ||
static validateMessage(message) { | ||
const isEmptyMessage = () => message === null; | ||
if (isEmptyMessage()) { | ||
throw errors_1.emptyMessageError(); | ||
} | ||
}; | ||
} | ||
/** | ||
* Validate AMQP message against service rules | ||
*/ | ||
static validateMessage(message) { | ||
const isEmptyMessage = () => message === null; | ||
if (isEmptyMessage()) { | ||
throw errors_1.emptyMessageError(); | ||
} | ||
} | ||
hasHandlers() { | ||
return Object.keys(omit_1.default(this.handlers, ['defaultAction'])).length > 0; | ||
} | ||
/** | ||
* Method returns queue options according to connection options | ||
* | ||
* In cluster mode: | ||
* According to RabbitMQ Highly Available (Mirrored) Queues configuration | ||
* ha-mode property should be set to 'all' to force queue replication | ||
* | ||
* In standalone mode no additional properties are provided | ||
*/ | ||
getQueueOptions() { | ||
const options = { | ||
durable: true | ||
}; | ||
if (this.isClusterConnection()) { | ||
options.arguments = { | ||
'ha-mode': 'all' | ||
/** | ||
* The method of converting input content, in case of an error, returns an object with a string placed inside | ||
* @param {Buffer} content | ||
*/ | ||
static getContent(content) { | ||
let data = {}; | ||
try { | ||
data = JSON.parse(content.toString()); | ||
} | ||
catch (ignore) { | ||
data = { data: content.toString() }; | ||
} | ||
return data; | ||
} | ||
/** | ||
* Return exchange name, if @exchange is empty, returns the default value(ServiceConnection.topicExchange) | ||
* @param exchange | ||
*/ | ||
static getTopicExchange(exchange) { | ||
if (exchange === undefined) { | ||
return ServiceConnection.topicExchange; | ||
} | ||
return exchange; | ||
} | ||
hasHandlers() { | ||
return Object.keys(omit_1.default(this.handlers, ['defaultAction'])).length > 0; | ||
} | ||
/** | ||
* Method returns queue options according to connection options | ||
* | ||
* In cluster mode: | ||
* According to RabbitMQ Highly Available (Mirrored) Queues configuration | ||
* ha-mode property should be set to 'all' to force queue replication | ||
* | ||
* In standalone mode no additional properties are provided | ||
*/ | ||
getQueueOptions() { | ||
const options = { | ||
durable: true | ||
}; | ||
if (this.isClusterConnection()) { | ||
options.arguments = { | ||
'ha-mode': 'all' | ||
}; | ||
} | ||
return options; | ||
} | ||
return options; | ||
} | ||
/** | ||
* Detects if connection configured as 'cluster' of rabbitMQ's or | ||
* as standalone. | ||
* | ||
* In standalone mode - only 'host' option required. | ||
* In cluster mode - an array of nodes should be provided via 'cluster' | ||
* property | ||
*/ | ||
isClusterConnection() { | ||
const { cluster = [] } = this.options; | ||
return !!cluster.length; | ||
} | ||
/** | ||
* Connect to AMQP server with service options | ||
*/ | ||
async connect() { | ||
this.setConnectionStatus(connection_1.ConnectionStatus.CONNECTING); | ||
this.connection = this.getConnection(); | ||
await this.assertTopicExchange(); | ||
await this.assertServiceQueue(); | ||
} | ||
/** | ||
* Assert service queue with name equal to service name | ||
*/ | ||
async assertServiceQueue() { | ||
if (!this.connection) { | ||
throw new errors_1.ConnectionNotInitialized(); | ||
/** | ||
* Detects if connection configured as 'cluster' of rabbitMQ's or | ||
* as standalone. | ||
* | ||
* In standalone mode - only 'host' option required. | ||
* In cluster mode - an array of nodes should be provided via 'cluster' | ||
* property | ||
*/ | ||
isClusterConnection() { | ||
const { cluster = [] } = this.options; | ||
return !!cluster.length; | ||
} | ||
const connection = await this.connection; | ||
await connection.assertQueue(this.name, this.getQueueOptions()); | ||
} | ||
/** | ||
* Assert topic exchange. Service posts messages to topic exchange when no recipients provided | ||
*/ | ||
async assertTopicExchange() { | ||
if (!this.connection) { | ||
throw new Error(); | ||
/** | ||
* Connect to AMQP server with service options | ||
*/ | ||
async connect() { | ||
this.setConnectionStatus(connection_1.ConnectionStatus.CONNECTING); | ||
this.connection = this.getConnection(); | ||
await this.assertTopicExchange(); | ||
await this.assertServiceQueue(); | ||
} | ||
const connection = await this.connection; | ||
await connection.assertExchange(ServiceConnection.topicExchange, 'topic', { durable: true }); | ||
} | ||
/** | ||
* Tries to connect with AMQP with provided retry strategy or uses | ||
* default one. Default retry strategy implements exponential backoff algorithm. | ||
*/ | ||
async getConnection(attempt = 1) { | ||
const { retryStrategy = infinite_1.default, maxReconnects = Infinity } = this.options; | ||
let connection; | ||
if (this.status === connection_1.ConnectionStatus.DISCONNECTING) { | ||
throw errors_1.amqpConnectGracefullyStopped(); | ||
/** | ||
* Assert service queue with name equal to service name | ||
*/ | ||
async assertServiceQueue() { | ||
if (!this.connection) { | ||
throw new errors_1.ConnectionNotInitialized(); | ||
} | ||
const connection = await this.connection; | ||
await connection.assertQueue(this.name, this.getQueueOptions()); | ||
} | ||
try { | ||
const connectionString = this.getConnectionString(); | ||
connection = await this.amqp.connect(connectionString, this.options, (eventName, message) => { | ||
this.connectionEventHandler(eventName, message); | ||
}); | ||
this.setConnectionStatus(connection_1.ConnectionStatus.CONNECTED); | ||
/** | ||
* Assert topic exchange. Service posts messages to topic exchange when no recipients provided | ||
*/ | ||
async assertTopicExchange() { | ||
if (!this.connection) { | ||
throw new Error(); | ||
} | ||
const connection = await this.connection; | ||
await connection.assertExchange(ServiceConnection.getTopicExchange(this.options.exchange), 'topic', { durable: true }); | ||
} | ||
catch (error) { | ||
await timeout_1.default(retryStrategy(attempt)); | ||
this.log.error(`[amqp-connection] ${error.message}`, error); | ||
this.log.info(`[amqp-connection] Retry connection to RabbitMQ. Attemt ${attempt}/${maxReconnects}`); | ||
if (attempt > maxReconnects) { | ||
throw errors_1.amqpConnectError(this.options, 'Maximum attemts exceeded.'); | ||
/** | ||
* Tries to connect with AMQP with provided retry strategy or uses | ||
* default one. Default retry strategy implements exponential backoff algorithm. | ||
*/ | ||
async getConnection(attempt = 1) { | ||
const { retryStrategy = infinite_1.default, maxReconnects = Infinity } = this.options; | ||
let connection; | ||
if (this.status === connection_1.ConnectionStatus.DISCONNECTING) { | ||
throw errors_1.amqpConnectGracefullyStopped(); | ||
} | ||
return this.getConnection(attempt + 1); | ||
try { | ||
const connectionString = this.getConnectionString(); | ||
connection = await this.amqp.connect(connectionString, this.options, (eventName, message) => { | ||
this.connectionEventHandler(eventName, message); | ||
}); | ||
this.setConnectionStatus(connection_1.ConnectionStatus.CONNECTED); | ||
} | ||
catch (error) { | ||
await timeout_1.default(retryStrategy(attempt)); | ||
this.log.error(`[amqp-connection] ${error.message}`, error); | ||
this.log.info(`[amqp-connection] Retry connection to RabbitMQ. Attemt ${attempt}/${maxReconnects}`); | ||
if (attempt > maxReconnects) { | ||
throw errors_1.amqpConnectError(this.options, 'Maximum attemts exceeded.'); | ||
} | ||
return this.getConnection(attempt + 1); | ||
} | ||
return connection; | ||
} | ||
return connection; | ||
} | ||
/** | ||
* Handle connection and channel events | ||
*/ | ||
connectionEventHandler(eventName, eventMessage) { | ||
switch (eventName) { | ||
case 'close': | ||
this.handleConnectionClose(eventMessage).catch(this.log.error); | ||
break; | ||
default: | ||
/** | ||
* Handle connection and channel events | ||
*/ | ||
connectionEventHandler(eventName, eventMessage) { | ||
switch (eventName) { | ||
case 'close': | ||
this.handleConnectionClose(eventMessage).catch(this.log.error); | ||
break; | ||
default: | ||
} | ||
} | ||
} | ||
/** | ||
* Handle connection errors | ||
*/ | ||
async handleConnectionClose(eventMessage) { | ||
this.log.error('[amqp-connection] Connection closed.', eventMessage, omit_1.default(this.options, ['password'])); | ||
this.emit(connection_1.ConnectionStatus.DISCONNECTED); | ||
await this.unsubscribe(); | ||
if (this.status !== connection_1.ConnectionStatus.DISCONNECTING) { | ||
await this.connect(); | ||
if (this.hasHandlers()) { | ||
await this.initQueue(this.name); | ||
/** | ||
* Handle connection errors | ||
*/ | ||
async handleConnectionClose(eventMessage) { | ||
this.log.error('[amqp-connection] Connection closed.', eventMessage, omit_1.default(this.options, ['password'])); | ||
this.emit(connection_1.ConnectionStatus.DISCONNECTED); | ||
await this.unsubscribe(); | ||
if (this.status !== connection_1.ConnectionStatus.DISCONNECTING) { | ||
await this.connect(); | ||
if (this.hasHandlers()) { | ||
await this.initQueue(this.name); | ||
} | ||
} | ||
} | ||
} | ||
/** | ||
* Set current connection status | ||
*/ | ||
setConnectionStatus(status) { | ||
this.log.info(`[amqp-connection] --> ${status}`); | ||
this.status = status; | ||
this.emitCurrentStatus(); | ||
} | ||
/** | ||
* Emits current connection status; | ||
*/ | ||
emitCurrentStatus() { | ||
this.emit(this.status); | ||
} | ||
/** | ||
* Gets connection string from options. Throws an error if configuration is not valid. | ||
*/ | ||
getConnectionString() { | ||
const connectionString = this.isClusterConnection() | ||
? this.getConnectionStringFromCluster() | ||
: this.getConnectionStringStandalone(); | ||
if (!connectionString) { | ||
throw errors_1.amqpConnectError('Wrong configuration. Either cluster or standalone mode should be enabled', this.options); | ||
/** | ||
* Set current connection status | ||
*/ | ||
setConnectionStatus(status) { | ||
this.log.info(`[amqp-connection] --> ${status}`); | ||
this.status = status; | ||
this.emitCurrentStatus(); | ||
} | ||
return connectionString; | ||
} | ||
/** | ||
* Extract connection string from options using 'host' parameter | ||
*/ | ||
getConnectionStringStandalone() { | ||
const { username, password, host = '', vhost = '', heartbeat = DEFAULT_HEART_BEAT } = this.options; | ||
const connectionString = `amqp://${username}:${password}@${host}/${vhost}?heartbeat=${heartbeat}`; | ||
this.log.info('[amqp-connection] Configured for standalone'); | ||
return connectionString; | ||
} | ||
/** | ||
* Pick random connection string from 'cluster' property | ||
*/ | ||
getConnectionStringFromCluster() { | ||
const { username, password, cluster = [], vhost = '', heartbeat = DEFAULT_HEART_BEAT } = this.options; | ||
const connectionStrings = cluster.map(host => `amqp://${username}:${password}@${host}/${vhost}?heartbeat=${heartbeat}`); | ||
this.log.info('[amqp-connection] Configured for cluster'); | ||
return random_pick_1.default(connectionStrings); | ||
} | ||
/** | ||
* Post message to recipients. If recipients array not empty, posts message directly to | ||
* recipient's queues. Otherwise message will be sent to topic exchange. | ||
* | ||
* @example | ||
* | ||
* service.postMessage(['news'], 'message', { persistent: true }) | ||
* service.postMessage([], 'message', { replyTo: 'news' }) | ||
*/ | ||
async postMessage(recipients = [], message, options = {}) { | ||
if (!this.connection) { | ||
throw new errors_1.ConnectionNotInitialized(); | ||
/** | ||
* Emits current connection status; | ||
*/ | ||
emitCurrentStatus() { | ||
this.emit(this.status); | ||
} | ||
this.log.info(`[amqp-client] send message to ${recipients.toString()}\n`, options); | ||
const defaultMessageOptions = { | ||
messageId: uuid_1.default.v4(), | ||
timestamp: Date.now(), | ||
persistent: true, | ||
replyTo: this.name | ||
}; | ||
const { headers: { action = 'default' } = {} } = options; | ||
const computedOptions = defaultsDeep_1.default({}, options, defaultMessageOptions); | ||
const connection = await this.connection; | ||
const content = Buffer.from(JSON.stringify(message)); | ||
if (recipients.length) { | ||
return Promise.all(recipients.map(async (recipient) => connection.sendToQueue(recipient, content, computedOptions))); | ||
/** | ||
* Gets connection string from options. Throws an error if configuration is not valid. | ||
*/ | ||
getConnectionString() { | ||
const connectionString = this.isClusterConnection() | ||
? this.getConnectionStringFromCluster() | ||
: this.getConnectionStringStandalone(); | ||
if (!connectionString) { | ||
throw errors_1.amqpConnectError('Wrong configuration. Either cluster or standalone mode should be enabled', this.options); | ||
} | ||
return connectionString; | ||
} | ||
return connection.publish('dispatcher', `${this.name}.${action}`, content, computedOptions); | ||
} | ||
/** | ||
* Handles messages from queue. | ||
* Runs one of registered message handlers with message. If no suitable handlers | ||
* present runs with 'defaultAction' handler | ||
*/ | ||
async messageHandler(message) { | ||
if (!this.connection) { | ||
throw new errors_1.ConnectionNotInitialized(); | ||
/** | ||
* Extract connection string from options using 'host' parameter | ||
*/ | ||
getConnectionStringStandalone() { | ||
const { username, password, host = '', vhost = '', heartbeat = DEFAULT_HEART_BEAT } = this.options; | ||
const connectionString = `amqp://${username}:${password}@${host}/${vhost}?heartbeat=${heartbeat}`; | ||
this.log.info('[amqp-connection] Configured for standalone'); | ||
return connectionString; | ||
} | ||
try { | ||
ServiceConnection.validateMessage(message); | ||
const { properties: { headers: { action: messageAction = 'defaultAction' } } } = message; | ||
const handler = this.getActionHandler(messageAction); | ||
/** | ||
* Pick random connection string from 'cluster' property | ||
*/ | ||
getConnectionStringFromCluster() { | ||
const { username, password, cluster = [], vhost = '', heartbeat = DEFAULT_HEART_BEAT } = this.options; | ||
const connectionStrings = cluster.map(host => `amqp://${username}:${password}@${host}/${vhost}?heartbeat=${heartbeat}`); | ||
this.log.info('[amqp-connection] Configured for cluster'); | ||
return random_pick_1.default(connectionStrings); | ||
} | ||
/** | ||
* Post message to recipients. If recipients array not empty, posts message directly to | ||
* recipient's queues. Otherwise message will be sent to topic exchange. | ||
* | ||
* @example | ||
* | ||
* service.postMessage(['news'], 'message', { persistent: true }) | ||
* service.postMessage([], 'message', { replyTo: 'news' }) | ||
*/ | ||
async postMessage(recipients = [], message, options = {}) { | ||
if (!this.connection) { | ||
throw new errors_1.ConnectionNotInitialized(); | ||
} | ||
this.log.info(`[amqp-client] send message to ${recipients.toString()}\n`, options); | ||
const defaultMessageOptions = { | ||
messageId: uuid_1.default.v4(), | ||
timestamp: Date.now(), | ||
persistent: true, | ||
replyTo: this.name | ||
}; | ||
const { headers: { action = 'default' } = {} } = options; | ||
const { headers: { isOriginalContent = false } = {} } = options; | ||
const { headers: { routingKey = `${this.name}.${action}` } = {} } = options; | ||
const computedOptions = defaultsDeep_1.default({}, options, defaultMessageOptions); | ||
const connection = await this.connection; | ||
await handler({ | ||
message: { | ||
fields: message.fields, | ||
properties: message.properties, | ||
content: JSON.parse(message.content.toString()) | ||
}, | ||
ack: () => { | ||
connection.ack(message); | ||
}, | ||
nack: () => { | ||
connection.nack(message); | ||
} | ||
}); | ||
const content = isOriginalContent ? message : Buffer.from(JSON.stringify(message)); | ||
if (recipients.length) { | ||
return Promise.all(recipients.map(async (recipient) => connection.sendToQueue(recipient, content, computedOptions))); | ||
} | ||
return connection.publish(ServiceConnection.getTopicExchange(this.options.exchange), routingKey, content, computedOptions); | ||
} | ||
catch (error) { | ||
if (error instanceof errors_1.EmptyMessageError) { | ||
this.unsubscribe().catch(this.log.error); | ||
/** | ||
* Handles messages from queue. | ||
* Runs one of registered message handlers with message. If no suitable handlers | ||
* present runs with 'defaultAction' handler | ||
*/ | ||
async messageHandler(message) { | ||
if (!this.connection) { | ||
throw new errors_1.ConnectionNotInitialized(); | ||
} | ||
this.log.error(error); | ||
try { | ||
ServiceConnection.validateMessage(message); | ||
const { properties: { headers: { action: messageAction = 'defaultAction' } } } = message; | ||
const handler = this.getActionHandler(messageAction); | ||
const connection = await this.connection; | ||
await handler({ | ||
message: { | ||
fields: message.fields, | ||
properties: message.properties, | ||
content: ServiceConnection.getContent(message.content) | ||
}, | ||
ack: () => { | ||
connection.ack(message); | ||
}, | ||
nack: () => { | ||
connection.nack(message); | ||
} | ||
}); | ||
} | ||
catch (error) { | ||
if (error instanceof errors_1.EmptyMessageError) { | ||
this.unsubscribe().catch(this.log.error); | ||
} | ||
this.log.error(error); | ||
} | ||
} | ||
} | ||
/** | ||
* Register action handler in service. | ||
*/ | ||
setActionHandler(handlerName, handler) { | ||
this.handlers[handlerName] = handler; | ||
} | ||
/** | ||
* Unregister action handler from service | ||
*/ | ||
getActionHandler(handlerName) { | ||
return this.handlers[handlerName] || this.handlers.defaultAction; | ||
} | ||
/** | ||
* Subscribe to messages from service queue | ||
* @example | ||
* | ||
* service.subscribe(({ message, ack, nack }) => { | ||
* // do something with message | ||
* ack(); | ||
* }); | ||
*/ | ||
async subscribe(onConsume) { | ||
this.setActionHandler('defaultAction', onConsume); | ||
await this.initQueue(this.name); | ||
} | ||
/** | ||
* Subscribe to messages from service queue | ||
* with specific actionType. | ||
* | ||
* @example | ||
* | ||
* service.subscribeOn('someAction', ({ message, ack, nack }) => { | ||
* // do something with message | ||
* ack(); | ||
* }); | ||
*/ | ||
async subscribeOn(actionType, onConsume) { | ||
if (!this.connection) { | ||
throw new errors_1.ConnectionNotInitialized(); | ||
/** | ||
* Register action handler in service. | ||
*/ | ||
setActionHandler(handlerName, handler) { | ||
this.handlers[handlerName] = handler; | ||
} | ||
this.setActionHandler(actionType, onConsume); | ||
const connection = await this.connection; | ||
await this.initQueue(this.name); | ||
await connection.bindQueue(this.name, 'dispatcher', `*.${actionType}`); | ||
} | ||
/** | ||
* Consumes queue if it's not already consumed | ||
* | ||
* Note: One ServiceConnection instance = one consumer. This should be not | ||
* changed in future due to vast amount of bugs occured when using | ||
* multiple consumers per queue in this class. | ||
* | ||
* These bugs are: circular message redelivery, because of | ||
* nature of 'nack' function. It tries to place message closer to | ||
* the head. See more info on this: http://www.rabbitmq.com/nack.html | ||
*/ | ||
async initQueue(queue) { | ||
if (!this.queuesConsumerTags[queue]) { | ||
await this.consumeQueue(queue, (message) => { | ||
this.messageHandler(message).catch(this.log.error); | ||
}); | ||
this.log.info(`[amqp-connection] subscribed for queue "${queue}"`); | ||
/** | ||
* Unregister action handler from service | ||
*/ | ||
getActionHandler(handlerName) { | ||
return this.handlers[handlerName] || this.handlers.defaultAction; | ||
} | ||
} | ||
/** | ||
* Safely unsubscribe from queue | ||
*/ | ||
async unsubscribe() { | ||
if (!this.connection) { | ||
throw new errors_1.ConnectionNotInitialized(); | ||
/** | ||
* Subscribe to messages from service queue | ||
* @example | ||
* | ||
* service.subscribe(({ message, ack, nack }) => { | ||
* // do something with message | ||
* ack(); | ||
* }); | ||
*/ | ||
async subscribe(onConsume) { | ||
this.setActionHandler('defaultAction', onConsume); | ||
await this.initQueue(this.name); | ||
} | ||
this.log.warn(`[amqp-client] unsubscribing from input queue of "${this.name}"`); | ||
let result; | ||
try { | ||
/** | ||
* Subscribe to messages from service queue | ||
* with specific actionType. | ||
* | ||
* @example | ||
* | ||
* service.subscribeOn('someAction', ({ message, ack, nack }) => { | ||
* // do something with message | ||
* ack(); | ||
* }); | ||
*/ | ||
async subscribeOn(actionType, onConsume) { | ||
if (!this.connection) { | ||
throw new errors_1.ConnectionNotInitialized(); | ||
} | ||
this.setActionHandler(actionType, onConsume); | ||
const connection = await this.connection; | ||
result = await connection.cancel(this.queuesConsumerTags[this.name]); | ||
this.log.info(`[amqp-connection] unsubscribed from queue "${this.name}"\n`); | ||
await this.initQueue(this.name); | ||
await connection.bindQueue(this.name, ServiceConnection.getTopicExchange(this.options.exchange), `*.${actionType}`); | ||
} | ||
catch (error) { | ||
this.log.error('[amqp-connection] cannot unsubscribe. \n', error.message); | ||
/** | ||
* Consumes queue if it's not already consumed | ||
* | ||
* Note: One ServiceConnection instance = one consumer. This should be not | ||
* changed in future due to vast amount of bugs occured when using | ||
* multiple consumers per queue in this class. | ||
* | ||
* These bugs are: circular message redelivery, because of | ||
* nature of 'nack' function. It tries to place message closer to | ||
* the head. See more info on this: http://www.rabbitmq.com/nack.html | ||
*/ | ||
async initQueue(queue) { | ||
if (!this.queuesConsumerTags[queue]) { | ||
await this.consumeQueue(queue, (message) => { | ||
this.messageHandler(message).catch(this.log.error); | ||
}); | ||
this.log.info(`[amqp-connection] subscribed for queue "${queue}"`); | ||
} | ||
} | ||
finally { | ||
delete this.queuesConsumerTags[this.name]; | ||
/** | ||
* Safely unsubscribe from queue | ||
*/ | ||
async unsubscribe() { | ||
if (!this.connection) { | ||
throw new errors_1.ConnectionNotInitialized(); | ||
} | ||
this.log.warn(`[amqp-client] unsubscribing from input queue of "${this.name}"`); | ||
let result; | ||
try { | ||
const connection = await this.connection; | ||
result = await connection.cancel(this.queuesConsumerTags[this.name]); | ||
this.log.info(`[amqp-connection] unsubscribed from queue "${this.name}"\n`); | ||
} | ||
catch (error) { | ||
this.log.error('[amqp-connection] cannot unsubscribe. \n', error.message); | ||
} | ||
finally { | ||
delete this.queuesConsumerTags[this.name]; | ||
} | ||
return result; | ||
} | ||
return result; | ||
} | ||
async close() { | ||
if (!this.connection) { | ||
throw new errors_1.ConnectionNotInitialized(); | ||
async close() { | ||
if (!this.connection) { | ||
throw new errors_1.ConnectionNotInitialized(); | ||
} | ||
this.setConnectionStatus(connection_1.ConnectionStatus.DISCONNECTING); | ||
try { | ||
const connection = await this.connection; | ||
await connection.close(); | ||
} | ||
catch (error) { | ||
if (error instanceof errors_1.AmqpConnectGracefullyStopped) { | ||
this.log.info('[amqp-connection] Connection retry process gracefully stopped'); | ||
return; | ||
} | ||
this.log.error(`[amqp-connection] Cannot close connection. ${error.message}\n`); | ||
} | ||
finally { | ||
this.setConnectionStatus(connection_1.ConnectionStatus.DISCONNECTED); | ||
} | ||
} | ||
this.setConnectionStatus(connection_1.ConnectionStatus.DISCONNECTING); | ||
try { | ||
/** | ||
* Consume queue and bind handler to incoming messages | ||
*/ | ||
async consumeQueue(queue, handler) { | ||
if (!this.connection) { | ||
throw new errors_1.ConnectionNotInitialized(); | ||
} | ||
const connection = await this.connection; | ||
await connection.close(); | ||
await connection.prefetch(1); | ||
const { consumerTag } = await connection.consume(queue, handler); | ||
this.queuesConsumerTags[queue] = consumerTag; | ||
return true; | ||
} | ||
catch (error) { | ||
if (error instanceof errors_1.AmqpConnectGracefullyStopped) { | ||
this.log.info('[amqp-connection] Connection retry process gracefully stopped'); | ||
return; | ||
} | ||
this.log.error(`[amqp-connection] Cannot close connection. ${error.message}\n`); | ||
} | ||
finally { | ||
this.setConnectionStatus(connection_1.ConnectionStatus.DISCONNECTED); | ||
} | ||
} | ||
/** | ||
* Consume queue and bind handler to incoming messages | ||
*/ | ||
async consumeQueue(queue, handler) { | ||
if (!this.connection) { | ||
throw new errors_1.ConnectionNotInitialized(); | ||
} | ||
const connection = await this.connection; | ||
await connection.prefetch(1); | ||
const { consumerTag } = await connection.consume(queue, handler); | ||
this.queuesConsumerTags[queue] = consumerTag; | ||
return true; | ||
} | ||
} | ||
ServiceConnection.topicExchange = 'dispatcher'; | ||
return ServiceConnection; | ||
})(); | ||
exports.ServiceConnection = ServiceConnection; | ||
ServiceConnection.topicExchange = 'dispatcher'; | ||
/** | ||
@@ -395,0 +425,0 @@ * Function with side-effects. Creates service, calls it's "connect" method |
{ | ||
"name": "mbclient", | ||
"version": "1.0.0", | ||
"version": "1.1.0", | ||
"main": "dist/index.js", | ||
@@ -42,18 +42,18 @@ "types": "dist/index.d.ts", | ||
"devDependencies": { | ||
"@semantic-release/changelog": "^5.0.1", | ||
"@semantic-release/git": "^9.0.0", | ||
"@types/amqplib": "^0.5.13", | ||
"@types/jest": "^24.0.22", | ||
"@types/jest": "^25.2.3", | ||
"@types/lodash": "^4.14.146", | ||
"@types/node": "^8.10.59", | ||
"@types/uuid": "^3.4.6", | ||
"coveralls": "^3.0.7", | ||
"coveralls": "^3.1.0", | ||
"cz-conventional-changelog": "^3.0.2", | ||
"semantic-release": "^15.13.30", | ||
"@semantic-release/changelog": "^3.0.5", | ||
"@semantic-release/git": "^7.0.18", | ||
"jest": "^24.9.0", | ||
"ts-jest": "^24.1.0", | ||
"jest": "^25.5.4", | ||
"semantic-release": "^17.0.8", | ||
"ts-jest": "^25.5.1", | ||
"ts-node": "^8.4.1", | ||
"tslint": "^5.20.1", | ||
"tslint-config-unional": "^0.10.0", | ||
"typescript": "^3.7.2" | ||
"typescript": "^3.9.3" | ||
}, | ||
@@ -60,0 +60,0 @@ "config": { |
# Message Broker Client/Server | ||
The MB client creates an abstraction over the interservice interaction on top of RabbitMQ. The library defines a common interface for messages and provides ways to send and subscribe to them. The client supports automatic reconnections to RabbitMQ and support for the Rabbit cluster. | ||
The MB client creates an abstraction over the inter-service interaction on top of RabbitMQ. The library defines a common interface for messages and provides ways to send and subscribe to them. The client supports automatic re-connections to RabbitMQ and support for the Rabbit cluster. | ||
The mechanism is quite simple and currently supports 2 simple operation modes (sending directly to the queue, sending to topic exchange) | ||
When a client is created, a durable topic exchange ("dispatcher" by default) is automatically created, and a service queue (with the name that was passed as serviceName during initialization). | ||
When a client created, a durable topic exchange ("dispatcher" by default) is automatically created, and a service queue (with the name that was passed as serviceName during initialization). | ||
When sending a message indicating the recipients, the message is sent to their queue directly. Otherwise, the message is sent via routingKey "{serviceName}. {Action}" to the dispatcher exchange. | ||
When sending a message indicating the recipients, the message sent to their queue directly. Otherwise, the message sent via routingKey "{serviceName}. {Action}" to the dispatcher exchange. | ||
@@ -28,3 +28,3 @@ # Examples | ||
// слушаем сообщения | ||
// listening to messages | ||
client.consumeByAction('logAction', ({ message, ack, nack }) => { | ||
@@ -31,0 +31,0 @@ // do something |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
74437
40
1060