Security News
Maven Central Adds Sigstore Signature Validation
Maven Central now validates Sigstore signatures, making it easier for developers to verify the provenance of Java packages.
aws-scatter-gather
Advanced tools
An NPM package that facilitates the scatter gather design pattern using AWS SNS Topics.
An NPM package that facilitates the scatter gather design pattern using AWS SNS Topics. This model requires two components: aggregators that make the request, and responders that respond to the request.
Similar examples can be found in the examples
directory that is included as part of this package.
The aggregator has the role of initiating a request. It sends the request data out to the specified SNS Topic and it gathers responses back that are intended for it alone.
Define an Aggregator
When an aggregator is created, what you've done is create a function that can be called to aggregate send the request and gather responses.
examples/with-lambda/aggegator/index.js
const AWS = require('aws-sdk');
const Scather = require('aws-scatter-gather');
exports.greetings = Scather.aggregator({
// transform each response - it's also possible to do this with "each" property by modifying the received object's data.
composer: function(responses) {
const str = Object.keys(responses)
.map(function(language) {
return language + ': ' + responses[language];
})
.join('\n\t');
return 'Greetings in multiple languages: \n\t' + str;
},
// this example each function does that same thing as expects property
each: function(received, state, done) {
if (state.requested && received.name === 'english') {
done();
}
},
// expecing a response with the name of english
expects: ['english'],
// wait at least 0 milliseconds and at most 2500 milliseconds
maxWait: 2500,
minWait: 0,
// provide the SNS object to use and specify the SNS ARN for sending and receiving
responseArn: 'arn:aws:sns:us-west-2:064824991063:ResponseTopic',
sns: new AWS.SNS({ region: 'us-west-2' }),
topicArn: 'arn:aws:sns:us-west-2:064824991063:RequestTopic'
});
Aggregator using Each
const AWS = require('aws-sdk');
const Scather = require('aws-scatter-gather');
exports.greetings = Scather.aggregator({
each: function(response, state, done) {
console.log(response);
},
minWait: 5000,
responseArn: 'arn:aws:sns:us-west-2:064824991063:ResponseTopic',
sns: new AWS.SNS({ region: 'us-west-2' }),
topicArn: 'arn:aws:sns:us-west-2:064824991063:RequestTopic'
});
Unit Testing
You can test that your aggregator is running as expected without having to send anything across the network. This is helpful for debugging and development.
examples/with-lambda/aggregator/test.js
mock
allows you to provide the response functions as the second parameter.const aggregators = require('./index');
const english = require('../lambdas/english/index').english;
// run mock aggregation - using a callback paradigm
aggregators.greetings.mock('James', [ english ], function(result) {
console.log(result);
});
// run mock aggregation - using a promise paradigm
aggregators.greetings.mock('James', [ english ])
.then(function(result) {
console.log(result);
});
Integration
Actually use SNS to communicate. You will need a server that is subscribed to the SNS Topic.
examples/with-lambda/aggegator/server.js
const aggregators = require('./index.js');
const express = require('express');
const Scather = require('aws-scatter-gather');
// create an express app and add the scather sns middleware
const app = express();
app.use(Scather.middleware({
endpoint: 'https://url-to-this-server.com',
server: app,
topics: ['arn:aws:sns:us-west-2:064824991063:ResponseTopic']
}));
// start the server listening on port 3000
app.listen(3000, function() {
// aggregate results through the SNS Topics - using callback paradigm
aggregators.greetings('James', function(err, data) {
console.log(data);
});
// aggregate results through the SNS Topics - using promise paradigm
aggregators.greetings('James')
.then(function(data) {
console.log(data);
});
});
Define a Lambda
examples/with-lambda/lambdas/english/index.js
Scather.response
takes a named function. In this case english
.Scather.lambda
function does the work of receiving SNS Topic Notifications, calling its associated Scather responder function, and sending the response back to the aggregator.const Scather = require('aws-scatter-gather');
// callback paradigm
exports.response = Scather.response(function english(data, callback) {
callback(null, 'Hello, ' + data);
});
// promise paradigm
exports.response = Scather.response(function english(data) {
return 'Hello, ' + data;
});
exports.handler = Scather.lambda(exports.response);
Unit Testing
examples/with-lambda/lambdas/english/test.js
const lambda = require('./index');
// callback paradigm
lambda.response('James', function(err, data) {
console.log(data);
});
// promise paradigm
lambda.response('James')
.then(function(data) {
console.log(data);
});
Response functions are subscribed to SNS events if you are running your own server. If you are using a lambda to capture SNS events then look to the Lambdas section for details.
examples/with-server/server/server.js
const AWS = require('aws-sdk');
const express = require('express');
const Scather = require('aws-scatter-gather');
// create the sns instance
const sns = new AWS.SNS({ region: 'us-west-2' });
// create an express app and add the scather sns middleware
const app = express();
app.use(Scather.middleware({
endpoint: 'http://url-to-this-server.com',
server: app,
sns: sns,
topics: ['arn:aws:sns:us-west-2:064824991063:ResponseTopic']
}));
// using a handler function as input and using a callback paradigm
Scather.response(function English(data, callback) {
callback(null, 'Hello, ' + data);
});
// using a configuration as input and using a promise paradigm
Scather.response({
name: 'Chinese',
sns: sns,
handler: function (data) { // promise paradigm because no second parameter is specified
return 'Ni hao, ' + data;
}
});
// start the server listening on port 3001
app.listen(3001, function() {
console.log('Server listening on port 3001');
});
If a responder depends on an upstream API, an aggregator may be configured with a circuit breaker that will suspend the responder if the upstream API goes down.
examples/with-circuitbreaker/aggregator/server.js
const AWS = require('aws-sdk');
const express = require('express');
const Scather = require('aws-scatter-gather');
// create an express app
const app = express();
// create a circuitbreaker
const circuitbreaker = Scather.circuitbreaker.config({
// trip for 1 minute
timeout: 1000 * 60,
// trip if errors exceed 10% of requests
errorThreshold: 0.1,
// don't trip breaker on first fault if less than 10 requests per window
lowLoadThreshold: 10,
// Ten minute window
windowSize: 1000 * 60 * 10
});
// add the scather sns middleware
app.use(Scather.middleware({
endpoint: 'http://url-to-this-server.com',
server: app,
sns: new AWS.SNS({ region: 'us-west-2' }),
topics: ['arn:aws:sns:us-west-2:064824991063:ResponseTopic']
}));
const echoes = Scather.aggregator({
composer: function(responses) {
const str = Object.keys(responses)
.map(function(source) {
return source + ': ' + responses[source];
})
.join('\n\t');
return 'Echo from multiple sources: \n\t' + str;
},
expects: ['service'],
maxWait: 2500,
minWait: 0,
responseArn: 'arn:aws:sns:us-west-2:064824991063:ResponseTopic',
topicArn: 'arn:aws:sns:us-west-2:064824991063:RequestTopic',
circuitbreaker: circuitbreaker
});
// start the server listening on port 3000
app.listen(3000, function() {
console.log('Server listening on port 3000');
// aggregate results through the SNS Topics - using callback paradigm
echoes('EchoThisBack', function(err, data) {
if(err) {
console.error(JSON.stringify(err));
}
console.log(JSON.stringify(data));
});
});
When the responder detects a fault in the upstream API, it should return an error with the attribute circuitbreakerFault set. In order to bypass a request once the circuit breaker has tripped, the response must be configured with a name, a handler function, and a bypass function:
examples/with-circuitbreaker/lambdas/service/index.js
'use strict';
const Scather = require('aws-scatter-gather');
const request = require('request');
const snsArn = 'arn:aws:sns:us-west-2:064824991063:ResponseTopic';
function service(data) {
const url = `http://echo.jsontest.com/data/${data}`;
return new Promise(function(resolve, reject) {
request(url, function(error, response, body) {
if(error) {
return reject(error);
}
if(response.statusCode !== 200) {
return reject({
circuitbreakerFault: true,
statusCode: response.statusCode
});
}
return resolve(body);
});
});
}
function bypass(data) {
return JSON.stringify({
data: 'Bypassed by circuit breaker'
});
}
exports.response = Scather.response({
name: 'service',
handler: service,
bypass: bypass
});
exports.handler = Scather.lambda(exports.response);
Produce an aggregator function that can be called to make a request and aggregate responses.
Parameters
recieved.data
.{ active: boolean, minWaitReached: boolean, missing: Array.<string>
. Active tells whether the aggregator will continue to process additional responses.[]
.2500
.0
.-
.Returns a function that can take one or two arguments. The first argument is the data to send with the request. The second parameter is an optional callback function. If the callback is omitted then a Promise will be returned.
Access the event interface. You probably don't need to do this unless you're building some sort of plugin. It's not really worth the effort to document this, yet.
If the lambda function is invoked by an SNS Topic then the handler will be called with the relevant data. Once the handler completes the lambda function will route the response back to the aggregator.
Parameters
Returns a function that is intended to be invoked by an SNS Topic event.
Parameters
false
.true
.[]
.true
.Produce a response function.
Parameters
handler - A function to call with request data or an object containing the handler. The handler function will be called with its defined signature (either using callbacks or promises). Note that when the handler is invoked, the invocation signature does not need to match the calling signature. The calling signature can use promises or callbacks, independent of the handler signature.
If an object is used it takes these parameters:
- *name* - The name of the function.
- *sns* - The SNS instance to use. This cannot be defined if passing in a function instead of an object for the response parameter.
- *handler* - The handler function to call.
- *bypass* - If a circuit breaker is being used, the function to call if the upstream API is down.
Returns a function that can take one or two arguments. The first arguments is the data to have the response process. The second parameter is an optional callback function. If the callback is omitted then a Promise will be returned.
Example of Free Form Paradigm Execution
// define response using the promise paradigm
const resP = Scather.response(function(data) {
return 'Hello, ' + data;
});
// define a response using the callback paradigm
const resC = Scather.response(function(data, callback) {
callback(null, 'Hello, ' + data);
});
// call resP using promise paradigm
resP('James')
.then(function(data) {
console.log(data);
});
// call resP using callback paradigm
resP('James', function(err, data) {
console.log(data);
});
// call resC using promise paradigm
resC('James')
.then(function(data) {
console.log(data);
});
// call resC using callback paradigm
resC('James', function(err, data) {
console.log(data);
});
Produce a circuitbreaker object. The circuit breaker will keep track of each request, and whether a successful response or a faulty response was the result. If enough responses are faulty within the request window, the circuit breaker will trip, bypassing requests for a time (the state will change from closed to open). Once the timeout has been reached, the state will change to indeterminate, where another faulty response will immediately change the state back to open. If a successful response is recorded instead, the state will be reset to closed and requests will operate normally.
Parameters
Returns a circuit breaker object with the following methods:
A scatter-gather design pattern sends a single request to many servers and then aggregates the responses of those servers. Because this module uses AWS SNS Topics for communication it is using an asynchronous messaging pattern.
An AWS SNS Topic is a messaging channel that any number of servers can subscribe to. When the SNS Topic receives an event it pushes that event to all servers that are subscribed to the topic.
HTTP is commonly used to make synchronous requests:
For asynchronous requests:
* I am aware that the server is now the client and the client the server, but to keep things simple I kept their names the same.
Option 1
The below image more accurately portrays messaging when using a single SNS Topic:
Using a single topic reduces initial set up, but it increases network traffic and the number of times requests are processed. The number of additional requests that would be made in this situation can be caluculated using this formula:
N x N + 1
N
is the number of servers responding to the request.
So, if you have 5
servers that are responding to requests from the aggregator then you'll have a total of 26 additional requests that did not need to be made. Every requests will consume both consume the network bandwidth and processing time.
Option 2
An alternative is to use two SNS Topics:
Using this method no unneeded requests will be made.
FAQs
An NPM package that facilitates the scatter gather design pattern using AWS SNS Topics.
The npm package aws-scatter-gather receives a total of 0 weekly downloads. As such, aws-scatter-gather popularity was classified as not popular.
We found that aws-scatter-gather demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Maven Central now validates Sigstore signatures, making it easier for developers to verify the provenance of Java packages.
Security News
CISOs are racing to adopt AI for cybersecurity, but hurdles in budgets and governance may leave some falling behind in the fight against cyber threats.
Research
Security News
Socket researchers uncovered a backdoored typosquat of BoltDB in the Go ecosystem, exploiting Go Module Proxy caching to persist undetected for years.