@zingle/esbulker
Advanced tools
Comparing version 0.0.4 to 0.1.0-beta1
@@ -12,11 +12,8 @@ const iteropt = require("iteropt"); | ||
return { | ||
address: undefined, // listen on any IP address | ||
breakerDocuments: Infinity, // unlimited queued documents without break | ||
breakerSize: Infinity, // unlimited queue size without break | ||
flushDocuments: Infinity, // unlimited documents per load | ||
flushSize: Infinity, // unlimited request size per load | ||
httpLog: false, // do not log HTTP errors | ||
port: 1374, // listen for requests on port 1374 | ||
retries: 0, // do not retry on problems connecting to ES | ||
slow: Infinity // unlimited time to insert | ||
address: undefined, // listen on any IP address | ||
port: 1374, // listen for requests on port 1374 | ||
secure: false, // do not use TLS | ||
tls: null, // TLS options if secure is true | ||
url: "http://localhost:9200", // default ES server | ||
verbosity: 0 // default log level | ||
}; | ||
@@ -46,25 +43,5 @@ } | ||
return options; | ||
case "--break-docs": | ||
if (isNaN(options.breakerDocuments = positiveInt(optval()))) { | ||
throw new CLIError(`option ${opt} expects positive integer`); | ||
} | ||
case "--es": | ||
options.url = optval(); | ||
break; | ||
case "--break-size": | ||
if (isNaN(options.breakerSize = size(optval()))) { | ||
throw new CLIError(`option ${opt} expects bytes`); | ||
} | ||
break; | ||
case "--flush-docs": | ||
if (isNaN(options.flushDocuments = positiveInt(optval()))) { | ||
throw new CLIError(`option ${opt} expects positive integer`); | ||
} | ||
break; | ||
case "--flush-size": | ||
if (isNaN(options.flushSize = size(optval()))) { | ||
throw new CLIError(`option ${opt} expects bytes`); | ||
} | ||
break; | ||
case "--http-log": | ||
options.httpLog = optval(); | ||
break; | ||
case "--ip": | ||
@@ -78,12 +55,17 @@ options.address = optval(); | ||
break; | ||
case "--retry": | ||
if (isNaN(options.retries = unnegativeInt(optval()))) { | ||
throw new CLIError(`option ${opt} expects positive integer or zero`); | ||
} | ||
case "-q": | ||
case "--quiet": | ||
options.verbosity = (options.verbosity || 0) - 1; | ||
break; | ||
case "--slow": | ||
if (isNaN(options.slow = positiveInt(optval()))) { | ||
throw new CLIError(`option ${opt} expects positive integer`); | ||
} | ||
case "-v": | ||
case "--verbose": | ||
options.verbosity = (options.verbosity || 0) + 1; | ||
break; | ||
case "--tls-cert": | ||
case "--tls-key": | ||
case "--tls-ca": | ||
// these are handled by tlsopt module; simply read value to ensure | ||
// options are parsed without error | ||
optval(); | ||
break; | ||
default: | ||
@@ -93,6 +75,4 @@ throw new CLIError(`option ${opt} is not recognized`); | ||
switch (args.length) { | ||
case 0: throw new CLIError(`missing required endpoint`); | ||
case 1: options.url = args.shift(); break; | ||
default: throw new CLIError(`unexpected argument`); | ||
if (args.length) { | ||
throw new CLIError(`unexpected argument`); | ||
} | ||
@@ -111,11 +91,5 @@ | ||
if (env.BREAK_DOCS) options.breakerDocuments = positiveInt(env.BREAK_DOCS); | ||
if (env.BREAK_SIZE) options.breakerSize = size(env.BREAK_SIZE); | ||
if (env.FLUSH_DOCS) options.flushDocuments = positiveInt(env.FLUSH_DOCS); | ||
if (env.FLUSH_SIZE) options.flushSize = size(env.FLUSH_SIZE); | ||
if (env.HTTP_LOG) options.httpLog = env.HTTP_LOG; | ||
if (env.ES_URL) options.url = env.ES_URL; | ||
if (env.LISTEN_ADDR) options.address = env.LISTEN_ADDR; | ||
if (env.LISTEN_PORT) options.port = positiveInt(env.LISTEN_PORT); | ||
if (env.REQ_RETRIES) options.retries = unnegativeInt(env.REQ_RETRIES); | ||
if (env.SLOW_INSERT) options.slow = positiveInt(env.SLOW_INSERT); | ||
@@ -150,36 +124,2 @@ // remove invalid values | ||
/** | ||
* Parse zero or positive integer. | ||
* @param {string} value | ||
* @returns {number} | ||
*/ | ||
function unnegativeInt(value) { | ||
if (value === "") { | ||
return 0; | ||
} else if (isNaN(value = parseInt(value))) { | ||
return value; | ||
} else if (value < 0) { | ||
return NaN; | ||
} else { | ||
return value; | ||
} | ||
} | ||
/** | ||
* Parse data size. Return Infinity on empty value. | ||
* @param {string} value | ||
* @returns {number} | ||
*/ | ||
function size(value) { | ||
if (value === "") { | ||
return Infinity; | ||
} else if (isNaN(value = bytesized(value))) { | ||
return value; | ||
} else if (value < 0) { | ||
return NaN; | ||
} else { | ||
return value; | ||
} | ||
} | ||
/** | ||
* Display help. | ||
@@ -190,19 +130,16 @@ */ | ||
`Usage: | ||
esbulker [<OPTIONS>] <endpoint> | ||
esbulker [-v|-q] [--es=<url>] [--ip=<addr>] [--port=<num>] | ||
esbulker --help | ||
esbulker --version | ||
Start elasticsearch bulk load proxy. | ||
ARGUMENTS | ||
endpoint URL of Elasticsearch server. | ||
OPTIONS | ||
--help Show this help. | ||
--es=<url> Elasticsearch server URL. | ||
--ip=<addr> IP address proxy listens on. | ||
--flush-documents=<num> Max documents loaded per request. | ||
--flush-size=<num> Max size of data per request. (e.g. 256kib, 2mb) | ||
--port=<num> Port proxy listens on. | ||
--slow=<num> Slow insert threshold in seconds. | ||
-q|--quiet Show less output. | ||
-v|--verbose Show more output. | ||
--version Display version information.` | ||
@@ -209,0 +146,0 @@ ); |
{ | ||
"name": "@zingle/esbulker", | ||
"version": "0.0.4", | ||
"version": "0.1.0-beta1", | ||
"description": "Elasticsearch bulking proxy", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
101
README.md
@@ -19,2 +19,23 @@ [![Build Status](https://travis-ci.com/Zingle/esbulker.svg?branch=master)](https://travis-ci.com/Zingle/esbulker) | ||
![Request Flow Diagram](doc/diagram.png) | ||
### Bulk Inserts | ||
Bulk inserts are limited to 2 MiB of data. This limit is checked *after* adding | ||
a new document to the queue, so the actual amount of data will probably be a bit | ||
more than this. | ||
### Backoff | ||
When the proxy can't connect to ES, or inserts become slow (>2s), or ES begins | ||
to report errors, the endpoint will be backed off. Backoff starts at 0.5s and | ||
increases up to 60s between attempts. | ||
While backed off, requests that have `?refresh=true` or `?refresh=wait_for` will | ||
begin to take a long time. Other requests will be accepted immediately and | ||
queued. | ||
### Circuit Breaker | ||
When the queue grows too large (over 250 MiB), the proxy will begin to return | ||
503 errors to the client. This is too ensure the client doesn't continue to | ||
load data while the proxy continues to fall further behind. | ||
Installation | ||
@@ -26,51 +47,2 @@ ------------ | ||
Configuration | ||
------------- | ||
Carefully consider how the proxy is configured to get the best use out of it. | ||
### Configure Backoff | ||
When Elasticsearch begins to become overloaded, slow inserts can be an early | ||
warning sign. Configuring a slow insert threshold will cause an Elasticsearch | ||
target endpoint to be paused. While paused, the proxy will continue to accept | ||
requests, but writing to the Elasticsearch endpoint will begin backing off | ||
between writes until the problem resolves. | ||
*The backoff on slow inserts is fibonacci based, and the corresponding | ||
rampup is geometric. This means recovery time will be impacted by how long the | ||
endpoint was paused.* | ||
When backoff is enabled, insert limits should also be configured to ensure | ||
inserts don't get larger and larger and slower during a backoff. | ||
Use the `--slow` option to configure backoff. | ||
### Configure Insert Limits | ||
By default, the proxy will attempt to load the entire queue into Elasticsearch | ||
each request. Under reasonable load and normal operation, this is probably ok, | ||
but if the connection to Elasticsearch is lost temporarily or Elasticsearch | ||
simply begins to fall behind, the queue may become very large. Configuring | ||
insert limits will force the proxy to break up the work into multiple inserts. | ||
Use the `--flush-docs` and/or `--flush-size` options to configure insert | ||
limits. | ||
### Configure Circuit Breaking | ||
Under normal operation, the proxy will queue requests and return a successful | ||
response to clients even if the target Elasticsearch server has gone down. This | ||
can hide issues and eventually lead to memory exhaustion, so it's a good idea to | ||
configure circuit breaking. When a circuit breaker is triggered, the proxy | ||
begins returning error responses to clients. The proxy will continue to attempt | ||
loading any documents remaining in the queue. Once the problem has resolved, | ||
the proxy will automatically start accepting requests again. | ||
Use the `--break-docs` and/or `--break-size` options to configure circuit | ||
breaking. | ||
### Logging Failed Requests | ||
For debugging, it can be helpful to see the full request and response when | ||
Elasticsearch returns an error. | ||
Use the `--http-log` option to configure the HTTP error log. Send the proxy a | ||
`HUP` signal to have it re-open the file. | ||
Usage | ||
@@ -80,32 +52,23 @@ ----- | ||
Usage: | ||
esbulker [<OPTIONS>] <endpoint> | ||
esbulker [-v|-q] [--es=<url>] [--ip=<addr>] [--port=<num>] | ||
esbulker --help | ||
esbulker --version | ||
Start elasticsearch bulk load proxy. | ||
ARGUMENTS | ||
endpoint URL of Elasticsearch server. | ||
OPTIONS | ||
--help Show this help. | ||
--break-docs=<docs> Max documents queued before breaking. | ||
--break-size=<bytes> Max size of queue before breaking. | ||
--flush-docs=<docs> Max documents loaded per insert. | ||
--flush-size=<bytes> Max size of data per insert. | ||
--http-log=<file> Path where failed HTTP requests are written. | ||
--retry=<num> Number of times to immediately retry before failing. | ||
--slow=<secs> Slow insert threshold. | ||
--version Print version number. | ||
--es=<url> Elasticsearch server URL. | ||
--ip=<addr> IP address proxy listens on. | ||
--port=<num> Port proxy listens on. | ||
-q|--quiet Show less output. | ||
-v|--verbose Show more output. | ||
--version Display version information.` | ||
NOTE: options which accept bytes can have units such as "10mb" or "1GiB" | ||
ENVIRONMENT | ||
LISTEN_PORT Configure port which proxy listens on. (default 1374) | ||
SIGNALS | ||
HUP Re-open HTTP log file. | ||
ES_URL Elasticsearch server URL. | ||
LISTEN_ADDR IP address proxy listens on. | ||
LISTEN_PORT Port proxy listens on. | ||
``` | ||
@@ -112,0 +75,0 @@ |
125
server.js
@@ -5,124 +5,21 @@ const http = require("http"); | ||
const {CLIError} = require("iteropt"); | ||
const stringWriter = require("@zingle/string-writer"); | ||
const {BulkProxy} = require("./lib/bulk-proxy"); | ||
const handler = require("./lib/handler"); | ||
const {defaults, readenv, readargs} = require("./lib/config"); | ||
const recover = require("./lib/recover"); | ||
const {file: logfile, none: nolog} = require("./lib/console"); | ||
const {HTTPUnhandledStatusError} = require("./lib/http"); | ||
const {assign, entries} = Object; | ||
const {env, argv} = process; | ||
try { | ||
const {assign} = Object; | ||
const {env, argv} = process; | ||
const options = assign(defaults(), readenv(env), readargs(argv)); | ||
const httpConsole = options.httpLog ? logfile(options.httpLog) : nolog(); | ||
const proxy = new BulkProxy(options.url); | ||
const tls = tlsopt.readSync(); | ||
const secure = Boolean(tls.pfx || tls.cert); | ||
const server = secure | ||
? https.createServer(tls, proxy.handler()) | ||
: http.createServer(proxy.handler()) | ||
const web = options.secure ? https : http; | ||
const serverOpts = options.secure ? [options.tls] : []; | ||
const server = web.createServer(...serverOpts, handler(options.url)); | ||
if (options.breakerDocuments) { | ||
proxy.changeBreakerDocuments(options.breakerDocuments); | ||
} | ||
if (options.verbosity < 2) console.debug = () => {}; | ||
if (options.verbosity < 1) console.info = () => {}; | ||
if (options.verbosity < 0) console.warn = () => {}; | ||
if (options.verbosity < -1) console.error = () => {}; | ||
if (options.breakerSize) { | ||
proxy.changeBreakerSize(options.breakerSize); | ||
} | ||
if (options.flushDocuments) { | ||
proxy.changeFlushDocuments(options.flushDocuments); | ||
} | ||
if (options.flushSize) { | ||
proxy.changeFlushSize(options.flushSize); | ||
} | ||
if (options.retries) { | ||
proxy.changeRetries(Number(options.retries)); | ||
} | ||
if (options.slow) { | ||
proxy.changeSlowThreshold(options.slow); | ||
} | ||
proxy.on("paused", endpoint => { | ||
if (endpoint) { | ||
console.info(`writing to ${endpoint.url} has been paused`); | ||
} else { | ||
console.warn(`proxy has gone down`); | ||
} | ||
recover(endpoint || proxy); | ||
}); | ||
proxy.on("resumed", endpoint => { | ||
if (endpoint) { | ||
console.info(`writing to ${endpoint.url} has been resumed`); | ||
} else { | ||
console.info(`proxy has come back up`); | ||
} | ||
}); | ||
proxy.on("backoff", (ms, inserts, endpoint) => { | ||
const loading = inserts.length; | ||
const total = loading + endpoint.pending; | ||
console.info(`backoff ${ms/1000}s ${loading}/${total} document(s) [${endpoint.url}]`); | ||
}); | ||
proxy.on("inserted", (inserts, endpoint) => { | ||
console.info(`inserted ${inserts.length} document(s) [${endpoint.url}]`); | ||
}); | ||
proxy.on("failed", (inserts, endpoint) => { | ||
console.error(`failed to insert ${inserts.length} document(s) [${endpoint.url}]`); | ||
if (process.env.DUMP_LOST) { | ||
console.error(inserts.join("").trim()); | ||
} | ||
}); | ||
proxy.on("error", (err, endpoint) => { | ||
if (endpoint) console.error(`${err.message} [${endpoint.url}]`); | ||
else console.error(`${err.message}`); | ||
if (process.env.DEBUG) console.error(err.stack); | ||
if (err instanceof HTTPUnhandledStatusError) { | ||
const resbody = stringWriter("utf8"); | ||
const {req, res} = err; | ||
res.pipe(resbody).on("error", console.error).on("finish", () => { | ||
const protocol = req.connection.encrypted ? "https" : "http"; | ||
const {host} = req.getHeaders(); | ||
const header = `** UNEXPECTED HTTP STATUS **`; | ||
httpConsole.error(Array(header.length).fill("*").join("")); | ||
httpConsole.error(header); | ||
httpConsole.error(Array(header.length).fill("*").join("")); | ||
httpConsole.error(`${req.method} ${req.path} HTTP/1.1`); | ||
httpConsole.error(`Host: ${host}`); | ||
httpConsole.error(`Date: ${new Date()}`); | ||
httpConsole.error(`X-Forwarded-Proto: ${protocol}`); | ||
httpConsole.error(); | ||
httpConsole.error(req.body.trimRight()); | ||
httpConsole.error(Array(header.length).fill("-").join("")); | ||
httpConsole.error(`${res.statusCode} ${res.statusMessage}`); | ||
httpConsole.error(); | ||
httpConsole.error(String(resbody).trimRight()); | ||
httpConsole.error(Array(header.length).fill("*").join("")); | ||
}); | ||
} | ||
}); | ||
if (!secure) { | ||
console.warn("transport security not enabled"); | ||
} | ||
server.listen(...[options.port, options.address].filter(a => a), () => { | ||
const {address, port} = server.address(); | ||
console.info(`listening on ${address}:${port}`); | ||
console.debug(`listening on ${address}:${port}`); | ||
}); | ||
@@ -129,0 +26,0 @@ } catch (err) { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 1 instance in 1 package
Filesystem access
Supply chain riskAccesses the file system, and could potentially read sensitive data.
Found 1 instance in 1 package
63228
2
18
933
79
7