@petitchevalroux/http-download-stream
Advanced tools
Comparing version 1.1.1 to 1.2.0
{ | ||
"name": "@petitchevalroux/http-download-stream", | ||
"version": "1.1.1", | ||
"version": "1.2.0", | ||
"description": "Node transform stream downloading url in input with retry and rate limit", | ||
@@ -5,0 +5,0 @@ "main": "src/index.js", |
@@ -7,10 +7,9 @@ "use strict"; | ||
path = require("path"), | ||
HttpError = require(path.join(__dirname, "errors", "http")), { | ||
RateLimiter | ||
} = require("limiter"), | ||
retry = require("retry"); | ||
urlModule = require("url"), | ||
Promise = require("bluebird"), | ||
Fetcher = require(path.join(__dirname, "fetcher")); | ||
class HttpDownloadStream extends Transform { | ||
constructor(options) { | ||
options = Object.assign({ | ||
const instanceOptions = Object.assign({ | ||
"timeout": 5000, | ||
@@ -23,10 +22,10 @@ "followRedirect": true, | ||
"retries": 3, | ||
"retryMinTimeout": 2500 | ||
"retryMinTimeout": 2500, | ||
"maxParallelHosts": 10 | ||
}, options || {}); | ||
super(options); | ||
if (typeof(options.httpClient) === "undefined") { | ||
this.httpClient = request.defaults({ | ||
"timeout": options.timeout, | ||
"followRedirect": options.followRedirect, | ||
"maxRedirects": options.maxRedirects, | ||
if (typeof(instanceOptions.httpClient) === "undefined") { | ||
instanceOptions.httpClient = request.defaults({ | ||
"timeout": instanceOptions.timeout, | ||
"followRedirect": instanceOptions.followRedirect, | ||
"maxRedirects": instanceOptions.maxRedirects, | ||
"gzip": true, | ||
@@ -37,66 +36,93 @@ "headers": { | ||
}); | ||
} else { | ||
this.httpClient = options.httpClient; | ||
} | ||
this.limiter = new RateLimiter(options.rateCount, options.rateWindow); | ||
this.retries = options.retries; | ||
this.retryMinTimeout = options.retryMinTimeout; | ||
super(instanceOptions); | ||
this.hostFetchersCount = 0; | ||
this.maxHostFetchers = instanceOptions.maxParallelHosts; | ||
delete instanceOptions.maxParallelHosts; | ||
this.options = instanceOptions; | ||
this.buffer = []; | ||
this.hostFetchers = {}; | ||
} | ||
get(chunk, callback) { | ||
_transform(chunk, encoding, callback) { | ||
try { | ||
const host = urlModule.parse(chunk.toString()) | ||
.hostname; | ||
this.getHostFetcher(host) | ||
.then((fetcher) => { | ||
fetcher.lastUsed = new Date() | ||
.getTime(); | ||
return fetcher | ||
.fetch(chunk.toString()); | ||
}) | ||
.then((result) => { | ||
return callback(null, result); | ||
}) | ||
.catch((err) => { | ||
callback(err); | ||
}); | ||
} catch (e) { | ||
callback(e); | ||
} | ||
} | ||
getHostFetcher(host) { | ||
if (this.hostFetchersCount > this.maxHostFetchers) { | ||
const self = this; | ||
return this | ||
.deleteLeastRecentlyUsedFetcher() | ||
.then(() => { | ||
return self.getHostFetcher(host); | ||
}); | ||
} | ||
return typeof(this.hostFetchers[host]) !== "undefined" ? | ||
Promise.resolve(this.hostFetchers[host]) : | ||
this.createHostFetcher(host); | ||
} | ||
createHostFetcher(host) { | ||
this.hostFetchersCount++; | ||
this.hostFetchers[host] = new Fetcher(this.options); | ||
return Promise.resolve(this.hostFetchers[host]); | ||
} | ||
deleteLeastRecentlyUsedFetcher() { | ||
const self = this; | ||
self.limiter.removeTokens(1, function(err) { | ||
if (err) { | ||
callback(err); | ||
return; | ||
} | ||
if (Buffer.isBuffer(chunk)) { | ||
chunk = chunk.toString(); | ||
} | ||
self.httpClient.get(chunk, (err, response, body) => { | ||
if (err) { | ||
callback(new HttpError(err)); | ||
return; | ||
return this | ||
.getLeastRecentlyUsedFetcherHost() | ||
.then((host) => { | ||
if (!host || !self.hostFetchers[host]) { | ||
throw new Error( | ||
"Unable to find an host fetcher to delete"); | ||
} | ||
callback( | ||
null, { | ||
"input": chunk, | ||
"output": { | ||
"headers": response.headers, | ||
"statusCode": response.statusCode, | ||
"body": body | ||
} | ||
} | ||
); | ||
delete self.hostFetchers[host]; | ||
self.hostFetchersCount--; | ||
return host; | ||
}); | ||
}); | ||
} | ||
_transform(chunk, encoding, callback) { | ||
const self = this; | ||
const operation = retry.operation({ | ||
"minTimeout": this.retryMinTimeout, | ||
"retries": this.retries | ||
}); | ||
operation.attempt((attempt) => { | ||
self.get(chunk, function(err, response) { | ||
if (!err && response.output.statusCode > | ||
499) { | ||
if (attempt <= self.retries) { | ||
err = 499; | ||
} | ||
getLeastRecentlyUsedFetcherHost() { | ||
return new Promise((resolve) => { | ||
const hosts = Object.getOwnPropertyNames(this.hostFetchers); | ||
if (!hosts.length) { | ||
return resolve(null); | ||
} | ||
const self = this; | ||
hosts.sort((a, b) => { | ||
if (self.hostFetchers[a].lastUsed === self.hostFetchers[ | ||
b].lastUsed) { | ||
return 0; | ||
} | ||
if (operation.retry(err)) { | ||
return; | ||
} | ||
if (response) { | ||
response.attempt = attempt; | ||
} | ||
callback(err ? operation.mainError() : null, | ||
response); | ||
return (self.hostFetchers[a].lastUsed < | ||
self.hostFetchers[ | ||
b].lastUsed) ? | ||
-1 : 1; | ||
}); | ||
return resolve(hosts[0]); | ||
}); | ||
} | ||
} | ||
module.exports = HttpDownloadStream; |
@@ -13,3 +13,2 @@ "use strict"; | ||
nock = require("nock"), | ||
sinon = require("sinon"), | ||
assert = require("assert"); | ||
@@ -23,5 +22,7 @@ | ||
}); | ||
sinon.stub(transform.httpClient, "get") | ||
.callsFake((chunk, cb) => { | ||
cb(new Error("dummy")); | ||
nock("http://example.com") | ||
.get("/error") | ||
.replyWithError({ | ||
message: "something awful happened", | ||
code: "AWFUL_ERROR" | ||
}); | ||
@@ -32,5 +33,8 @@ const input = new PassThrough(); | ||
assert(err instanceof HttpError); | ||
assert.equal(err.url, | ||
"http://example.com/error" | ||
); | ||
resolve(); | ||
}); | ||
input.write("http://example.com"); | ||
input.write("http://example.com/error"); | ||
}); | ||
@@ -44,3 +48,3 @@ | ||
}); | ||
assert.equal(transform.httpClient, "foo"); | ||
assert.equal(transform.options.httpClient, "foo"); | ||
}); | ||
@@ -47,0 +51,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Dynamic require
Supply chain riskDynamic require can indicate the package is performing dangerous or unsafe dynamic code execution.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
75110
15
468
6