Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Sign inDemoInstall


Package Overview
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies


amqp-extension - npm Package Compare versions

Comparing version 1.0.5 to 1.1.0



export * from './type';
export * from './module';
export * from './utils';
"use strict";
* Copyright (c) 2022.
* Author Peter Placzek (tada5hi)
* For the full copyright and license information,
* view the LICENSE file that was distributed with this source code.
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) {

@@ -18,3 +24,3 @@ if (k2 === undefined) k2 = k;

__exportStar(require("./type"), exports);
__exportStar(require("./module"), exports);
__exportStar(require("./utils"), exports);


import { Options } from 'amqplib';
import { ConsumeOptions } from '../consume';
import { PublishOptions } from '../publish';
export declare type ExchangeType = 'fanout' | 'direct' | 'topic' | 'match' | string;
export declare type Config = {
alias?: string;
import { ExchangeOptions } from '../exchange';
import { ConsumeOptions, PublishOptions } from '../type';
export type Config = {
alias: string;
connection: Options.Connect | string;
exchange: {
name: string;
type: ExchangeType;
options?: Options.AssertExchange;
publish?: PublishOptions;
consume?: ConsumeOptions;
exchange: ExchangeOptions;
publish: PublishOptions;
consume: ConsumeOptions;
export type ConfigInput = Partial<Exclude<Config, 'connection'>> & Pick<Config, 'connection'>;
"use strict";
* Copyright (c) 2022.
* Author Peter Placzek (tada5hi)
* For the full copyright and license information,
* view the LICENSE file that was distributed with this source code.
Object.defineProperty(exports, "__esModule", { value: true });

@@ -1,4 +0,3 @@

import { Config } from './type';
export declare const DEFAULT_KEY = "default";
export declare function setConfig(key: string | Config, value?: Config): Config;
export declare function getConfig(key?: string | Config): Config;
import { Config, ConfigInput } from './type';
export declare function getConfigKey(alias?: string): string;
export declare function extendConfig(input: ConfigInput): Config;
"use strict";
* Copyright (c) 2022-2022.
* Author Peter Placzek (tada5hi)
* For the full copyright and license information,
* view the LICENSE file that was distributed with this source code.
Object.defineProperty(exports, "__esModule", { value: true });
exports.getConfig = exports.setConfig = exports.DEFAULT_KEY = void 0;
exports.DEFAULT_KEY = 'default';
var configMap = new Map();
function setConfig(key, value) {
var _a;
if (typeof key === 'string') {
if (typeof value === 'undefined') {
throw new Error("A config must be defined for the alias: ".concat(key));
value.alias = key;
configMap.set(key, value);
return value;
(_a = key.alias) !== null && _a !== void 0 ? _a : (key.alias = exports.DEFAULT_KEY);
configMap.set(key.alias, key);
return key;
exports.extendConfig = exports.getConfigKey = void 0;
const exchange_1 = require("../exchange");
function getConfigKey(alias) {
return alias || 'default';
exports.setConfig = setConfig;
function getConfig(key) {
var _a;
key !== null && key !== void 0 ? key : (key = exports.DEFAULT_KEY);
if (typeof key === 'string') {
var data = configMap.get(key);
if (typeof data === 'undefined') {
throw new Error("A config must be defined for the alias: ".concat(key));
return data;
var config = key;
(_a = config.alias) !== null && _a !== void 0 ? _a : (config.alias = exports.DEFAULT_KEY);
return config;
exports.getConfigKey = getConfigKey;
function extendConfig(input) {
return {
alias: getConfigKey(input.alias),
connection: input.connection,
publish: input.publish || {},
consume: input.consume || {},
exchange: || {
name: '',
type: exchange_1.ExchangeType.DEFAULT,
exports.getConfig = getConfig;
exports.extendConfig = extendConfig;

@@ -1,1 +0,1 @@

export * from './utils';
export * from './module';
"use strict";
* Copyright (c) 2022.
* Author Peter Placzek (tada5hi)
* For the full copyright and license information,
* view the LICENSE file that was distributed with this source code.
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) {

@@ -17,3 +23,2 @@ if (k2 === undefined) k2 = k;

Object.defineProperty(exports, "__esModule", { value: true });
__exportStar(require("./utils"), exports);
__exportStar(require("./module"), exports);
export * from './static';
export * from './utils';
export * from './module';
export * from './type';
"use strict";
* Copyright (c) 2022.
* Author Peter Placzek (tada5hi)
* For the full copyright and license information,
* view the LICENSE file that was distributed with this source code.
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) {

@@ -18,4 +24,3 @@ if (k2 === undefined) k2 = k;

__exportStar(require("./static"), exports);
__exportStar(require("./utils"), exports);
__exportStar(require("./module"), exports);
__exportStar(require("./type"), exports);
"use strict";
* Copyright (c) 2022.
* Author Peter Placzek (tada5hi)
* For the full copyright and license information,
* view the LICENSE file that was distributed with this source code.
Object.defineProperty(exports, "__esModule", { value: true });
exports.ConsumeHandlerAnyKey = void 0;
exports.ConsumeHandlerAnyKey = '$any';

@@ -1,33 +0,6 @@

import { Options } from 'amqplib';
import { Config } from '../config';
import { Message, MessageContext } from '../message';
import type { Channel, ConsumeMessage } from 'amqplib';
import { ConsumeHandlerAnyKey } from './static';
export declare type ConsumeHandler = (message: Message, context: MessageContext) => Promise<void>;
export declare type ConsumeHandlerAnyKeyType = typeof ConsumeHandlerAnyKey;
export declare type ConsumeHandlers = Record<ConsumeHandlerAnyKeyType | string, ConsumeHandler>;
export declare type ConsumeOptions = {
* Queue routing key(s).
routingKey?: string | string[];
* Config key or object.
alias?: string | Config;
* Queue name.
* Default: ''
name?: string;
* Amqplib consume options.
* Default: { }
options?: Options.Consume;
* Default: false
requeueOnFailure?: boolean;
export { ConsumeMessage, };
export type ConsumeMessageHandler = (message: ConsumeMessage, channel: Channel) => Promise<void>;
export type ConsumeHandlerAnyKeyType = typeof ConsumeHandlerAnyKey;
export type ConsumeHandlers = Record<ConsumeHandlerAnyKeyType | string, ConsumeMessageHandler>;
"use strict";
* Copyright (c) 2022.
* Author Peter Placzek (tada5hi)
* For the full copyright and license information,
* view the LICENSE file that was distributed with this source code.
Object.defineProperty(exports, "__esModule", { value: true });

@@ -1,2 +0,3 @@

import { ConsumeHandlers, ConsumeOptions } from './type';
export declare function consumeQueue(options: ConsumeOptions, handlers: ConsumeHandlers): Promise<void>;
import { Options } from 'amqplib';
import { ConsumeOptions } from '../type';
export declare function buildDriverConsumeOptions(options: ConsumeOptions): Options.Consume;
"use strict";
var __assign = (this && this.__assign) || function () {
__assign = Object.assign || function(t) {
for (var s, i = 1, n = arguments.length; i < n; i++) {
s = arguments[i];
for (var p in s) if (, p))
t[p] = s[p];
return t;
return __assign.apply(this, arguments);
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(; } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());
var __generator = (this && this.__generator) || function (thisArg, body) {
var _ = { label: 0, sent: function() { if (t[0] & 1) throw t[1]; return t[1]; }, trys: [], ops: [] }, f, y, t, g;
return g = { next: verb(0), "throw": verb(1), "return": verb(2) }, typeof Symbol === "function" && (g[Symbol.iterator] = function() { return this; }), g;
function verb(n) { return function (v) { return step([n, v]); }; }
function step(op) {
if (f) throw new TypeError("Generator is already executing.");
while (_) try {
if (f = 1, y && (t = op[0] & 2 ? y["return"] : op[0] ? y["throw"] || ((t = y["return"]) &&, 0) : && !(t =, op[1])).done) return t;
if (y = 0, t) op = [op[0] & 2, t.value];
switch (op[0]) {
case 0: case 1: t = op; break;
case 4: _.label++; return { value: op[1], done: false };
case 5: _.label++; y = op[1]; op = [0]; continue;
case 7: op = _.ops.pop(); _.trys.pop(); continue;
if (!(t = _.trys, t = t.length > 0 && t[t.length - 1]) && (op[0] === 6 || op[0] === 2)) { _ = 0; continue; }
if (op[0] === 3 && (!t || (op[1] > t[0] && op[1] < t[3]))) { _.label = op[1]; break; }
if (op[0] === 6 && _.label < t[1]) { _.label = t[1]; t = op; break; }
if (t && _.label < t[2]) { _.label = t[2]; _.ops.push(op); break; }
if (t[2]) _.ops.pop();
_.trys.pop(); continue;
op =, _);
} catch (e) { op = [6, e]; y = 0; } finally { f = t = 0; }
if (op[0] & 5) throw op[1]; return { value: op[0] ? op[1] : void 0, done: true };
Object.defineProperty(exports, "__esModule", { value: true });
exports.consumeQueue = void 0;
var config_1 = require("../config");
var utils_1 = require("../utils");
var static_1 = require("./static");
/* istanbul ignore next */
function consumeQueue(options, handlers) {
var _a, _b, _c, _d;
return __awaiter(this, void 0, void 0, function () {
var config, _e, channel, connection, queueName, assertionQueue, routingKeys, promises, consumeOptions;
var _this = this;
return __generator(this, function (_f) {
switch (_f.label) {
case 0:
config = (0, config_1.getConfig)(options.alias);
return [4 /*yield*/, (0, utils_1.createChannel)(config)];
case 1:
_e = _f.sent(), channel =, connection = _e.connection;
queueName = (_a = !== null && _a !== void 0 ? _a : '';
return [4 /*yield*/, channel.assertQueue(queueName, {
durable: false,
autoDelete: true,
case 2:
assertionQueue = _f.sent();
if (!(typeof options.routingKey !== 'undefined')) return [3 /*break*/, 4];
routingKeys = Array.isArray(options.routingKey) ? options.routingKey : [options.routingKey];
promises = routingKeys
.map(function (routKey) { return channel.bindQueue(assertionQueue.queue,, routKey); });
return [4 /*yield*/, Promise.all(promises)];
case 3:
_f.label = 4;
case 4:
consumeOptions = __assign(__assign({}, ((_c = (_b = config.consume) === null || _b === void 0 ? void 0 : _b.options) !== null && _c !== void 0 ? _c : {})), ((_d = options.options) !== null && _d !== void 0 ? _d : {}));
return [4 /*yield*/, channel.consume(assertionQueue.queue, ((function (message) { return __awaiter(_this, void 0, void 0, function () {
var content, handler, context, requeueOnFailure, e_1;
var _a, _b, _c;
return __generator(this, function (_d) {
switch (_d.label) {
case 0:
if (!message) {
return [2 /*return*/];
content = JSON.parse(message.content.toString('utf-8'));
handler = (_a = handlers[content.type]) !== null && _a !== void 0 ? _a : handlers[static_1.ConsumeHandlerAnyKey];
context = {
channel: channel,
connection: connection,
messageFields: message.fields,
requeueOnFailure = (_c = (_b = config.consume) === null || _b === void 0 ? void 0 : _b.requeueOnFailure) !== null && _c !== void 0 ? _c : false;
if (typeof handler === 'undefined') {
channel.reject(message, requeueOnFailure);
return [2 /*return*/];
_d.label = 1;
case 1:
_d.trys.push([1, 3, , 4]);
return [4 /*yield*/, handler(content, context)];
case 2:
return [3 /*break*/, 4];
case 3:
e_1 = _d.sent();
channel.reject(message, requeueOnFailure);
return [3 /*break*/, 4];
case 4: return [2 /*return*/];
}); })), consumeOptions)];
case 5:
return [2 /*return*/];
exports.buildDriverConsumeOptions = void 0;
const utils_1 = require("../utils");
function buildDriverConsumeOptions(options) {
return (0, utils_1.removeKeysFromOptions)({ ...options }, [
exports.consumeQueue = consumeQueue;
exports.buildDriverConsumeOptions = buildDriverConsumeOptions;
export * from './config';
export * from './consume';
export * from './connection';
export * from './message';
export * from './exchange';
export * from './publish';
export * from './type';
"use strict";
* Copyright (c) 2022.
* Author Peter Placzek (tada5hi)
* For the full copyright and license information,
* view the LICENSE file that was distributed with this source code.
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) {

@@ -20,4 +26,4 @@ if (k2 === undefined) k2 = k;

__exportStar(require("./connection"), exports);
__exportStar(require("./message"), exports);
__exportStar(require("./exchange"), exports);
__exportStar(require("./publish"), exports);
__exportStar(require("./type"), exports);

@@ -1,2 +0,2 @@

export * from './utils';
export * from './module';
export * from './type';
"use strict";
* Copyright (c) 2022.
* Author Peter Placzek (tada5hi)
* For the full copyright and license information,
* view the LICENSE file that was distributed with this source code.
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) {

@@ -17,4 +23,3 @@ if (k2 === undefined) k2 = k;

Object.defineProperty(exports, "__esModule", { value: true });
__exportStar(require("./utils"), exports);
__exportStar(require("./module"), exports);
__exportStar(require("./type"), exports);

@@ -1,12 +0,13 @@

import { Options } from 'amqplib';
import { Config } from '../config';
export declare type PublishOptions = {
import { PublishOptions } from '../type';
export type PublishOptionsExtended = {
* Config key or object.
* Alias for: messageId
* Default: <generated uuid>
alias?: string | Config;
id?: string;
* Amqplib publish options.
* The message data.
options?: Options.Publish;
data: any;
} & PublishOptions;
"use strict";
* Copyright (c) 2022.
* Author Peter Placzek (tada5hi)
* For the full copyright and license information,
* view the LICENSE file that was distributed with this source code.
Object.defineProperty(exports, "__esModule", { value: true });

@@ -1,3 +0,3 @@

import { Message } from '../message';
import { PublishOptions } from './type';
export declare function publishMessage(message: Message, options?: PublishOptions): Promise<void>;
import { Options } from 'amqplib';
import { PublishOptionsExtended } from './type';
export declare function buildDriverPublishOptions(options: PublishOptionsExtended): Options.Publish;
"use strict";
var __assign = (this && this.__assign) || function () {
__assign = Object.assign || function(t) {
for (var s, i = 1, n = arguments.length; i < n; i++) {
s = arguments[i];
for (var p in s) if (, p))
t[p] = s[p];
return t;
return __assign.apply(this, arguments);
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(; } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());
var __generator = (this && this.__generator) || function (thisArg, body) {
var _ = { label: 0, sent: function() { if (t[0] & 1) throw t[1]; return t[1]; }, trys: [], ops: [] }, f, y, t, g;
return g = { next: verb(0), "throw": verb(1), "return": verb(2) }, typeof Symbol === "function" && (g[Symbol.iterator] = function() { return this; }), g;
function verb(n) { return function (v) { return step([n, v]); }; }
function step(op) {
if (f) throw new TypeError("Generator is already executing.");
while (_) try {
if (f = 1, y && (t = op[0] & 2 ? y["return"] : op[0] ? y["throw"] || ((t = y["return"]) &&, 0) : && !(t =, op[1])).done) return t;
if (y = 0, t) op = [op[0] & 2, t.value];
switch (op[0]) {
case 0: case 1: t = op; break;
case 4: _.label++; return { value: op[1], done: false };
case 5: _.label++; y = op[1]; op = [0]; continue;
case 7: op = _.ops.pop(); _.trys.pop(); continue;
if (!(t = _.trys, t = t.length > 0 && t[t.length - 1]) && (op[0] === 6 || op[0] === 2)) { _ = 0; continue; }
if (op[0] === 3 && (!t || (op[1] > t[0] && op[1] < t[3]))) { _.label = op[1]; break; }
if (op[0] === 6 && _.label < t[1]) { _.label = t[1]; t = op; break; }
if (t && _.label < t[2]) { _.label = t[2]; _.ops.push(op); break; }
if (t[2]) _.ops.pop();
_.trys.pop(); continue;
op =, _);
} catch (e) { op = [6, e]; y = 0; } finally { f = t = 0; }
if (op[0] & 5) throw op[1]; return { value: op[0] ? op[1] : void 0, done: true };
var __rest = (this && this.__rest) || function (s, e) {
var t = {};
for (var p in s) if (, p) && e.indexOf(p) < 0)
t[p] = s[p];
if (s != null && typeof Object.getOwnPropertySymbols === "function")
for (var i = 0, p = Object.getOwnPropertySymbols(s); i < p.length; i++) {
if (e.indexOf(p[i]) < 0 &&, p[i]))
t[p[i]] = s[p[i]];
return t;
Object.defineProperty(exports, "__esModule", { value: true });
exports.publishMessage = void 0;
var config_1 = require("../config");
var utils_1 = require("../utils");
/* istanbul ignore next */
function publishMessage(message, options) {
var _a, _b, _c, _d;
return __awaiter(this, void 0, void 0, function () {
var messageOptions, messagePayload, buffer, config, channel, publishOptions;
return __generator(this, function (_e) {
switch (_e.label) {
case 0:
messageOptions = message.options, messagePayload = __rest(message, ["options"]);
buffer = Buffer.from(JSON.stringify(messagePayload));
options !== null && options !== void 0 ? options : (options = {});
config = (0, config_1.getConfig)(options.alias);
return [4 /*yield*/, (0, utils_1.createChannel)(config)];
case 1:
channel = (_e.sent()).channel;
publishOptions = __assign(__assign(__assign({}, ((_b = (_a = config.publish) === null || _a === void 0 ? void 0 : _a.options) !== null && _b !== void 0 ? _b : {})), ((_c = messageOptions.publish) !== null && _c !== void 0 ? _c : {})), ((_d = options.options) !== null && _d !== void 0 ? _d : {}));
channel.publish(, messageOptions.routingKey, buffer, publishOptions);
return [2 /*return*/];
exports.buildDriverPublishOptions = void 0;
const utils_1 = require("../utils");
function buildDriverPublishOptions(options) {
return (0, utils_1.removeKeysFromOptions)({ ...options }, [
exports.publishMessage = publishMessage;
exports.buildDriverPublishOptions = buildDriverPublishOptions;

@@ -1,6 +0,1 @@

import { Channel, Connection } from 'amqplib';
import { Config } from './config';
export declare function createChannel(key: string | Config): Promise<{
channel: Channel;
connection: Connection;
export declare function removeKeysFromOptions<T extends Record<string, any>, K extends (keyof T)[]>(options: T, keys: K): Omit<T, K[number]>;
"use strict";
var __assign = (this && this.__assign) || function () {
__assign = Object.assign || function(t) {
for (var s, i = 1, n = arguments.length; i < n; i++) {
s = arguments[i];
for (var p in s) if (, p))
t[p] = s[p];
Object.defineProperty(exports, "__esModule", { value: true });
exports.removeKeysFromOptions = void 0;
const smob_1 = require("smob");
function removeKeysFromOptions(options, keys) {
for (let i = 0; i < keys.length; i++) {
if ((0, smob_1.hasOwnProperty)(options, keys[i])) {
delete options[keys[i]];
return t;
return __assign.apply(this, arguments);
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(; } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());
var __generator = (this && this.__generator) || function (thisArg, body) {
var _ = { label: 0, sent: function() { if (t[0] & 1) throw t[1]; return t[1]; }, trys: [], ops: [] }, f, y, t, g;
return g = { next: verb(0), "throw": verb(1), "return": verb(2) }, typeof Symbol === "function" && (g[Symbol.iterator] = function() { return this; }), g;
function verb(n) { return function (v) { return step([n, v]); }; }
function step(op) {
if (f) throw new TypeError("Generator is already executing.");
while (_) try {
if (f = 1, y && (t = op[0] & 2 ? y["return"] : op[0] ? y["throw"] || ((t = y["return"]) &&, 0) : && !(t =, op[1])).done) return t;
if (y = 0, t) op = [op[0] & 2, t.value];
switch (op[0]) {
case 0: case 1: t = op; break;
case 4: _.label++; return { value: op[1], done: false };
case 5: _.label++; y = op[1]; op = [0]; continue;
case 7: op = _.ops.pop(); _.trys.pop(); continue;
if (!(t = _.trys, t = t.length > 0 && t[t.length - 1]) && (op[0] === 6 || op[0] === 2)) { _ = 0; continue; }
if (op[0] === 3 && (!t || (op[1] > t[0] && op[1] < t[3]))) { _.label = op[1]; break; }
if (op[0] === 6 && _.label < t[1]) { _.label = t[1]; t = op; break; }
if (t && _.label < t[2]) { _.label = t[2]; _.ops.push(op); break; }
if (t[2]) _.ops.pop();
_.trys.pop(); continue;
op =, _);
} catch (e) { op = [6, e]; y = 0; } finally { f = t = 0; }
if (op[0] & 5) throw op[1]; return { value: op[0] ? op[1] : void 0, done: true };
Object.defineProperty(exports, "__esModule", { value: true });
exports.createChannel = void 0;
var config_1 = require("./config");
var connection_1 = require("./connection");
/* istanbul ignore next */
function createChannel(key) {
var _a, _b;
return __awaiter(this, void 0, void 0, function () {
var config, connection, channel, exchangeOptions;
return __generator(this, function (_c) {
switch (_c.label) {
case 0:
config = (0, config_1.getConfig)(key);
return [4 /*yield*/, (0, connection_1.useConnection)(config.alias)];
case 1:
connection = _c.sent();
return [4 /*yield*/, connection.createChannel()];
case 2:
channel = _c.sent();
exchangeOptions = __assign({ durable: true }, ((_b = (_a = === null || _a === void 0 ? void 0 : _a.options) !== null && _b !== void 0 ? _b : {}));
return [4 /*yield*/, channel.assertExchange(,, exchangeOptions)];
case 3:
return [2 /*return*/, {
channel: channel,
connection: connection,
return options;
exports.createChannel = createChannel;
exports.removeKeysFromOptions = removeKeysFromOptions;
"name": "amqp-extension",
"version": "1.0.5",
"version": "1.1.0",
"description": "An amqp extension with functions and utility functions to consume and publish queue messages.",

@@ -38,10 +38,19 @@ "main": "./dist/index.js",

"dependencies": {
"amqplib": "^0.10.2",
"uuid": "^8.3.2"
"amqplib": "^0.10.3",
"smob": "^0.1.0",
"uuid": "^9.0.0"
"devDependencies": {
"@tada5hi/eslint-config-typescript": "^1.0.5",
"@types/amqplib": "^0.8.2",
"@commitlint/cli": "^17.4.2",
"@commitlint/config-angular": "^17.4.2",
"@commitlint/config-conventional": "^17.4.2",
"@commitlint/cz-commitlint": "^17.4.2",
"@semantic-release/changelog": "^6.0.1",
"@semantic-release/git": "^10.0.1",
"@semantic-release/release-notes-generator": "^10.0.3",
"@tada5hi/eslint-config-typescript": "^1.1.1",
"@tada5hi/tsconfig": "^0.4.0",
"@types/amqplib": "^0.10.0",
"@types/jest": "^27.4.1",
"@types/node": "^18.7.9",
"@types/node": "^18.11.9",
"@types/uuid": "^8.3.4",

@@ -51,8 +60,14 @@ "codecov": "^3.8.3",

"cross-env": "^7.0.3",
"eslint": "^8.22.0",
"eslint": "^8.33.0",
"husky": "^8.0.2",
"jest": "^27.5.1",
"np": "^7.6.2",
"semantic-release": "^19.0.5",
"ts-jest": "^27.1.4",
"typescript": "^4.7.4"
"typescript": "^4.9.5"
"config": {
"commitizen": {
"path": "@commitlint/cz-commitlint"

@@ -0,7 +1,9 @@

# AMQP Extension 🏰
[![npm version](](
[![Master Workflow](](
[![Known Vulnerabilities](](
[![semantic-release: angular](](
# AMQP Extension 🏰
This is a library on top of the famous [amqplib]( library and defines a [message format](#message-types) for queue messages through a message broker across multiple standalone services.

@@ -14,17 +16,14 @@ All utility functions support the usage of multiple registered connections.

- [Usage](#usage)
- [Publish](#publish)
- [Consume](#consume)
- [Publish](#publish-to-queue)
- [Consume](#consume-queue)
- [Functions](#functions)
- [setConfig](#setconfig)
- [useConnection](#useconnection)
- [publishMessage](#publishmessage)
- [consumeQueue](#consumequeue)
- [publish](#publish)
- [consume](#consume)
- [Types](#types)
- [Config](#config-types)
- [Consume](#consume-types)
- [Message](#message-types)
- [Publish](#publish-types)
- [License](#license)
## Installation

@@ -38,13 +37,11 @@

### Publish
### Publish to Queue
To publish a queue message according the [Message Scheme](#message-types), use the `buildMessage` helper function
to build a message and the `publishMessage` function to submit it to the message broker.
The `publish` function allows you to send messages quickly.
Existing options can be added or overwritten
import {

@@ -62,20 +59,15 @@ } from "amqp-extension";

const message: Message = buildMessage({
type: 'resourceCreated',
options: {
routingKey: '<routing-key>'
// {id: 'xxxx-xxxx-xxxx-xxxx', type: 'resourceCreated', options: {routingKey: '<routing-key>'}, data: {}, metadata: {}}
(async () => {
await publishMessage(message);
await publish({
data: {
type: 'resourceCreated',
name: 'foo'
### Consume
### Consume Queue
To consume a queue message use the `consumeMessage` function. As first argument it accepts a configuration object
To consume a queue use the `consume` function. As first argument it accepts a configuration object
and as second argument and object to specify an async callback function handler for a specific message `type`.

@@ -85,7 +77,5 @@

import {

@@ -104,11 +94,14 @@ } from "amqp-extension";

const options: ConsumeOptions = {
routingKey: '<routing-key>'
exchange: {
routingKey: '<routing-key>'
(async () => {
await consumeQueue(options, {
resourceCreated: async (message: Message, messageContext: MessageContext) => {
// do some async operation :)
// {id: 'xxxx-xxxx-xxxx-xxxx', type: 'resourceCreated', data: {}, metadata: {}}
await consume(options, {
resourceCreated: async (message: ConsumeMessage) => {
const content = message.content.toString('utf-8');
const payload = JSON.parse(content);
// { type: 'resourceCreated', name: 'foo' }

@@ -126,8 +119,5 @@ });

import {

@@ -153,15 +143,20 @@ } from "amqp-extension";

(async () => {
await consume(
routingKey: '<routing-key>',
alias: 'foo' // <--- use another connection :)
// ... handlers
const consumeOptions: ConsumeOptions = {
routingKey: '<routing-key>',
alias: 'foo' // <--- use another connection :)
const publishOptions: PublishOptions = {
alias: 'foo' // <--- use another connection :)
(async () => {
await consumeQueue({/* handlers */}, consumeOptions);
await publishMessage({/* message */}, publishOptions);
await publish({
routingKey: '<routing-key>',
alias: 'foo', // <--- use another connection :)
data: {
foo: 'bar'

@@ -175,3 +170,3 @@

▸ `function` **setConfig**(`key?: string | Config`, `value?: Config`): `Config`
▸ `function` **setConfig**(`key?: string | ConfigInput`, `value?: ConfigInput`): `Config`

@@ -205,3 +200,3 @@ Register a connection as `default` alias or specify an `<alias>` as config property.

import {setConfig, useConnection} from "amqp-extension";
import { setConfig, useConnection } from "amqp-extension";

@@ -235,4 +230,4 @@ (async () => {

| Name | Description |
| :------ | :------ |
| Name | Description |

@@ -243,6 +238,6 @@

| Name | Type | Description |
| :------ | :------ | :------ |
| `key` | `string` or `Config` | Config object or alias of config. [more](#config-types) |
| `value` | `Config` | Config object. [more](#config-types) |
| Name | Type | Description |
| `key` | `string` or `ConfigInput` | Config object or alias of config. [more](#config-types) |
| `value` | `Config` | Config object. [more](#config-types) |

@@ -257,3 +252,3 @@ #### Returns

▸ `function` **useConnection**(`key?: string | Config`): `Promise<Connection>`
▸ `function` **useConnection**(`key?: string | ConfigInput`): `Promise<Connection>`

@@ -302,4 +297,4 @@ Either register a connection as `default` alias or specify an `alias` as config property.

| Name | Description |
| :------ | :------ |
| Name | Description |

@@ -310,4 +305,4 @@

| Name | Type | Description |
| :------ | :------ | :------ |
| Name | Type | Description |
| `key` | `string` or `Config` | Config or alias of config. [more](#config-types) |

@@ -321,5 +316,5 @@

### publishMessage
### publish
▸ `function` **publishMessage**(`message: Message`, `options?: PublishOptions`): `Promise<void>`
▸ `function` **publish**(`message: Message`, `options?: PublishOptions`): `Promise<void>`

@@ -333,18 +328,11 @@ Send the constructed queue message to the message broker.

import {
} from "amqp-extension";
const message: Message = buildMessage({
type: 'resourceCreated',
options: {
routingKey: '<routing-key>'
// {id: 'xxxx-xxxx-xxxx-xxxx', type: 'resourceCreated', options: {routingKey: '<routing-key>'}, data: {}, metadata: {}}
(async () => {
await publishMessage(message);
await publish({
data: {
type: 'resourceCreated'

@@ -355,11 +343,11 @@ ```

| Name | Description |
| :------ | :------ |
| Name | Description |
#### Parameters
| Name | Type | Description |
| :------ | :------ | :------ |
| `message` | `Message` | Constructed message object. [more](#message-types)|
| `options` | `PublishOptions` | Publish options. [more](#publish-types) |
| Name | Type | Description |
| `message` | `Message` | Constructed message object. |
| `options` | `PublishOptions` | Publish options. |

@@ -372,5 +360,5 @@ #### Returns

### consumeQueue
### consume
▸ `function` **consumeQueue**(`options: ConsumeOptions`, `cb: ConsumeHandlers`): `Promise<void>`
▸ `function` **consume**(`options: ConsumeOptions`, `cb: ConsumeHandlers`): `Promise<void>`

@@ -384,6 +372,5 @@ Send the constructed queue message to the message broker.

import {
} from "amqp-extension";

@@ -396,7 +383,5 @@

(async () => {
await consumeQueue(options, {
'<type>': async (message: Message, messageContext: MessageContext) => {
await consume(options, {
'<type>': async (message: ConsumeMessage) => {
// do some async action :)
// {id: 'xxxx-xxxx-xxxx-xxxx', type: 'resourceCreated', data: {}, metadata: {}}

@@ -410,4 +395,4 @@ });

| Name | Description |
| :------ | :------ |
| Name | Description |

@@ -418,6 +403,6 @@

| Name | Type | Description |
| :------ | :------ | :------ |
| `options` | `ConsumeOptions` | Consume options. [more](#consume-types)|
| `handlers` | `ConsumeHandlers` | Handlers object. [more](#consume-types) |
| Name | Type | Description |
| `options` | `ConsumeOptions` | Consume options. ) |
| `handlers` | `ConsumeHandlers` | Handlers object. |

@@ -435,121 +420,22 @@ #### Returns

import {Options} from "amqplib";
import {PublishOptions} from "amqp-extension";
import { Options } from 'amqplib';
import { ExchangeOptions } from '../exchange';
import { ConsumeOptions, PublishOptions } from '../type';
export type ExchangeType = 'fanout' | 'direct' | 'topic' | 'match' | string;
export type Config = {
alias?: string,
alias: string,
connection: Options.Connect | string,
exchange: {
name: string,
type: ExchangeType,
options?: Options.AssertExchange
publish?: PublishOptions,
consume?: ConsumeOptions
exchange: ExchangeOptions,
publish: PublishOptions,
consume: ConsumeOptions
### Consume Types
import {Options} from "amqplib";
import {Config, MessageContext, Message} from "amqp-extension";
export type ConsumeHandler = (message: Message, context: MessageContext) => Promise<void>;
export type ConsumeHandlers = Record<'$any' | string, ConsumeHandler>;
export type ConsumeOptions = {
* Queue routing key(s).
routingKey?: string | string[],
* Config key or object.
alias?: string | Config,
* Queue name.
* Default: ''
name?: string,
* Amqplib consume options.
* Default: {}
options?: Options.Consume
export type ConfigInput = Partial<Exclude<Config, 'connection'>> &
Pick<Config, 'connection'>;
### Message Types
## License
import {Options} from "amqplib";
Made with 💚
export interface MessageOptions {
* Routing key for message broker.
routingKey?: string;
* Override default publish options.
publish?: Options.Publish;
export type Message = {
* Routing information for amqp library.
* This property will be removed, before it is passed to the message queue.
options?: MessageOptions;
* Default: <generated uuid>
id: string;
* Event- or Command-name.
type: string;
* Metadata object to provide details for the message broker.
* Default: {}
metadata: Record<string, any>;
* The message data.
* Default: {}
data: Record<string, any>;
### Publish Types
import {Options} from "amqplib";
import {Config} from "amqp-extension";
export type PublishOptions = {
* Config key or object.
alias?: string | Config;
* Amqplib publish options.
options?: Options.Publish;
Published under [MIT License](./LICENSE).

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo


  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog



Stay in touch

Get open source security insights delivered straight into your inbox.

  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc