New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

queues

Package Overview
Dependencies
Maintainers
1
Versions
10
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

queues - npm Package Compare versions

Comparing version 0.2.1 to 0.3.1

lib/backoff.js

153

lib/sqs.js

@@ -7,4 +7,6 @@

var Emitter = require('event-emitter');
var SQS = require('aws-sqs');
//var SQS = require('aws-sqs');
var AWS = require('aws-sdk');
var debug = require('debug')('queues:sqs');
var Backoff = require('./backoff');

@@ -28,3 +30,8 @@ /**

this.client = new SQS(id, secret);
this.client = new AWS.SQS({
accessKeyId: id,
secretAccessKey: secret,
region: 'us-east-1'
});
this.queues = {};

@@ -80,4 +87,5 @@ }

this.client.createQueue(this.name, {},
function(err, res) {
this.client.createQueue({
QueueName: this.name
}, function(err, res) {
if (err) {

@@ -88,6 +96,8 @@ debug('Failed to initialize queue', err);

var id = res.QueueUrl;
// initialize queue
self.init(res);
self.init(id);
debug('connected, waiting for messages..');
debug('connected to queue %s', id);

@@ -100,2 +110,23 @@ self.emit('connected');

/**
* Define backoff strategy
*
* @param {String} type - linear, exponential.
* @param {Integer} max - max seconds backoff.
*/
Queue.prototype.idleBackoff = function(type, max) {
var backoff;
if (type === 'linear') {
backoff = new Backoff.Linear(max);
} else {
throw new Error('invalid idle backoff strategy');
}
this._idleBackoff = backoff;
return this;
};
/**
* Pull messages from the queue

@@ -107,6 +138,9 @@ *

Queue.prototype.pull = function() {
this.client.receiveMessage(this.id, {
maxNumberOfMessages: 5
debug('polling queue');
this.client.receiveMessage({
QueueUrl: this.id,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 20
}, this.onMessage.bind(this));
};

@@ -154,4 +188,10 @@

if (body) {
body.forEach(function(msg){
debug('received messages %j', body);
var messages = body.Messages || [];
var count = messages.length;
if (messages) {
debug('received %s messages', count);
messages.forEach(function(msg){
self.emit('message', msg);

@@ -161,3 +201,15 @@ });

this.pull();
if (this._idleBackoff) {
if (!count) {
var secs = this._idleBackoff.next();
debug('idle, backing-off %s seconds', secs);
setTimeout(this.pull.bind(this), secs*1000);
} else {
this._idleBackoff.reset();
this.pull();
}
} else {
this.pull();
}
};

@@ -172,7 +224,16 @@

Queue.prototype.post = function(msg, fn) {
debug('posting message');
var params = {
QueueUrl: this.id,
MessageBody: JSON.stringify(msg)
};
this.client.sendMessage(this.id, JSON.stringify(msg), null,
function(err, res) {
debug('message posted');
debug('posting message %j', params, this.id);
this.client.sendMessage(params, function(err, res) {
if (err) {
debug('error posting message %j', err);
} else {
debug('message posted ok');
}
fn && fn(err, res);

@@ -192,3 +253,6 @@ }

this.client.deleteMessage(this.id, receipt, function(err, body) {
this.client.deleteMessage({
QueueUrl: this.id,
ReceiptHandle: receipt
}, function(err, body) {
if (err)

@@ -201,26 +265,33 @@ debug('could not delete "%s"', receipt, err);

/**
// MAIN
//var creds = {
// id: 'XXX',
// secret: 'YYY'
//};
//
//var provider = new Provider(creds.id, creds.secret);
//var queue = provider.get('dev-notifications');
//
//queue.on('connected', function() {
// queue.post('hello!', function(err, res) {
// if (err)
// debug('error posting callback %j, %j', err, res);
// else
// debug('posted message with id', res.MessageId);
// });
//});
//
//queue.on('message', function(msg) {
// debug('message received', msg.Body);
// queue.remove(msg);
//});
//
//queue.connect();
//
var creds = {
"id": "XXX",
"secret": "XXX"
};
var provider = new Provider(creds.id, creds.secret);
var queue = provider.get('dev-notifications');
queue.on('connected', function() {
for (var i=0; i<10; i++) {
queue.post('hello!', function(err, res) {
if (err)
debug('error posting callback %j, %j', err, res);
else
debug('posted message with id', res.MessageId);
});
}
queue.start();
});
queue.idleBackoff('linear', 5);
queue.on('message', function(msg) {
debug('message received', msg.Body);
queue.remove(msg);
});
queue.connect();
**/
{
"name": "queues",
"version": "0.2.1",
"version": "0.3.1",
"description": "Collection of queue wrappers with a common interface.",

@@ -26,4 +26,5 @@ "main": "index.js",

"event-emitter": "^0.3.1",
"aws-sqs": "0.0.6"
"aws-sqs": "0.0.6",
"aws-sdk": "^2.0.11"
}
}

@@ -15,2 +15,5 @@ queues

// set a backoff strategy
q.idleBackoff('linear', 5);
q.connect();

@@ -17,0 +20,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc