amqp-connection-manager
Advanced tools
Comparing version 3.7.0 to 3.8.0
@@ -50,3 +50,3 @@ "use strict"; | ||
constructor(connectionManager, options = {}) { | ||
var _a; | ||
var _a, _b; | ||
super(); | ||
@@ -64,2 +64,6 @@ /** If we're in the process of creating a channel, this is a Promise which | ||
/** | ||
* True to create a ConfirmChannel. False to create a regular Channel. | ||
*/ | ||
this._confirm = true; | ||
/** | ||
* True if the "worker" is busy sending messages. False if we need to | ||
@@ -82,5 +86,6 @@ * start the worker to get stuff done. | ||
this._connectionManager = connectionManager; | ||
this._confirm = (_a = options.confirm) !== null && _a !== void 0 ? _a : true; | ||
this.name = options.name; | ||
this._publishTimeout = options.publishTimeout; | ||
this._json = (_a = options.json) !== null && _a !== void 0 ? _a : false; | ||
this._json = (_b = options.json) !== null && _b !== void 0 ? _b : false; | ||
// Array of setup functions to call. | ||
@@ -192,3 +197,3 @@ this._setups = []; | ||
routingKey, | ||
content, | ||
content: this._getEncodedMessage(content), | ||
resolve, | ||
@@ -212,2 +217,3 @@ reject, | ||
sendToQueue(queue, content, options, done) { | ||
const encodedContent = this._getEncodedMessage(content); | ||
return promise_breaker_1.default.addCallback(done, new Promise((resolve, reject) => { | ||
@@ -218,3 +224,3 @@ const { timeout, ...opts } = options || {}; | ||
queue, | ||
content, | ||
content: encodedContent, | ||
resolve, | ||
@@ -251,3 +257,9 @@ reject, | ||
try { | ||
const channel = await connection.createConfirmChannel(); | ||
let channel; | ||
if (this._confirm) { | ||
channel = await connection.createConfirmChannel(); | ||
} | ||
else { | ||
channel = await connection.createChannel(); | ||
} | ||
this._channel = channel; | ||
@@ -417,39 +429,56 @@ this._channelHasRoom = true; | ||
} | ||
this._unconfirmedMessages.push(message); | ||
const encodedMessage = this._getEncodedMessage(message.content); | ||
let thisCanSend = true; | ||
switch (message.type) { | ||
case 'publish': { | ||
let thisCanSend = true; | ||
thisCanSend = this._channelHasRoom = channel.publish(message.exchange, message.routingKey, encodedMessage, message.options, (err) => { | ||
if (message.isTimedout) { | ||
return; | ||
} | ||
if (this._confirm) { | ||
this._unconfirmedMessages.push(message); | ||
thisCanSend = this._channelHasRoom = channel.publish(message.exchange, message.routingKey, message.content, message.options, (err) => { | ||
if (message.isTimedout) { | ||
return; | ||
} | ||
if (message.timeout) { | ||
clearTimeout(message.timeout); | ||
} | ||
if (err) { | ||
this._messageRejected(message, err); | ||
} | ||
else { | ||
this._messageResolved(message, thisCanSend); | ||
} | ||
}); | ||
} | ||
else { | ||
if (message.timeout) { | ||
clearTimeout(message.timeout); | ||
} | ||
if (err) { | ||
this._messageRejected(message, err); | ||
} | ||
else { | ||
this._messageResolved(message, thisCanSend); | ||
} | ||
}); | ||
thisCanSend = this._channelHasRoom = channel.publish(message.exchange, message.routingKey, message.content, message.options); | ||
message.resolve(thisCanSend); | ||
} | ||
break; | ||
} | ||
case 'sendToQueue': { | ||
let thisCanSend = true; | ||
thisCanSend = this._channelHasRoom = channel.sendToQueue(message.queue, encodedMessage, message.options, (err) => { | ||
if (message.isTimedout) { | ||
return; | ||
} | ||
if (this._confirm) { | ||
this._unconfirmedMessages.push(message); | ||
thisCanSend = this._channelHasRoom = channel.sendToQueue(message.queue, message.content, message.options, (err) => { | ||
if (message.isTimedout) { | ||
return; | ||
} | ||
if (message.timeout) { | ||
clearTimeout(message.timeout); | ||
} | ||
if (err) { | ||
this._messageRejected(message, err); | ||
} | ||
else { | ||
this._messageResolved(message, thisCanSend); | ||
} | ||
}); | ||
} | ||
else { | ||
if (message.timeout) { | ||
clearTimeout(message.timeout); | ||
} | ||
if (err) { | ||
this._messageRejected(message, err); | ||
} | ||
else { | ||
this._messageResolved(message, thisCanSend); | ||
} | ||
}); | ||
thisCanSend = this._channelHasRoom = channel.sendToQueue(message.queue, message.content, message.options); | ||
message.resolve(thisCanSend); | ||
} | ||
break; | ||
@@ -456,0 +485,0 @@ } |
/// <reference types="node" /> | ||
import * as amqplib from 'amqplib'; | ||
import { ConfirmChannel, Options } from 'amqplib'; | ||
import type * as amqplib from 'amqplib'; | ||
import { Options } from 'amqplib'; | ||
import { EventEmitter } from 'events'; | ||
import pb from 'promise-breaker'; | ||
import { IAmqpConnectionManager } from './AmqpConnectionManager.js'; | ||
export declare type SetupFunc = ((channel: ConfirmChannel, callback: (error?: Error) => void) => void) | ((channel: ConfirmChannel) => Promise<void>); | ||
export declare type Channel = amqplib.ConfirmChannel | amqplib.Channel; | ||
export declare type SetupFunc = ((channel: Channel, callback: (error?: Error) => void) => void) | ((channel: Channel) => Promise<void>) | ((channel: amqplib.ConfirmChannel, callback: (error?: Error) => void) => void) | ((channel: amqplib.ConfirmChannel) => Promise<void>); | ||
export interface CreateChannelOpts { | ||
@@ -17,2 +18,6 @@ /** Name for this channel. Used for debugging. */ | ||
/** | ||
* True to create a ConfirmChannel (default). False to create a regular Channel. | ||
*/ | ||
confirm?: boolean; | ||
/** | ||
* if true, then ChannelWrapper assumes all messages passed to publish() and sendToQueue() are plain JSON objects. | ||
@@ -75,2 +80,6 @@ * These will be encoded automatically before being sent. | ||
/** | ||
* True to create a ConfirmChannel. False to create a regular Channel. | ||
*/ | ||
private _confirm; | ||
/** | ||
* True if the "worker" is busy sending messages. False if we need to | ||
@@ -77,0 +86,0 @@ * start the worker to get stuff done. |
@@ -45,3 +45,3 @@ import { EventEmitter } from 'events'; | ||
constructor(connectionManager, options = {}) { | ||
var _a; | ||
var _a, _b; | ||
super(); | ||
@@ -59,2 +59,6 @@ /** If we're in the process of creating a channel, this is a Promise which | ||
/** | ||
* True to create a ConfirmChannel. False to create a regular Channel. | ||
*/ | ||
this._confirm = true; | ||
/** | ||
* True if the "worker" is busy sending messages. False if we need to | ||
@@ -77,5 +81,6 @@ * start the worker to get stuff done. | ||
this._connectionManager = connectionManager; | ||
this._confirm = (_a = options.confirm) !== null && _a !== void 0 ? _a : true; | ||
this.name = options.name; | ||
this._publishTimeout = options.publishTimeout; | ||
this._json = (_a = options.json) !== null && _a !== void 0 ? _a : false; | ||
this._json = (_b = options.json) !== null && _b !== void 0 ? _b : false; | ||
// Array of setup functions to call. | ||
@@ -187,3 +192,3 @@ this._setups = []; | ||
routingKey, | ||
content, | ||
content: this._getEncodedMessage(content), | ||
resolve, | ||
@@ -207,2 +212,3 @@ reject, | ||
sendToQueue(queue, content, options, done) { | ||
const encodedContent = this._getEncodedMessage(content); | ||
return pb.addCallback(done, new Promise((resolve, reject) => { | ||
@@ -213,3 +219,3 @@ const { timeout, ...opts } = options || {}; | ||
queue, | ||
content, | ||
content: encodedContent, | ||
resolve, | ||
@@ -246,3 +252,9 @@ reject, | ||
try { | ||
const channel = await connection.createConfirmChannel(); | ||
let channel; | ||
if (this._confirm) { | ||
channel = await connection.createConfirmChannel(); | ||
} | ||
else { | ||
channel = await connection.createChannel(); | ||
} | ||
this._channel = channel; | ||
@@ -412,39 +424,56 @@ this._channelHasRoom = true; | ||
} | ||
this._unconfirmedMessages.push(message); | ||
const encodedMessage = this._getEncodedMessage(message.content); | ||
let thisCanSend = true; | ||
switch (message.type) { | ||
case 'publish': { | ||
let thisCanSend = true; | ||
thisCanSend = this._channelHasRoom = channel.publish(message.exchange, message.routingKey, encodedMessage, message.options, (err) => { | ||
if (message.isTimedout) { | ||
return; | ||
} | ||
if (this._confirm) { | ||
this._unconfirmedMessages.push(message); | ||
thisCanSend = this._channelHasRoom = channel.publish(message.exchange, message.routingKey, message.content, message.options, (err) => { | ||
if (message.isTimedout) { | ||
return; | ||
} | ||
if (message.timeout) { | ||
clearTimeout(message.timeout); | ||
} | ||
if (err) { | ||
this._messageRejected(message, err); | ||
} | ||
else { | ||
this._messageResolved(message, thisCanSend); | ||
} | ||
}); | ||
} | ||
else { | ||
if (message.timeout) { | ||
clearTimeout(message.timeout); | ||
} | ||
if (err) { | ||
this._messageRejected(message, err); | ||
} | ||
else { | ||
this._messageResolved(message, thisCanSend); | ||
} | ||
}); | ||
thisCanSend = this._channelHasRoom = channel.publish(message.exchange, message.routingKey, message.content, message.options); | ||
message.resolve(thisCanSend); | ||
} | ||
break; | ||
} | ||
case 'sendToQueue': { | ||
let thisCanSend = true; | ||
thisCanSend = this._channelHasRoom = channel.sendToQueue(message.queue, encodedMessage, message.options, (err) => { | ||
if (message.isTimedout) { | ||
return; | ||
} | ||
if (this._confirm) { | ||
this._unconfirmedMessages.push(message); | ||
thisCanSend = this._channelHasRoom = channel.sendToQueue(message.queue, message.content, message.options, (err) => { | ||
if (message.isTimedout) { | ||
return; | ||
} | ||
if (message.timeout) { | ||
clearTimeout(message.timeout); | ||
} | ||
if (err) { | ||
this._messageRejected(message, err); | ||
} | ||
else { | ||
this._messageResolved(message, thisCanSend); | ||
} | ||
}); | ||
} | ||
else { | ||
if (message.timeout) { | ||
clearTimeout(message.timeout); | ||
} | ||
if (err) { | ||
this._messageRejected(message, err); | ||
} | ||
else { | ||
this._messageResolved(message, thisCanSend); | ||
} | ||
}); | ||
thisCanSend = this._channelHasRoom = channel.sendToQueue(message.queue, message.content, message.options); | ||
message.resolve(thisCanSend); | ||
} | ||
break; | ||
@@ -451,0 +480,0 @@ } |
import AmqpConnectionManager, { AmqpConnectionManagerOptions, ConnectionUrl, IAmqpConnectionManager } from './AmqpConnectionManager.js'; | ||
export type { AmqpConnectionManagerOptions, ConnectionUrl, IAmqpConnectionManager as AmqpConnectionManager, } from './AmqpConnectionManager.js'; | ||
export type { CreateChannelOpts, default as ChannelWrapper, SetupFunc } from './ChannelWrapper.js'; | ||
export type { CreateChannelOpts, default as ChannelWrapper, SetupFunc, Channel, } from './ChannelWrapper.js'; | ||
export declare function connect(urls: ConnectionUrl | ConnectionUrl[] | undefined | null, options?: AmqpConnectionManagerOptions): IAmqpConnectionManager; | ||
@@ -5,0 +5,0 @@ export { AmqpConnectionManager as AmqpConnectionManagerClass }; |
{ | ||
"name": "amqp-connection-manager", | ||
"version": "3.7.0", | ||
"version": "3.8.0", | ||
"description": "Auto-reconnect and round robin support for amqplib.", | ||
@@ -5,0 +5,0 @@ "module": "./dist/esm/index.js", |
@@ -135,4 +135,5 @@ # amqp-connection-manager | ||
arbitrary data in. | ||
- `options.json` if true, then ChannelWrapper assumes all messages passed to `publish()` and `sendToQueue()` | ||
- `options.json` - if true, then ChannelWrapper assumes all messages passed to `publish()` and `sendToQueue()` | ||
are plain JSON objects. These will be encoded automatically before being sent. | ||
- `options.confirm` - if true (default), the created channel will be a ConfirmChannel | ||
- `options.publishTimeout` - a default timeout for messages published to this channel. | ||
@@ -139,0 +140,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
134688
2346
201