Comparing version 1.0.2 to 1.1.0
{ | ||
"name": "multifetch", | ||
"version": "1.0.2", | ||
"version": "1.1.0", | ||
"repository": "git@github.com:e-conomic/multifetch.git", | ||
@@ -11,2 +11,3 @@ "description": "Express middleware for performing internal batch requests", | ||
"dependencies": { | ||
"async": "~0.9.0", | ||
"pump": "~1.0.0", | ||
@@ -13,0 +14,0 @@ "extend": "~2.0.0" |
@@ -126,2 +126,14 @@ multifetch [![Build Status](https://travis-ci.org/e-conomic/multifetch.png?branch=master)](https://travis-ci.org/e-conomic/multifetch) | ||
### Concurrency | ||
By default, `multifetch` will process each request sequentially, waiting until a request has been processed and fully piped out before it processes the next request. | ||
If the response to each of your requests is small but takes a long time to fetch (e.g. heavy database queries), `multifetch` supports concurrently processing requests. | ||
Passing `concurrency: N` as an option allows you to control the number of concurrent requests being processed at any one time: | ||
```javascript | ||
app.get('/api/multifetch', multifetch({ concurrency: 5 })); | ||
``` | ||
In the above case, 5 requests would be routed through express concurrently, and the response of each is placed in a queue to be streamed out to the client sequentially. | ||
License | ||
@@ -128,0 +140,0 @@ ------- |
@@ -5,2 +5,3 @@ var url = require('url'); | ||
var extend = require('extend'); | ||
var async = require('async'); | ||
@@ -72,5 +73,10 @@ var JsonStream = require('./json'); | ||
var create = function(options, callback) { | ||
if(!callback && typeof options === 'function') { | ||
callback = options; | ||
var endStream = function (jsonStream, error) { | ||
jsonStream.writeObject('_error', error); | ||
jsonStream.end(); | ||
}; | ||
var create = function(options, prefetch) { | ||
if(!prefetch && typeof options === 'function') { | ||
prefetch = options; | ||
options = {}; | ||
@@ -80,9 +86,9 @@ } | ||
options = options || {}; | ||
var ignore = options.ignore || []; | ||
var headers = options.headers !== undefined ? options.headers : true; | ||
var concurrency = options.concurrency || 1; // Defaults to sequential fetching | ||
var fetch = headers ? fetchWithHeaders : fetchBare; | ||
callback = callback || noopCallback; | ||
prefetch = prefetch || noopCallback; | ||
@@ -101,37 +107,59 @@ return function(request, response, next) { | ||
(function loop() { | ||
var key = keys.pop(); | ||
// Exit early if there is nothing to fetch. | ||
if(keys.length === 0) { | ||
return endStream(json, error); | ||
} | ||
if(!key) { | ||
json.writeObject('_error', error); | ||
return json.end(); | ||
} | ||
// The resource queue processes resource streams sequentially. | ||
var resourceQueue = async.queue(function worker(task, callback) { | ||
pump(task.resource, json.createObjectStream(task.key), function(err) { | ||
if(err) { | ||
json.destroy(); | ||
return callback(err); | ||
} | ||
if(!(/2\d\d/).test(task.response.statusCode)) { | ||
error = true; | ||
} | ||
callback(); | ||
}); | ||
}, 1); | ||
// Asynchronously fetch the resource for a key and push the resulting | ||
// stream into the resource queue. | ||
var fetchResource = function(key, callback) { | ||
var messages = createMessages(request, query[key]); | ||
prefetch(request, messages.request, function(prevent) { | ||
if (prevent) return callback(); | ||
var write = function(prevent) { | ||
if(prevent) { | ||
return loop(); | ||
} | ||
var resource = fetch(messages.request, messages.response); | ||
var task = { | ||
resource: resource, | ||
request: messages.request, | ||
response: messages.response, | ||
key: key | ||
}; | ||
pump(resource, json.createObjectStream(key), function(err) { | ||
if(err) { | ||
return json.destroy(); | ||
} | ||
if(!(/2\d\d/).test(messages.response.statusCode)) { | ||
error = true; | ||
} | ||
loop(); | ||
}); | ||
app(messages.request, messages.response, function(err) { | ||
app(messages.request, messages.response, function() { | ||
resourceQueue.kill(); | ||
json.destroy(); | ||
}); | ||
}; | ||
callback(request, messages.request, write); | ||
}()); | ||
// Callback is called once the stream for this resource has | ||
// been fully piped out to the client. | ||
resourceQueue.push(task, callback); | ||
}); | ||
}; | ||
// Fire off all requests and push the resulting streams into a queue to | ||
// be processed | ||
async.eachLimit(keys, concurrency, fetchResource, function(err) { | ||
if(resourceQueue.idle()) { | ||
endStream(json, error); | ||
} else { | ||
// Called once all streams have been fully pumped out to the client. | ||
resourceQueue.drain = function() { | ||
endStream(json, error); | ||
}; | ||
} | ||
}); | ||
}; | ||
@@ -138,0 +166,0 @@ }; |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
16762
394
142
3
+ Addedasync@~0.9.0
+ Addedasync@0.9.2(transitive)