Socket
Socket
Sign inDemoInstall

sqs-batch

Package Overview
Dependencies
42
Maintainers
1
Versions
2
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

    sqs-batch

A wrapper on top of AWS SQS (Amazon Simple Queue Service) including a memory buffer for batch message processing


Version published
Weekly downloads
0
decreased by-100%
Maintainers
1
Install size
69.3 MB
Created
Weekly downloads
 

Readme

Source

sqs-batch

An AWS.SQS wrapper that let's you build queue-based applications easier. It has a built-in memory buffer to extend the 10 batch limit.

Installation

npm install sqs-buffer

Usage

const Receiver = require('sqs-buffer')

const worker = new Receiver({
  queueUrl: '...',
  messageReceiver: (messages, done) => {
    // process message(s)
    done(messages)
  }
})

worker.start()
  • By default the queue will be polled one message at a time using sqs long polling.
  • In order to process multiple messages either specify batchSize and/or bufferSize options explained below
  • The message parameter in the messageReceiver callback contains an array of messages even if there's only one message polled.
  • If no bufferSize specified then the next polling will be made after the current one is process and acknowledged by calling done(messages).
  • Calling done(err) with an error will leave the message in the queue and the processing:error event will be fired. See events below.
  • In order to delete a message form the queue after it has been processed, call done(messages) where messages can be either one message or an array of messages.

Credentials

By default it uses the Environment Variables credentials as specified here. In order to specify them manually you can do:

const AWS = require('aws-sdk')
const Receiver = require('sqs-buffer')

const SQS = new AWS.SQS({
  region: 'eu-west-1',
  accessKeyId: '...',
  secretAccessKey: '...'
})

const worker = new Receiver({
  queueUrl: '...',
  sqs: SQS,
  messageReceiver: (messages, done) => {
    // ...
    done(messages)
  }
})

worker.start()

API

new Receiver(options)

Creates a new SQS receiver instance

options
  • queueUrl - String - REQUIRED - SQS queue URL
  • messageReceiver - Function - REQUIRED - A callback function to be called when a message is received or the buffer is flushed. (messageReceiver(messages, done)).
  • batchSize - Number - (default 1) - The number of messages to poll from SQS. Max. 10.
  • waitTimeSeconds - Number - The duration (in seconds) for which the call will wait for a message to arrive in the queue before returning reference.
  • visibilityTimeout - Number - The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request reference.
  • attributeNames - Array - A list of attributes that need to be returned along with each message reference.
  • messageAttributeNames - Array - A list of message attributes to include in the response reference.
  • authenticationErrorTimeout - Number - (default 10000) - The duration (in milliseconds) to wait before making another request after an authentication error.
  • bufferSize - Number - The number of messages to be placed in buffer before processing. Should be greater than batchSize.
  • bufferTimeout - Number - The duration (in milliseconds) to wait before the buffer is flushed.
  • sqs - Object - An AWS.SQS instance in case you want to configure it's options manually.
  • buffer - Object - (default memory) - A class instance used to store the buffer into. More info bellow

receiver.start()

Start message polling

receiver.stop()

Stop message polling

Events

EventParamsDescription
errorerrFired when a request error occurs
message:receivedmessageFired when new message(s) received
processing:errorerrFired when you call don(error)
message:processedmessageFired when the message(s) is/are processed
stoppedNoneFired when receiver is stopped

Buffer

The buffer is used to temporarily store SQS messages in order to be later processed in batches larger than the Amazon's 10 messages per batch limit. The buffer fires a flush event wich is then captured by the Receiver and then proxied to the messageReceiver(messages, done) callback. The flush event is fired either on bufferTimeout or bufferSize reached.

API

new Buffer(options)

Creates a new buffer instance

options
  • bufferSize - Number - Buffer size.
  • bufferTimeout - Number - The duration (in milliseconds) after which the buffer will flush.
Buffer.add(messages)
  • messages - |[] - The data type of each item is constrained by Amazon SQS. It can be a single item or an array of items.
Events
EventParamsDescription
flushmessagesFired with either bufferSize is full or bufferTimeout reached.

Custom Buffer

In order to use your own custom buffer (redis, mongo etc.) you would need to implement two things:

  1. Have a Buffer.add(messages) method that stores messages into the buffer
  2. Fire a flush event when the buffer is full or timer completed.

Your custom buffer

class MyBuffer extends EventEmitter {
    constructor (options) {
        super()
        // constructor
        
        this.buffer = []
    }
    
    add (messages) {
        // add messages into this.buffer
        
        // on bufferSize reached
        this.emit('flush', this.buffer)
        
        // on bufferTimeout
        this.emit('flush', this.buffer)
    }
}
const worker = new Receiver({
  queueUrl: '...',
  buffer: new MyBuffer(options),
  messageReceiver: (messages, done) => {
    // will be called once buffer is filled or timed out.
    done(messages)
  }
})

worker.start()

Licence

MIT Licence Š Copyright 2016 C8 MANAGEMENT LIMITED

Keywords

FAQs

Last updated on 06 Sep 2016

Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc