servicebus
Advanced tools
Comparing version 1.0.21 to 2.0.1
@@ -1,139 +0,3 @@ | ||
var extend = require('extend'); | ||
var log = require('debug')('servicebus:retry'); | ||
var util = require('util'); | ||
var maxRetries = 3; | ||
var localRejected = {}; | ||
var methodMax = {}; | ||
function createOnly (method, max) { | ||
var calledByMethod = {}; | ||
methodMax[method] = max; | ||
return function only (message) { | ||
if (calledByMethod[method] === undefined) { | ||
calledByMethod[method] = 1; | ||
} else { | ||
calledByMethod[method]++; | ||
} | ||
if (Object.keys(calledByMethod).length && calledByMethod[method] > methodMax[method]) { | ||
var methods = Object.keys(calledByMethod).join(','); | ||
throw new Error(util.format('message type: %s cid: %s handle already called with %s', message.content.type, message.content.cid, methods)); | ||
} | ||
}; | ||
} | ||
function retryLocal (channel, message, options, next) { | ||
var onlyAckOnce = createOnly('ack', 1); | ||
var onlyRejectMax = createOnly('reject', maxRetries); | ||
if (options && options.ack) { | ||
if ( ! message.properties.headers) message.properties.headers = {}; | ||
message.properties.headers.rejected = localRejected[message.content.cid]; | ||
message.content.handle = { | ||
ack: function () { | ||
onlyAckOnce(message); | ||
channel.ack(message); | ||
}, | ||
acknowledge: function () { | ||
onlyAckOnce(message); | ||
channel.ack(message); | ||
}, | ||
reject: function () { | ||
onlyRejectMax(message); | ||
if (localRejected[message.content.cid] === undefined) { | ||
localRejected[message.content.cid] = 1; | ||
} else { | ||
localRejected[message.content.cid] = localRejected[message.content.cid] + 1; | ||
} | ||
if (localRejected[message.content.cid] > maxRetries) { | ||
var errorQueueName = util.format('%s.error', message.fields.routingKey); | ||
log('sending message %s to error queue %s', message.content.cid, errorQueueName); | ||
channel.sendToQueue(errorQueueName, new Buffer(JSON.stringify(message.content)), extend(options, { headers: { rejected: localRejected[message.content.cid] } })); | ||
channel.reject(message, false); | ||
delete localRejected[message.content.cid]; | ||
} else { | ||
log('retrying message %s', message.content.cid); | ||
channel.reject(message, true); | ||
} | ||
} | ||
}; | ||
} | ||
next(null, channel, message, options); | ||
} | ||
function retryDistributed (channel, message, options, next) { | ||
if (message.properties.headers.rejected === undefined) { | ||
message.properties.headers.rejected = 0; | ||
} | ||
var onlyAckOnce = createOnly('ack', 1); | ||
var onlyRejectMax = createOnly('reject', maxRetries); | ||
if (options && options.ack) { | ||
message.content.handle = { | ||
ack: function () { | ||
onlyAckOnce(message); | ||
channel.ack(message); | ||
}, | ||
acknowledge: function () { | ||
onlyAckOnce(message); | ||
channel.ack(message); | ||
}, | ||
reject: function () { | ||
onlyRejectMax(message); | ||
var rejected = message.properties.headers.rejected + 1; | ||
if (rejected > maxRetries) { | ||
var errorQueueName = util.format('%s.error', message.fields.routingKey); | ||
log('sending message %s to error queue %s', message.content.cid, errorQueueName); | ||
channel.sendToQueue(errorQueueName, new Buffer(JSON.stringify(message.content)), extend(options, { headers: { rejected: rejected } })); | ||
channel.reject(message, false); | ||
} else { | ||
log('retrying message %s', message.content.cid); | ||
message.properties.headers.rejected = rejected; | ||
channel.reject(message, false); | ||
if (options.queueType === 'queue') { | ||
channel.sendToQueue(message.fields.routingKey, new Buffer(JSON.stringify(message.content)), extend(options, { headers: { rejected: rejected } })); | ||
} else { | ||
channel.publish(message.fields.exchange, message.fields.routingKey, new Buffer(JSON.stringify(message.content)), extend(options, { headers: { rejected: rejected } })); | ||
} | ||
} | ||
} | ||
}; | ||
} | ||
next(null, channel, message, options); | ||
} | ||
module.exports = function (options) { | ||
options = options || { localOnly: false }; | ||
if (options.maxRetries) maxRetries = options.maxRetries; | ||
return { | ||
handleIncoming: options.localOnly ? retryLocal : retryDistributed | ||
}; | ||
throw new Error('bus.retry() middleware is deprecated. please use https://github.com/mateodelnorte/servicebus-retry instead') | ||
}; |
@@ -10,3 +10,3 @@ { | ||
"description": "Simple service bus for sending events between processes using amqp.", | ||
"version": "1.0.21", | ||
"version": "2.0.1", | ||
"homepage": "https://github.com/mateodelnorte/servicebus", | ||
@@ -30,2 +30,3 @@ "repository": { | ||
"mocha": ">=2.3.3", | ||
"servicebus-retry": "0.0.9", | ||
"should": "7.1.0", | ||
@@ -32,0 +33,0 @@ "sinon": "~1.17.1" |
@@ -9,2 +9,3 @@ require('longjohn'); | ||
var bus = require('../').bus({ url: busUrl, enableConfirms: true }); | ||
var retry = require('servicebus-retry'); | ||
@@ -15,4 +16,6 @@ bus.use(bus.messageDomain()); | ||
bus.use(bus.logger()); | ||
bus.use(bus.retry()); | ||
bus.use(retry({ | ||
store: retry.MemoryStore() | ||
})) | ||
module.exports.bus = bus; |
@@ -7,2 +7,3 @@ require('longjohn'); | ||
var busUrl = process.env.RABBITMQ_URL; | ||
var retry = require('servicebus-retry'); | ||
@@ -18,4 +19,6 @@ var bus = require('../').bus({ | ||
bus.use(bus.logger()); | ||
bus.use(bus.retry()); | ||
bus.use(retry({ | ||
store: retry.MemoryStore() | ||
})) | ||
module.exports.bus = bus; |
@@ -7,2 +7,3 @@ var noop = function () {}; | ||
var retry = require('servicebus-retry'); | ||
var should = require('should'); | ||
@@ -17,66 +18,8 @@ | ||
describe('middleware', function () { | ||
it('should throw if ack called more than once on message', function () { | ||
var channel = { | ||
ack: function () {}, | ||
publish: function () {} | ||
}; | ||
var message = { | ||
content: {}, | ||
fields: {}, | ||
properties: { | ||
headers: {} | ||
} | ||
}; | ||
var middleware = retry().handleIncoming; | ||
middleware(channel, message, { ack: true }, function (err, channel, message, options, next) { | ||
message.content.handle.ack(); | ||
(function () { | ||
message.content.handle.ack(); | ||
}).should.throw(Error); | ||
}); | ||
}); | ||
it('should throw if reject called more than max on message', function () { | ||
var channel = { | ||
reject: function () {}, | ||
publish: function () {} | ||
}; | ||
var message = { | ||
content: {}, | ||
fields: {}, | ||
properties: { | ||
headers: {} | ||
} | ||
}; | ||
var middleware = retry().handleIncoming; | ||
middleware(channel, message, { ack: true, maxRetries: 3 }, function (err, channel, message, options, next) { | ||
message.content.handle.reject(); | ||
message.content.handle.reject(); | ||
message.content.handle.reject(); | ||
(function () { | ||
message.content.handle.reject(); | ||
}).should.throw(Error); | ||
}); | ||
}); | ||
}); | ||
it('should throw deprecated feature error with link to servicebus-retry', function () { | ||
describe('send & listen', function () { | ||
should(function () { | ||
bus.use(bus.retry()); | ||
}).throw(Error) | ||
it('rejected messages should retry until max retries', function (done) { | ||
var count = 0; | ||
bus.listen('my.event.5', { ack: true }, function (event) { | ||
count++; | ||
event.handle.reject(); | ||
}); | ||
bus.listen('my.event.5.error', { ack: true }, function (event) { | ||
count.should.equal(4); // one send and three retries | ||
event.handle.ack(); | ||
bus.destroyListener('my.event.5').on('success', function () { | ||
bus.destroyListener('my.event.5.error').on('success', function () { | ||
done(); | ||
}); | ||
}); | ||
}); | ||
setTimeout(function () { | ||
bus.send('my.event.5', { my: 'event' }); | ||
}, 100); | ||
}); | ||
@@ -86,26 +29,2 @@ | ||
describe('publish & subscribe', function () { | ||
it('rejected messages should retry until max retries', function (done){ | ||
var count = 0; | ||
var subscription = bus.subscribe('my.event.15', { ack: true }, function (event) { | ||
count++; | ||
event.handle.reject(); | ||
}); | ||
bus.listen('my.event.15.error', { ack: true }, function (event) { | ||
count.should.equal(4); // one send and three retries | ||
event.handle.ack(); | ||
// subscription.unsubscribe(function () { | ||
bus.destroyListener('my.event.15.error').on('success', function () { | ||
done(); | ||
}); | ||
// }); | ||
}); | ||
setTimeout(function () { | ||
bus.publish('my.event.15', { data: Math.random() }); | ||
}, 1000); | ||
}); | ||
}); | ||
}); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
54427
5
1309