Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@zingle/esbulker

Package Overview
Dependencies
Maintainers
1
Versions
10
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@zingle/esbulker - npm Package Compare versions

Comparing version 0.0.4 to 0.1.0-beta1

doc/diagram.png

121

lib/config.js

@@ -12,11 +12,8 @@ const iteropt = require("iteropt");

return {
address: undefined, // listen on any IP address
breakerDocuments: Infinity, // unlimited queued documents without break
breakerSize: Infinity, // unlimited queue size without break
flushDocuments: Infinity, // unlimited documents per load
flushSize: Infinity, // unlimited request size per load
httpLog: false, // do not log HTTP errors
port: 1374, // listen for requests on port 1374
retries: 0, // do not retry on problems connecting to ES
slow: Infinity // unlimited time to insert
address: undefined, // listen on any IP address
port: 1374, // listen for requests on port 1374
secure: false, // do not use TLS
tls: null, // TLS options if secure is true
url: "http://localhost:9200", // default ES server
verbosity: 0 // default log level
};

@@ -46,25 +43,5 @@ }

return options;
case "--break-docs":
if (isNaN(options.breakerDocuments = positiveInt(optval()))) {
throw new CLIError(`option ${opt} expects positive integer`);
}
case "--es":
options.url = optval();
break;
case "--break-size":
if (isNaN(options.breakerSize = size(optval()))) {
throw new CLIError(`option ${opt} expects bytes`);
}
break;
case "--flush-docs":
if (isNaN(options.flushDocuments = positiveInt(optval()))) {
throw new CLIError(`option ${opt} expects positive integer`);
}
break;
case "--flush-size":
if (isNaN(options.flushSize = size(optval()))) {
throw new CLIError(`option ${opt} expects bytes`);
}
break;
case "--http-log":
options.httpLog = optval();
break;
case "--ip":

@@ -78,12 +55,17 @@ options.address = optval();

break;
case "--retry":
if (isNaN(options.retries = unnegativeInt(optval()))) {
throw new CLIError(`option ${opt} expects positive integer or zero`);
}
case "-q":
case "--quiet":
options.verbosity = (options.verbosity || 0) - 1;
break;
case "--slow":
if (isNaN(options.slow = positiveInt(optval()))) {
throw new CLIError(`option ${opt} expects positive integer`);
}
case "-v":
case "--verbose":
options.verbosity = (options.verbosity || 0) + 1;
break;
case "--tls-cert":
case "--tls-key":
case "--tls-ca":
// these are handled by tlsopt module; simply read value to ensure
// options are parsed without error
optval();
break;
default:

@@ -93,6 +75,4 @@ throw new CLIError(`option ${opt} is not recognized`);

switch (args.length) {
case 0: throw new CLIError(`missing required endpoint`);
case 1: options.url = args.shift(); break;
default: throw new CLIError(`unexpected argument`);
if (args.length) {
throw new CLIError(`unexpected argument`);
}

@@ -111,11 +91,5 @@

if (env.BREAK_DOCS) options.breakerDocuments = positiveInt(env.BREAK_DOCS);
if (env.BREAK_SIZE) options.breakerSize = size(env.BREAK_SIZE);
if (env.FLUSH_DOCS) options.flushDocuments = positiveInt(env.FLUSH_DOCS);
if (env.FLUSH_SIZE) options.flushSize = size(env.FLUSH_SIZE);
if (env.HTTP_LOG) options.httpLog = env.HTTP_LOG;
if (env.ES_URL) options.url = env.ES_URL;
if (env.LISTEN_ADDR) options.address = env.LISTEN_ADDR;
if (env.LISTEN_PORT) options.port = positiveInt(env.LISTEN_PORT);
if (env.REQ_RETRIES) options.retries = unnegativeInt(env.REQ_RETRIES);
if (env.SLOW_INSERT) options.slow = positiveInt(env.SLOW_INSERT);

@@ -150,36 +124,2 @@ // remove invalid values

/**
* Parse zero or positive integer.
* @param {string} value
* @returns {number}
*/
function unnegativeInt(value) {
if (value === "") {
return 0;
} else if (isNaN(value = parseInt(value))) {
return value;
} else if (value < 0) {
return NaN;
} else {
return value;
}
}
/**
* Parse data size. Return Infinity on empty value.
* @param {string} value
* @returns {number}
*/
function size(value) {
if (value === "") {
return Infinity;
} else if (isNaN(value = bytesized(value))) {
return value;
} else if (value < 0) {
return NaN;
} else {
return value;
}
}
/**
* Display help.

@@ -190,19 +130,16 @@ */

`Usage:
esbulker [<OPTIONS>] <endpoint>
esbulker [-v|-q] [--es=<url>] [--ip=<addr>] [--port=<num>]
esbulker --help
esbulker --version
Start elasticsearch bulk load proxy.
ARGUMENTS
endpoint URL of Elasticsearch server.
OPTIONS
--help Show this help.
--es=<url> Elasticsearch server URL.
--ip=<addr> IP address proxy listens on.
--flush-documents=<num> Max documents loaded per request.
--flush-size=<num> Max size of data per request. (e.g. 256kib, 2mb)
--port=<num> Port proxy listens on.
--slow=<num> Slow insert threshold in seconds.
-q|--quiet Show less output.
-v|--verbose Show more output.
--version Display version information.`

@@ -209,0 +146,0 @@ );

{
"name": "@zingle/esbulker",
"version": "0.0.4",
"version": "0.1.0-beta1",
"description": "Elasticsearch bulking proxy",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -19,2 +19,23 @@ [![Build Status](https://travis-ci.com/Zingle/esbulker.svg?branch=master)](https://travis-ci.com/Zingle/esbulker)

![Request Flow Diagram](doc/diagram.png)
### Bulk Inserts
Bulk inserts are limited to 2 MiB of data. This limit is checked *after* adding
a new document to the queue, so the actual amount of data will probably be a bit
more than this.
### Backoff
When the proxy can't connect to ES, or inserts become slow (>2s), or ES begins
to report errors, the endpoint will be backed off. Backoff starts at 0.5s and
increases up to 60s between attempts.
While backed off, requests that have `?refresh=true` or `?refresh=wait_for` will
begin to take a long time. Other requests will be accepted immediately and
queued.
### Circuit Breaker
When the queue grows too large (over 250 MiB), the proxy will begin to return
503 errors to the client. This is too ensure the client doesn't continue to
load data while the proxy continues to fall further behind.
Installation

@@ -26,51 +47,2 @@ ------------

Configuration
-------------
Carefully consider how the proxy is configured to get the best use out of it.
### Configure Backoff
When Elasticsearch begins to become overloaded, slow inserts can be an early
warning sign. Configuring a slow insert threshold will cause an Elasticsearch
target endpoint to be paused. While paused, the proxy will continue to accept
requests, but writing to the Elasticsearch endpoint will begin backing off
between writes until the problem resolves.
*The backoff on slow inserts is fibonacci based, and the corresponding
rampup is geometric. This means recovery time will be impacted by how long the
endpoint was paused.*
When backoff is enabled, insert limits should also be configured to ensure
inserts don't get larger and larger and slower during a backoff.
Use the `--slow` option to configure backoff.
### Configure Insert Limits
By default, the proxy will attempt to load the entire queue into Elasticsearch
each request. Under reasonable load and normal operation, this is probably ok,
but if the connection to Elasticsearch is lost temporarily or Elasticsearch
simply begins to fall behind, the queue may become very large. Configuring
insert limits will force the proxy to break up the work into multiple inserts.
Use the `--flush-docs` and/or `--flush-size` options to configure insert
limits.
### Configure Circuit Breaking
Under normal operation, the proxy will queue requests and return a successful
response to clients even if the target Elasticsearch server has gone down. This
can hide issues and eventually lead to memory exhaustion, so it's a good idea to
configure circuit breaking. When a circuit breaker is triggered, the proxy
begins returning error responses to clients. The proxy will continue to attempt
loading any documents remaining in the queue. Once the problem has resolved,
the proxy will automatically start accepting requests again.
Use the `--break-docs` and/or `--break-size` options to configure circuit
breaking.
### Logging Failed Requests
For debugging, it can be helpful to see the full request and response when
Elasticsearch returns an error.
Use the `--http-log` option to configure the HTTP error log. Send the proxy a
`HUP` signal to have it re-open the file.
Usage

@@ -80,32 +52,23 @@ -----

Usage:
esbulker [<OPTIONS>] <endpoint>
esbulker [-v|-q] [--es=<url>] [--ip=<addr>] [--port=<num>]
esbulker --help
esbulker --version
Start elasticsearch bulk load proxy.
ARGUMENTS
endpoint URL of Elasticsearch server.
OPTIONS
--help Show this help.
--break-docs=<docs> Max documents queued before breaking.
--break-size=<bytes> Max size of queue before breaking.
--flush-docs=<docs> Max documents loaded per insert.
--flush-size=<bytes> Max size of data per insert.
--http-log=<file> Path where failed HTTP requests are written.
--retry=<num> Number of times to immediately retry before failing.
--slow=<secs> Slow insert threshold.
--version Print version number.
--es=<url> Elasticsearch server URL.
--ip=<addr> IP address proxy listens on.
--port=<num> Port proxy listens on.
-q|--quiet Show less output.
-v|--verbose Show more output.
--version Display version information.`
NOTE: options which accept bytes can have units such as "10mb" or "1GiB"
ENVIRONMENT
LISTEN_PORT Configure port which proxy listens on. (default 1374)
SIGNALS
HUP Re-open HTTP log file.
ES_URL Elasticsearch server URL.
LISTEN_ADDR IP address proxy listens on.
LISTEN_PORT Port proxy listens on.
```

@@ -112,0 +75,0 @@

@@ -5,124 +5,21 @@ const http = require("http");

const {CLIError} = require("iteropt");
const stringWriter = require("@zingle/string-writer");
const {BulkProxy} = require("./lib/bulk-proxy");
const handler = require("./lib/handler");
const {defaults, readenv, readargs} = require("./lib/config");
const recover = require("./lib/recover");
const {file: logfile, none: nolog} = require("./lib/console");
const {HTTPUnhandledStatusError} = require("./lib/http");
const {assign, entries} = Object;
const {env, argv} = process;
try {
const {assign} = Object;
const {env, argv} = process;
const options = assign(defaults(), readenv(env), readargs(argv));
const httpConsole = options.httpLog ? logfile(options.httpLog) : nolog();
const proxy = new BulkProxy(options.url);
const tls = tlsopt.readSync();
const secure = Boolean(tls.pfx || tls.cert);
const server = secure
? https.createServer(tls, proxy.handler())
: http.createServer(proxy.handler())
const web = options.secure ? https : http;
const serverOpts = options.secure ? [options.tls] : [];
const server = web.createServer(...serverOpts, handler(options.url));
if (options.breakerDocuments) {
proxy.changeBreakerDocuments(options.breakerDocuments);
}
if (options.verbosity < 2) console.debug = () => {};
if (options.verbosity < 1) console.info = () => {};
if (options.verbosity < 0) console.warn = () => {};
if (options.verbosity < -1) console.error = () => {};
if (options.breakerSize) {
proxy.changeBreakerSize(options.breakerSize);
}
if (options.flushDocuments) {
proxy.changeFlushDocuments(options.flushDocuments);
}
if (options.flushSize) {
proxy.changeFlushSize(options.flushSize);
}
if (options.retries) {
proxy.changeRetries(Number(options.retries));
}
if (options.slow) {
proxy.changeSlowThreshold(options.slow);
}
proxy.on("paused", endpoint => {
if (endpoint) {
console.info(`writing to ${endpoint.url} has been paused`);
} else {
console.warn(`proxy has gone down`);
}
recover(endpoint || proxy);
});
proxy.on("resumed", endpoint => {
if (endpoint) {
console.info(`writing to ${endpoint.url} has been resumed`);
} else {
console.info(`proxy has come back up`);
}
});
proxy.on("backoff", (ms, inserts, endpoint) => {
const loading = inserts.length;
const total = loading + endpoint.pending;
console.info(`backoff ${ms/1000}s ${loading}/${total} document(s) [${endpoint.url}]`);
});
proxy.on("inserted", (inserts, endpoint) => {
console.info(`inserted ${inserts.length} document(s) [${endpoint.url}]`);
});
proxy.on("failed", (inserts, endpoint) => {
console.error(`failed to insert ${inserts.length} document(s) [${endpoint.url}]`);
if (process.env.DUMP_LOST) {
console.error(inserts.join("").trim());
}
});
proxy.on("error", (err, endpoint) => {
if (endpoint) console.error(`${err.message} [${endpoint.url}]`);
else console.error(`${err.message}`);
if (process.env.DEBUG) console.error(err.stack);
if (err instanceof HTTPUnhandledStatusError) {
const resbody = stringWriter("utf8");
const {req, res} = err;
res.pipe(resbody).on("error", console.error).on("finish", () => {
const protocol = req.connection.encrypted ? "https" : "http";
const {host} = req.getHeaders();
const header = `** UNEXPECTED HTTP STATUS **`;
httpConsole.error(Array(header.length).fill("*").join(""));
httpConsole.error(header);
httpConsole.error(Array(header.length).fill("*").join(""));
httpConsole.error(`${req.method} ${req.path} HTTP/1.1`);
httpConsole.error(`Host: ${host}`);
httpConsole.error(`Date: ${new Date()}`);
httpConsole.error(`X-Forwarded-Proto: ${protocol}`);
httpConsole.error();
httpConsole.error(req.body.trimRight());
httpConsole.error(Array(header.length).fill("-").join(""));
httpConsole.error(`${res.statusCode} ${res.statusMessage}`);
httpConsole.error();
httpConsole.error(String(resbody).trimRight());
httpConsole.error(Array(header.length).fill("*").join(""));
});
}
});
if (!secure) {
console.warn("transport security not enabled");
}
server.listen(...[options.port, options.address].filter(a => a), () => {
const {address, port} = server.address();
console.info(`listening on ${address}:${port}`);
console.debug(`listening on ${address}:${port}`);
});

@@ -129,0 +26,0 @@ } catch (err) {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc