Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@scriptollc/amqp-worker

Package Overview
Dependencies
Maintainers
5
Versions
7
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@scriptollc/amqp-worker

Class for creating AMQP connections

  • 1.3.2
  • latest
  • Source
  • npm
  • Socket score

Version published
Weekly downloads
8
increased by100%
Maintainers
5
Weekly downloads
 
Created
Source

amqp-worker

Build Status

A base worker class for connecting to and dealing with RabbitMQ.

This is mostly a convenience wrapper around amqplib.

Install

Requirements

  • Node.js 8+ (for async/await)
  • RabbitMQ
npm install @scriptollc/amqp-worker

Usage

const QueueWorker = require('@scriptollc/amqp-worker')

class MyWorker extends QueueWorker {
  constructor () {
    super()
    this.queue = 'my-queue'
  }

  messageHandler (msg) {
    const data = msg.content
    this.channel.ack(data)
  }
}

const worker = new MyWorker()

worker.listen()
  .then(() => console.log('listening!'))
  .catch((err) => console.log('ERROR!', err))

If you're not into ES6 Classes, you can use a function and the prototype chain to manage this as well. Since this is an ES6 class however, prototyping requires the use of Reflect:

const QueueWorker = require('@scriptollc/amqp-worker')

function MyWorker () {
  Object.assign(this, Reflect.construct(QueueWorker, arguments, MyWorker))
  this.queue = 'my-queue'
}

Reflect.setPrototypeOf(MyWorker.prototype, QueueWorker.prototype)

MyWorker.prototype.messageHandler = function (msg) {
  const data = msg.content
  this.channel.ack(data)
}

const worker = new MyWorker()
worker.listen()
  .then(() => console.log('listening!'))
  .catch((err) => console.log('ERROR!', err))

The upside to this construct is that it works for both ES6 style classes and traditional JS function prototypes.

API

new QueueWorker(host?:string, port?:number, opts?:object):QueueWorker

Create a new instance of a queue worker. Optionally pass in a hostname and a port for the RabbitMQ server. These may also be specified in environment variables:

  • RABBIT_MQ_HOST - default: localhost
  • RABBIT_MQ_POST - default: 5672

An object of objects may be passed in to provide default options for asserting a queue, consuming a queue or sending a message to a queue. Options may be overridden or additional options may be provided at runtime

  • opts.assertOpts - default options for this.channel.assertQueue
  • opts.consumeOpts - default options for this.channel.consume
  • opts.sendOpts - default options for this.channel.sendToQueue

All queue worker instances must implement:

  • messageHandler(msg:object):undefined - the message handler
  • queue:string - the name of the queue

They may as well implement:

  • handleError(err:Error):undefined - error handler (default: throws errors)
  • beforeDisconnect():Promise - do something before disconnection
  • serializeMessage():Buffer - called on the msg passed in to sendMessage. noop by default, allows you to control how messages get changed into Buffers

Methods

async QueueWorker#initialize():Promise

Connect to the specified RabbitMQ server and attach a listener for error messages on the connection

async QueueWorker#getChannel():Promise

Create a channel on the active connection, or, make a connection if one doesn't exist and create a channel

async QueueWorker#disconnect():Promise

Disconnect the channel and the server, running the optional beforeDisconnect handler.

async QueueWorker#listen(assertOpts?:object, consumeOpts?:object):Promise

Assert the specified queue and attach the messageHandler as a queue consumer.

Will create a connection and/or a channel as necessary. The options for assertQueue and consume are the same as for the amqplib functions.

async QueueWorker#sendMessage(msg:Buffer|Any, sendOpts?object):Promise

Send a message to the queue, using the specified options, if any. Your message must either be an instance of the Buffer object, or you must have overridden QueueWorker#serializeMessage to return a Buffer.

You may use a different channel to send a message to a queue by passing in a channel key on the sendOpts object.

QueueWorker#messageHandler(msg:Object):undefined

This MUST be implemented! Handle messages coming in from RabbitMQ. The msg object is the same as provided for the consume method in amqplib:

{
  content: Buffer,
  fields: Object,
  properties: Object
}

You can use this.channel.ack and this.channel.nack (or any of the channel methods) to tell the server you've handled (or rejected) the message.

async QueueWorker#beforeDisconnect():Promise

This MAY be implemented

Do something before disconnecting the channel and the connecftion

async QueueWorker#handleError(err:Error):undefined

This MAY be implemented

Do something with any errors that might be thrown

Properties

QueueWorker.queue:string

What queue to use when asserting

License

Copyright © 2018, Scripto LLC. Apache-2.0 licensed.

Keywords

FAQs

Package last updated on 18 Apr 2018

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc