Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

amqp-extension

Package Overview
Dependencies
Maintainers
1
Versions
26
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqp-extension

An amqp extension with functions and utility functions to consume and publish queue messages.

  • 2.0.0
  • Source
  • npm
  • Socket score

Version published
Weekly downloads
15
increased by25%
Maintainers
1
Weekly downloads
 
Created
Source

AMQP Extension 🏰

npm version codecov Master Workflow Known Vulnerabilities semantic-release: angular

This is a library on top of the famous amqplib library and defines a message format for queue messages through a message broker across multiple standalone services. All utility functions support the usage of multiple registered connections.

Table of Contents

Installation

npm install amqp-extension --save

Usage

Publish to Queue

The publish function allows you to send messages quickly. Existing options can be added or overwritten

import {
    useConnection,
    publish,
    setConfig
} from "amqp-extension";

// This will set the default connection :)
setConfig({
    connection: 'amqp://<user>:<password>@<host>',
    exchange: {
        name: '<name>',
        type: 'topic'
    }
});

(async () => {
    await publish({
        content: {
            type: 'resourceCreated',
            name: 'foo'
        }
    });
})();

Consume Queue

To consume a queue use the consume function. As first argument it accepts a configuration object and as second argument and object to specify an async callback function handler for a specific message type.

import {
    consume,
    ConsumeMessage,
    ConsumeOptions,
    setConfig
} from "amqp-extension";

// This will set the default connection :)
setConfig({
    connection: 'amqp://<user>:<password>@<host>',
    exchange: {
        name: '<name>',
        type: 'topic'
    }
});

const options: ConsumeOptions = {
    exchange: {
        routingKey: '<routing-key>'
    }
}

(async () => {
    await consume(options, {
        resourceCreated: async (message: ConsumeMessage) => {
            const content = message.content.toString('utf-8');
            const payload = JSON.parse(content);
            console.log(payload);
            // { type: 'resourceCreated', name: 'foo' }
        }
    });
})();

Multiple Connections

To define multiple concurrent connections just specify a different alias in the configuration object, so it does not get overwritten.

import {
    ConsumeOptions,
    publish,
    PublishOptionsExtended,
    setConfig
} from "amqp-extension";

// This will set the default connection :)
setConfig({
    connection: 'amqp://<user>:<password>@<host>',
    exchange: {
        name: '<name>',
        type: 'topic'
    }
});

setConfig({
    alias: 'foo',
    connection: 'amqp://<user>:<password>@<host>',
    exchange: {
        name: '<name>',
        type: 'topic'
    }
});
(async () => {
    await consume(
        {
            routingKey: '<routing-key>',
            alias: 'foo' // <--- use another connection :)
        },
        {
            // ... handlers
        }
    );

    await publish({
        routingKey: '<routing-key>',
        alias: 'foo', // <--- use another connection :)
        content: {
            foo: 'bar'
        }
    });
})();

Functions

setConfig

function setConfig(key?: string | ConfigInput, value?: ConfigInput): Config

Register a connection as default alias or specify an <alias> as config property.

Example

Simple

Define the default connection config.

import {setConfig, useConnection} from "amqp-extension";

(async () => {
    setConfig({
        connection: 'amqp://<user>:<password>@<host>',
        exchange: {
            name: '<name>',
            type: 'topic'
        }
    });

    const connection = await useConnection();
})();

Define a non default connection.

import { setConfig, useConnection } from "amqp-extension";

(async () => {
    setConfig({
        alias: '<alias>',
        connection: 'amqp://<user>:<password>@<host>',
        exchange: {
            name: '<name>',
            type: 'topic'
        }
    });

    setConfig('foo', {
        connection: 'amqp://<user>:<password>@<host>',
        exchange: {
            name: '<name>',
            type: 'topic'
        }
    });

    const connection = await useConnection('<alias>');
    const fooConnection = await useConnection('foo');
})();

Type parameters
NameDescription
Parameters
NameTypeDescription
keystring or ConfigInputConfig object or alias of config. more
valueConfigConfig object. more
Returns

Config

The function returns the Config object.

useConnection

function useConnection(key?: string | ConfigInput): Promise<Connection>

Either register a connection as default alias or specify an alias as config property. If you have registered a connection you can receive the connection by specifying no arguments or provide an alias name if specified.

Example

Simple

Receive underlying driver connection.

import {useConnection} from "amqp-extension";

(async () => {
    const connection = await useConnection();
})();

Advanced

Use a none default connection.

import {setConfig, useConnection} from "amqp-extension";

(async () => {
    setConfig({
        alias: '<alias>',
        connection: 'amqp://<user>:<password>@<host>',
        exchange: {
            name: '<name>',
            type: 'topic'
        }
    });

    const connection = await useConnection('<alias>');
})();

Type parameters
NameDescription
Parameters
NameTypeDescription
keystring or ConfigConfig or alias of config. more
Returns

Promise<Connection>

The function returns the Connection object of the amqplib.

publish

function publish(message: Message, options?: PublishOptions): Promise<void>

Send the constructed queue message to the message broker. As second parameter a registered config can be used by specifying the alias or provide the full config object.

Example

Simple

import {
    publish
} from "amqp-extension";

(async () => {
    await publish({
        content: {
            type: 'resourceCreated'
        }
    });
})();
Type parameters
NameDescription
Parameters
NameTypeDescription
messageMessageConstructed message object.
optionsPublishOptionsPublish options.
Returns

Promise<void>

The function does not return a value.

consume

function consume(options: ConsumeOptions, cb: ConsumeHandlers): Promise<void>

Send the constructed queue message to the message broker. As second parameter a registered config can be used by specifying the alias or provide the full config object.

Example

Simple

import {
    consume,
    ConsumeOptions,
    ConsumeMessage,
} from "amqp-extension";

const options: ConsumeOptions = {
    routingKey: '<routing-key>'
}

(async () => {
    await consume(options, {
        '<type>': async (message: ConsumeMessage) => {
            // do some async action :)
        }
    });
})();

Type parameters
NameDescription
Parameters
NameTypeDescription
optionsConsumeOptionsConsume options. )
handlersConsumeHandlersHandlers object.
Returns

Promise<void>

The function does not return a value.

Types

Config Types

import { Options } from 'amqplib';
import { ExchangeOptions } from '../exchange';
import { ConsumeOptions, PublishOptions } from '../type';

export type Config = {
    alias: string,
    connection: Options.Connect | string,
    exchange: ExchangeOptions,
    publish: PublishOptions,
    consume: ConsumeOptions
};

export type ConfigInput = Partial<Exclude<Config, 'connection'>> &
    Pick<Config, 'connection'>;

License

Made with 💚

Published under MIT License.

Keywords

FAQs

Package last updated on 03 Feb 2023

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc