keuss
Advanced tools
Comparing version 1.6.10 to 1.6.11
{ | ||
"name": "keuss", | ||
"version": "1.6.10", | ||
"version": "1.6.11", | ||
"keywords": [ | ||
@@ -5,0 +5,0 @@ "queue", |
@@ -170,2 +170,6 @@ const _ = require ('lodash'); | ||
if (opts.hdrs) { | ||
_.each (opts.hdrs, (v, k) => upd.$set['hdrs.' + k] = v); | ||
} | ||
this._col.updateOne (q, upd, {}, (err, result) => { | ||
@@ -278,5 +282,11 @@ if (err) return callback (err); | ||
////////////////////////////////////////////// | ||
// redefnition | ||
// redefinition from Queue | ||
_move_to_deadletter (obj, cb) { | ||
this.pl_step (obj._id, this._factory.deadletter_queue (), {}, (err, res) => { | ||
const hdrs = { | ||
'x-dl-from-queue': this.name (), | ||
'x-dl-t': new Date().toISOString (), | ||
'x-dl-tries': obj.tries | ||
}; | ||
this.pl_step (obj._id, this._factory.deadletter_queue (), {hdrs}, (err, res) => { | ||
if (err) return cb (err); | ||
@@ -283,0 +293,0 @@ this._stats.incr ('get'); |
22
Queue.js
@@ -436,10 +436,10 @@ var async = require ('async'); | ||
if ( | ||
(obj.tries) && | ||
(this._factory.deadletter_queue ()) && | ||
(this._factory.max_ko ()) && | ||
(obj.tries > this._factory.max_ko ()) | ||
(obj.tries) && // only if we got tries | ||
(this._factory.deadletter_queue ()) && // AND the factory has a deadletter queue | ||
(this._factory.max_ko ()) && // AND thee's a max ko attempts | ||
(obj.tries > this._factory.max_ko ()) && // AND we got enough tries | ||
(this.name () != '__deadletter__') // and this queue is not deadletter already | ||
) { | ||
debug ('%s: too many retries (%d), moving to deadletter', obj._id, obj.tries); | ||
this._move_to_deadletter (obj, cb); | ||
// TODO add from-what-queue to deadletter elements | ||
} | ||
@@ -714,4 +714,12 @@ else { | ||
// commit and move to deadletter | ||
// ALSO NOT IN deadletter queue (to void loop) | ||
// commit element in origin queue, push in deadletter afterwards | ||
const opts = { | ||
hdrs: _.clone (obj.hdrs || {}) | ||
}; | ||
// add some extra x-dl-* headers | ||
opts.hdrs['x-dl-from-queue'] = this.name (); | ||
opts.hdrs['x-dl-t'] = new Date().toISOString (); | ||
opts.hdrs['x-dl-tries'] = obj.tries; | ||
this.commit (obj._id, err => { | ||
@@ -723,3 +731,3 @@ if (err) { | ||
this._factory.deadletter_queue ().push (obj.payload, (err, res) => { | ||
this._factory.deadletter_queue ().push (obj.payload, opts, (err, res) => { | ||
if (err) { | ||
@@ -726,0 +734,0 @@ debug ('while moving %s to deadletter: %j', obj._id, err); |
@@ -114,2 +114,4 @@ var async = require ('async'); | ||
const hdrs = {aaa: 'qw', bbb: '666'}; | ||
async.waterfall ([ | ||
@@ -124,3 +126,3 @@ cb => get_mq_factory (MQ, factory_opts, cb), | ||
cb => async.series([ | ||
cb => q.push (pl, cb), | ||
cb => q.push (pl, {hdrs}, cb), | ||
cb => pop (q, stage, cb), | ||
@@ -144,2 +146,9 @@ cb => reject (q, stage, (err) => {tries++;cb()}), | ||
res.payload.should.eql (pl); | ||
res.hdrs.should.match ({ | ||
aaa: "qw", | ||
bbb: "666", | ||
'x-dl-from-queue': "test_queue_deadletter", | ||
'x-dl-t': /.+/, | ||
'x-dl-tries': 4 | ||
}); | ||
tries.should.equal (5); | ||
@@ -146,0 +155,0 @@ cb (err); |
Sorry, the diff of this file is not supported yet
374557
74
10238