New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details →
Socket
Book a DemoSign in
Socket

akx-mq

Package Overview
Dependencies
Maintainers
1
Versions
3
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

akx-mq

This is a small library to handle rabbitmq messages. Mostly a wrapper.

latest
npmnpm
Version
0.1.1
Version published
Maintainers
1
Created
Source

AkxMQ

================================ A wrapper around '''amqplib''' to handle retries, adding bulk consumers, and connect style middleware.

#Installation

npm install akx-mq --save

Usage

file: akx-mq-config.js

var mqConfig = require( './config.json' );
var akxMq = require( 'akx-mq' )( mqConfig );
var logger = require( './logger' );

akxMq.addErrorHandling( function( err ){
    logger.log( 'mq', err );
} );

exports.akxMqMiddleware =  function(){
    var getMessage = function( req, res ){
        return res._body.wire.dataValues;
    };
    var callback = function( err, req, res, next ){
        if( error ){
            logger.log( 'mq', err, { req: req } )
        }
        return next();
    };
    return akxMq.publishMiddleware( { queue: 'wiresQ', getMessage: getMessage, callback: callback } );
};

exports.akxMqAddConsumer = akxMq.addConsumer;

Then in your routes file call the akx-mq-config.js file and create publisherMiddleware to publish messages to the queues, and add consumers to consume messages for they're respected queues.

file: routes.js

var akxMqConfig = require( './akx-mq-config' );

var users = require('../controllers/users');
server.post('/users', authMiddleware, users.create, akxMqConfig.akxMqMiddleware() );

akxMqConfig.akxMqAddConsumer( {
    queueNameHere: [ fn1, fn2, fn3, ... ],
    anotherQueueName: [ fn1, fn4, ... ]
} );

Note: The addConsumer function uses the keys as the queue names here so make sure that they are in the config file before adding them here. Also the array of functions as the value will be called in order synchronously (connect style middleware). Note that the publish and consumer share the same connection and channel, this is by design. The above example calls akxMqConfig.akxMqMiddleware() function returns back the middleware that will be used. Below are the api references and options.

API Reference

#publishMiddleware

instance.publishMiddleware({[queue, [getMessage, [callback]]]}) Returns a middleware function with the regular req, res, and next arguments.

Takes a POJO with three properties:

  • queue: String name of the queue you want to publish to.
  • getMessage: Function that are given the req, res objects to extract the message.
  • callback: Function that are given the req, res, and next arguments if an error occurs you can log.

example: instance.publishMiddleware({queue:'users', getMessage: getMessageFunc, callback: callbackFunc}) Note: All three are required.

#addConsumer

instance.addConsumer({[queue, [array of functions]]}) Adds consumers to the respected queue.

Takes a POJO. Uses the keys as the queue name and the values as the consumer.

  • queue {queueName:[ fn1, fn2, ...]}

example: instance.addConsume({queueName:[ fn1, fn2, ...]}) Note: that functions will be called in order with the arguments: queueName, parsedMsg, next. The next callback is similar to connect style but you can pass in next( true ) which will raise a flag that will ignore the rest of the functions and call the last function in the list. Else you can just call next() and it will call them in order one at a time. There is no limit to how many consumers you can add. Also the message wont be acknowledge until the last function has finished.

#addErrorHandling

instance.addErrorHandling([function]) Adds a catch all error handling function

Takes a function to be the general error handling function.

  • function: Just a regular function, named or anonymous.

example: instance.addErrorHandling(function(){ logger.log('message')}) Note: This is optional.

Options

PropertyDataTypeDefaultDescription
retryNumber60000How many milliseconds before retrying to connect to the server
hostString'amqp://localhost'The host for the server
persistentBooleanfalseIf you want the message to persist if server goes down
prefetchNumber0How many unAcked messages you want to allow before sending more down to the consumer
noAckBooleanfalseIf you want no Acknowledge meant
queuesArray[ ]An array of objects with 'name', and 'durable' properties

Keywords

rabbitmq

FAQs

Package last updated on 31 Jul 2017

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts