Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

multifetch

Package Overview
Dependencies
Maintainers
7
Versions
9
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

multifetch - npm Package Compare versions

Comparing version 1.0.2 to 1.1.0

3

package.json
{
"name": "multifetch",
"version": "1.0.2",
"version": "1.1.0",
"repository": "git@github.com:e-conomic/multifetch.git",

@@ -11,2 +11,3 @@ "description": "Express middleware for performing internal batch requests",

"dependencies": {
"async": "~0.9.0",
"pump": "~1.0.0",

@@ -13,0 +14,0 @@ "extend": "~2.0.0"

@@ -126,2 +126,14 @@ multifetch [![Build Status](https://travis-ci.org/e-conomic/multifetch.png?branch=master)](https://travis-ci.org/e-conomic/multifetch)

### Concurrency
By default, `multifetch` will process each request sequentially, waiting until a request has been processed and fully piped out before it processes the next request.
If the response to each of your requests is small but takes a long time to fetch (e.g. heavy database queries), `multifetch` supports concurrently processing requests.
Passing `concurrency: N` as an option allows you to control the number of concurrent requests being processed at any one time:
```javascript
app.get('/api/multifetch', multifetch({ concurrency: 5 }));
```
In the above case, 5 requests would be routed through express concurrently, and the response of each is placed in a queue to be streamed out to the client sequentially.
License

@@ -128,0 +140,0 @@ -------

@@ -5,2 +5,3 @@ var url = require('url');

var extend = require('extend');
var async = require('async');

@@ -72,5 +73,10 @@ var JsonStream = require('./json');

var create = function(options, callback) {
if(!callback && typeof options === 'function') {
callback = options;
var endStream = function (jsonStream, error) {
jsonStream.writeObject('_error', error);
jsonStream.end();
};
var create = function(options, prefetch) {
if(!prefetch && typeof options === 'function') {
prefetch = options;
options = {};

@@ -80,9 +86,9 @@ }

options = options || {};
var ignore = options.ignore || [];
var headers = options.headers !== undefined ? options.headers : true;
var concurrency = options.concurrency || 1; // Defaults to sequential fetching
var fetch = headers ? fetchWithHeaders : fetchBare;
callback = callback || noopCallback;
prefetch = prefetch || noopCallback;

@@ -101,37 +107,59 @@ return function(request, response, next) {

(function loop() {
var key = keys.pop();
// Exit early if there is nothing to fetch.
if(keys.length === 0) {
return endStream(json, error);
}
if(!key) {
json.writeObject('_error', error);
return json.end();
}
// The resource queue processes resource streams sequentially.
var resourceQueue = async.queue(function worker(task, callback) {
pump(task.resource, json.createObjectStream(task.key), function(err) {
if(err) {
json.destroy();
return callback(err);
}
if(!(/2\d\d/).test(task.response.statusCode)) {
error = true;
}
callback();
});
}, 1);
// Asynchronously fetch the resource for a key and push the resulting
// stream into the resource queue.
var fetchResource = function(key, callback) {
var messages = createMessages(request, query[key]);
prefetch(request, messages.request, function(prevent) {
if (prevent) return callback();
var write = function(prevent) {
if(prevent) {
return loop();
}
var resource = fetch(messages.request, messages.response);
var task = {
resource: resource,
request: messages.request,
response: messages.response,
key: key
};
pump(resource, json.createObjectStream(key), function(err) {
if(err) {
return json.destroy();
}
if(!(/2\d\d/).test(messages.response.statusCode)) {
error = true;
}
loop();
});
app(messages.request, messages.response, function(err) {
app(messages.request, messages.response, function() {
resourceQueue.kill();
json.destroy();
});
};
callback(request, messages.request, write);
}());
// Callback is called once the stream for this resource has
// been fully piped out to the client.
resourceQueue.push(task, callback);
});
};
// Fire off all requests and push the resulting streams into a queue to
// be processed
async.eachLimit(keys, concurrency, fetchResource, function(err) {
if(resourceQueue.idle()) {
endStream(json, error);
} else {
// Called once all streams have been fully pumped out to the client.
resourceQueue.drain = function() {
endStream(json, error);
};
}
});
};

@@ -138,0 +166,0 @@ };

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc