elasticsearch-scroll-stream
Advanced tools
Comparing version 1.2.2 to 1.3.0
@@ -12,3 +12,4 @@ import { Client } from "@elastic/elasticsearch"; | ||
| "_parent" | ||
| "_routing"; | ||
| "_routing" | ||
| "inner_hits"; | ||
@@ -15,0 +16,0 @@ declare class ElasticsearchScrollStream extends Readable { |
33
index.js
@@ -6,5 +6,5 @@ /** | ||
*/ | ||
var LibElasticsearchAdaptee = require('./lib/elasticsearch-stream') | ||
const LibElasticsearchAdaptee = require('./lib/elasticsearch-stream') | ||
var allowed_extrafields = ['_id', '_score', '_type', '_index', '_parent', '_routing', 'inner_hits'] | ||
const allowed_extrafields = ['_id', '_score', '_type', '_index', '_parent', '_routing', 'inner_hits'] | ||
@@ -17,24 +17,23 @@ /** | ||
* @param `optional_fields` - array of optional properties to include in the results. | ||
* Allowed values: '_id', '_score', '_type', '_index', '_parent', '_routing' | ||
* Allowed values: '_id', '_score', '_type', '_index', '_parent', '_routing', 'inner_hits' | ||
* @param `stream_opts` - object to be passed to ReadableStream | ||
*/ | ||
var ElasticsearchScrollStream = function(client, query_opts, optional_fields, stream_opts) { | ||
if (arguments.length == 1) throw new Error('ElasticsearchScrollStream: missing parameters') | ||
if (!client) throw new Error('ElasticsearchScrollStream: client is ', client) | ||
class ElasticsearchScrollStream { | ||
constructor(client, query_opts = {}, optional_fields = [], stream_opts = {}) { | ||
if (!client) throw new Error('ElasticsearchScrollStream: client is ', client) | ||
if (Object.keys(query_opts).length === 0) throw new Error('ElasticsearchScrollStream: missing parameters') | ||
optional_fields = !!optional_fields ? optional_fields : [] | ||
if (!Array.isArray(optional_fields)) | ||
throw new Error('ElasticsearchScrollStream: optional_fields must be an array', optional_fields) | ||
if (!Array.isArray(optional_fields)) | ||
throw new Error('ElasticsearchScrollStream: optional_fields must be an array', optional_fields) | ||
optional_fields.forEach(function(entry) { | ||
if (allowed_extrafields.indexOf(entry) == -1) { | ||
throw new Error("ElasticsearchScrollStream: property '" + entry + "' not allowed in optional_fields") | ||
} | ||
}) | ||
optional_fields.forEach(entry => { | ||
if (allowed_extrafields.indexOf(entry) === -1) { | ||
throw new Error(`ElasticsearchScrollStream: property '${entry}' not allowed in optional_fields`) | ||
} | ||
}) | ||
stream_opts = !!stream_opts ? stream_opts : {} | ||
return new LibElasticsearchAdaptee(client, query_opts, optional_fields, stream_opts) | ||
return new LibElasticsearchAdaptee(client, query_opts, optional_fields, stream_opts) | ||
} | ||
} | ||
module.exports = ElasticsearchScrollStream |
@@ -1,9 +0,2 @@ | ||
/** | ||
* Elasticsearch Stream | ||
* | ||
* Create a ReadableStream from an elasticsearch scroll query. | ||
* Assumptions: client library is of type [elasticsearch](https://www.npmjs.org/package/elasticsearch) | ||
*/ | ||
var Readable = require('stream').Readable, | ||
util = require('util') | ||
const { Readable } = require('stream') | ||
@@ -20,27 +13,21 @@ /** | ||
*/ | ||
var LibElasticsearchScrollStream = function(client, query_opts, optional_fields, stream_opts) { | ||
this._client = client | ||
this._options = query_opts | ||
this._extrafields = optional_fields | ||
this._options.scroll = query_opts.scroll || '10m' | ||
this._reading = false | ||
this._counter = 0 | ||
this._total = 0 | ||
this._forceClose = false | ||
Readable.call(this, stream_opts) | ||
} | ||
class LibElasticsearchScrollStream extends Readable { | ||
constructor(client, query_opts, optional_fields, stream_opts) { | ||
super(stream_opts) | ||
util.inherits(LibElasticsearchScrollStream, Readable) | ||
LibElasticsearchScrollStream.prototype._read = function() { | ||
if (this._reading) { | ||
return false | ||
this._client = client | ||
this._options = query_opts | ||
this._extrafields = optional_fields | ||
this._options.scroll = query_opts.scroll || '10m' | ||
this._reading = false | ||
this._counter = 0 | ||
this._total = 0 | ||
this._forceClose = false | ||
} | ||
this._reading = true | ||
var self = this | ||
this._client.search(this._options, function getMoreUntilDone(err, response) { | ||
getMoreUntilDone = (err, response) => { | ||
if (err) { | ||
return self.emit('error', err) | ||
return this.emit('error', err) | ||
} | ||
let body = !!response.body ? response.body : response | ||
@@ -53,50 +40,46 @@ // Set the total matching documents | ||
// } | ||
var body = !!response.body ? response.body : response | ||
self._total = typeof body.hits.total === 'object' ? body.hits.total.value : body.hits.total | ||
this._total = typeof body.hits.total === 'object' ? body.hits.total.value : body.hits.total | ||
var objectMode = self._readableState.objectMode | ||
body.hits.hits.forEach(function(hit) { | ||
var ref_results = {} | ||
if (hit.fields) { | ||
ref_results = hit.fields | ||
} else { | ||
ref_results = hit._source | ||
} | ||
body.hits.hits.forEach(hit => { | ||
let ref_results = hit.fields ? hit.fields : hit._source | ||
// populate extra fields | ||
self._extrafields.forEach(function(entry) { | ||
this._extrafields.forEach(entry => { | ||
ref_results[entry] = hit[entry] | ||
}) | ||
self.push(objectMode ? ref_results : JSON.stringify(ref_results)) | ||
self._counter++ | ||
this.push(this._readableState.objectMode ? ref_results : JSON.stringify(ref_results)) | ||
this._counter++ | ||
}) | ||
if (self._total !== self._counter && !self._forceClose) { | ||
self._client.scroll( | ||
{ | ||
scroll: self._options.scroll, | ||
scroll_id: body._scroll_id, | ||
}, | ||
getMoreUntilDone | ||
) | ||
if (this._total !== this._counter && !this._forceClose) { | ||
this._client.scroll({ scroll: this._options.scroll, scroll_id: body._scroll_id }, this.getMoreUntilDone) | ||
} else { | ||
// clearScroll for the current _scroll_id | ||
self._client.clearScroll({ scrollId: [body._scroll_id] }, function(err, res) { | ||
this._client.clearScroll({ scrollId: [body._scroll_id] }, (err, res) => { | ||
// end correctly | ||
return setImmediate(function() { | ||
self._reading = false | ||
self._counter = 0 | ||
self._forceClose = false | ||
self.push(null) | ||
return setImmediate(() => { | ||
this._reading = false | ||
this._counter = 0 | ||
this._forceClose = false | ||
this.push(null) | ||
}) | ||
}) | ||
} | ||
}) | ||
} | ||
} | ||
LibElasticsearchScrollStream.prototype.close = function() { | ||
return (this._forceClose = true) | ||
_read() { | ||
if (this._reading) { | ||
return false | ||
} | ||
this._reading = true | ||
this._client.search(this._options, this.getMoreUntilDone) | ||
} | ||
close() { | ||
return (this._forceClose = true) | ||
} | ||
} | ||
module.exports = LibElasticsearchScrollStream |
{ | ||
"name": "elasticsearch-scroll-stream", | ||
"version": "1.2.2", | ||
"version": "1.3.0", | ||
"description": "Elasticsearch Scroll query results as a Stream", | ||
@@ -25,9 +25,9 @@ "repository": { | ||
"engines": { | ||
"node": ">= 0.10.x" | ||
"node": ">=8" | ||
}, | ||
"devDependencies": { | ||
"@elastic/elasticsearch": "^7.6.1", | ||
"@elastic/elasticsearch": "^7.8.0", | ||
"chai": "^4.2.0", | ||
"mocha": "^7.1.1" | ||
"mocha": "^7.2.0" | ||
} | ||
} |
@@ -5,16 +5,16 @@ /** | ||
*/ | ||
var expect = require('chai').expect | ||
const { expect } = require('chai') | ||
var Client = require('@elastic/elasticsearch').Client | ||
const { Client } = require('@elastic/elasticsearch') | ||
var ElasticsearchScrollStream = require('../index.js') | ||
const ElasticsearchScrollStream = require('../index.js') | ||
describe('elasticsearch_scroll_stream', function() { | ||
it("Should stream correctly when '_source' property is specified", function(done) { | ||
describe('elasticsearch_scroll_stream', function () { | ||
it("Should stream correctly when '_source' property is specified", function (done) { | ||
this.timeout(10000) | ||
var counter = 0 | ||
var current_doc | ||
var elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
let counter = 0 | ||
let current_doc | ||
let elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
var es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
let es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
index: 'elasticsearch-test-scroll-stream', | ||
@@ -41,3 +41,3 @@ type: 'test-type', | ||
es_stream.on('data', function(data) { | ||
es_stream.on('data', function (data) { | ||
expect(es_stream._total).to.equal(20) | ||
@@ -49,3 +49,3 @@ current_doc = JSON.parse(data.toString()) | ||
es_stream.on('end', function() { | ||
es_stream.on('end', function () { | ||
expect(counter).to.equal(20) | ||
@@ -55,3 +55,3 @@ done() | ||
es_stream.on('error', function(err) { | ||
es_stream.on('error', function (err) { | ||
done(err) | ||
@@ -61,8 +61,8 @@ }) | ||
it('Should explicitly clear the active search context when the scroll finishes', function(done) { | ||
it('Should explicitly clear the active search context when the scroll finishes', function (done) { | ||
this.timeout(10000) | ||
var current_doc | ||
var elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
let current_doc | ||
let elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
var es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
let es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
index: 'elasticsearch-test-scroll-stream', | ||
@@ -89,3 +89,3 @@ type: 'test-type', | ||
es_stream.on('data', function(data) { | ||
es_stream.on('data', function (data) { | ||
current_doc = JSON.parse(data.toString()) | ||
@@ -95,3 +95,3 @@ expect(current_doc.name).to.equal('third chunk name') | ||
es_stream.on('end', function() { | ||
es_stream.on('end', function () { | ||
elasticsearch_client.indices.stats( | ||
@@ -102,3 +102,3 @@ { | ||
}, | ||
function(err, res) { | ||
function (err, res) { | ||
expect(res.body._all.total.search.open_contexts).to.equal(0) | ||
@@ -110,3 +110,3 @@ done() | ||
es_stream.on('error', function(err) { | ||
es_stream.on('error', function (err) { | ||
done(err) | ||
@@ -116,9 +116,9 @@ }) | ||
it('Should stream correctly when no fields are specified (full _source)', function(done) { | ||
it('Should stream correctly when no fields are specified (full _source)', function (done) { | ||
this.timeout(10000) | ||
var counter = 0 | ||
var current_doc | ||
var elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
let counter = 0 | ||
let current_doc | ||
let elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
var es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
let es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
index: 'elasticsearch-test-scroll-stream', | ||
@@ -144,3 +144,3 @@ type: 'test-type', | ||
es_stream.on('data', function(data) { | ||
es_stream.on('data', function (data) { | ||
current_doc = JSON.parse(data.toString()) | ||
@@ -151,3 +151,3 @@ expect(current_doc.name).to.equal('third chunk name') | ||
es_stream.on('end', function() { | ||
es_stream.on('end', function () { | ||
expect(counter).to.equal(20) | ||
@@ -157,3 +157,3 @@ done() | ||
es_stream.on('error', function(err) { | ||
es_stream.on('error', function (err) { | ||
done(err) | ||
@@ -163,9 +163,9 @@ }) | ||
it("Should stream correctly when '_source' property is specified and optional_fields required", function(done) { | ||
it("Should stream correctly when '_source' property is specified and optional_fields required", function (done) { | ||
this.timeout(10000) | ||
var counter = 0 | ||
var current_doc | ||
var elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
let counter = 0 | ||
let current_doc | ||
let elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
var es_stream = new ElasticsearchScrollStream( | ||
let es_stream = new ElasticsearchScrollStream( | ||
elasticsearch_client, | ||
@@ -196,3 +196,3 @@ { | ||
es_stream.on('data', function(data) { | ||
es_stream.on('data', function (data) { | ||
current_doc = JSON.parse(data.toString()) | ||
@@ -205,3 +205,3 @@ expect(current_doc.name).to.equal('third chunk name') | ||
es_stream.on('end', function() { | ||
es_stream.on('end', function () { | ||
expect(counter).to.equal(20) | ||
@@ -211,3 +211,3 @@ done() | ||
es_stream.on('error', function(err) { | ||
es_stream.on('error', function (err) { | ||
done(err) | ||
@@ -217,8 +217,7 @@ }) | ||
it('Should throw error when optional_fields is not an array', function(done) { | ||
var elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
it('Should throw error when optional_fields is not an array', function (done) { | ||
let elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
expect( | ||
ElasticsearchScrollStream.bind( | ||
this, | ||
expect(() => { | ||
new ElasticsearchScrollStream( | ||
elasticsearch_client, | ||
@@ -248,12 +247,11 @@ { | ||
) | ||
).to.throw(/optional_fields must be an array/) | ||
}).to.throw(/optional_fields must be an array/) | ||
done() | ||
}) | ||
it('Should throw error when optional_fields does not contain an allowed value', function(done) { | ||
var elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
it('Should throw error when optional_fields does not contain an allowed value', function (done) { | ||
let elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
expect( | ||
ElasticsearchScrollStream.bind( | ||
this, | ||
expect(() => { | ||
new ElasticsearchScrollStream( | ||
elasticsearch_client, | ||
@@ -283,15 +281,15 @@ { | ||
) | ||
).to.throw(/not allowed in optional_fields/) | ||
}).to.throw(/not allowed in optional_fields/) | ||
done() | ||
}) | ||
it("Should correctly close the stream when #close() method is called (using 'return' in 'data' handler)", function(done) { | ||
it("Should correctly close the stream when #close() method is called (using 'return' in 'data' handler)", function (done) { | ||
this.timeout(10000) | ||
var pageSize = '5' | ||
var stopCounterIndex = parseInt(pageSize) + 1 | ||
var counter = 0 | ||
var current_doc | ||
var elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
let pageSize = '5' | ||
let stopCounterIndex = parseInt(pageSize) + 1 | ||
let counter = 0 | ||
let current_doc | ||
let elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
var es_stream = new ElasticsearchScrollStream( | ||
let es_stream = new ElasticsearchScrollStream( | ||
elasticsearch_client, | ||
@@ -322,3 +320,3 @@ { | ||
es_stream.on('data', function(data) { | ||
es_stream.on('data', function (data) { | ||
current_doc = JSON.parse(data.toString()) | ||
@@ -332,3 +330,3 @@ if (counter == stopCounterIndex) { | ||
es_stream.on('end', function() { | ||
es_stream.on('end', function () { | ||
expect(counter).to.equal(stopCounterIndex) | ||
@@ -338,3 +336,3 @@ done() | ||
es_stream.on('error', function(err) { | ||
es_stream.on('error', function (err) { | ||
done(err) | ||
@@ -344,11 +342,11 @@ }) | ||
it("Should correctly close the stream when #close() method is called (without 'return' into 'data' handler)", function(done) { | ||
it("Should correctly close the stream when #close() method is called (without 'return' into 'data' handler)", function (done) { | ||
this.timeout(10000) | ||
var pageSize = '5' | ||
var stopCounterIndex = parseInt(pageSize) + 1 | ||
var counter = 0 | ||
var current_doc | ||
var elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
let pageSize = '5' | ||
let stopCounterIndex = parseInt(pageSize) + 1 | ||
let counter = 0 | ||
let current_doc | ||
let elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
var es_stream = new ElasticsearchScrollStream( | ||
let es_stream = new ElasticsearchScrollStream( | ||
elasticsearch_client, | ||
@@ -379,3 +377,3 @@ { | ||
es_stream.on('data', function(data) { | ||
es_stream.on('data', function (data) { | ||
current_doc = JSON.parse(data.toString()) | ||
@@ -388,3 +386,3 @@ if (counter == stopCounterIndex) { | ||
es_stream.on('end', function() { | ||
es_stream.on('end', function () { | ||
expect(counter).to.equal(parseInt(pageSize) * 2) | ||
@@ -394,3 +392,3 @@ done() | ||
es_stream.on('error', function(err) { | ||
es_stream.on('error', function (err) { | ||
done(err) | ||
@@ -397,0 +395,0 @@ }) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
23957
457