Socket
Socket
Sign inDemoInstall

aswh

Package Overview
Dependencies
Maintainers
1
Versions
26
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

aswh - npm Package Compare versions

Comparing version 1.1.4 to 1.2.0

docker-compose.yaml

5

app.js

@@ -33,2 +33,3 @@ const express = require ('express');

const url = req.headers['x-dest-url'];
const cb_url = req.headers['x-cb-url'];
const q_name = req.headers['x-queue'];

@@ -60,2 +61,3 @@ const q_ns = req.headers['x-queue-ns'];

delete req.headers['x-dest-url'];
delete req.headers['x-cb-url'];
delete req.headers['x-queue'];

@@ -73,3 +75,4 @@ delete req.headers['x-queue-ns'];

if (agent) pl.xtra.agent = agent;
if (agent) pl.xtra.agent = agent;
if (cb_url) pl.cb_url = cb_url;

@@ -76,0 +79,0 @@ // ...and queue it

2

blackbox-test/package.json
{
"name": "blackbox-test",
"version": "1.1.4",
"version": "1.2.0",
"dependencies": {},

@@ -5,0 +5,0 @@ "devDependencies": {

@@ -108,2 +108,70 @@ const should = require ('should');

[
'get',
'delete'
].forEach (verb => it (`forwards a ${verb} ok, element does callback via __completed__cb__`, done => {
app = new express ();
app.use (bodyParser.json ());
app[verb] ('/this/is/the/path', (req, res) => {
res.status (200).send ('ok');
});
app.post ('/cb', (req, res) => {
res.send ('ok');
req.body.should.match ({
req: {
url:"http://tests:36677/this/is/the/path?a=1&bb=ww",
method: verb.toUpperCase (),
headers:{
a_a_a:"123",
b_b_b:"qwe"
},
xtra:{},
cb_url:"http://tests:36677/cb"
},
res:{
status:200,
body:{},
text:"ok",
headers:{
"content-type":/.+/,
"content-length":"2"
}
}
});
async.series ([
cb => setTimeout (cb, 1000),
cb => tools.getQueueContents (mq, 'default', cb),
], (err, res) => {
if (err) return done (err);
res[1].should.eql([]);
done ();
});
});
app_http = app.listen (36677, () => {
request (cfg.aswh.base_url)
[verb](cfg.aswh.api_path)
.set ({
'x-queue-ns': mq,
'x-cb-url': 'http://tests:36677/cb',
'x-dest-url': 'http://tests:36677/this/is/the/path?a=1&bb=ww',
a_a_a: '123',
b_b_b: 'qwe'
})
.expect (201)
.end ((err, res) => {
if (err) return done (err);
res.body.should.match ({
res: 'ok',
id: /.+/,
q: 'default',
ns: mq
});
});
});
}));
[
'post',

@@ -160,4 +228,75 @@ 'put',

[
'post',
'put',
'patch'
].forEach (verb => it (`forwards a text ${verb} ok, element does callback via __completed__cb__`, done => {
app = new express ();
app.use (bodyParser.json ());
app[verb] ('/this/is/the/path', (req, res) => {
res.status (200).send ('ok');
});
app.post ('/cb', (req, res) => {
res.send ('ok');
req.body.should.match ({
req: {
url:"http://tests:36677/this/is/the/path?a=1&bb=ww",
method: verb.toUpperCase (),
headers:{
a_a_a:"123",
b_b_b:"qwe"
},
body: 'qwertyuiop',
xtra:{},
cb_url:"http://tests:36677/cb"
},
res:{
status:200,
body:{},
text:"ok",
headers:{
"content-type":/.+/,
"content-length":"2"
}
}
});
async.series ([
cb => setTimeout (cb, 1000),
cb => tools.getQueueContents (mq, 'default', cb),
], (err, res) => {
if (err) return done (err);
res[1].should.eql([]);
done ();
});
});
app_http = app.listen (36677, () => {
request (cfg.aswh.base_url)
[verb] (cfg.aswh.api_path)
.set ({
'x-queue-ns': mq,
'x-cb-url': 'http://tests:36677/cb',
'x-dest-url': 'http://tests:36677/this/is/the/path?a=1&bb=ww',
a_a_a: '123',
b_b_b: 'qwe'
})
.type ('text')
.send ('qwertyuiop')
.expect (201)
.end ((err, res) => {
if (err) return done (err);
res.body.should.match ({
res: 'ok',
id: /.+/,
q: 'default',
ns: mq
});
});
});
}));
});
});

@@ -113,2 +113,72 @@ const should = require ('should');

'delete'
].forEach (verb => it (`forwards a ${verb} ok, gets a 400, does not retry, element does callback via __failed__cb__`, done => {
app = new express ();
app.use (bodyParser.json ());
app[verb] ('/this/is/the/path', (req, res) => {
res.status (400).send ('ko');
});
app.post ('/cb', (req, res) => {
res.send ('ok');
req.body.should.match ({
req: {
url:"http://tests:36677/this/is/the/path?a=1&bb=ww",
method: verb.toUpperCase (),
headers:{
a_a_a:"123",
b_b_b:"qwe"
},
xtra:{},
cb_url:"http://tests:36677/cb"
},
res:{
status:400,
body:{},
text:"ko",
headers:{
"content-type":/.+/,
"content-length":"2"
}
}
});
async.series ([
cb => setTimeout (cb, 1000),
cb => tools.getQueueContents (mq, 'default', cb),
cb => tools.getQueueContents (mq, '__failed__', cb),
], (err, res) => {
if (err) return done (err);
res[1].should.eql([]);
res[2].should.eql([]);
done ();
});
});
app_http = app.listen (36677, () => {
request (cfg.aswh.base_url)
[verb](cfg.aswh.api_path)
.set ({
'x-queue-ns': mq,
'x-cb-url': 'http://tests:36677/cb',
'x-dest-url': 'http://tests:36677/this/is/the/path?a=1&bb=ww',
a_a_a: '123',
b_b_b: 'qwe'
})
.expect (201)
.end ((err, res) => {
if (err) return done (err);
res.body.should.match ({
res: 'ok',
id: /.+/,
q: 'default',
ns: mq
});
});
});
}));
[
'get',
'delete'
].forEach (verb => it (`forwards a ${verb} ok, gets a 500, retries to deadletter`, done => {

@@ -263,3 +333,75 @@ let tries = 0;

[
'post',
'put',
'patch'
].forEach (verb => it (`forwards a text ${verb} ok, gets a 400, does not retry, element does callback via __failed__cb__`, done => {
app = new express ();
app.use (bodyParser.json ());
app[verb] ('/this/is/the/path', (req, res) => {
res.status (400).send ('ko');
});
app.post ('/cb', (req, res) => {
res.send ('ok');
req.body.should.match ({
req: {
url:"http://tests:36677/this/is/the/path?a=1&bb=ww",
method: verb.toUpperCase (),
headers:{
a_a_a:"123",
b_b_b:"qwe"
},
body: 'qwertyuiop',
xtra:{},
cb_url:"http://tests:36677/cb"
},
res:{
status:400,
body:{},
text:"ko",
headers:{
"content-type":/.+/,
"content-length":"2"
}
}
});
async.series ([
cb => setTimeout (cb, 1000),
cb => tools.getQueueContents (mq, 'default', cb),
cb => tools.getQueueContents (mq, '__failed__', cb),
], (err, res) => {
if (err) return done (err);
res[1].should.eql([]);
res[2].should.eql([]);
done ();
});
});
app_http = app.listen (36677, () => {
request (cfg.aswh.base_url)
[verb] (cfg.aswh.api_path)
.set ({
'x-queue-ns': mq,
'x-cb-url': 'http://tests:36677/cb',
'x-dest-url': 'http://tests:36677/this/is/the/path?a=1&bb=ww',
a_a_a: '123',
b_b_b: 'qwe'
})
.type ('text')
.send ('qwertyuiop')
.expect (201)
.end ((err, res) => {
if (err) return done (err);
res.body.should.match ({
res: 'ok',
id: /.+/,
q: 'default',
ns: mq
});
});
});
}));
[

@@ -266,0 +408,0 @@ 'post',

@@ -24,7 +24,13 @@ const Log = require ('winston-log-space');

_.each (context.components.Keuss.queues(), (q, qn) => {
const q_config = this._opts.keuss.queue_groups[q.ns()].queues[q.name()];
let q_config = this._opts.keuss.queue_groups[q.ns()].queues[q.name()];
if (!q_config) {
// not a consumable queue. __failed__, for example
return;
// special queues such as __*__cb__
if ((q.name() == '__failed__cb__') || (q.name() == '__completed__cb__')) {
q_config = {};
}
else {
// not a consumable queue. __failed__, for example
return;
}
}

@@ -43,6 +49,10 @@

const failed_q = context.components.Keuss.queues() [`__failed__@${q.ns()}`];
this._clients[qn] = new consumer (q, {failed_q}, this, opts);
// TODO not for __failed__, __failed__cb__ or __completed__cb__
const failed_q = context.components.Keuss.queues() [`__failed__@${q.ns()}`];
const failed_cb_q = context.components.Keuss.queues() [`__failed__cb__@${q.ns()}`];
const completed_cb_q = context.components.Keuss.queues() [`__completed__cb__@${q.ns()}`];
this._clients[qn] = new consumer (q, {failed_q, failed_cb_q, completed_cb_q}, this, opts);
log.info ('created consumer on queue %s@%s', q.name (), q.ns());
})
});

@@ -49,0 +59,0 @@ cb (null, this);

@@ -58,2 +58,18 @@ var _ = require('lodash');

});
tasks_q.push (cb => {
const failed_cb_queue = '__failed__cb__';
const fcbqn = `${failed_cb_queue}@${qg_name}`;
this._queues[fcbqn] = this._factories[qg_name].queue (failed_cb_queue, {});
log.info ('created *callback for failed* queue %s', fcbqn);
cb ();
});
tasks_q.push (cb => {
const completed_cb_queue = '__completed__cb__';
const ccbqn = `${completed_cb_queue}@${qg_name}`;
this._queues[ccbqn] = this._factories[qg_name].queue (completed_cb_queue, {});
log.info ('created *callback for completed* queue %s', ccbqn);
cb ();
});
});

@@ -60,0 +76,0 @@

@@ -9,2 +9,4 @@ const request = require ('superagent');

this._failed_q = env.failed_q;
this._failed_cb_q = env.failed_cb_q;
this._completed_cb_q = env.completed_cb_q;
this._consumer = consumer;

@@ -66,6 +68,6 @@ this._opts = opts || {};

switch (status_series) {
case 3: return cb (null, true); // http 3xx, not an error
case 4: return cb (false); // http 4xx, error, do not retry
case 5: return cb (true); // http 5xx, error, retry
default: return cb (true); // unknown http error, retry
case 3: return cb (null, true, res); // http 3xx, not an error
case 4: return cb (false, null, res); // http 4xx, error, do not retry
case 5: return cb (true, null, res); // http 5xx, error, retry
default: return cb (true, null, res); // unknown http error, retry
}

@@ -81,3 +83,3 @@ }

).observe (delta);
cb (null, true);
cb (null, true, res);
}

@@ -106,2 +108,3 @@ });

}
// error: log it, wait a bit and continue

@@ -117,3 +120,3 @@ this._log.error ('error while popping: %o', err);

this._do_http_call (elem, (retry, ok) => {
this._do_http_call (elem, (retry, ok, res) => {
this._w_size--;

@@ -129,33 +132,10 @@ this._log.verbose ('window released, occupation now (%d/%d)', this._w_size, this._w_max);

if (ok) {
this._log.verbose ('sent elem with id %s', elem._id);
this._q.ok (elem, err => {
if (err) this._log.error ('error while committing, continuing anyway: %o', err);
else this._log.verbose ('committed elem %s', elem._id);
});
this._manage_process_ok (elem, res);
}
else {
if (retry) {
const delay = this._get_delay (elem);
this._log.info ('not sent elem with id %s, retrying with %d ms delay...', elem._id, delay);
this._q.ko (elem, (new Date().getTime () + delay), err => {
if (err) this._log.error ('error while rolling-back, continuing anyway: %o', err);
else this._log.verbose ('rolled back elem %s', elem._id);
});
this._manage_process_retry (elem);
}
else {
this._log.warn ('not sent elem with id %s, NOT retrying and moving to queue %s', elem._id, this._failed_q.name ());
// insert element into __failed__ queue
this._failed_q.push (elem.payload, (err, id) => {
if (err) {
this._log.error ('can not insert failed element into %s queue. Error is %j, elem is %j', this._failed_q.name (), err, elem);
}
this._q.ok (elem, err => {
if (err) this._log.error ('error while committing, continuing anyway: %o', err);
else this._log.verbose ('committed elem %s', elem._id);
});
});
this._manage_process_ko (elem, res);
}

@@ -171,2 +151,107 @@ }

///////////////////////////////////////////////////////////////////
_manage_process_ok (elem, res) {
this._log.verbose ('sent elem with id %s', elem._id);
if (elem.payload.cb_url) {
const pl = {
url: elem.payload.cb_url,
method: 'POST',
headers: {},
body: {
req: elem.payload,
res: _.pick (res, 'status', 'body', 'text', 'headers')
},
xtra: {}
};
// ...and queue it
this._completed_cb_q.push (pl, {}, (err, id) => {
// error while queuing?
if (err) {
this._log.error ('error while pushing payload in completed_cb_q:', err);
}
else {
this._log.verbose ('inserted callback-completed call for %s', id);
}
this._q.ok (elem, err => {
if (err) this._log.error ('error while committing, continuing anyway: %o', err);
else this._log.verbose ('committed elem %s', elem._id);
});
});
}
else {
this._q.ok (elem, err => {
if (err) this._log.error ('error while committing, continuing anyway: %o', err);
else this._log.verbose ('committed elem %s', elem._id);
});
}
}
///////////////////////////////////////////////////////////////////
_manage_process_retry (elem) {
const delay = this._get_delay (elem);
this._log.info ('not sent elem with id %s, retrying with %d ms delay...', elem._id, delay);
this._q.ko (elem, (new Date().getTime () + delay), err => {
if (err) this._log.error ('error while rolling-back, continuing anyway: %o', err);
else this._log.verbose ('rolled back elem %s', elem._id);
});
}
///////////////////////////////////////////////////////////////////
_manage_process_ko (elem, res) {
this._log.verbose ('not-sent elem with id %s', elem._id);
if (elem.payload.cb_url) {
const pl = {
url: elem.payload.cb_url,
method: 'POST',
headers: {},
body: {
req: elem.payload,
res: _.pick (res, 'status', 'body', 'text', 'headers')
},
xtra: {}
};
// ...and queue it
this._failed_cb_q.push (pl, {}, (err, id) => {
// error while queuing?
if (err) {
this._log.error ('error while pushing payload in failed_cb_q:', err);
}
else {
this._log.verbose ('inserted callback-failed call for %s', id);
}
this._q.ok (elem, err => {
if (err) this._log.error ('error while committing, continuing anyway: %o', err);
else this._log.verbose ('committed elem %s', elem._id);
});
});
}
else {
this._log.warn ('not sent elem with id %s, NOT retrying and moving to queue %s', elem._id, this._failed_q.name ());
// insert element into __failed__ queue
elem.payload.xtra.res = _.pick (res, 'status', 'body', 'text', 'headers');
this._failed_q.push (elem.payload, (err, id) => {
if (err) {
this._log.error ('can not insert failed element into %s queue. Error is %j, elem is %j', this._failed_q.name (), err, elem);
}
this._q.ok (elem, err => {
if (err) this._log.error ('error while committing, continuing anyway: %o', err);
else this._log.verbose ('committed elem %s', elem._id);
});
});
}
}
///////////////////////////////////////////////////////////////////
run () {

@@ -173,0 +258,0 @@ this._in_consume_loop = true;

{
"name": "aswh",
"description": "Asynchronous WebHook delivery, or generic store-and-forward HTTP proxy",
"version": "1.1.4",
"version": "1.2.0",
"keywords": [

@@ -6,0 +6,0 @@ "webhook",

@@ -44,2 +44,12 @@ # aswh: Asynchronous WebHook delivery

### HTTP callbacks
`aswh` can produce an http callback for each webhook, when it is either completed ot failed: this is activated on a per-webhook basis, simply adding an extra header `x-cb-url`:
if the webhook is called successfuly or is rejected permanently, a post to the url at `x-cb-url` is done with a json body including both the webhook's request and its response
At this point, ther is no callback when an element is retried too many times: it will always go to `__deadletter__`
The callbacks are implemented also as webhooks, delayed HTTP calls queued on `__completed__cb__` (for successful webhooks) and `__failed__cb__` (for failed webhooks) queues,
which are pre-created by `aswh` on each queue group; you can in fact add configuration for them as if they were regular queues (which in fact are)
## Configuration

@@ -46,0 +56,0 @@

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc