New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

amqp-extension

Package Overview
Dependencies
Maintainers
1
Versions
26
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqp-extension - npm Package Compare versions

Comparing version 3.0.0 to 3.1.0

8

CHANGELOG.md

@@ -0,1 +1,9 @@

# [3.1.0](https://github.com/Tada5hi/amqp-extension/compare/v3.0.0...v3.1.0) (2024-03-06)
### Features
* reconnect strategy ([#320](https://github.com/Tada5hi/amqp-extension/issues/320)) ([4c31551](https://github.com/Tada5hi/amqp-extension/commit/4c31551029916dcaf055f78d53a8af7c626c393d))
* reconnect strategy ([#320](https://github.com/Tada5hi/amqp-extension/issues/320)) ([17b141f](https://github.com/Tada5hi/amqp-extension/commit/17b141f6bd6256172e8c03a1006bfc1cab6225eb))
# [3.0.0](https://github.com/Tada5hi/amqp-extension/compare/v2.0.2...v3.0.0) (2024-02-28)

@@ -2,0 +10,0 @@

2

dist/config/module.js

@@ -13,2 +13,4 @@ "use strict";

return {
reconnectAttempts: 10,
reconnectTimeout: 1000,
connection: input.connection,

@@ -15,0 +17,0 @@ publish: input.publish || {},

@@ -5,2 +5,4 @@ import type { Options } from 'amqplib';

export type Config = {
reconnectAttempts: number;
reconnectTimeout: number;
connection: Options.Connect | string;

@@ -7,0 +9,0 @@ exchange: ExchangeOptions;

8

dist/consume/type.d.ts
import type { Channel, ConsumeMessage } from 'amqplib';
import type { ConsumeHandlerAnyKey } from './static';
export { ConsumeMessage, };
export type ConsumeMessageHandler = (message: ConsumeMessage, channel: Channel) => Promise<void>;
export type ConsumeMessageHandler = (message: ConsumeMessage, channel: Channel) => Promise<void> | void;
export type ConsumeHandlerAnyKeyType = typeof ConsumeHandlerAnyKey;
export type ConsumeHandlers = Record<ConsumeHandlerAnyKeyType | string, ConsumeMessageHandler>;
export type ConsumeHandlers = {
[ConsumeHandlerAnyKey]?: ConsumeMessageHandler;
} & {
[key: string]: ConsumeMessageHandler;
};

@@ -6,9 +6,18 @@ import type { Connection } from 'amqplib';

import type { ConsumeOptions } from './type';
type Consumer = {
options: ConsumeOptions;
handlers: ConsumeHandlers;
};
export declare class Client {
protected connection: Connection | undefined;
protected config: Config;
protected reconnectAttempts: number;
protected consumers: Consumer[];
constructor(options: ConfigInput);
protected createConnection(): Promise<Connection>;
useConnection(): Promise<Connection>;
protected recreateConsumers(): Promise<void>;
consume(options: ConsumeOptions, handlers: ConsumeHandlers): Promise<void>;
publish(options: PublishOptionsExtended): Promise<void>;
publish(options: PublishOptionsExtended): Promise<boolean>;
}
export {};

@@ -8,4 +8,8 @@ "use strict";

*/
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
exports.Client = void 0;
const node_process_1 = __importDefault(require("node:process"));
const amqplib_1 = require("amqplib");

@@ -18,6 +22,30 @@ const smob_1 = require("smob");

const publish_1 = require("./publish");
const utils_1 = require("./utils");
class Client {
constructor(options) {
this.config = (0, config_1.buildConfig)(options);
this.reconnectAttempts = 0;
this.consumers = [];
node_process_1.default.once('SIGINT', async () => {
if (this.connection) {
await this.connection.close();
}
});
}
async createConnection() {
let connection;
try {
connection = await (0, amqplib_1.connect)(this.config.connection);
this.reconnectAttempts = 0;
}
catch (e) {
if (this.reconnectAttempts < this.config.reconnectAttempts) {
this.reconnectAttempts++;
await (0, utils_1.wait)(this.config.reconnectTimeout);
return this.createConnection();
}
throw e;
}
return connection;
}
async useConnection() {

@@ -27,5 +55,19 @@ if (typeof this.connection !== 'undefined') {

}
this.connection = await (0, amqplib_1.connect)(this.config.connection);
const connection = await this.createConnection();
const handleDisconnect = async (err) => {
if (!err)
return;
this.connection = await this.createConnection();
await this.recreateConsumers();
};
connection.once('close', handleDisconnect);
connection.once('error', handleDisconnect);
this.connection = connection;
return this.connection;
}
async recreateConsumers() {
for (let i = 0; i < this.consumers.length; i++) {
await this.consume(this.consumers[i].options, this.consumers[i].handlers);
}
}
async consume(options, handlers) {

@@ -84,2 +126,6 @@ const connection = await this.useConnection();

await channel.consume(queueName, (message) => handleMessage(message), (0, consume_1.buildDriverConsumeOptions)(options));
this.consumers.push({
options,
handlers,
});
}

@@ -114,7 +160,6 @@ async publish(options) {

}
channel.publish(this.config.exchange.name, exchangeOptions.routingKey, buffer, (0, publish_1.buildDriverPublishOptions)({
return channel.publish(this.config.exchange.name, exchangeOptions.routingKey, buffer, (0, publish_1.buildDriverPublishOptions)({
persistent: true,
...options,
}));
return;
}

@@ -130,3 +175,3 @@ // publish to default exchange

});
channel.sendToQueue(queueName, buffer, (0, publish_1.buildDriverPublishOptions)({
return channel.sendToQueue(queueName, buffer, (0, publish_1.buildDriverPublishOptions)({
persistent: true,

@@ -133,0 +178,0 @@ ...options,

export declare function removeKeysFromOptions<T extends Record<string, any>, K extends (keyof T)[]>(options: T, keys: K): Omit<T, K[number]>;
export declare function wait(ms: number): Promise<void>;
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.removeKeysFromOptions = void 0;
exports.wait = exports.removeKeysFromOptions = void 0;
const smob_1 = require("smob");

@@ -14,1 +14,9 @@ function removeKeysFromOptions(options, keys) {

exports.removeKeysFromOptions = removeKeysFromOptions;
async function wait(ms) {
return new Promise((resolve) => {
setTimeout(() => {
resolve();
}, ms);
});
}
exports.wait = wait;
{
"name": "amqp-extension",
"version": "3.0.0",
"version": "3.1.0",
"description": "An amqp extension with functions and utility functions to consume and publish queue messages.",

@@ -9,3 +9,3 @@ "main": "./dist/index.js",

"build": "rm -rf ./dist && tsc",
"test": "cross-env NODE_ENV=test jest --config ./test/jest.config.js",
"test": "cross-env NODE_ENV=test jest --config ./test/jest.config.js --detectOpenHandles",
"test:coverage": "cross-env NODE_ENV=test jest --config ./test/jest.config.js --coverage",

@@ -47,8 +47,9 @@ "lint": "eslint --ext .js,.vue,.ts ./src",

"@tada5hi/eslint-config-typescript": "^1.2.6",
"@tada5hi/semantic-release": "^0.3.0",
"@tada5hi/semantic-release": "^0.3.1",
"@tada5hi/tsconfig": "^0.5.0",
"@types/jest": "^29.5.7",
"@types/jest": "^29.5.12",
"@types/node": "^20.8.10",
"@types/uuid": "^9.0.8",
"cross-env": "^7.0.3",
"envix": "^1.5.0",
"eslint": "^8.57.0",

@@ -58,2 +59,3 @@ "husky": "^8.0.3",

"semantic-release": "^22.0.7",
"testcontainers": "^10.7.2",
"ts-jest": "^29.1.2",

@@ -60,0 +62,0 @@ "typescript": "^5.2.2"

@@ -5,2 +5,3 @@ {

"src/**/*.ts",
"test/**/*.ts",
"release.config.js",

@@ -7,0 +8,0 @@ "commitlint.config.js"

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc