amqp-extension
Advanced tools
Comparing version 3.0.0 to 3.1.0
@@ -0,1 +1,9 @@ | ||
# [3.1.0](https://github.com/Tada5hi/amqp-extension/compare/v3.0.0...v3.1.0) (2024-03-06) | ||
### Features | ||
* reconnect strategy ([#320](https://github.com/Tada5hi/amqp-extension/issues/320)) ([4c31551](https://github.com/Tada5hi/amqp-extension/commit/4c31551029916dcaf055f78d53a8af7c626c393d)) | ||
* reconnect strategy ([#320](https://github.com/Tada5hi/amqp-extension/issues/320)) ([17b141f](https://github.com/Tada5hi/amqp-extension/commit/17b141f6bd6256172e8c03a1006bfc1cab6225eb)) | ||
# [3.0.0](https://github.com/Tada5hi/amqp-extension/compare/v2.0.2...v3.0.0) (2024-02-28) | ||
@@ -2,0 +10,0 @@ |
@@ -13,2 +13,4 @@ "use strict"; | ||
return { | ||
reconnectAttempts: 10, | ||
reconnectTimeout: 1000, | ||
connection: input.connection, | ||
@@ -15,0 +17,0 @@ publish: input.publish || {}, |
@@ -5,2 +5,4 @@ import type { Options } from 'amqplib'; | ||
export type Config = { | ||
reconnectAttempts: number; | ||
reconnectTimeout: number; | ||
connection: Options.Connect | string; | ||
@@ -7,0 +9,0 @@ exchange: ExchangeOptions; |
import type { Channel, ConsumeMessage } from 'amqplib'; | ||
import type { ConsumeHandlerAnyKey } from './static'; | ||
export { ConsumeMessage, }; | ||
export type ConsumeMessageHandler = (message: ConsumeMessage, channel: Channel) => Promise<void>; | ||
export type ConsumeMessageHandler = (message: ConsumeMessage, channel: Channel) => Promise<void> | void; | ||
export type ConsumeHandlerAnyKeyType = typeof ConsumeHandlerAnyKey; | ||
export type ConsumeHandlers = Record<ConsumeHandlerAnyKeyType | string, ConsumeMessageHandler>; | ||
export type ConsumeHandlers = { | ||
[ConsumeHandlerAnyKey]?: ConsumeMessageHandler; | ||
} & { | ||
[key: string]: ConsumeMessageHandler; | ||
}; |
@@ -6,9 +6,18 @@ import type { Connection } from 'amqplib'; | ||
import type { ConsumeOptions } from './type'; | ||
type Consumer = { | ||
options: ConsumeOptions; | ||
handlers: ConsumeHandlers; | ||
}; | ||
export declare class Client { | ||
protected connection: Connection | undefined; | ||
protected config: Config; | ||
protected reconnectAttempts: number; | ||
protected consumers: Consumer[]; | ||
constructor(options: ConfigInput); | ||
protected createConnection(): Promise<Connection>; | ||
useConnection(): Promise<Connection>; | ||
protected recreateConsumers(): Promise<void>; | ||
consume(options: ConsumeOptions, handlers: ConsumeHandlers): Promise<void>; | ||
publish(options: PublishOptionsExtended): Promise<void>; | ||
publish(options: PublishOptionsExtended): Promise<boolean>; | ||
} | ||
export {}; |
@@ -8,4 +8,8 @@ "use strict"; | ||
*/ | ||
var __importDefault = (this && this.__importDefault) || function (mod) { | ||
return (mod && mod.__esModule) ? mod : { "default": mod }; | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.Client = void 0; | ||
const node_process_1 = __importDefault(require("node:process")); | ||
const amqplib_1 = require("amqplib"); | ||
@@ -18,6 +22,30 @@ const smob_1 = require("smob"); | ||
const publish_1 = require("./publish"); | ||
const utils_1 = require("./utils"); | ||
class Client { | ||
constructor(options) { | ||
this.config = (0, config_1.buildConfig)(options); | ||
this.reconnectAttempts = 0; | ||
this.consumers = []; | ||
node_process_1.default.once('SIGINT', async () => { | ||
if (this.connection) { | ||
await this.connection.close(); | ||
} | ||
}); | ||
} | ||
async createConnection() { | ||
let connection; | ||
try { | ||
connection = await (0, amqplib_1.connect)(this.config.connection); | ||
this.reconnectAttempts = 0; | ||
} | ||
catch (e) { | ||
if (this.reconnectAttempts < this.config.reconnectAttempts) { | ||
this.reconnectAttempts++; | ||
await (0, utils_1.wait)(this.config.reconnectTimeout); | ||
return this.createConnection(); | ||
} | ||
throw e; | ||
} | ||
return connection; | ||
} | ||
async useConnection() { | ||
@@ -27,5 +55,19 @@ if (typeof this.connection !== 'undefined') { | ||
} | ||
this.connection = await (0, amqplib_1.connect)(this.config.connection); | ||
const connection = await this.createConnection(); | ||
const handleDisconnect = async (err) => { | ||
if (!err) | ||
return; | ||
this.connection = await this.createConnection(); | ||
await this.recreateConsumers(); | ||
}; | ||
connection.once('close', handleDisconnect); | ||
connection.once('error', handleDisconnect); | ||
this.connection = connection; | ||
return this.connection; | ||
} | ||
async recreateConsumers() { | ||
for (let i = 0; i < this.consumers.length; i++) { | ||
await this.consume(this.consumers[i].options, this.consumers[i].handlers); | ||
} | ||
} | ||
async consume(options, handlers) { | ||
@@ -84,2 +126,6 @@ const connection = await this.useConnection(); | ||
await channel.consume(queueName, (message) => handleMessage(message), (0, consume_1.buildDriverConsumeOptions)(options)); | ||
this.consumers.push({ | ||
options, | ||
handlers, | ||
}); | ||
} | ||
@@ -114,7 +160,6 @@ async publish(options) { | ||
} | ||
channel.publish(this.config.exchange.name, exchangeOptions.routingKey, buffer, (0, publish_1.buildDriverPublishOptions)({ | ||
return channel.publish(this.config.exchange.name, exchangeOptions.routingKey, buffer, (0, publish_1.buildDriverPublishOptions)({ | ||
persistent: true, | ||
...options, | ||
})); | ||
return; | ||
} | ||
@@ -130,3 +175,3 @@ // publish to default exchange | ||
}); | ||
channel.sendToQueue(queueName, buffer, (0, publish_1.buildDriverPublishOptions)({ | ||
return channel.sendToQueue(queueName, buffer, (0, publish_1.buildDriverPublishOptions)({ | ||
persistent: true, | ||
@@ -133,0 +178,0 @@ ...options, |
export declare function removeKeysFromOptions<T extends Record<string, any>, K extends (keyof T)[]>(options: T, keys: K): Omit<T, K[number]>; | ||
export declare function wait(ms: number): Promise<void>; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.removeKeysFromOptions = void 0; | ||
exports.wait = exports.removeKeysFromOptions = void 0; | ||
const smob_1 = require("smob"); | ||
@@ -14,1 +14,9 @@ function removeKeysFromOptions(options, keys) { | ||
exports.removeKeysFromOptions = removeKeysFromOptions; | ||
async function wait(ms) { | ||
return new Promise((resolve) => { | ||
setTimeout(() => { | ||
resolve(); | ||
}, ms); | ||
}); | ||
} | ||
exports.wait = wait; |
{ | ||
"name": "amqp-extension", | ||
"version": "3.0.0", | ||
"version": "3.1.0", | ||
"description": "An amqp extension with functions and utility functions to consume and publish queue messages.", | ||
@@ -9,3 +9,3 @@ "main": "./dist/index.js", | ||
"build": "rm -rf ./dist && tsc", | ||
"test": "cross-env NODE_ENV=test jest --config ./test/jest.config.js", | ||
"test": "cross-env NODE_ENV=test jest --config ./test/jest.config.js --detectOpenHandles", | ||
"test:coverage": "cross-env NODE_ENV=test jest --config ./test/jest.config.js --coverage", | ||
@@ -47,8 +47,9 @@ "lint": "eslint --ext .js,.vue,.ts ./src", | ||
"@tada5hi/eslint-config-typescript": "^1.2.6", | ||
"@tada5hi/semantic-release": "^0.3.0", | ||
"@tada5hi/semantic-release": "^0.3.1", | ||
"@tada5hi/tsconfig": "^0.5.0", | ||
"@types/jest": "^29.5.7", | ||
"@types/jest": "^29.5.12", | ||
"@types/node": "^20.8.10", | ||
"@types/uuid": "^9.0.8", | ||
"cross-env": "^7.0.3", | ||
"envix": "^1.5.0", | ||
"eslint": "^8.57.0", | ||
@@ -58,2 +59,3 @@ "husky": "^8.0.3", | ||
"semantic-release": "^22.0.7", | ||
"testcontainers": "^10.7.2", | ||
"ts-jest": "^29.1.2", | ||
@@ -60,0 +62,0 @@ "typescript": "^5.2.2" |
@@ -5,2 +5,3 @@ { | ||
"src/**/*.ts", | ||
"test/**/*.ts", | ||
"release.config.js", | ||
@@ -7,0 +8,0 @@ "commitlint.config.js" |
34084
597
16