@zingle/esbulker
Advanced tools
Comparing version 0.1.0-beta1 to 0.1.0-beta2
@@ -5,2 +5,3 @@ const {parse} = require("url"); | ||
const bytesized = require("bytesized"); | ||
const stringWriter = require("@zingle/string-writer"); | ||
const Queue = require("./queue"); | ||
@@ -156,2 +157,3 @@ | ||
const req = web.request(assign(parse(url), {method})); | ||
const date = new Date(); | ||
@@ -166,5 +168,23 @@ return new Promise((resolve, reject) => { | ||
resolve(res); | ||
log(); | ||
} else { | ||
reject(new Error(`unhandled HTTP status ${statusCode}`)); | ||
const output = stringWriter(); | ||
res.pipe(output).on("error", reject); | ||
output.on("finish", () => { | ||
reject(new Error(`unhandled HTTP status ${statusCode}`)); | ||
res.body = String(output); | ||
log(); | ||
}); | ||
} | ||
function log() { | ||
console.http(); | ||
console.http(`POST ${url}`); | ||
console.http(body.trim()); | ||
console.http(`${statusCode} ${http.STATUS_CODES[statusCode]}`); | ||
if (res.body) console.http(res.body.trim()); | ||
console.http(); | ||
} | ||
}); | ||
@@ -171,0 +191,0 @@ |
@@ -7,3 +7,3 @@ const http = require("http"); | ||
const bulk = require("./bulk") | ||
const {all: queues} = require("./queue"); | ||
const {calculateSize: calculateQueueSize} = require("./queue"); | ||
@@ -65,9 +65,2 @@ const MAX_QUEUE_SIZE = bytesized("250MiB"); | ||
/** | ||
* Calculate the total queue size. | ||
*/ | ||
function calculateQueueSize() { | ||
return Array.from(queues()).reduce((a,b) => a+b.size, 0); | ||
} | ||
/** | ||
* Create response body for successful PUT. | ||
@@ -74,0 +67,0 @@ * @param {Header} header |
const {assign, defineProperties} = Object; | ||
const queues = new Map(); | ||
const registry = new Map(); | ||
const internal = Symbol(); | ||
module.exports = assign(queue, {all}); | ||
module.exports = assign(queue, {calculateSize, internal, [internal]: {registry}}); | ||
@@ -15,10 +16,18 @@ /** | ||
if (!queues.has(url)) { | ||
queues.set(url, createQueue(esurl, uri)); | ||
if (!registry.has(url)) { | ||
registry.set(url, createQueue(esurl, uri)); | ||
} | ||
return queues.get(url); | ||
return registry.get(url); | ||
} | ||
/** | ||
* Calculate the total queue size. | ||
* @returns {number} | ||
*/ | ||
function calculateSize() { | ||
return Array.from(registry.values()).reduce((a,b) => a+b.size, 0); | ||
} | ||
/** | ||
* Create Elasticsearch bulk queue. | ||
@@ -75,10 +84,2 @@ * @param {string} esurl | ||
/** | ||
* Iterate over all known queues. | ||
* @yields {Queue} | ||
*/ | ||
function* all() { | ||
yield* queues.values(); | ||
} | ||
/** | ||
* Generate Elasticsearch index action for a bulk URI and index request header. | ||
@@ -85,0 +86,0 @@ * @param {string} uri |
{ | ||
"name": "@zingle/esbulker", | ||
"version": "0.1.0-beta1", | ||
"version": "0.1.0-beta2", | ||
"description": "Elasticsearch bulking proxy", | ||
@@ -34,2 +34,3 @@ "main": "index.js", | ||
"mocha-lcov-reporter": "^1.3.0", | ||
"nock": "^10.0.6", | ||
"nyc": "^14.1.1", | ||
@@ -36,0 +37,0 @@ "sinon": "^7.3.2" |
@@ -16,2 +16,5 @@ const http = require("http"); | ||
console.http = (...args) => console.debug(...args); | ||
if (options.verbosity < 3) console.http = () => {}; | ||
if (options.verbosity < 2) console.debug = () => {}; | ||
@@ -24,3 +27,3 @@ if (options.verbosity < 1) console.info = () => {}; | ||
const {address, port} = server.address(); | ||
console.debug(`listening on ${address}:${port}`); | ||
console.info(`listening on ${address}:${port}`); | ||
}); | ||
@@ -27,0 +30,0 @@ } catch (err) { |
@@ -9,8 +9,8 @@ const expect = require("expect.js"); | ||
expect(options.breakerDocuments).to.be.a("number"); | ||
expect(options.breakerSize).to.be.a("number"); | ||
expect(options.flushDocuments).to.be.a("number"); | ||
expect(options.flushSize).to.be.a("number"); | ||
expect(options.address).to.be(undefined); | ||
expect(options.port).to.be.a("number"); | ||
expect(options.retries).to.be.a("number"); | ||
expect(options.secure).to.be(false); | ||
expect(options.tls).to.be(null); | ||
expect(options.url).to.be.a("string"); | ||
expect(options.verbosity).to.be.a("number"); | ||
}); | ||
@@ -29,14 +29,5 @@ }); | ||
"node", "esbulker", | ||
"--break-docs=12", | ||
"--break-size=300", | ||
"--flush-docs=13", | ||
"--flush-size=10MiB", | ||
"--http-log=foo", | ||
"--es=http://localhost:9201", | ||
"--ip=::1", | ||
"--port=13", | ||
"--retry=2", | ||
"--slow=3", | ||
"--help", | ||
"--version", | ||
"http://localhost:9200" | ||
"--port=13" | ||
]; | ||
@@ -51,13 +42,9 @@ }); | ||
it("should recognize options", () => { | ||
console.log("ittt"); | ||
const options = readargs(args); | ||
delete console.log; | ||
expect(options.breakerDocuments).to.be(12); | ||
expect(options.breakerSize).to.be(300); | ||
expect(options.flushDocuments).to.be(13); | ||
expect(options.flushSize).to.be(10485760); | ||
expect(options.httpLog).to.be("foo"); | ||
expect(options.url).to.be("http://localhost:9201"); | ||
expect(options.address).to.be("::1"); | ||
expect(options.port).to.be(13); | ||
expect(options.retries).to.be(2); | ||
expect(options.slow).to.be(3); | ||
}); | ||
@@ -81,11 +68,5 @@ | ||
env = { | ||
BREAK_DOCS: "12", | ||
BREAK_SIZE: "300", | ||
FLUSH_DOCS: "13", | ||
FLUSH_SIZE: "10MiB", | ||
HTTP_LOG: "foo", | ||
ES_URL: "http://localhost:9201", | ||
LISTEN_ADDR: "::1", | ||
LISTEN_PORT: "13", | ||
REQ_RETRIES: 2, | ||
SLOW_INSERT: 3 | ||
LISTEN_PORT: "13" | ||
}; | ||
@@ -97,12 +78,6 @@ }); | ||
expect(options.breakerDocuments).to.be(12); | ||
expect(options.breakerSize).to.be(300); | ||
expect(options.flushDocuments).to.be(13); | ||
expect(options.flushSize).to.be(10485760); | ||
expect(options.httpLog).to.be("foo"); | ||
expect(options.url).to.be("http://localhost:9201"); | ||
expect(options.address).to.be("::1"); | ||
expect(options.port).to.be(13); | ||
expect(options.retries).to.be(2); | ||
expect(options.slow).to.be(3); | ||
}); | ||
}); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
6
61097
7
896