AMQP Extension 🏰
This is a library on top of the famous amqplib library and defines a message format for queue messages through a message broker across multiple standalone services.
All utility functions support the usage of multiple registered connections.
Table of Contents
Installation
npm install amqp-extension --save
Usage
Publish
To publish a queue message according the Message Scheme, use the buildMessage
helper function
to build a message and the publishMessage
function to submit it to the message broker.
import {
buildMessage,
Message,
useConnection,
publishMessage,
setConfig
} from "amqp-extension";
setConfig({
connection: 'amqp://<user>:<password>@<host>',
exchange: {
name: '<name>',
type: 'topic'
}
});
const message: Message = buildMessage({
type: 'resourceCreated',
options: {
routingKey: '<routing-key>'
}
});
console.log(message);
(async () => {
await publishMessage(message);
})();
Consume
To consume a queue message use the consumeMessage
function. As first argument it accepts a configuration object
and as second argument and object to specify an async callback function handler for a specific message type
.
import {
buildMessage,
ConsumeOptions,
Message,
MessageContext,
publishMessage,
setConfig
} from "amqp-extension";
setConfig({
connection: 'amqp://<user>:<password>@<host>',
exchange: {
name: '<name>',
type: 'topic'
}
});
const options: ConsumeOptions = {
routingKey: '<routing-key>'
}
(async () => {
await consumeQueue(options, {
resourceCreated: async (message: Message, messageContext: MessageContext) => {
console.log(message);
}
});
})();
Multiple Connections
To define multiple concurrent connections just specify a different alias
in the configuration object, so it does not
get overwritten.
import {
buildMessage,
ConsumeOptions,
Message,
MessageContext,
publishMessage,
PublishOptions,
setConfig
} from "amqp-extension";
setConfig({
connection: 'amqp://<user>:<password>@<host>',
exchange: {
name: '<name>',
type: 'topic'
}
});
setConfig({
alias: 'foo',
connection: 'amqp://<user>:<password>@<host>',
exchange: {
name: '<name>',
type: 'topic'
}
});
const consumeOptions: ConsumeOptions = {
routingKey: '<routing-key>',
alias: 'foo'
}
const publishOptions: PublishOptions = {
alias: 'foo'
}
(async () => {
await consumeQueue({}, consumeOptions);
await publishMessage({}, publishOptions);
})();
Functions
setConfig
▸ function
setConfig(key?: string | Config
, value?: Config
): Config
Register a connection as default
alias or specify an <alias>
as config property.
Example
Simple
Define the default connection config.
import {setConfig, useConnection} from "amqp-extension";
(async () => {
setConfig({
connection: 'amqp://<user>:<password>@<host>',
exchange: {
name: '<name>',
type: 'topic'
}
});
const connection = await useConnection();
})();
Define a non default connection.
import {setConfig, useConnection} from "amqp-extension";
(async () => {
setConfig({
alias: '<alias>',
connection: 'amqp://<user>:<password>@<host>',
exchange: {
name: '<name>',
type: 'topic'
}
});
setConfig('foo', {
connection: 'amqp://<user>:<password>@<host>',
exchange: {
name: '<name>',
type: 'topic'
}
});
const connection = await useConnection('<alias>');
const fooConnection = await useConnection('foo');
})();
Type parameters
Parameters
Name | Type | Description |
---|
key | string or Config | Config object or alias of config. more |
value | Config | Config object. more |
Returns
Config
The function returns the Config object.
useConnection
▸ function
useConnection(key?: string | Config
): Promise<Connection>
Either register a connection as default
alias or specify an alias
as config property.
If you have registered a connection you can receive the connection by specifying no arguments or provide an alias name if specified.
Example
Simple
Receive underlying driver connection.
import {useConnection} from "amqp-extension";
(async () => {
const connection = await useConnection();
})();
Advanced
Use a none default
connection.
import {setConfig, useConnection} from "amqp-extension";
(async () => {
setConfig({
alias: '<alias>',
connection: 'amqp://<user>:<password>@<host>',
exchange: {
name: '<name>',
type: 'topic'
}
});
const connection = await useConnection('<alias>');
})();
Type parameters
Parameters
Name | Type | Description |
---|
key | string or Config | Config or alias of config. more |
Returns
Promise<Connection>
The function returns the Connection object of the amqplib
.
publishMessage
▸ function
publishMessage(message: Message
, options?: PublishOptions
): Promise<void>
Send the constructed queue message to the message broker.
As second parameter a registered config can be used by specifying the alias or provide the full config object.
Example
Simple
import {
buildMessage,
publishMessage
} from "amqp-extension";
const message: Message = buildMessage({
type: 'resourceCreated',
options: {
routingKey: '<routing-key>'
}
});
console.log(message);
(async () => {
await publishMessage(message);
})();
Type parameters
Parameters
Name | Type | Description |
---|
message | Message | Constructed message object. more |
options | PublishOptions | Publish options. more |
Returns
Promise<void>
The function does not return a value.
consumeQueue
▸ function
consumeQueue(options: ConsumeOptions
, cb: ConsumeHandlers
): Promise<void>
Send the constructed queue message to the message broker.
As second parameter a registered config can be used by specifying the alias or provide the full config object.
Example
Simple
import {
consumeQueue,
ConsumeOptions,
Message,
MessageContext
} from "amqp-extension";
const options: ConsumeOptions = {
routingKey: '<routing-key>'
}
(async () => {
await consumeQueue(options, {
'<type>': async (message: Message, messageContext: MessageContext) => {
console.log(message);
}
});
})();
Type parameters
Parameters
Name | Type | Description |
---|
options | ConsumeOptions | Consume options. more |
handlers | ConsumeHandlers | Handlers object. more |
Returns
Promise<void>
The function does not return a value.
Types
Config Types
import {Options} from "amqplib";
import {PublishOptions} from "amqp-extension";
export type ExchangeType = 'fanout' | 'direct' | 'topic' | 'match' | string;
export type Config = {
alias?: string,
connection: Options.Connect | string,
exchange: {
name: string,
type: ExchangeType,
options?: Options.AssertExchange
},
publish?: PublishOptions,
consume?: ConsumeOptions
};
Consume Types
import {Options} from "amqplib";
import {Config, MessageContext, Message} from "amqp-extension";
export type ConsumeHandler = (message: Message, context: MessageContext) => Promise<void>;
export type ConsumeHandlers = Record<'$any' | string, ConsumeHandler>;
export type ConsumeOptions = {
routingKey?: string | string[],
alias?: string | Config,
name?: string,
options?: Options.Consume
}
Message Types
import {Options} from "amqplib";
export interface MessageOptions {
routingKey?: string;
publish?: Options.Publish;
}
export type Message = {
options?: MessageOptions;
id: string;
type: string;
metadata: Record<string, any>;
data: Record<string, any>;
};
Publish Types
import {Options} from "amqplib";
import {Config} from "amqp-extension";
export type PublishOptions = {
alias?: string | Config;
options?: Options.Publish;
}