Comparing version 1.1.4 to 1.2.0
@@ -33,2 +33,3 @@ const express = require ('express'); | ||
const url = req.headers['x-dest-url']; | ||
const cb_url = req.headers['x-cb-url']; | ||
const q_name = req.headers['x-queue']; | ||
@@ -60,2 +61,3 @@ const q_ns = req.headers['x-queue-ns']; | ||
delete req.headers['x-dest-url']; | ||
delete req.headers['x-cb-url']; | ||
delete req.headers['x-queue']; | ||
@@ -73,3 +75,4 @@ delete req.headers['x-queue-ns']; | ||
if (agent) pl.xtra.agent = agent; | ||
if (agent) pl.xtra.agent = agent; | ||
if (cb_url) pl.cb_url = cb_url; | ||
@@ -76,0 +79,0 @@ // ...and queue it |
{ | ||
"name": "blackbox-test", | ||
"version": "1.1.4", | ||
"version": "1.2.0", | ||
"dependencies": {}, | ||
@@ -5,0 +5,0 @@ "devDependencies": { |
@@ -108,2 +108,70 @@ const should = require ('should'); | ||
[ | ||
'get', | ||
'delete' | ||
].forEach (verb => it (`forwards a ${verb} ok, element does callback via __completed__cb__`, done => { | ||
app = new express (); | ||
app.use (bodyParser.json ()); | ||
app[verb] ('/this/is/the/path', (req, res) => { | ||
res.status (200).send ('ok'); | ||
}); | ||
app.post ('/cb', (req, res) => { | ||
res.send ('ok'); | ||
req.body.should.match ({ | ||
req: { | ||
url:"http://tests:36677/this/is/the/path?a=1&bb=ww", | ||
method: verb.toUpperCase (), | ||
headers:{ | ||
a_a_a:"123", | ||
b_b_b:"qwe" | ||
}, | ||
xtra:{}, | ||
cb_url:"http://tests:36677/cb" | ||
}, | ||
res:{ | ||
status:200, | ||
body:{}, | ||
text:"ok", | ||
headers:{ | ||
"content-type":/.+/, | ||
"content-length":"2" | ||
} | ||
} | ||
}); | ||
async.series ([ | ||
cb => setTimeout (cb, 1000), | ||
cb => tools.getQueueContents (mq, 'default', cb), | ||
], (err, res) => { | ||
if (err) return done (err); | ||
res[1].should.eql([]); | ||
done (); | ||
}); | ||
}); | ||
app_http = app.listen (36677, () => { | ||
request (cfg.aswh.base_url) | ||
[verb](cfg.aswh.api_path) | ||
.set ({ | ||
'x-queue-ns': mq, | ||
'x-cb-url': 'http://tests:36677/cb', | ||
'x-dest-url': 'http://tests:36677/this/is/the/path?a=1&bb=ww', | ||
a_a_a: '123', | ||
b_b_b: 'qwe' | ||
}) | ||
.expect (201) | ||
.end ((err, res) => { | ||
if (err) return done (err); | ||
res.body.should.match ({ | ||
res: 'ok', | ||
id: /.+/, | ||
q: 'default', | ||
ns: mq | ||
}); | ||
}); | ||
}); | ||
})); | ||
[ | ||
'post', | ||
@@ -160,4 +228,75 @@ 'put', | ||
[ | ||
'post', | ||
'put', | ||
'patch' | ||
].forEach (verb => it (`forwards a text ${verb} ok, element does callback via __completed__cb__`, done => { | ||
app = new express (); | ||
app.use (bodyParser.json ()); | ||
app[verb] ('/this/is/the/path', (req, res) => { | ||
res.status (200).send ('ok'); | ||
}); | ||
app.post ('/cb', (req, res) => { | ||
res.send ('ok'); | ||
req.body.should.match ({ | ||
req: { | ||
url:"http://tests:36677/this/is/the/path?a=1&bb=ww", | ||
method: verb.toUpperCase (), | ||
headers:{ | ||
a_a_a:"123", | ||
b_b_b:"qwe" | ||
}, | ||
body: 'qwertyuiop', | ||
xtra:{}, | ||
cb_url:"http://tests:36677/cb" | ||
}, | ||
res:{ | ||
status:200, | ||
body:{}, | ||
text:"ok", | ||
headers:{ | ||
"content-type":/.+/, | ||
"content-length":"2" | ||
} | ||
} | ||
}); | ||
async.series ([ | ||
cb => setTimeout (cb, 1000), | ||
cb => tools.getQueueContents (mq, 'default', cb), | ||
], (err, res) => { | ||
if (err) return done (err); | ||
res[1].should.eql([]); | ||
done (); | ||
}); | ||
}); | ||
app_http = app.listen (36677, () => { | ||
request (cfg.aswh.base_url) | ||
[verb] (cfg.aswh.api_path) | ||
.set ({ | ||
'x-queue-ns': mq, | ||
'x-cb-url': 'http://tests:36677/cb', | ||
'x-dest-url': 'http://tests:36677/this/is/the/path?a=1&bb=ww', | ||
a_a_a: '123', | ||
b_b_b: 'qwe' | ||
}) | ||
.type ('text') | ||
.send ('qwertyuiop') | ||
.expect (201) | ||
.end ((err, res) => { | ||
if (err) return done (err); | ||
res.body.should.match ({ | ||
res: 'ok', | ||
id: /.+/, | ||
q: 'default', | ||
ns: mq | ||
}); | ||
}); | ||
}); | ||
})); | ||
}); | ||
}); |
@@ -113,2 +113,72 @@ const should = require ('should'); | ||
'delete' | ||
].forEach (verb => it (`forwards a ${verb} ok, gets a 400, does not retry, element does callback via __failed__cb__`, done => { | ||
app = new express (); | ||
app.use (bodyParser.json ()); | ||
app[verb] ('/this/is/the/path', (req, res) => { | ||
res.status (400).send ('ko'); | ||
}); | ||
app.post ('/cb', (req, res) => { | ||
res.send ('ok'); | ||
req.body.should.match ({ | ||
req: { | ||
url:"http://tests:36677/this/is/the/path?a=1&bb=ww", | ||
method: verb.toUpperCase (), | ||
headers:{ | ||
a_a_a:"123", | ||
b_b_b:"qwe" | ||
}, | ||
xtra:{}, | ||
cb_url:"http://tests:36677/cb" | ||
}, | ||
res:{ | ||
status:400, | ||
body:{}, | ||
text:"ko", | ||
headers:{ | ||
"content-type":/.+/, | ||
"content-length":"2" | ||
} | ||
} | ||
}); | ||
async.series ([ | ||
cb => setTimeout (cb, 1000), | ||
cb => tools.getQueueContents (mq, 'default', cb), | ||
cb => tools.getQueueContents (mq, '__failed__', cb), | ||
], (err, res) => { | ||
if (err) return done (err); | ||
res[1].should.eql([]); | ||
res[2].should.eql([]); | ||
done (); | ||
}); | ||
}); | ||
app_http = app.listen (36677, () => { | ||
request (cfg.aswh.base_url) | ||
[verb](cfg.aswh.api_path) | ||
.set ({ | ||
'x-queue-ns': mq, | ||
'x-cb-url': 'http://tests:36677/cb', | ||
'x-dest-url': 'http://tests:36677/this/is/the/path?a=1&bb=ww', | ||
a_a_a: '123', | ||
b_b_b: 'qwe' | ||
}) | ||
.expect (201) | ||
.end ((err, res) => { | ||
if (err) return done (err); | ||
res.body.should.match ({ | ||
res: 'ok', | ||
id: /.+/, | ||
q: 'default', | ||
ns: mq | ||
}); | ||
}); | ||
}); | ||
})); | ||
[ | ||
'get', | ||
'delete' | ||
].forEach (verb => it (`forwards a ${verb} ok, gets a 500, retries to deadletter`, done => { | ||
@@ -263,3 +333,75 @@ let tries = 0; | ||
[ | ||
'post', | ||
'put', | ||
'patch' | ||
].forEach (verb => it (`forwards a text ${verb} ok, gets a 400, does not retry, element does callback via __failed__cb__`, done => { | ||
app = new express (); | ||
app.use (bodyParser.json ()); | ||
app[verb] ('/this/is/the/path', (req, res) => { | ||
res.status (400).send ('ko'); | ||
}); | ||
app.post ('/cb', (req, res) => { | ||
res.send ('ok'); | ||
req.body.should.match ({ | ||
req: { | ||
url:"http://tests:36677/this/is/the/path?a=1&bb=ww", | ||
method: verb.toUpperCase (), | ||
headers:{ | ||
a_a_a:"123", | ||
b_b_b:"qwe" | ||
}, | ||
body: 'qwertyuiop', | ||
xtra:{}, | ||
cb_url:"http://tests:36677/cb" | ||
}, | ||
res:{ | ||
status:400, | ||
body:{}, | ||
text:"ko", | ||
headers:{ | ||
"content-type":/.+/, | ||
"content-length":"2" | ||
} | ||
} | ||
}); | ||
async.series ([ | ||
cb => setTimeout (cb, 1000), | ||
cb => tools.getQueueContents (mq, 'default', cb), | ||
cb => tools.getQueueContents (mq, '__failed__', cb), | ||
], (err, res) => { | ||
if (err) return done (err); | ||
res[1].should.eql([]); | ||
res[2].should.eql([]); | ||
done (); | ||
}); | ||
}); | ||
app_http = app.listen (36677, () => { | ||
request (cfg.aswh.base_url) | ||
[verb] (cfg.aswh.api_path) | ||
.set ({ | ||
'x-queue-ns': mq, | ||
'x-cb-url': 'http://tests:36677/cb', | ||
'x-dest-url': 'http://tests:36677/this/is/the/path?a=1&bb=ww', | ||
a_a_a: '123', | ||
b_b_b: 'qwe' | ||
}) | ||
.type ('text') | ||
.send ('qwertyuiop') | ||
.expect (201) | ||
.end ((err, res) => { | ||
if (err) return done (err); | ||
res.body.should.match ({ | ||
res: 'ok', | ||
id: /.+/, | ||
q: 'default', | ||
ns: mq | ||
}); | ||
}); | ||
}); | ||
})); | ||
[ | ||
@@ -266,0 +408,0 @@ 'post', |
@@ -24,7 +24,13 @@ const Log = require ('winston-log-space'); | ||
_.each (context.components.Keuss.queues(), (q, qn) => { | ||
const q_config = this._opts.keuss.queue_groups[q.ns()].queues[q.name()]; | ||
let q_config = this._opts.keuss.queue_groups[q.ns()].queues[q.name()]; | ||
if (!q_config) { | ||
// not a consumable queue. __failed__, for example | ||
return; | ||
// special queues such as __*__cb__ | ||
if ((q.name() == '__failed__cb__') || (q.name() == '__completed__cb__')) { | ||
q_config = {}; | ||
} | ||
else { | ||
// not a consumable queue. __failed__, for example | ||
return; | ||
} | ||
} | ||
@@ -43,6 +49,10 @@ | ||
const failed_q = context.components.Keuss.queues() [`__failed__@${q.ns()}`]; | ||
this._clients[qn] = new consumer (q, {failed_q}, this, opts); | ||
// TODO not for __failed__, __failed__cb__ or __completed__cb__ | ||
const failed_q = context.components.Keuss.queues() [`__failed__@${q.ns()}`]; | ||
const failed_cb_q = context.components.Keuss.queues() [`__failed__cb__@${q.ns()}`]; | ||
const completed_cb_q = context.components.Keuss.queues() [`__completed__cb__@${q.ns()}`]; | ||
this._clients[qn] = new consumer (q, {failed_q, failed_cb_q, completed_cb_q}, this, opts); | ||
log.info ('created consumer on queue %s@%s', q.name (), q.ns()); | ||
}) | ||
}); | ||
@@ -49,0 +59,0 @@ cb (null, this); |
@@ -58,2 +58,18 @@ var _ = require('lodash'); | ||
}); | ||
tasks_q.push (cb => { | ||
const failed_cb_queue = '__failed__cb__'; | ||
const fcbqn = `${failed_cb_queue}@${qg_name}`; | ||
this._queues[fcbqn] = this._factories[qg_name].queue (failed_cb_queue, {}); | ||
log.info ('created *callback for failed* queue %s', fcbqn); | ||
cb (); | ||
}); | ||
tasks_q.push (cb => { | ||
const completed_cb_queue = '__completed__cb__'; | ||
const ccbqn = `${completed_cb_queue}@${qg_name}`; | ||
this._queues[ccbqn] = this._factories[qg_name].queue (completed_cb_queue, {}); | ||
log.info ('created *callback for completed* queue %s', ccbqn); | ||
cb (); | ||
}); | ||
}); | ||
@@ -60,0 +76,0 @@ |
@@ -9,2 +9,4 @@ const request = require ('superagent'); | ||
this._failed_q = env.failed_q; | ||
this._failed_cb_q = env.failed_cb_q; | ||
this._completed_cb_q = env.completed_cb_q; | ||
this._consumer = consumer; | ||
@@ -66,6 +68,6 @@ this._opts = opts || {}; | ||
switch (status_series) { | ||
case 3: return cb (null, true); // http 3xx, not an error | ||
case 4: return cb (false); // http 4xx, error, do not retry | ||
case 5: return cb (true); // http 5xx, error, retry | ||
default: return cb (true); // unknown http error, retry | ||
case 3: return cb (null, true, res); // http 3xx, not an error | ||
case 4: return cb (false, null, res); // http 4xx, error, do not retry | ||
case 5: return cb (true, null, res); // http 5xx, error, retry | ||
default: return cb (true, null, res); // unknown http error, retry | ||
} | ||
@@ -81,3 +83,3 @@ } | ||
).observe (delta); | ||
cb (null, true); | ||
cb (null, true, res); | ||
} | ||
@@ -106,2 +108,3 @@ }); | ||
} | ||
// error: log it, wait a bit and continue | ||
@@ -117,3 +120,3 @@ this._log.error ('error while popping: %o', err); | ||
this._do_http_call (elem, (retry, ok) => { | ||
this._do_http_call (elem, (retry, ok, res) => { | ||
this._w_size--; | ||
@@ -129,33 +132,10 @@ this._log.verbose ('window released, occupation now (%d/%d)', this._w_size, this._w_max); | ||
if (ok) { | ||
this._log.verbose ('sent elem with id %s', elem._id); | ||
this._q.ok (elem, err => { | ||
if (err) this._log.error ('error while committing, continuing anyway: %o', err); | ||
else this._log.verbose ('committed elem %s', elem._id); | ||
}); | ||
this._manage_process_ok (elem, res); | ||
} | ||
else { | ||
if (retry) { | ||
const delay = this._get_delay (elem); | ||
this._log.info ('not sent elem with id %s, retrying with %d ms delay...', elem._id, delay); | ||
this._q.ko (elem, (new Date().getTime () + delay), err => { | ||
if (err) this._log.error ('error while rolling-back, continuing anyway: %o', err); | ||
else this._log.verbose ('rolled back elem %s', elem._id); | ||
}); | ||
this._manage_process_retry (elem); | ||
} | ||
else { | ||
this._log.warn ('not sent elem with id %s, NOT retrying and moving to queue %s', elem._id, this._failed_q.name ()); | ||
// insert element into __failed__ queue | ||
this._failed_q.push (elem.payload, (err, id) => { | ||
if (err) { | ||
this._log.error ('can not insert failed element into %s queue. Error is %j, elem is %j', this._failed_q.name (), err, elem); | ||
} | ||
this._q.ok (elem, err => { | ||
if (err) this._log.error ('error while committing, continuing anyway: %o', err); | ||
else this._log.verbose ('committed elem %s', elem._id); | ||
}); | ||
}); | ||
this._manage_process_ko (elem, res); | ||
} | ||
@@ -171,2 +151,107 @@ } | ||
/////////////////////////////////////////////////////////////////// | ||
_manage_process_ok (elem, res) { | ||
this._log.verbose ('sent elem with id %s', elem._id); | ||
if (elem.payload.cb_url) { | ||
const pl = { | ||
url: elem.payload.cb_url, | ||
method: 'POST', | ||
headers: {}, | ||
body: { | ||
req: elem.payload, | ||
res: _.pick (res, 'status', 'body', 'text', 'headers') | ||
}, | ||
xtra: {} | ||
}; | ||
// ...and queue it | ||
this._completed_cb_q.push (pl, {}, (err, id) => { | ||
// error while queuing? | ||
if (err) { | ||
this._log.error ('error while pushing payload in completed_cb_q:', err); | ||
} | ||
else { | ||
this._log.verbose ('inserted callback-completed call for %s', id); | ||
} | ||
this._q.ok (elem, err => { | ||
if (err) this._log.error ('error while committing, continuing anyway: %o', err); | ||
else this._log.verbose ('committed elem %s', elem._id); | ||
}); | ||
}); | ||
} | ||
else { | ||
this._q.ok (elem, err => { | ||
if (err) this._log.error ('error while committing, continuing anyway: %o', err); | ||
else this._log.verbose ('committed elem %s', elem._id); | ||
}); | ||
} | ||
} | ||
/////////////////////////////////////////////////////////////////// | ||
_manage_process_retry (elem) { | ||
const delay = this._get_delay (elem); | ||
this._log.info ('not sent elem with id %s, retrying with %d ms delay...', elem._id, delay); | ||
this._q.ko (elem, (new Date().getTime () + delay), err => { | ||
if (err) this._log.error ('error while rolling-back, continuing anyway: %o', err); | ||
else this._log.verbose ('rolled back elem %s', elem._id); | ||
}); | ||
} | ||
/////////////////////////////////////////////////////////////////// | ||
_manage_process_ko (elem, res) { | ||
this._log.verbose ('not-sent elem with id %s', elem._id); | ||
if (elem.payload.cb_url) { | ||
const pl = { | ||
url: elem.payload.cb_url, | ||
method: 'POST', | ||
headers: {}, | ||
body: { | ||
req: elem.payload, | ||
res: _.pick (res, 'status', 'body', 'text', 'headers') | ||
}, | ||
xtra: {} | ||
}; | ||
// ...and queue it | ||
this._failed_cb_q.push (pl, {}, (err, id) => { | ||
// error while queuing? | ||
if (err) { | ||
this._log.error ('error while pushing payload in failed_cb_q:', err); | ||
} | ||
else { | ||
this._log.verbose ('inserted callback-failed call for %s', id); | ||
} | ||
this._q.ok (elem, err => { | ||
if (err) this._log.error ('error while committing, continuing anyway: %o', err); | ||
else this._log.verbose ('committed elem %s', elem._id); | ||
}); | ||
}); | ||
} | ||
else { | ||
this._log.warn ('not sent elem with id %s, NOT retrying and moving to queue %s', elem._id, this._failed_q.name ()); | ||
// insert element into __failed__ queue | ||
elem.payload.xtra.res = _.pick (res, 'status', 'body', 'text', 'headers'); | ||
this._failed_q.push (elem.payload, (err, id) => { | ||
if (err) { | ||
this._log.error ('can not insert failed element into %s queue. Error is %j, elem is %j', this._failed_q.name (), err, elem); | ||
} | ||
this._q.ok (elem, err => { | ||
if (err) this._log.error ('error while committing, continuing anyway: %o', err); | ||
else this._log.verbose ('committed elem %s', elem._id); | ||
}); | ||
}); | ||
} | ||
} | ||
/////////////////////////////////////////////////////////////////// | ||
run () { | ||
@@ -173,0 +258,0 @@ this._in_consume_loop = true; |
{ | ||
"name": "aswh", | ||
"description": "Asynchronous WebHook delivery, or generic store-and-forward HTTP proxy", | ||
"version": "1.1.4", | ||
"version": "1.2.0", | ||
"keywords": [ | ||
@@ -6,0 +6,0 @@ "webhook", |
@@ -44,2 +44,12 @@ # aswh: Asynchronous WebHook delivery | ||
### HTTP callbacks | ||
`aswh` can produce an http callback for each webhook, when it is either completed ot failed: this is activated on a per-webhook basis, simply adding an extra header `x-cb-url`: | ||
if the webhook is called successfuly or is rejected permanently, a post to the url at `x-cb-url` is done with a json body including both the webhook's request and its response | ||
At this point, ther is no callback when an element is retried too many times: it will always go to `__deadletter__` | ||
The callbacks are implemented also as webhooks, delayed HTTP calls queued on `__completed__cb__` (for successful webhooks) and `__failed__cb__` (for failed webhooks) queues, | ||
which are pre-created by `aswh` on each queue group; you can in fact add configuration for them as if they were regular queues (which in fact are) | ||
## Configuration | ||
@@ -46,0 +56,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
188611
36
4109
237