Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@petitchevalroux/http-download-stream

Package Overview
Dependencies
Maintainers
1
Versions
10
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@petitchevalroux/http-download-stream - npm Package Compare versions

Comparing version 1.1.1 to 1.2.0

.npmignore

2

package.json
{
"name": "@petitchevalroux/http-download-stream",
"version": "1.1.1",
"version": "1.2.0",
"description": "Node transform stream downloading url in input with retry and rate limit",

@@ -5,0 +5,0 @@ "main": "src/index.js",

@@ -7,10 +7,9 @@ "use strict";

path = require("path"),
HttpError = require(path.join(__dirname, "errors", "http")), {
RateLimiter
} = require("limiter"),
retry = require("retry");
urlModule = require("url"),
Promise = require("bluebird"),
Fetcher = require(path.join(__dirname, "fetcher"));
class HttpDownloadStream extends Transform {
constructor(options) {
options = Object.assign({
const instanceOptions = Object.assign({
"timeout": 5000,

@@ -23,10 +22,10 @@ "followRedirect": true,

"retries": 3,
"retryMinTimeout": 2500
"retryMinTimeout": 2500,
"maxParallelHosts": 10
}, options || {});
super(options);
if (typeof(options.httpClient) === "undefined") {
this.httpClient = request.defaults({
"timeout": options.timeout,
"followRedirect": options.followRedirect,
"maxRedirects": options.maxRedirects,
if (typeof(instanceOptions.httpClient) === "undefined") {
instanceOptions.httpClient = request.defaults({
"timeout": instanceOptions.timeout,
"followRedirect": instanceOptions.followRedirect,
"maxRedirects": instanceOptions.maxRedirects,
"gzip": true,

@@ -37,66 +36,93 @@ "headers": {

});
} else {
this.httpClient = options.httpClient;
}
this.limiter = new RateLimiter(options.rateCount, options.rateWindow);
this.retries = options.retries;
this.retryMinTimeout = options.retryMinTimeout;
super(instanceOptions);
this.hostFetchersCount = 0;
this.maxHostFetchers = instanceOptions.maxParallelHosts;
delete instanceOptions.maxParallelHosts;
this.options = instanceOptions;
this.buffer = [];
this.hostFetchers = {};
}
get(chunk, callback) {
_transform(chunk, encoding, callback) {
try {
const host = urlModule.parse(chunk.toString())
.hostname;
this.getHostFetcher(host)
.then((fetcher) => {
fetcher.lastUsed = new Date()
.getTime();
return fetcher
.fetch(chunk.toString());
})
.then((result) => {
return callback(null, result);
})
.catch((err) => {
callback(err);
});
} catch (e) {
callback(e);
}
}
getHostFetcher(host) {
if (this.hostFetchersCount > this.maxHostFetchers) {
const self = this;
return this
.deleteLeastRecentlyUsedFetcher()
.then(() => {
return self.getHostFetcher(host);
});
}
return typeof(this.hostFetchers[host]) !== "undefined" ?
Promise.resolve(this.hostFetchers[host]) :
this.createHostFetcher(host);
}
createHostFetcher(host) {
this.hostFetchersCount++;
this.hostFetchers[host] = new Fetcher(this.options);
return Promise.resolve(this.hostFetchers[host]);
}
deleteLeastRecentlyUsedFetcher() {
const self = this;
self.limiter.removeTokens(1, function(err) {
if (err) {
callback(err);
return;
}
if (Buffer.isBuffer(chunk)) {
chunk = chunk.toString();
}
self.httpClient.get(chunk, (err, response, body) => {
if (err) {
callback(new HttpError(err));
return;
return this
.getLeastRecentlyUsedFetcherHost()
.then((host) => {
if (!host || !self.hostFetchers[host]) {
throw new Error(
"Unable to find an host fetcher to delete");
}
callback(
null, {
"input": chunk,
"output": {
"headers": response.headers,
"statusCode": response.statusCode,
"body": body
}
}
);
delete self.hostFetchers[host];
self.hostFetchersCount--;
return host;
});
});
}
_transform(chunk, encoding, callback) {
const self = this;
const operation = retry.operation({
"minTimeout": this.retryMinTimeout,
"retries": this.retries
});
operation.attempt((attempt) => {
self.get(chunk, function(err, response) {
if (!err && response.output.statusCode >
499) {
if (attempt <= self.retries) {
err = 499;
}
getLeastRecentlyUsedFetcherHost() {
return new Promise((resolve) => {
const hosts = Object.getOwnPropertyNames(this.hostFetchers);
if (!hosts.length) {
return resolve(null);
}
const self = this;
hosts.sort((a, b) => {
if (self.hostFetchers[a].lastUsed === self.hostFetchers[
b].lastUsed) {
return 0;
}
if (operation.retry(err)) {
return;
}
if (response) {
response.attempt = attempt;
}
callback(err ? operation.mainError() : null,
response);
return (self.hostFetchers[a].lastUsed <
self.hostFetchers[
b].lastUsed) ?
-1 : 1;
});
return resolve(hosts[0]);
});
}
}
module.exports = HttpDownloadStream;

@@ -13,3 +13,2 @@ "use strict";

nock = require("nock"),
sinon = require("sinon"),
assert = require("assert");

@@ -23,5 +22,7 @@

});
sinon.stub(transform.httpClient, "get")
.callsFake((chunk, cb) => {
cb(new Error("dummy"));
nock("http://example.com")
.get("/error")
.replyWithError({
message: "something awful happened",
code: "AWFUL_ERROR"
});

@@ -32,5 +33,8 @@ const input = new PassThrough();

assert(err instanceof HttpError);
assert.equal(err.url,
"http://example.com/error"
);
resolve();
});
input.write("http://example.com");
input.write("http://example.com/error");
});

@@ -44,3 +48,3 @@

});
assert.equal(transform.httpClient, "foo");
assert.equal(transform.options.httpClient, "foo");
});

@@ -47,0 +51,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc