Security News
Fluent Assertions Faces Backlash After Abandoning Open Source Licensing
Fluent Assertions is facing backlash after dropping the Apache license for a commercial model, leaving users blindsided and questioning contributor rights.
amqp-extension
Advanced tools
An amqp extension with functions and utility functions to consume and publish queue messages.
This is a library on top of the famous amqplib library and defines a message format for queue messages through a message broker across multiple standalone services. All utility functions support the usage of multiple registered connections.
Table of Contents
npm install amqp-extension --save
To publish a queue message according the Message Scheme, use the buildMessage
helper function
to build a message and the publishMessage
function to submit it to the message broker.
import {
buildMessage,
Message,
useConnection,
publishMessage,
setConfig
} from "amqp-extension";
// This will set the default connection :)
setConfig({
connection: 'amqp://<user>:<password>@<host>',
exchange: {
name: '<name>',
type: 'topic'
}
});
const message: Message = buildMessage({
type: 'resourceCreated',
options: {
routingKey: '<routing-key>'
}
});
console.log(message);
// {id: 'xxxx-xxxx-xxxx-xxxx', type: 'resourceCreated', options: {routingKey: '<routing-key>'}, data: {}, metadata: {}}
(async () => {
await publishMessage(message);
})();
To consume a queue message use the consumeMessage
function. As first argument it accepts a configuration object
and as second argument and object to specify an async callback function handler for a specific message type
.
import {
buildMessage,
ConsumeOptions,
Message,
MessageContext,
publishMessage,
setConfig
} from "amqp-extension";
// This will set the default connection :)
setConfig({
connection: 'amqp://<user>:<password>@<host>',
exchange: {
name: '<name>',
type: 'topic'
}
});
const options: ConsumeOptions = {
routingKey: '<routing-key>'
}
(async () => {
await consumeQueue(options, {
resourceCreated: async (message: Message, messageContext: MessageContext) => {
// do some async operation :)
console.log(message);
// {id: 'xxxx-xxxx-xxxx-xxxx', type: 'resourceCreated', data: {}, metadata: {}}
}
});
})();
To define multiple concurrent connections just specify a different alias
in the configuration object, so it does not
get overwritten.
import {
buildMessage,
ConsumeOptions,
Message,
MessageContext,
publishMessage,
PublishOptions,
setConfig
} from "amqp-extension";
// This will set the default connection :)
setConfig({
connection: 'amqp://<user>:<password>@<host>',
exchange: {
name: '<name>',
type: 'topic'
}
});
setConfig({
alias: 'foo',
connection: 'amqp://<user>:<password>@<host>',
exchange: {
name: '<name>',
type: 'topic'
}
});
const consumeOptions: ConsumeOptions = {
routingKey: '<routing-key>',
alias: 'foo' // <--- use another connection :)
}
const publishOptions: PublishOptions = {
alias: 'foo' // <--- use another connection :)
}
(async () => {
await consumeQueue({/* handlers */}, consumeOptions);
await publishMessage({/* message */}, publishOptions);
})();
▸ function
setConfig(key?: string | Config
, value?: Config
): Config
Register a connection as default
alias or specify an <alias>
as config property.
Simple
Define the default connection config.
import {setConfig, useConnection} from "amqp-extension";
(async () => {
setConfig({
connection: 'amqp://<user>:<password>@<host>',
exchange: {
name: '<name>',
type: 'topic'
}
});
const connection = await useConnection();
})();
Define a non default connection.
import {setConfig, useConnection} from "amqp-extension";
(async () => {
setConfig({
alias: '<alias>',
connection: 'amqp://<user>:<password>@<host>',
exchange: {
name: '<name>',
type: 'topic'
}
});
setConfig('foo', {
connection: 'amqp://<user>:<password>@<host>',
exchange: {
name: '<name>',
type: 'topic'
}
});
const connection = await useConnection('<alias>');
const fooConnection = await useConnection('foo');
})();
Name | Description |
---|
Name | Type | Description |
---|---|---|
key | string or Config | Config object or alias of config. more |
value | Config | Config object. more |
Config
The function returns the Config object.
▸ function
useConnection(key?: string | Config
): Promise<Connection>
Either register a connection as default
alias or specify an alias
as config property.
If you have registered a connection you can receive the connection by specifying no arguments or provide an alias name if specified.
Simple
Receive underlying driver connection.
import {useConnection} from "amqp-extension";
(async () => {
const connection = await useConnection();
})();
Advanced
Use a none default
connection.
import {setConfig, useConnection} from "amqp-extension";
(async () => {
setConfig({
alias: '<alias>',
connection: 'amqp://<user>:<password>@<host>',
exchange: {
name: '<name>',
type: 'topic'
}
});
const connection = await useConnection('<alias>');
})();
Name | Description |
---|
Name | Type | Description |
---|---|---|
key | string or Config | Config or alias of config. more |
Promise<Connection>
The function returns the Connection object of the amqplib
.
▸ function
publishMessage(message: Message
, options?: PublishOptions
): Promise<void>
Send the constructed queue message to the message broker. As second parameter a registered config can be used by specifying the alias or provide the full config object.
Simple
import {
buildMessage,
publishMessage
} from "amqp-extension";
const message: Message = buildMessage({
type: 'resourceCreated',
options: {
routingKey: '<routing-key>'
}
});
console.log(message);
// {id: 'xxxx-xxxx-xxxx-xxxx', type: 'resourceCreated', options: {routingKey: '<routing-key>'}, data: {}, metadata: {}}
(async () => {
await publishMessage(message);
})();
Name | Description |
---|
Name | Type | Description |
---|---|---|
message | Message | Constructed message object. more |
options | PublishOptions | Publish options. more |
Promise<void>
The function does not return a value.
▸ function
consumeQueue(options: ConsumeOptions
, cb: ConsumeHandlers
): Promise<void>
Send the constructed queue message to the message broker. As second parameter a registered config can be used by specifying the alias or provide the full config object.
Simple
import {
consumeQueue,
ConsumeOptions,
Message,
MessageContext
} from "amqp-extension";
const options: ConsumeOptions = {
routingKey: '<routing-key>'
}
(async () => {
await consumeQueue(options, {
'<type>': async (message: Message, messageContext: MessageContext) => {
// do some async action :)
console.log(message);
// {id: 'xxxx-xxxx-xxxx-xxxx', type: 'resourceCreated', data: {}, metadata: {}}
}
});
})();
Name | Description |
---|
Name | Type | Description |
---|---|---|
options | ConsumeOptions | Consume options. more |
handlers | ConsumeHandlers | Handlers object. more |
Promise<void>
The function does not return a value.
import {Options} from "amqplib";
import {PublishOptions} from "amqp-extension";
export type ExchangeType = 'fanout' | 'direct' | 'topic' | 'match' | string;
export type Config = {
alias?: string,
connection: Options.Connect | string,
exchange: {
name: string,
type: ExchangeType,
options?: Options.AssertExchange
},
publish?: PublishOptions,
consume?: ConsumeOptions
};
import {Options} from "amqplib";
import {Config, MessageContext, Message} from "amqp-extension";
export type ConsumeHandler = (message: Message, context: MessageContext) => Promise<void>;
export type ConsumeHandlers = Record<'$any' | string, ConsumeHandler>;
export type ConsumeOptions = {
/**
* Queue routing key(s).
*/
routingKey?: string | string[],
/**
* Config key or object.
*/
alias?: string | Config,
/**
* Queue name.
*
* Default: ''
*/
name?: string,
/**
* Amqplib consume options.
*
* Default: {}
*/
options?: Options.Consume
}
import {Options} from "amqplib";
export interface MessageOptions {
/**
* Routing key for message broker.
*/
routingKey?: string;
/**
* Override default publish options.
*/
publish?: Options.Publish;
}
export type Message = {
/**
* Routing information for amqp library.
* This property will be removed, before it is passed to the message queue.
*/
options?: MessageOptions;
/**
*
* Default: <generated uuid>
*/
id: string;
/**
* Event- or Command-name.
*/
type: string;
/**
* Metadata object to provide details for the message broker.
*
* Default: {}
*/
metadata: Record<string, any>;
/**
* The message data.
*
* Default: {}
*/
data: Record<string, any>;
};
import {Options} from "amqplib";
import {Config} from "amqp-extension";
export type PublishOptions = {
/**
* Config key or object.
*/
alias?: string | Config;
/**
* Amqplib publish options.
*/
options?: Options.Publish;
}
FAQs
An amqp extension with functions and utility functions to consume and publish queue messages.
The npm package amqp-extension receives a total of 31 weekly downloads. As such, amqp-extension popularity was classified as not popular.
We found that amqp-extension demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Fluent Assertions is facing backlash after dropping the Apache license for a commercial model, leaving users blindsided and questioning contributor rights.
Research
Security News
Socket researchers uncover the risks of a malicious Python package targeting Discord developers.
Security News
The UK is proposing a bold ban on ransomware payments by public entities to disrupt cybercrime, protect critical services, and lead global cybersecurity efforts.