Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

servicebus

Package Overview
Dependencies
Maintainers
2
Versions
73
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

servicebus - npm Package Compare versions

Comparing version 1.0.21 to 2.0.1

138

bus/middleware/retry.js

@@ -1,139 +0,3 @@

var extend = require('extend');
var log = require('debug')('servicebus:retry');
var util = require('util');
var maxRetries = 3;
var localRejected = {};
var methodMax = {};
function createOnly (method, max) {
var calledByMethod = {};
methodMax[method] = max;
return function only (message) {
if (calledByMethod[method] === undefined) {
calledByMethod[method] = 1;
} else {
calledByMethod[method]++;
}
if (Object.keys(calledByMethod).length && calledByMethod[method] > methodMax[method]) {
var methods = Object.keys(calledByMethod).join(',');
throw new Error(util.format('message type: %s cid: %s handle already called with %s', message.content.type, message.content.cid, methods));
}
};
}
function retryLocal (channel, message, options, next) {
var onlyAckOnce = createOnly('ack', 1);
var onlyRejectMax = createOnly('reject', maxRetries);
if (options && options.ack) {
if ( ! message.properties.headers) message.properties.headers = {};
message.properties.headers.rejected = localRejected[message.content.cid];
message.content.handle = {
ack: function () {
onlyAckOnce(message);
channel.ack(message);
},
acknowledge: function () {
onlyAckOnce(message);
channel.ack(message);
},
reject: function () {
onlyRejectMax(message);
if (localRejected[message.content.cid] === undefined) {
localRejected[message.content.cid] = 1;
} else {
localRejected[message.content.cid] = localRejected[message.content.cid] + 1;
}
if (localRejected[message.content.cid] > maxRetries) {
var errorQueueName = util.format('%s.error', message.fields.routingKey);
log('sending message %s to error queue %s', message.content.cid, errorQueueName);
channel.sendToQueue(errorQueueName, new Buffer(JSON.stringify(message.content)), extend(options, { headers: { rejected: localRejected[message.content.cid] } }));
channel.reject(message, false);
delete localRejected[message.content.cid];
} else {
log('retrying message %s', message.content.cid);
channel.reject(message, true);
}
}
};
}
next(null, channel, message, options);
}
function retryDistributed (channel, message, options, next) {
if (message.properties.headers.rejected === undefined) {
message.properties.headers.rejected = 0;
}
var onlyAckOnce = createOnly('ack', 1);
var onlyRejectMax = createOnly('reject', maxRetries);
if (options && options.ack) {
message.content.handle = {
ack: function () {
onlyAckOnce(message);
channel.ack(message);
},
acknowledge: function () {
onlyAckOnce(message);
channel.ack(message);
},
reject: function () {
onlyRejectMax(message);
var rejected = message.properties.headers.rejected + 1;
if (rejected > maxRetries) {
var errorQueueName = util.format('%s.error', message.fields.routingKey);
log('sending message %s to error queue %s', message.content.cid, errorQueueName);
channel.sendToQueue(errorQueueName, new Buffer(JSON.stringify(message.content)), extend(options, { headers: { rejected: rejected } }));
channel.reject(message, false);
} else {
log('retrying message %s', message.content.cid);
message.properties.headers.rejected = rejected;
channel.reject(message, false);
if (options.queueType === 'queue') {
channel.sendToQueue(message.fields.routingKey, new Buffer(JSON.stringify(message.content)), extend(options, { headers: { rejected: rejected } }));
} else {
channel.publish(message.fields.exchange, message.fields.routingKey, new Buffer(JSON.stringify(message.content)), extend(options, { headers: { rejected: rejected } }));
}
}
}
};
}
next(null, channel, message, options);
}
module.exports = function (options) {
options = options || { localOnly: false };
if (options.maxRetries) maxRetries = options.maxRetries;
return {
handleIncoming: options.localOnly ? retryLocal : retryDistributed
};
throw new Error('bus.retry() middleware is deprecated. please use https://github.com/mateodelnorte/servicebus-retry instead')
};

3

package.json

@@ -10,3 +10,3 @@ {

"description": "Simple service bus for sending events between processes using amqp.",
"version": "1.0.21",
"version": "2.0.1",
"homepage": "https://github.com/mateodelnorte/servicebus",

@@ -30,2 +30,3 @@ "repository": {

"mocha": ">=2.3.3",
"servicebus-retry": "0.0.9",
"should": "7.1.0",

@@ -32,0 +33,0 @@ "sinon": "~1.17.1"

@@ -9,2 +9,3 @@ require('longjohn');

var bus = require('../').bus({ url: busUrl, enableConfirms: true });
var retry = require('servicebus-retry');

@@ -15,4 +16,6 @@ bus.use(bus.messageDomain());

bus.use(bus.logger());
bus.use(bus.retry());
bus.use(retry({
store: retry.MemoryStore()
}))
module.exports.bus = bus;

@@ -7,2 +7,3 @@ require('longjohn');

var busUrl = process.env.RABBITMQ_URL;
var retry = require('servicebus-retry');

@@ -18,4 +19,6 @@ var bus = require('../').bus({

bus.use(bus.logger());
bus.use(bus.retry());
bus.use(retry({
store: retry.MemoryStore()
}))
module.exports.bus = bus;

@@ -7,2 +7,3 @@ var noop = function () {};

var retry = require('servicebus-retry');
var should = require('should');

@@ -17,66 +18,8 @@

describe('middleware', function () {
it('should throw if ack called more than once on message', function () {
var channel = {
ack: function () {},
publish: function () {}
};
var message = {
content: {},
fields: {},
properties: {
headers: {}
}
};
var middleware = retry().handleIncoming;
middleware(channel, message, { ack: true }, function (err, channel, message, options, next) {
message.content.handle.ack();
(function () {
message.content.handle.ack();
}).should.throw(Error);
});
});
it('should throw if reject called more than max on message', function () {
var channel = {
reject: function () {},
publish: function () {}
};
var message = {
content: {},
fields: {},
properties: {
headers: {}
}
};
var middleware = retry().handleIncoming;
middleware(channel, message, { ack: true, maxRetries: 3 }, function (err, channel, message, options, next) {
message.content.handle.reject();
message.content.handle.reject();
message.content.handle.reject();
(function () {
message.content.handle.reject();
}).should.throw(Error);
});
});
});
it('should throw deprecated feature error with link to servicebus-retry', function () {
describe('send & listen', function () {
should(function () {
bus.use(bus.retry());
}).throw(Error)
it('rejected messages should retry until max retries', function (done) {
var count = 0;
bus.listen('my.event.5', { ack: true }, function (event) {
count++;
event.handle.reject();
});
bus.listen('my.event.5.error', { ack: true }, function (event) {
count.should.equal(4); // one send and three retries
event.handle.ack();
bus.destroyListener('my.event.5').on('success', function () {
bus.destroyListener('my.event.5.error').on('success', function () {
done();
});
});
});
setTimeout(function () {
bus.send('my.event.5', { my: 'event' });
}, 100);
});

@@ -86,26 +29,2 @@

describe('publish & subscribe', function () {
it('rejected messages should retry until max retries', function (done){
var count = 0;
var subscription = bus.subscribe('my.event.15', { ack: true }, function (event) {
count++;
event.handle.reject();
});
bus.listen('my.event.15.error', { ack: true }, function (event) {
count.should.equal(4); // one send and three retries
event.handle.ack();
// subscription.unsubscribe(function () {
bus.destroyListener('my.event.15.error').on('success', function () {
done();
});
// });
});
setTimeout(function () {
bus.publish('my.event.15', { data: Math.random() });
}, 1000);
});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc