Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

amqp-extension

Package Overview
Dependencies
Maintainers
1
Versions
26
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqp-extension

An amqp extension with functions and utility functions to consume and publish queue messages.

  • 1.0.1
  • Source
  • npm
  • Socket score

Version published
Weekly downloads
33
decreased by-19.51%
Maintainers
1
Weekly downloads
 
Created
Source

npm version codecov Master Workflow

AMQP Extension 🏰

This is a library on top of the famous amqplib library and defines a message format for queue messages through a message broker across multiple standalone services. All utility functions support the usage of multiple registered connections.

Table of Contents

Installation

npm install amqp-extension --save

Usage

Publish

To publish a queue message according the Message Scheme, use the buildMessage helper function to build a message and the publishMessage function to submit it to the message broker.

import {
    buildMessage,
    Message,
    useConnection,
    publishMessage,
    setConfig
} from "amqp-extension";

// This will set the default connection :)
setConfig({
    connection: 'amqp://<user>:<password>@<host>',
    exchange: {
        name: '<name>',
        type: 'topic'
    }
});

const message: Message = buildMessage({
    type: 'resourceCreated',
    options: {
        routingKey: '<routing-key>'
    }
});

console.log(message);
// {id: 'xxxx-xxxx-xxxx-xxxx', type: 'resourceCreated', options: {routingKey: '<routing-key>'}, data: {}, metadata: {}}

(async () => {
    await publishMessage(message);
})();

Consume

To consume a queue message use the consumeMessage function. As first argument it accepts a configuration object and as second argument and object to specify an async callback function handler for a specific message type.

import {
    buildMessage,
    ConsumeOptions,
    Message,
    MessageContext,
    publishMessage,
    setConfig
} from "amqp-extension";

// This will set the default connection :)
setConfig({
    connection: 'amqp://<user>:<password>@<host>',
    exchange: {
        name: '<name>',
        type: 'topic'
    }
});

const options: ConsumeOptions = {
    routingKey: '<routing-key>'
}

(async () => {
    await consumeQueue(options, {
        resourceCreated: async (message: Message, messageContext: MessageContext) => {
            // do some async operation :)
            console.log(message);
            // {id: 'xxxx-xxxx-xxxx-xxxx', type: 'resourceCreated', data: {}, metadata: {}}
        }
    });
})();

Multiple Connections

To define multiple concurrent connections just specify a different alias in the configuration object, so it does not get overwritten.

import {
    buildMessage,
    ConsumeOptions,
    Message,
    MessageContext,
    publishMessage,
    PublishOptions,
    setConfig
} from "amqp-extension";

// This will set the default connection :)
setConfig({
    connection: 'amqp://<user>:<password>@<host>',
    exchange: {
        name: '<name>',
        type: 'topic'
    }
});

setConfig({
    alias: 'foo',
    connection: 'amqp://<user>:<password>@<host>',
    exchange: {
        name: '<name>',
        type: 'topic'
    }
});

const consumeOptions: ConsumeOptions = {
    routingKey: '<routing-key>',
    alias: 'foo' // <--- use another connection :)
}

const publishOptions: PublishOptions = {
    alias: 'foo' // <--- use another connection :)
}

(async () => {
    await consumeQueue({/* handlers */}, consumeOptions);
    await publishMessage({/* message */}, publishOptions);
})();

Functions

setConfig

function setConfig(key?: string | Config, value?: Config): Config

Register a connection as default alias or specify an <alias> as config property.

Example

Simple

Define the default connection config.

import {setConfig, useConnection} from "amqp-extension";

(async () => {
    setConfig({
        connection: 'amqp://<user>:<password>@<host>',
        exchange: {
            name: '<name>',
            type: 'topic'
        }
    });

    const connection = await useConnection();
})();

Define a non default connection.

import {setConfig, useConnection} from "amqp-extension";

(async () => {
    setConfig({
        alias: '<alias>',
        connection: 'amqp://<user>:<password>@<host>',
        exchange: {
            name: '<name>',
            type: 'topic'
        }
    });

    setConfig('foo', {
        connection: 'amqp://<user>:<password>@<host>',
        exchange: {
            name: '<name>',
            type: 'topic'
        }
    });

    const connection = await useConnection('<alias>');
    const fooConnection = await useConnection('foo');
})();

Type parameters
NameDescription
Parameters
NameTypeDescription
keystring or ConfigConfig object or alias of config. more
valueConfigConfig object. more
Returns

Config

The function returns the Config object.

useConnection

function useConnection(key?: string | Config): Promise<Connection>

Either register a connection as default alias or specify an alias as config property. If you have registered a connection you can receive the connection by specifying no arguments or provide an alias name if specified.

Example

Simple

Receive underlying driver connection.

import {useConnection} from "amqp-extension";

(async () => {
    const connection = await useConnection();
})();

Advanced

Use a none default connection.

import {setConfig, useConnection} from "amqp-extension";

(async () => {
    setConfig({
        alias: '<alias>',
        connection: 'amqp://<user>:<password>@<host>',
        exchange: {
            name: '<name>',
            type: 'topic'
        }
    });

    const connection = await useConnection('<alias>');
})();

Type parameters
NameDescription
Parameters
NameTypeDescription
keystring or ConfigConfig or alias of config. more
Returns

Promise<Connection>

The function returns the Connection object of the amqplib.

publishMessage

function publishMessage(message: Message, options?: PublishOptions): Promise<void>

Send the constructed queue message to the message broker. As second parameter a registered config can be used by specifying the alias or provide the full config object.

Example

Simple

import {
    buildMessage,
    publishMessage
} from "amqp-extension";

const message: Message = buildMessage({
    type: 'resourceCreated',
    options: {
        routingKey: '<routing-key>'
    }
});

console.log(message);
// {id: 'xxxx-xxxx-xxxx-xxxx', type: 'resourceCreated', options: {routingKey: '<routing-key>'}, data: {}, metadata: {}}

(async () => {
    await publishMessage(message);
})();
Type parameters
NameDescription
Parameters
NameTypeDescription
messageMessageConstructed message object. more
optionsPublishOptionsPublish options. more
Returns

Promise<void>

The function does not return a value.

consumeQueue

function consumeQueue(options: ConsumeOptions, cb: ConsumeHandlers): Promise<void>

Send the constructed queue message to the message broker. As second parameter a registered config can be used by specifying the alias or provide the full config object.

Example

Simple

import {
    consumeQueue,
    ConsumeOptions,
    Message,
    MessageContext
} from "amqp-extension";

const options: ConsumeOptions = {
    routingKey: '<routing-key>'
}

(async () => {
    await consumeQueue(options, {
        '<type>': async (message: Message, messageContext: MessageContext) => {
            // do some async action :)
            console.log(message);
            // {id: 'xxxx-xxxx-xxxx-xxxx', type: 'resourceCreated', data: {}, metadata: {}}
        }
    });
})();

Type parameters
NameDescription
Parameters
NameTypeDescription
optionsConsumeOptionsConsume options. more
handlersConsumeHandlersHandlers object. more
Returns

Promise<void>

The function does not return a value.

Types

Config Types

import {Options} from "amqplib";
import {PublishOptions} from "amqp-extension";

export type ExchangeType = 'fanout' | 'direct' | 'topic' | 'match' | string;

export type Config = {
    alias?: string,
    connection: Options.Connect | string,
    exchange: {
        name: string,
        type: ExchangeType,
        options?: Options.AssertExchange
    },
    publish?: PublishOptions,
    consume?: ConsumeOptions
};

Consume Types

import {Options} from "amqplib";
import {Config, MessageContext, Message} from "amqp-extension";

export type ConsumeHandler = (message: Message, context: MessageContext) => Promise<void>;
export type ConsumeHandlers = Record<'$any' | string, ConsumeHandler>;

export type ConsumeOptions = {
    /**
     * Queue routing key(s).
     */
    routingKey?: string | string[],

    /**
     * Config key or object.
     */
    alias?: string | Config,
    /**
     * Queue name.
     *
     * Default: ''
     */
    name?: string,
    /**
     * Amqplib consume options.
     *
     * Default: {}
     */
    options?: Options.Consume
}

Message Types

import {Options} from "amqplib";

export interface MessageOptions {
    /**
     * Routing key for message broker.
     */
    routingKey?: string;
    /**
     * Override default publish options.
     */
    publish?: Options.Publish;
}

export type Message = {
    /**
     * Routing information for amqp library.
     * This property will be removed, before it is passed to the message queue.
     */
    options?: MessageOptions;

    /**
     *
     * Default: <generated uuid>
     */
    id: string;

    /**
     * Event- or Command-name.
     */
    type: string;

    /**
     * Metadata object to provide details for the message broker.
     *
     * Default: {}
     */
    metadata: Record<string, any>;

    /**
     * The message data.
     *
     * Default: {}
     */
    data: Record<string, any>;
};

Publish Types

import {Options} from "amqplib";
import {Config} from "amqp-extension";

export type PublishOptions = {
    /**
     * Config key or object.
     */
    alias?: string | Config;

    /**
     * Amqplib publish options.
     */
    options?: Options.Publish;
}

Keywords

FAQs

Package last updated on 22 Dec 2021

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc