amqp-connection-manager
Advanced tools
Comparing version 3.6.0 to 3.7.0
@@ -75,3 +75,37 @@ "use strict"; | ||
this._findServers = options.findServers || (() => Promise.resolve(urls)); | ||
} | ||
/** | ||
* Start the connect retries and await the first connect result. Even if the initial connect fails or timeouts, the | ||
* reconnect attempts will continue in the background. | ||
* @param [options={}] - | ||
* @param [options.timeout] - Time to wait for initial connect | ||
*/ | ||
async connect({ timeout } = {}) { | ||
this._connect(); | ||
let reject; | ||
const onDisconnect = ({ err }) => { | ||
// Ignore disconnects caused by dead servers etc., but throw on operational errors like bad credentials. | ||
if (err.isOperational) { | ||
reject(err); | ||
} | ||
}; | ||
try { | ||
await Promise.race([ | ||
(0, events_1.once)(this, 'connect'), | ||
new Promise((_resolve, innerReject) => { | ||
reject = innerReject; | ||
this.on('disconnect', onDisconnect); | ||
}), | ||
...(timeout | ||
? [ | ||
(0, helpers_js_1.wait)(timeout).promise.then(() => { | ||
throw new Error('amqp-connection-manager: connect timeout'); | ||
}), | ||
] | ||
: []), | ||
]); | ||
} | ||
finally { | ||
this.removeListener('disconnect', onDisconnect); | ||
} | ||
} | ||
@@ -78,0 +112,0 @@ // `options` here are any options that can be passed to ChannelWrapper. |
@@ -81,2 +81,3 @@ "use strict"; | ||
this.name = options.name; | ||
this._publishTimeout = options.publishTimeout; | ||
this._json = (_a = options.json) !== null && _a !== void 0 ? _a : false; | ||
@@ -184,3 +185,4 @@ // Array of setup functions to call. | ||
return promise_breaker_1.default.addCallback(done, new Promise((resolve, reject) => { | ||
this._messages.push({ | ||
const { timeout, ...opts } = options || {}; | ||
this._enqueueMessage({ | ||
type: 'publish', | ||
@@ -190,6 +192,7 @@ exchange, | ||
content, | ||
options, | ||
resolve, | ||
reject, | ||
}); | ||
options: opts, | ||
isTimedout: false, | ||
}, timeout || this._publishTimeout); | ||
this._startWorker(); | ||
@@ -209,13 +212,34 @@ })); | ||
return promise_breaker_1.default.addCallback(done, new Promise((resolve, reject) => { | ||
this._messages.push({ | ||
const { timeout, ...opts } = options || {}; | ||
this._enqueueMessage({ | ||
type: 'sendToQueue', | ||
queue, | ||
content, | ||
options, | ||
resolve, | ||
reject, | ||
}); | ||
return this._startWorker(); | ||
options: opts, | ||
isTimedout: false, | ||
}, timeout || this._publishTimeout); | ||
this._startWorker(); | ||
})); | ||
} | ||
_enqueueMessage(message, timeout) { | ||
if (timeout) { | ||
message.timeout = setTimeout(() => { | ||
let idx = this._messages.indexOf(message); | ||
if (idx !== -1) { | ||
this._messages.splice(idx, 1); | ||
} | ||
else { | ||
idx = this._unconfirmedMessages.indexOf(message); | ||
if (idx !== -1) { | ||
this._unconfirmedMessages.splice(idx, 1); | ||
} | ||
} | ||
message.isTimedout = true; | ||
message.reject(new Error('timeout')); | ||
}, timeout); | ||
} | ||
this._messages.push(message); | ||
} | ||
// Called whenever we connect to the broker. | ||
@@ -294,7 +318,17 @@ async _onConnect({ connection }) { | ||
// Reject any unsent messages. | ||
this._messages.forEach((message) => message.reject(new Error('Channel closed'))); | ||
this._messages.forEach((message) => { | ||
if (message.timeout) { | ||
clearTimeout(message.timeout); | ||
} | ||
message.reject(new Error('Channel closed')); | ||
}); | ||
} | ||
if (this._unconfirmedMessages.length !== 0) { | ||
// Reject any unconfirmed messages. | ||
this._unconfirmedMessages.forEach((message) => message.reject(new Error('Channel closed'))); | ||
this._unconfirmedMessages.forEach((message) => { | ||
if (message.timeout) { | ||
clearTimeout(message.timeout); | ||
} | ||
message.reject(new Error('Channel closed')); | ||
}); | ||
} | ||
@@ -387,2 +421,8 @@ this._connectionManager.removeListener('connect', this._onConnect); | ||
thisCanSend = this._channelHasRoom = channel.publish(message.exchange, message.routingKey, encodedMessage, message.options, (err) => { | ||
if (message.isTimedout) { | ||
return; | ||
} | ||
if (message.timeout) { | ||
clearTimeout(message.timeout); | ||
} | ||
if (err) { | ||
@@ -400,2 +440,8 @@ this._messageRejected(message, err); | ||
thisCanSend = this._channelHasRoom = channel.sendToQueue(message.queue, encodedMessage, message.options, (err) => { | ||
if (message.isTimedout) { | ||
return; | ||
} | ||
if (message.timeout) { | ||
clearTimeout(message.timeout); | ||
} | ||
if (err) { | ||
@@ -402,0 +448,0 @@ this._messageRejected(message, err); |
@@ -6,6 +6,11 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.connect = void 0; | ||
exports.AmqpConnectionManagerClass = exports.connect = void 0; | ||
const AmqpConnectionManager_js_1 = __importDefault(require("./AmqpConnectionManager.js")); | ||
exports.AmqpConnectionManagerClass = AmqpConnectionManager_js_1.default; | ||
function connect(urls, options) { | ||
return new AmqpConnectionManager_js_1.default(urls, options); | ||
const conn = new AmqpConnectionManager_js_1.default(urls, options); | ||
conn.connect().catch(() => { | ||
/* noop */ | ||
}); | ||
return conn; | ||
} | ||
@@ -12,0 +17,0 @@ exports.connect = connect; |
@@ -62,2 +62,3 @@ /// <reference types="node" /> | ||
}) => void): this; | ||
listeners(eventName: string | symbol): Function[]; | ||
on(event: string, listener: (...args: any[]) => void): this; | ||
@@ -100,2 +101,6 @@ on(event: 'connect', listener: ConnectListener): this; | ||
removeListener(event: string, listener: (...args: any[]) => void): this; | ||
connect(options?: { | ||
timeout?: number; | ||
}): Promise<void>; | ||
reconnect(): void; | ||
createChannel(options?: CreateChannelOpts): ChannelWrapper; | ||
@@ -105,5 +110,5 @@ close(): Promise<void>; | ||
/** The current connection. */ | ||
get connection(): Connection | undefined; | ||
readonly connection: Connection | undefined; | ||
/** Returns the number of registered channels. */ | ||
get channelCount(): number; | ||
readonly channelCount: number; | ||
} | ||
@@ -154,2 +159,11 @@ export default class AmqpConnectionManager extends EventEmitter implements IAmqpConnectionManager { | ||
constructor(urls: ConnectionUrl | ConnectionUrl[] | undefined | null, options?: AmqpConnectionManagerOptions); | ||
/** | ||
* Start the connect retries and await the first connect result. Even if the initial connect fails or timeouts, the | ||
* reconnect attempts will continue in the background. | ||
* @param [options={}] - | ||
* @param [options.timeout] - Time to wait for initial connect | ||
*/ | ||
connect({ timeout }?: { | ||
timeout?: number; | ||
}): Promise<void>; | ||
createChannel(options?: CreateChannelOpts): ChannelWrapper; | ||
@@ -156,0 +170,0 @@ close(): Promise<void>; |
import amqp from 'amqplib'; | ||
import { EventEmitter } from 'events'; | ||
import { EventEmitter, once } from 'events'; | ||
import pb from 'promise-breaker'; | ||
@@ -70,3 +70,37 @@ import { URL } from 'url'; | ||
this._findServers = options.findServers || (() => Promise.resolve(urls)); | ||
} | ||
/** | ||
* Start the connect retries and await the first connect result. Even if the initial connect fails or timeouts, the | ||
* reconnect attempts will continue in the background. | ||
* @param [options={}] - | ||
* @param [options.timeout] - Time to wait for initial connect | ||
*/ | ||
async connect({ timeout } = {}) { | ||
this._connect(); | ||
let reject; | ||
const onDisconnect = ({ err }) => { | ||
// Ignore disconnects caused by dead servers etc., but throw on operational errors like bad credentials. | ||
if (err.isOperational) { | ||
reject(err); | ||
} | ||
}; | ||
try { | ||
await Promise.race([ | ||
once(this, 'connect'), | ||
new Promise((_resolve, innerReject) => { | ||
reject = innerReject; | ||
this.on('disconnect', onDisconnect); | ||
}), | ||
...(timeout | ||
? [ | ||
wait(timeout).promise.then(() => { | ||
throw new Error('amqp-connection-manager: connect timeout'); | ||
}), | ||
] | ||
: []), | ||
]); | ||
} | ||
finally { | ||
this.removeListener('disconnect', onDisconnect); | ||
} | ||
} | ||
@@ -73,0 +107,0 @@ // `options` here are any options that can be passed to ChannelWrapper. |
@@ -21,3 +21,11 @@ /// <reference types="node" /> | ||
json?: boolean; | ||
/** | ||
* Default publish timeout in ms. Messages not published within the given time are rejected with a timeout error. | ||
*/ | ||
publishTimeout?: number; | ||
} | ||
interface PublishOptions extends Options.Publish { | ||
/** Message will be rejected after timeout ms */ | ||
timeout?: number; | ||
} | ||
interface ConsumerOptions extends amqplib.Options.Consume { | ||
@@ -81,2 +89,6 @@ prefetch?: number; | ||
private _channelHasRoom; | ||
/** | ||
* Default publish timeout | ||
*/ | ||
private _publishTimeout?; | ||
name?: string; | ||
@@ -145,3 +157,3 @@ addListener(event: string, listener: (...args: any[]) => void): this; | ||
*/ | ||
removeSetup(setup: SetupFunc, teardown?: pb.Callback<void>, done?: pb.Callback<void>): Promise<void>; | ||
removeSetup(setup: SetupFunc, teardown?: SetupFunc, done?: pb.Callback<void>): Promise<void>; | ||
/** | ||
@@ -155,4 +167,5 @@ * Returns a Promise which resolves when this channel next connects. | ||
waitForConnect(done?: pb.Callback<void>): Promise<void>; | ||
publish(exchange: string, routingKey: string, content: Buffer | string | unknown, options?: amqplib.Options.Publish, done?: pb.Callback<boolean>): Promise<boolean>; | ||
sendToQueue(queue: string, content: Buffer | string | unknown, options?: Options.Publish, done?: pb.Callback<boolean>): Promise<boolean>; | ||
publish(exchange: string, routingKey: string, content: Buffer | string | unknown, options?: PublishOptions, done?: pb.Callback<boolean>): Promise<boolean>; | ||
sendToQueue(queue: string, content: Buffer | string | unknown, options?: PublishOptions, done?: pb.Callback<boolean>): Promise<boolean>; | ||
private _enqueueMessage; | ||
/** | ||
@@ -159,0 +172,0 @@ * Create a new ChannelWrapper. |
@@ -76,2 +76,3 @@ import { EventEmitter } from 'events'; | ||
this.name = options.name; | ||
this._publishTimeout = options.publishTimeout; | ||
this._json = (_a = options.json) !== null && _a !== void 0 ? _a : false; | ||
@@ -179,3 +180,4 @@ // Array of setup functions to call. | ||
return pb.addCallback(done, new Promise((resolve, reject) => { | ||
this._messages.push({ | ||
const { timeout, ...opts } = options || {}; | ||
this._enqueueMessage({ | ||
type: 'publish', | ||
@@ -185,6 +187,7 @@ exchange, | ||
content, | ||
options, | ||
resolve, | ||
reject, | ||
}); | ||
options: opts, | ||
isTimedout: false, | ||
}, timeout || this._publishTimeout); | ||
this._startWorker(); | ||
@@ -204,13 +207,34 @@ })); | ||
return pb.addCallback(done, new Promise((resolve, reject) => { | ||
this._messages.push({ | ||
const { timeout, ...opts } = options || {}; | ||
this._enqueueMessage({ | ||
type: 'sendToQueue', | ||
queue, | ||
content, | ||
options, | ||
resolve, | ||
reject, | ||
}); | ||
return this._startWorker(); | ||
options: opts, | ||
isTimedout: false, | ||
}, timeout || this._publishTimeout); | ||
this._startWorker(); | ||
})); | ||
} | ||
_enqueueMessage(message, timeout) { | ||
if (timeout) { | ||
message.timeout = setTimeout(() => { | ||
let idx = this._messages.indexOf(message); | ||
if (idx !== -1) { | ||
this._messages.splice(idx, 1); | ||
} | ||
else { | ||
idx = this._unconfirmedMessages.indexOf(message); | ||
if (idx !== -1) { | ||
this._unconfirmedMessages.splice(idx, 1); | ||
} | ||
} | ||
message.isTimedout = true; | ||
message.reject(new Error('timeout')); | ||
}, timeout); | ||
} | ||
this._messages.push(message); | ||
} | ||
// Called whenever we connect to the broker. | ||
@@ -289,7 +313,17 @@ async _onConnect({ connection }) { | ||
// Reject any unsent messages. | ||
this._messages.forEach((message) => message.reject(new Error('Channel closed'))); | ||
this._messages.forEach((message) => { | ||
if (message.timeout) { | ||
clearTimeout(message.timeout); | ||
} | ||
message.reject(new Error('Channel closed')); | ||
}); | ||
} | ||
if (this._unconfirmedMessages.length !== 0) { | ||
// Reject any unconfirmed messages. | ||
this._unconfirmedMessages.forEach((message) => message.reject(new Error('Channel closed'))); | ||
this._unconfirmedMessages.forEach((message) => { | ||
if (message.timeout) { | ||
clearTimeout(message.timeout); | ||
} | ||
message.reject(new Error('Channel closed')); | ||
}); | ||
} | ||
@@ -382,2 +416,8 @@ this._connectionManager.removeListener('connect', this._onConnect); | ||
thisCanSend = this._channelHasRoom = channel.publish(message.exchange, message.routingKey, encodedMessage, message.options, (err) => { | ||
if (message.isTimedout) { | ||
return; | ||
} | ||
if (message.timeout) { | ||
clearTimeout(message.timeout); | ||
} | ||
if (err) { | ||
@@ -395,2 +435,8 @@ this._messageRejected(message, err); | ||
thisCanSend = this._channelHasRoom = channel.sendToQueue(message.queue, encodedMessage, message.options, (err) => { | ||
if (message.isTimedout) { | ||
return; | ||
} | ||
if (message.timeout) { | ||
clearTimeout(message.timeout); | ||
} | ||
if (err) { | ||
@@ -397,0 +443,0 @@ this._messageRejected(message, err); |
@@ -1,5 +0,6 @@ | ||
import { AmqpConnectionManagerOptions, ConnectionUrl, IAmqpConnectionManager } from './AmqpConnectionManager.js'; | ||
import AmqpConnectionManager, { AmqpConnectionManagerOptions, ConnectionUrl, IAmqpConnectionManager } from './AmqpConnectionManager.js'; | ||
export type { AmqpConnectionManagerOptions, ConnectionUrl, IAmqpConnectionManager as AmqpConnectionManager, } from './AmqpConnectionManager.js'; | ||
export type { CreateChannelOpts, default as ChannelWrapper, SetupFunc } from './ChannelWrapper.js'; | ||
export declare function connect(urls: ConnectionUrl | ConnectionUrl[] | undefined | null, options?: AmqpConnectionManagerOptions): IAmqpConnectionManager; | ||
export { AmqpConnectionManager as AmqpConnectionManagerClass }; | ||
declare const amqp: { | ||
@@ -6,0 +7,0 @@ connect: typeof connect; |
import AmqpConnectionManager from './AmqpConnectionManager.js'; | ||
export function connect(urls, options) { | ||
return new AmqpConnectionManager(urls, options); | ||
const conn = new AmqpConnectionManager(urls, options); | ||
conn.connect().catch(() => { | ||
/* noop */ | ||
}); | ||
return conn; | ||
} | ||
export { AmqpConnectionManager as AmqpConnectionManagerClass }; | ||
const amqp = { connect }; | ||
export default amqp; | ||
//# sourceMappingURL=index.js.map |
{ | ||
"name": "amqp-connection-manager", | ||
"version": "3.6.0", | ||
"version": "3.7.0", | ||
"description": "Auto-reconnect and round robin support for amqplib.", | ||
@@ -19,5 +19,3 @@ "module": "./dist/esm/index.js", | ||
"dependencies": { | ||
"promise-breaker": "^5.0.0", | ||
"ts-node": "^10.2.1", | ||
"typescript": "^4.3.5" | ||
"promise-breaker": "^5.0.0" | ||
}, | ||
@@ -63,3 +61,5 @@ "peerDependencies": { | ||
"semantic-release": "^17.1.1", | ||
"ts-jest": "^27.0.5" | ||
"ts-jest": "^27.0.5", | ||
"ts-node": "^10.2.1", | ||
"typescript": "^4.3.5" | ||
}, | ||
@@ -66,0 +66,0 @@ "engines": { |
@@ -5,9 +5,4 @@ # amqp-connection-manager | ||
![Build Status](https://github.com/jwalton/node-amqp-connection-manager/workflows/GitHub%20CI/badge.svg) | ||
[![Coverage Status](https://coveralls.io/repos/jwalton/node-amqp-connection-manager/badge.svg?branch=master&service=github)](https://coveralls.io/github/jwalton/node-amqp-connection-manager?branch=master) | ||
[![semantic-release](https://img.shields.io/badge/%20%20%F0%9F%93%A6%F0%9F%9A%80-semantic--release-e10079.svg)](https://github.com/semantic-release/semantic-release) | ||
[![Dependency Status](https://david-dm.org/jwalton/node-amqp-connection-manager.svg)](https://david-dm.org/jwalton/node-amqp-connection-manager) | ||
[![devDependency Status](https://david-dm.org/jwalton/node-amqp-connection-manager/dev-status.svg)](https://david-dm.org/jwalton/node-amqp-connection-manager#info=devDependencies) | ||
[![peerDependency Status](https://david-dm.org/jwalton/node-amqp-connection-manager/peer-status.svg)](https://david-dm.org/jwalton/node-amqp-connection-manager#info=peerDependencies) | ||
Connection management for amqplib. This is a wrapper around [amqplib](http://www.squaremobius.net/amqp.node/) which provides automatic reconnects. | ||
@@ -143,2 +138,3 @@ | ||
are plain JSON objects. These will be encoded automatically before being sent. | ||
- `options.publishTimeout` - a default timeout for messages published to this channel. | ||
@@ -188,2 +184,8 @@ ### AmqpConnectionManager#isConnected() | ||
Both of these functions take an additional option when passing options: | ||
- `timeout` - If specified, if a messages is not acked by the amqp broker within the specified number of milliseconds, | ||
the message will be rejected. Note that the message _may_ still end up getting delivered after the timeout, as we | ||
have no way to cancel the in-flight request. | ||
### ChannelWrapper#ack and ChannelWrapper#nack | ||
@@ -190,0 +192,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
130256
2
2279
200
38
- Removedts-node@^10.2.1
- Removedtypescript@^4.3.5
- Removed@cspotcode/source-map-support@0.8.1(transitive)
- Removed@jridgewell/resolve-uri@3.1.2(transitive)
- Removed@jridgewell/sourcemap-codec@1.5.0(transitive)
- Removed@jridgewell/trace-mapping@0.3.9(transitive)
- Removed@tsconfig/node10@1.0.11(transitive)
- Removed@tsconfig/node12@1.0.11(transitive)
- Removed@tsconfig/node14@1.0.3(transitive)
- Removed@tsconfig/node16@1.0.4(transitive)
- Removed@types/node@22.1.0(transitive)
- Removedacorn@8.12.1(transitive)
- Removedacorn-walk@8.3.3(transitive)
- Removedarg@4.1.3(transitive)
- Removedcreate-require@1.1.1(transitive)
- Removeddiff@4.0.2(transitive)
- Removedmake-error@1.3.6(transitive)
- Removedts-node@10.9.2(transitive)
- Removedtypescript@4.9.5(transitive)
- Removedundici-types@6.13.0(transitive)
- Removedv8-compile-cache-lib@3.0.1(transitive)
- Removedyn@3.1.1(transitive)