elasticsearch-scroll-stream
Advanced tools
+13
-18
@@ -6,6 +6,5 @@ /** | ||
| */ | ||
| var LibElasticalAdaptee = require("./lib/elastical-stream"), | ||
| LibElasticsearchAdaptee = require("./lib/elasticsearch-stream"); | ||
| var LibElasticsearchAdaptee = require('./lib/elasticsearch-stream') | ||
| var allowed_extrafields = ['_id', '_score', '_type', '_index', '_parent', '_routing', 'inner_hits']; | ||
| var allowed_extrafields = ['_id', '_score', '_type', '_index', '_parent', '_routing', 'inner_hits'] | ||
@@ -22,24 +21,20 @@ /** | ||
| var ElasticsearchScrollStream = function(client, query_opts, optional_fields, stream_opts) { | ||
| if (arguments.length == 1) throw new Error("ElasticsearchScrollStream: missing parameters"); | ||
| if (!client) throw new Error("ElasticsearchScrollStream: client is ", client); | ||
| if (arguments.length == 1) throw new Error('ElasticsearchScrollStream: missing parameters') | ||
| if (!client) throw new Error('ElasticsearchScrollStream: client is ', client) | ||
| optional_fields = (!!optional_fields) ? optional_fields : [] | ||
| if (!Array.isArray(optional_fields)) throw new Error("ElasticsearchScrollStream: optional_fields must be an array", optional_fields); | ||
| optional_fields = !!optional_fields ? optional_fields : [] | ||
| if (!Array.isArray(optional_fields)) | ||
| throw new Error('ElasticsearchScrollStream: optional_fields must be an array', optional_fields) | ||
| optional_fields.forEach(function(entry) { | ||
| if (allowed_extrafields.indexOf(entry) == -1) { | ||
| throw new Error("ElasticsearchScrollStream: property '" + entry + "' not allowed in optional_fields"); | ||
| throw new Error("ElasticsearchScrollStream: property '" + entry + "' not allowed in optional_fields") | ||
| } | ||
| }); | ||
| }) | ||
| stream_opts = (!!stream_opts) ? stream_opts : {}; | ||
| stream_opts = !!stream_opts ? stream_opts : {} | ||
| if (!!client.nodes) { | ||
| return new LibElasticsearchAdaptee(client, query_opts, optional_fields, stream_opts); | ||
| } else { | ||
| return new LibElasticalAdaptee(client, query_opts, stream_opts); | ||
| } | ||
| }; | ||
| return new LibElasticsearchAdaptee(client, query_opts, optional_fields, stream_opts) | ||
| } | ||
| module.exports = ElasticsearchScrollStream; | ||
| module.exports = ElasticsearchScrollStream |
@@ -7,6 +7,5 @@ /** | ||
| */ | ||
| var Readable = require("stream").Readable, | ||
| util = require("util"); | ||
| var Readable = require('stream').Readable, | ||
| util = require('util') | ||
| /** | ||
@@ -23,27 +22,25 @@ * LibElasticsearchScrollStream | ||
| var LibElasticsearchScrollStream = function(client, query_opts, optional_fields, stream_opts) { | ||
| this._client = client | ||
| this._options = query_opts | ||
| this._extrafields = optional_fields | ||
| this._options.scroll = query_opts.scroll || '10m' | ||
| this._reading = false | ||
| this._counter = 0 | ||
| this._total = 0 | ||
| this._forceClose = false | ||
| Readable.call(this, stream_opts) | ||
| } | ||
| this._client = client; | ||
| this._options = query_opts; | ||
| this._extrafields = optional_fields; | ||
| this._options.scroll = query_opts.scroll || '10m'; | ||
| this._reading = false; | ||
| this._counter = 0; | ||
| this._total = 0; | ||
| this._forceClose = false; | ||
| Readable.call(this, stream_opts); | ||
| }; | ||
| util.inherits(LibElasticsearchScrollStream, Readable) | ||
| util.inherits(LibElasticsearchScrollStream, Readable); | ||
| LibElasticsearchScrollStream.prototype._read = function() { | ||
| if (this._reading) { | ||
| return false; | ||
| return false | ||
| } | ||
| this._reading = true; | ||
| var self = this; | ||
| this._reading = true | ||
| var self = this | ||
| this._client.search(this._options, function getMoreUntilDone(err, response) { | ||
| if (err) { | ||
| return self.emit("error", err); | ||
| return self.emit('error', err) | ||
| } | ||
@@ -57,11 +54,12 @@ | ||
| // } | ||
| self._total = (typeof response.hits.total === 'object') ? response.hits.total.value : response.hits.total; | ||
| var body = !!response.body ? response.body : response | ||
| self._total = typeof body.hits.total === 'object' ? body.hits.total.value : body.hits.total | ||
| var objectMode = self._readableState.objectMode; | ||
| response.hits.hits.forEach(function(hit) { | ||
| var ref_results = {}; | ||
| if(hit.fields) { | ||
| ref_results = hit.fields; | ||
| var objectMode = self._readableState.objectMode | ||
| body.hits.hits.forEach(function(hit) { | ||
| var ref_results = {} | ||
| if (hit.fields) { | ||
| ref_results = hit.fields | ||
| } else { | ||
| ref_results = hit._source; | ||
| ref_results = hit._source | ||
| } | ||
@@ -71,35 +69,36 @@ | ||
| self._extrafields.forEach(function(entry) { | ||
| ref_results[entry] = hit[entry]; | ||
| }); | ||
| ref_results[entry] = hit[entry] | ||
| }) | ||
| self.push(objectMode ? ref_results : JSON.stringify(ref_results)); | ||
| self._counter++; | ||
| }); | ||
| self.push(objectMode ? ref_results : JSON.stringify(ref_results)) | ||
| self._counter++ | ||
| }) | ||
| if ((self._total !== self._counter) && (!self._forceClose)) { | ||
| self._client.scroll({ | ||
| scroll: self._options.scroll, | ||
| scroll_id: response._scroll_id | ||
| }, getMoreUntilDone); | ||
| if (self._total !== self._counter && !self._forceClose) { | ||
| self._client.scroll( | ||
| { | ||
| scroll: self._options.scroll, | ||
| scroll_id: body._scroll_id, | ||
| }, | ||
| getMoreUntilDone | ||
| ) | ||
| } else { | ||
| // trigger an asyncronous clearScroll for the current _scroll_id | ||
| self._client.clearScroll({ scrollId: [response._scroll_id] }, function(err, res) {}); | ||
| // end correctly | ||
| return setImmediate(function() { | ||
| self._reading = false; | ||
| self._counter = 0; | ||
| self._forceClose = false; | ||
| self.push(null); | ||
| }); | ||
| // clearScroll for the current _scroll_id | ||
| self._client.clearScroll({ scrollId: [body._scroll_id] }, function(err, res) { | ||
| // end correctly | ||
| return setImmediate(function() { | ||
| self._reading = false | ||
| self._counter = 0 | ||
| self._forceClose = false | ||
| self.push(null) | ||
| }) | ||
| }) | ||
| } | ||
| }) | ||
| } | ||
| }); | ||
| }; | ||
| LibElasticsearchScrollStream.prototype.close = function() { | ||
| return this._forceClose = true; | ||
| }; | ||
| return (this._forceClose = true) | ||
| } | ||
| module.exports = LibElasticsearchScrollStream; | ||
| module.exports = LibElasticsearchScrollStream |
+2
-3
| { | ||
| "name": "elasticsearch-scroll-stream", | ||
| "version": "1.1.5", | ||
| "version": "1.2.0", | ||
| "description": "Elasticsearch Scroll query results as a Stream", | ||
@@ -28,7 +28,6 @@ "repository": { | ||
| "devDependencies": { | ||
| "@elastic/elasticsearch": "^7.2.0", | ||
| "chai": "^4.2.0", | ||
| "elastical": "0.0.13", | ||
| "elasticsearch": "^16.0.0", | ||
| "mocha": "^5.2.0" | ||
| } | ||
| } |
+38
-41
@@ -8,6 +8,6 @@ | ||
| This module works with the following Elasticsearch nodejs clients: | ||
| This module works with the official Elasticsearch nodejs clients: | ||
| - [elasticsearch](https://www.npmjs.org/package/elasticsearch) (official Elasticsearch js API) | ||
| - [elastical](https://www.npmjs.org/package/elastical) (DEPRECATED - Discontinued support) | ||
| - [@elastic/elasticsearch](https://www.npmjs.com/package/@elastic/elasticsearch) (new Elasticsearch js API) | ||
| - [elasticsearch](https://www.npmjs.org/package/elasticsearch) (old Elasticsearch js API) | ||
@@ -34,6 +34,6 @@ | ||
| ```js | ||
| var elasticsearch = require('elasticsearch'); | ||
| var ElasticsearchScrollStream = require('elasticsearch-scroll-stream'); | ||
| const { Client } = require('@elastic/elasticsearch') | ||
| const ElasticsearchScrollStream = require('elasticsearch-scroll-stream') | ||
| var client = new elasticsearch.Client(); | ||
| const elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
@@ -44,3 +44,3 @@ // Create index and add documents here... | ||
| // as parameters in the constructor | ||
| var es_stream = new ElasticsearchScrollStream(client, { | ||
| const es_stream = new ElasticsearchScrollStream(client, { | ||
| index: 'elasticsearch-test-scroll-stream', | ||
@@ -51,12 +51,11 @@ type: 'test-type', | ||
| _source: ['name'], | ||
| q: 'name:*' | ||
| }); | ||
| q: 'name:*', | ||
| }) | ||
| // Pipe the results to other writeble streams.. | ||
| es_stream.pipe(process.stdout); | ||
| es_stream.pipe(process.stdout) | ||
| es_stream.on('end', function() { | ||
| console.log("End"); | ||
| }); | ||
| console.log('End') | ||
| }) | ||
| ``` | ||
@@ -69,6 +68,6 @@ | ||
| ```js | ||
| var elasticsearch = require('elasticsearch'); | ||
| var ElasticsearchScrollStream = require('elasticsearch-scroll-stream'); | ||
| const { Client } = require('@elastic/elasticsearch') | ||
| const ElasticsearchScrollStream = require('elasticsearch-scroll-stream') | ||
| var client = new elasticsearch.Client(); | ||
| const elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
@@ -79,3 +78,3 @@ // Create index and add documents here... | ||
| // as parameters in the constructor | ||
| var es_stream = new ElasticsearchScrollStream(client, { | ||
| const es_stream = new ElasticsearchScrollStream(client, { | ||
| index: 'elasticsearch-test-scroll-stream', | ||
@@ -90,6 +89,6 @@ type: 'test-type', | ||
| // Pipe the results to other writeble streams.. | ||
| es_stream.pipe(process.stdout); | ||
| es_stream.pipe(process.stdout) | ||
| es_stream.on('end', function() { | ||
| console.log("End"); | ||
| console.log("End") | ||
| }); | ||
@@ -102,6 +101,6 @@ | ||
| ```js | ||
| var elasticsearch = require('elasticsearch'); | ||
| var ElasticsearchScrollStream = require('elasticsearch-scroll-stream'); | ||
| const { Client } = require('@elastic/elasticsearch') | ||
| const ElasticsearchScrollStream = require('elasticsearch-scroll-stream') | ||
| var client = new elasticsearch.Client(); | ||
| const elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
@@ -112,3 +111,3 @@ // Create index and add documents here... | ||
| // as parameters in the constructor | ||
| var es_stream = new ElasticsearchScrollStream(client, { | ||
| const es_stream = new ElasticsearchScrollStream(client, { | ||
| index: 'elasticsearch-test-scroll-stream', | ||
@@ -136,6 +135,6 @@ type: 'test-type', | ||
| // Pipe the results to other writeble streams.. | ||
| es_stream.pipe(process.stdout); | ||
| es_stream.pipe(process.stdout) | ||
| es_stream.on('end', function() { | ||
| console.log("End"); | ||
| console.log("End") | ||
| }); | ||
@@ -148,17 +147,15 @@ | ||
| ```js | ||
| var elasticsearch = require('elasticsearch'); | ||
| var ElasticsearchScrollStream = require('elasticsearch-scroll-stream'); | ||
| const { Client } = require('@elastic/elasticsearch') | ||
| const ElasticsearchScrollStream = require('elasticsearch-scroll-stream') | ||
| // Create index and add documents here... | ||
| // You need to pass the client instance and the query object | ||
| // as parameters in the constructor | ||
| const pageSize = '5' | ||
| let stopCounterIndex = (parseInt(pageSize) + 1) | ||
| let counter = 0 | ||
| let current_doc | ||
| const elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
| var pageSize = '5'; | ||
| var stopCounterIndex = (parseInt(pageSize) + 1); | ||
| var counter = 0; | ||
| var current_doc; | ||
| var elasticsearch_client = new elasticsearch.Client(); | ||
| var es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
| const es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
| index: 'elasticsearch-test-scroll-stream', | ||
@@ -186,15 +183,15 @@ type: 'test-type', | ||
| es_stream.on('data', function(data) { | ||
| current_doc = JSON.parse(data.toString()); | ||
| current_doc = JSON.parse(data.toString()) | ||
| if (counter == stopCounterIndex) { | ||
| es_stream.close(); | ||
| es_stream.close() | ||
| } | ||
| counter++; | ||
| counter++ | ||
| }); | ||
| es_stream.on('end', function() { | ||
| console.log(counter); | ||
| console.log(counter) | ||
| }); | ||
| es_stream.on('error', function(err) { | ||
| console.log(err); | ||
| console.log(err) | ||
| }); | ||
@@ -201,0 +198,0 @@ |
+272
-252
@@ -5,14 +5,14 @@ /** | ||
| */ | ||
| var expect = require('chai').expect, | ||
| elasticsearch = require('elasticsearch'); | ||
| var expect = require('chai').expect | ||
| var ElasticsearchScrollStream = require('../index.js'); | ||
| var Client = require('@elastic/elasticsearch').Client | ||
| var ElasticsearchScrollStream = require('../index.js') | ||
| describe('elasticsearch_scroll_stream', function() { | ||
| it("Should stream correctly when '_source' property is specified", function(done) { | ||
| this.timeout(10000); | ||
| var counter = 0; | ||
| var current_doc; | ||
| var elasticsearch_client = new elasticsearch.Client(); | ||
| this.timeout(10000) | ||
| var counter = 0 | ||
| var current_doc | ||
| var elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
@@ -24,3 +24,3 @@ var es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
| size: '50', | ||
| _source: ["name"], | ||
| _source: ['name'], | ||
| body: { | ||
@@ -32,35 +32,34 @@ query: { | ||
| query_string: { | ||
| default_field: "_all", | ||
| query: 'name:third*' | ||
| } | ||
| } | ||
| ] | ||
| } | ||
| } | ||
| } | ||
| }); | ||
| default_field: '_all', | ||
| query: 'name:third*', | ||
| }, | ||
| }, | ||
| ], | ||
| }, | ||
| }, | ||
| }, | ||
| }) | ||
| es_stream.on('data', function(data) { | ||
| expect(es_stream._total).to.equal(20); | ||
| current_doc = JSON.parse(data.toString()); | ||
| expect(current_doc.name).to.equal("third chunk name"); | ||
| counter++; | ||
| }); | ||
| expect(es_stream._total).to.equal(20) | ||
| current_doc = JSON.parse(data.toString()) | ||
| expect(current_doc.name).to.equal('third chunk name') | ||
| counter++ | ||
| }) | ||
| es_stream.on('end', function() { | ||
| expect(counter).to.equal(20); | ||
| done(); | ||
| }); | ||
| expect(counter).to.equal(20) | ||
| done() | ||
| }) | ||
| es_stream.on('error', function(err) { | ||
| done(err); | ||
| }); | ||
| }); | ||
| done(err) | ||
| }) | ||
| }) | ||
| it('Should explicitly clear the active search context when the scroll finishes', function(done) { | ||
| this.timeout(10000) | ||
| var current_doc | ||
| var elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
| it("Should explicitly clear the active search context when the scroll finishes", function(done) { | ||
| this.timeout(10000); | ||
| var current_doc; | ||
| var elasticsearch_client = new elasticsearch.Client(); | ||
| var es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
@@ -71,3 +70,3 @@ index: 'elasticsearch-test-scroll-stream', | ||
| size: '50', | ||
| _source: ["name"], | ||
| _source: ['name'], | ||
| body: { | ||
@@ -79,38 +78,40 @@ query: { | ||
| query_string: { | ||
| default_field: "_all", | ||
| query: 'name:third*' | ||
| } | ||
| } | ||
| ] | ||
| } | ||
| } | ||
| } | ||
| }); | ||
| default_field: '_all', | ||
| query: 'name:third*', | ||
| }, | ||
| }, | ||
| ], | ||
| }, | ||
| }, | ||
| }, | ||
| }) | ||
| es_stream.on('data', function(data) { | ||
| current_doc = JSON.parse(data.toString()); | ||
| expect(current_doc.name).to.equal("third chunk name"); | ||
| }); | ||
| current_doc = JSON.parse(data.toString()) | ||
| expect(current_doc.name).to.equal('third chunk name') | ||
| }) | ||
| es_stream.on('end', function() { | ||
| elasticsearch_client.indices.stats({ | ||
| index: '_all', | ||
| metric: 'search' | ||
| }, function(err, res) { | ||
| expect(res._all.total.search.open_contexts).to.equal(0); | ||
| done(); | ||
| }); | ||
| }); | ||
| elasticsearch_client.indices.stats( | ||
| { | ||
| index: '_all', | ||
| metric: 'search', | ||
| }, | ||
| function(err, res) { | ||
| expect(res.body._all.total.search.open_contexts).to.equal(0) | ||
| done() | ||
| } | ||
| ) | ||
| }) | ||
| es_stream.on('error', function(err) { | ||
| done(err); | ||
| }); | ||
| }); | ||
| done(err) | ||
| }) | ||
| }) | ||
| it('Should stream correctly when no fields are specified (full _source)', function(done) { | ||
| this.timeout(10000); | ||
| var counter = 0; | ||
| var current_doc; | ||
| var elasticsearch_client = new elasticsearch.Client(); | ||
| this.timeout(10000) | ||
| var counter = 0 | ||
| var current_doc | ||
| var elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
@@ -128,231 +129,250 @@ var es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
| query_string: { | ||
| default_field: "_all", | ||
| query: 'name:third*' | ||
| } | ||
| } | ||
| ] | ||
| } | ||
| } | ||
| } | ||
| }); | ||
| default_field: '_all', | ||
| query: 'name:third*', | ||
| }, | ||
| }, | ||
| ], | ||
| }, | ||
| }, | ||
| }, | ||
| }) | ||
| es_stream.on('data', function(data) { | ||
| current_doc = JSON.parse(data.toString()); | ||
| expect(current_doc.name).to.equal("third chunk name"); | ||
| counter++; | ||
| }); | ||
| current_doc = JSON.parse(data.toString()) | ||
| expect(current_doc.name).to.equal('third chunk name') | ||
| counter++ | ||
| }) | ||
| es_stream.on('end', function() { | ||
| expect(counter).to.equal(20); | ||
| done(); | ||
| }); | ||
| expect(counter).to.equal(20) | ||
| done() | ||
| }) | ||
| es_stream.on('error', function(err) { | ||
| done(err); | ||
| }); | ||
| }); | ||
| done(err) | ||
| }) | ||
| }) | ||
| it("Should stream correctly when '_source' property is specified and optional_fields required", function(done) { | ||
| this.timeout(10000); | ||
| var counter = 0; | ||
| var current_doc; | ||
| var elasticsearch_client = new elasticsearch.Client(); | ||
| this.timeout(10000) | ||
| var counter = 0 | ||
| var current_doc | ||
| var elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
| var es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
| index: 'elasticsearch-test-scroll-stream', | ||
| type: 'test-type', | ||
| scroll: '10s', | ||
| size: '50', | ||
| _source: ["name"], | ||
| body: { | ||
| query: { | ||
| bool: { | ||
| must: [ | ||
| { | ||
| query_string: { | ||
| default_field: "_all", | ||
| query: 'name:third*' | ||
| } | ||
| } | ||
| ] | ||
| } | ||
| } | ||
| } | ||
| }, ['_id', '_score']); | ||
| var es_stream = new ElasticsearchScrollStream( | ||
| elasticsearch_client, | ||
| { | ||
| index: 'elasticsearch-test-scroll-stream', | ||
| type: 'test-type', | ||
| scroll: '10s', | ||
| size: '50', | ||
| _source: ['name'], | ||
| body: { | ||
| query: { | ||
| bool: { | ||
| must: [ | ||
| { | ||
| query_string: { | ||
| default_field: '_all', | ||
| query: 'name:third*', | ||
| }, | ||
| }, | ||
| ], | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| ['_id', '_score'] | ||
| ) | ||
| es_stream.on('data', function(data) { | ||
| current_doc = JSON.parse(data.toString()); | ||
| expect(current_doc.name).to.equal("third chunk name"); | ||
| expect(current_doc).to.have.property("_id"); | ||
| expect(current_doc).to.have.property("_score"); | ||
| counter++; | ||
| }); | ||
| current_doc = JSON.parse(data.toString()) | ||
| expect(current_doc.name).to.equal('third chunk name') | ||
| expect(current_doc).to.have.property('_id') | ||
| expect(current_doc).to.have.property('_score') | ||
| counter++ | ||
| }) | ||
| es_stream.on('end', function() { | ||
| expect(counter).to.equal(20); | ||
| done(); | ||
| }); | ||
| expect(counter).to.equal(20) | ||
| done() | ||
| }) | ||
| es_stream.on('error', function(err) { | ||
| done(err); | ||
| }); | ||
| }); | ||
| done(err) | ||
| }) | ||
| }) | ||
| it('Should throw error when optional_fields is not an array', function(done) { | ||
| var elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
| it("Should throw error when optional_fields is not an array", function(done) { | ||
| var elasticsearch_client = new elasticsearch.Client(); | ||
| expect( | ||
| ElasticsearchScrollStream.bind( | ||
| this, | ||
| elasticsearch_client, | ||
| { | ||
| index: 'elasticsearch-test-scroll-stream', | ||
| type: 'test-type', | ||
| scroll: '10s', | ||
| size: '50', | ||
| _source: ['name'], | ||
| body: { | ||
| query: { | ||
| bool: { | ||
| must: [ | ||
| { | ||
| query_string: { | ||
| default_field: '_all', | ||
| query: 'name:third*', | ||
| }, | ||
| }, | ||
| ], | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| '_id' | ||
| ) | ||
| ).to.throw(/optional_fields must be an array/) | ||
| done() | ||
| }) | ||
| expect(ElasticsearchScrollStream.bind(this, elasticsearch_client, { | ||
| index: 'elasticsearch-test-scroll-stream', | ||
| type: 'test-type', | ||
| scroll: '10s', | ||
| size: '50', | ||
| _source: ["name"], | ||
| body: { | ||
| query: { | ||
| bool: { | ||
| must: [ | ||
| { | ||
| query_string: { | ||
| default_field: "_all", | ||
| query: 'name:third*' | ||
| } | ||
| } | ||
| ] | ||
| } | ||
| } | ||
| } | ||
| }, '_id')).to.throw(/optional_fields must be an array/); | ||
| done(); | ||
| }); | ||
| it('Should throw error when optional_fields does not contain an allowed value', function(done) { | ||
| var elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
| expect( | ||
| ElasticsearchScrollStream.bind( | ||
| this, | ||
| elasticsearch_client, | ||
| { | ||
| index: 'elasticsearch-test-scroll-stream', | ||
| type: 'test-type', | ||
| scroll: '10s', | ||
| size: '50', | ||
| _source: ['name'], | ||
| body: { | ||
| query: { | ||
| bool: { | ||
| must: [ | ||
| { | ||
| query_string: { | ||
| default_field: '_all', | ||
| query: 'name:third*', | ||
| }, | ||
| }, | ||
| ], | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| ['invalid_value'] | ||
| ) | ||
| ).to.throw(/not allowed in optional_fields/) | ||
| done() | ||
| }) | ||
| it("Should throw error when optional_fields does not contain an allowed value", function(done) { | ||
| var elasticsearch_client = new elasticsearch.Client(); | ||
| expect(ElasticsearchScrollStream.bind(this, elasticsearch_client, { | ||
| index: 'elasticsearch-test-scroll-stream', | ||
| type: 'test-type', | ||
| scroll: '10s', | ||
| size: '50', | ||
| _source: ["name"], | ||
| body: { | ||
| query: { | ||
| bool: { | ||
| must: [ | ||
| { | ||
| query_string: { | ||
| default_field: "_all", | ||
| query: 'name:third*' | ||
| } | ||
| } | ||
| ] | ||
| } | ||
| } | ||
| } | ||
| }, ['invalid_value'])).to.throw(/not allowed in optional_fields/); | ||
| done(); | ||
| }); | ||
| it("Should correctly close the stream when #close() method is called (using 'return' in 'data' handler)", function(done) { | ||
| this.timeout(10000); | ||
| var pageSize = '5'; | ||
| var stopCounterIndex = (parseInt(pageSize) + 1); | ||
| var counter = 0; | ||
| var current_doc; | ||
| var elasticsearch_client = new elasticsearch.Client(); | ||
| this.timeout(10000) | ||
| var pageSize = '5' | ||
| var stopCounterIndex = parseInt(pageSize) + 1 | ||
| var counter = 0 | ||
| var current_doc | ||
| var elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
| var es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
| index: 'elasticsearch-test-scroll-stream', | ||
| type: 'test-type', | ||
| scroll: '10s', | ||
| size: pageSize, | ||
| _source: ["name"], | ||
| body: { | ||
| query: { | ||
| bool: { | ||
| must: [ | ||
| { | ||
| query_string: { | ||
| default_field: "_all", | ||
| query: 'name:third*' | ||
| } | ||
| } | ||
| ] | ||
| } | ||
| } | ||
| } | ||
| }, ['_id', '_score']); | ||
| var es_stream = new ElasticsearchScrollStream( | ||
| elasticsearch_client, | ||
| { | ||
| index: 'elasticsearch-test-scroll-stream', | ||
| type: 'test-type', | ||
| scroll: '10s', | ||
| size: pageSize, | ||
| _source: ['name'], | ||
| body: { | ||
| query: { | ||
| bool: { | ||
| must: [ | ||
| { | ||
| query_string: { | ||
| default_field: '_all', | ||
| query: 'name:third*', | ||
| }, | ||
| }, | ||
| ], | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| ['_id', '_score'] | ||
| ) | ||
| es_stream.on('data', function(data) { | ||
| current_doc = JSON.parse(data.toString()); | ||
| current_doc = JSON.parse(data.toString()) | ||
| if (counter == stopCounterIndex) { | ||
| es_stream.close(); | ||
| return; | ||
| es_stream.close() | ||
| return | ||
| } | ||
| counter++; | ||
| }); | ||
| counter++ | ||
| }) | ||
| es_stream.on('end', function() { | ||
| expect(counter).to.equal(stopCounterIndex); | ||
| done(); | ||
| }); | ||
| expect(counter).to.equal(stopCounterIndex) | ||
| done() | ||
| }) | ||
| es_stream.on('error', function(err) { | ||
| done(err); | ||
| }); | ||
| }); | ||
| done(err) | ||
| }) | ||
| }) | ||
| it("Should correctly close the stream when #close() method is called (without 'return' into 'data' handler)", function(done) { | ||
| this.timeout(10000); | ||
| var pageSize = '5'; | ||
| var stopCounterIndex = (parseInt(pageSize) + 1); | ||
| var counter = 0; | ||
| var current_doc; | ||
| var elasticsearch_client = new elasticsearch.Client(); | ||
| this.timeout(10000) | ||
| var pageSize = '5' | ||
| var stopCounterIndex = parseInt(pageSize) + 1 | ||
| var counter = 0 | ||
| var current_doc | ||
| var elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
| var es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
| index: 'elasticsearch-test-scroll-stream', | ||
| type: 'test-type', | ||
| scroll: '10s', | ||
| size: pageSize, | ||
| _source: ["name"], | ||
| body: { | ||
| query: { | ||
| bool: { | ||
| must: [ | ||
| { | ||
| query_string: { | ||
| default_field: "_all", | ||
| query: 'name:third*' | ||
| } | ||
| } | ||
| ] | ||
| } | ||
| } | ||
| } | ||
| }, ['_id', '_score']); | ||
| var es_stream = new ElasticsearchScrollStream( | ||
| elasticsearch_client, | ||
| { | ||
| index: 'elasticsearch-test-scroll-stream', | ||
| type: 'test-type', | ||
| scroll: '10s', | ||
| size: pageSize, | ||
| _source: ['name'], | ||
| body: { | ||
| query: { | ||
| bool: { | ||
| must: [ | ||
| { | ||
| query_string: { | ||
| default_field: '_all', | ||
| query: 'name:third*', | ||
| }, | ||
| }, | ||
| ], | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| ['_id', '_score'] | ||
| ) | ||
| es_stream.on('data', function(data) { | ||
| current_doc = JSON.parse(data.toString()); | ||
| current_doc = JSON.parse(data.toString()) | ||
| if (counter == stopCounterIndex) { | ||
| es_stream.close(); | ||
| es_stream.close() | ||
| } | ||
| counter++; | ||
| }); | ||
| counter++ | ||
| }) | ||
| es_stream.on('end', function() { | ||
| expect(counter).to.equal(parseInt(pageSize) * 2); | ||
| done(); | ||
| }); | ||
| expect(counter).to.equal(parseInt(pageSize) * 2) | ||
| done() | ||
| }) | ||
| es_stream.on('error', function(err) { | ||
| done(err); | ||
| }); | ||
| }); | ||
| }); | ||
| done(err) | ||
| }) | ||
| }) | ||
| }) |
| /** | ||
| * Elastical Stream | ||
| * | ||
| * Create a ReadableStream from an elasticsearch scroll query. | ||
| * Assumptions: client library is of type [node-elastical](https://www.npmjs.org/package/elastical) | ||
| */ | ||
| var Readable = require("stream").Readable, | ||
| util = require("util"); | ||
| /** | ||
| * LibElasticalScrollStream | ||
| * | ||
| * @param `client` - elastical instance | ||
| * @param `query_opts` - query object to be passed to elastical. | ||
| * It contains the query | ||
| * @param `stream_opts` - object to be passed to ReadableStream | ||
| */ | ||
| var LibElasticalScrollStream = function(client, query_opts, stream_opts) { | ||
| this._client = client; | ||
| this._options = query_opts; | ||
| this._options.scroll = query_opts.scroll || '10m'; | ||
| this._reading = false; | ||
| this._counter = 0; | ||
| Readable.call(this, stream_opts); | ||
| }; | ||
| util.inherits(LibElasticalScrollStream, Readable); | ||
| LibElasticalScrollStream.prototype._read = function() { | ||
| if (this._reading) { | ||
| return false; | ||
| } | ||
| this._reading = true; | ||
| var self = this; | ||
| this._client.search(this._options, function getMoreUntilDone(err, results, _res) { | ||
| if (err) { | ||
| return self.emit("error", err); | ||
| } | ||
| var objectMode = self._readableState.objectMode; | ||
| results.hits.forEach(function(hit) { | ||
| self.push(objectMode ? hit.fields : JSON.stringify(hit.fields)); | ||
| self._counter++; | ||
| }); | ||
| if (results.total !== self._counter) { | ||
| self._client.search({ | ||
| scroll: self._options.scroll, | ||
| scroll_id: _res._scroll_id | ||
| }, getMoreUntilDone); | ||
| } else { | ||
| return setImmediate(function() { | ||
| self._reading = false; | ||
| self._counter = 0; | ||
| self.push(null); | ||
| }); | ||
| } | ||
| }); | ||
| }; | ||
| module.exports = LibElasticalScrollStream; | ||
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
3
-25%23921
-4.88%11
-8.33%456
-5.2%236
-1.26%1
Infinity%