elasticsearch-scroll-stream
Advanced tools
Comparing version 1.1.5 to 1.2.0
31
index.js
@@ -6,6 +6,5 @@ /** | ||
*/ | ||
var LibElasticalAdaptee = require("./lib/elastical-stream"), | ||
LibElasticsearchAdaptee = require("./lib/elasticsearch-stream"); | ||
var LibElasticsearchAdaptee = require('./lib/elasticsearch-stream') | ||
var allowed_extrafields = ['_id', '_score', '_type', '_index', '_parent', '_routing', 'inner_hits']; | ||
var allowed_extrafields = ['_id', '_score', '_type', '_index', '_parent', '_routing', 'inner_hits'] | ||
@@ -22,24 +21,20 @@ /** | ||
var ElasticsearchScrollStream = function(client, query_opts, optional_fields, stream_opts) { | ||
if (arguments.length == 1) throw new Error("ElasticsearchScrollStream: missing parameters"); | ||
if (!client) throw new Error("ElasticsearchScrollStream: client is ", client); | ||
if (arguments.length == 1) throw new Error('ElasticsearchScrollStream: missing parameters') | ||
if (!client) throw new Error('ElasticsearchScrollStream: client is ', client) | ||
optional_fields = (!!optional_fields) ? optional_fields : [] | ||
if (!Array.isArray(optional_fields)) throw new Error("ElasticsearchScrollStream: optional_fields must be an array", optional_fields); | ||
optional_fields = !!optional_fields ? optional_fields : [] | ||
if (!Array.isArray(optional_fields)) | ||
throw new Error('ElasticsearchScrollStream: optional_fields must be an array', optional_fields) | ||
optional_fields.forEach(function(entry) { | ||
if (allowed_extrafields.indexOf(entry) == -1) { | ||
throw new Error("ElasticsearchScrollStream: property '" + entry + "' not allowed in optional_fields"); | ||
throw new Error("ElasticsearchScrollStream: property '" + entry + "' not allowed in optional_fields") | ||
} | ||
}); | ||
}) | ||
stream_opts = (!!stream_opts) ? stream_opts : {}; | ||
stream_opts = !!stream_opts ? stream_opts : {} | ||
if (!!client.nodes) { | ||
return new LibElasticsearchAdaptee(client, query_opts, optional_fields, stream_opts); | ||
} else { | ||
return new LibElasticalAdaptee(client, query_opts, stream_opts); | ||
} | ||
}; | ||
return new LibElasticsearchAdaptee(client, query_opts, optional_fields, stream_opts) | ||
} | ||
module.exports = ElasticsearchScrollStream; | ||
module.exports = ElasticsearchScrollStream |
@@ -7,6 +7,5 @@ /** | ||
*/ | ||
var Readable = require("stream").Readable, | ||
util = require("util"); | ||
var Readable = require('stream').Readable, | ||
util = require('util') | ||
/** | ||
@@ -23,27 +22,25 @@ * LibElasticsearchScrollStream | ||
var LibElasticsearchScrollStream = function(client, query_opts, optional_fields, stream_opts) { | ||
this._client = client | ||
this._options = query_opts | ||
this._extrafields = optional_fields | ||
this._options.scroll = query_opts.scroll || '10m' | ||
this._reading = false | ||
this._counter = 0 | ||
this._total = 0 | ||
this._forceClose = false | ||
Readable.call(this, stream_opts) | ||
} | ||
this._client = client; | ||
this._options = query_opts; | ||
this._extrafields = optional_fields; | ||
this._options.scroll = query_opts.scroll || '10m'; | ||
this._reading = false; | ||
this._counter = 0; | ||
this._total = 0; | ||
this._forceClose = false; | ||
Readable.call(this, stream_opts); | ||
}; | ||
util.inherits(LibElasticsearchScrollStream, Readable) | ||
util.inherits(LibElasticsearchScrollStream, Readable); | ||
LibElasticsearchScrollStream.prototype._read = function() { | ||
if (this._reading) { | ||
return false; | ||
return false | ||
} | ||
this._reading = true; | ||
var self = this; | ||
this._reading = true | ||
var self = this | ||
this._client.search(this._options, function getMoreUntilDone(err, response) { | ||
if (err) { | ||
return self.emit("error", err); | ||
return self.emit('error', err) | ||
} | ||
@@ -57,11 +54,12 @@ | ||
// } | ||
self._total = (typeof response.hits.total === 'object') ? response.hits.total.value : response.hits.total; | ||
var body = !!response.body ? response.body : response | ||
self._total = typeof body.hits.total === 'object' ? body.hits.total.value : body.hits.total | ||
var objectMode = self._readableState.objectMode; | ||
response.hits.hits.forEach(function(hit) { | ||
var ref_results = {}; | ||
if(hit.fields) { | ||
ref_results = hit.fields; | ||
var objectMode = self._readableState.objectMode | ||
body.hits.hits.forEach(function(hit) { | ||
var ref_results = {} | ||
if (hit.fields) { | ||
ref_results = hit.fields | ||
} else { | ||
ref_results = hit._source; | ||
ref_results = hit._source | ||
} | ||
@@ -71,35 +69,36 @@ | ||
self._extrafields.forEach(function(entry) { | ||
ref_results[entry] = hit[entry]; | ||
}); | ||
ref_results[entry] = hit[entry] | ||
}) | ||
self.push(objectMode ? ref_results : JSON.stringify(ref_results)); | ||
self._counter++; | ||
}); | ||
self.push(objectMode ? ref_results : JSON.stringify(ref_results)) | ||
self._counter++ | ||
}) | ||
if ((self._total !== self._counter) && (!self._forceClose)) { | ||
self._client.scroll({ | ||
scroll: self._options.scroll, | ||
scroll_id: response._scroll_id | ||
}, getMoreUntilDone); | ||
if (self._total !== self._counter && !self._forceClose) { | ||
self._client.scroll( | ||
{ | ||
scroll: self._options.scroll, | ||
scroll_id: body._scroll_id, | ||
}, | ||
getMoreUntilDone | ||
) | ||
} else { | ||
// trigger an asyncronous clearScroll for the current _scroll_id | ||
self._client.clearScroll({ scrollId: [response._scroll_id] }, function(err, res) {}); | ||
// end correctly | ||
return setImmediate(function() { | ||
self._reading = false; | ||
self._counter = 0; | ||
self._forceClose = false; | ||
self.push(null); | ||
}); | ||
// clearScroll for the current _scroll_id | ||
self._client.clearScroll({ scrollId: [body._scroll_id] }, function(err, res) { | ||
// end correctly | ||
return setImmediate(function() { | ||
self._reading = false | ||
self._counter = 0 | ||
self._forceClose = false | ||
self.push(null) | ||
}) | ||
}) | ||
} | ||
}) | ||
} | ||
}); | ||
}; | ||
LibElasticsearchScrollStream.prototype.close = function() { | ||
return this._forceClose = true; | ||
}; | ||
return (this._forceClose = true) | ||
} | ||
module.exports = LibElasticsearchScrollStream; | ||
module.exports = LibElasticsearchScrollStream |
{ | ||
"name": "elasticsearch-scroll-stream", | ||
"version": "1.1.5", | ||
"version": "1.2.0", | ||
"description": "Elasticsearch Scroll query results as a Stream", | ||
@@ -28,7 +28,6 @@ "repository": { | ||
"devDependencies": { | ||
"@elastic/elasticsearch": "^7.2.0", | ||
"chai": "^4.2.0", | ||
"elastical": "0.0.13", | ||
"elasticsearch": "^16.0.0", | ||
"mocha": "^5.2.0" | ||
} | ||
} |
@@ -8,6 +8,6 @@ | ||
This module works with the following Elasticsearch nodejs clients: | ||
This module works with the official Elasticsearch nodejs clients: | ||
- [elasticsearch](https://www.npmjs.org/package/elasticsearch) (official Elasticsearch js API) | ||
- [elastical](https://www.npmjs.org/package/elastical) (DEPRECATED - Discontinued support) | ||
- [@elastic/elasticsearch](https://www.npmjs.com/package/@elastic/elasticsearch) (new Elasticsearch js API) | ||
- [elasticsearch](https://www.npmjs.org/package/elasticsearch) (old Elasticsearch js API) | ||
@@ -34,6 +34,6 @@ | ||
```js | ||
var elasticsearch = require('elasticsearch'); | ||
var ElasticsearchScrollStream = require('elasticsearch-scroll-stream'); | ||
const { Client } = require('@elastic/elasticsearch') | ||
const ElasticsearchScrollStream = require('elasticsearch-scroll-stream') | ||
var client = new elasticsearch.Client(); | ||
const elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
@@ -44,3 +44,3 @@ // Create index and add documents here... | ||
// as parameters in the constructor | ||
var es_stream = new ElasticsearchScrollStream(client, { | ||
const es_stream = new ElasticsearchScrollStream(client, { | ||
index: 'elasticsearch-test-scroll-stream', | ||
@@ -51,12 +51,11 @@ type: 'test-type', | ||
_source: ['name'], | ||
q: 'name:*' | ||
}); | ||
q: 'name:*', | ||
}) | ||
// Pipe the results to other writeble streams.. | ||
es_stream.pipe(process.stdout); | ||
es_stream.pipe(process.stdout) | ||
es_stream.on('end', function() { | ||
console.log("End"); | ||
}); | ||
console.log('End') | ||
}) | ||
``` | ||
@@ -69,6 +68,6 @@ | ||
```js | ||
var elasticsearch = require('elasticsearch'); | ||
var ElasticsearchScrollStream = require('elasticsearch-scroll-stream'); | ||
const { Client } = require('@elastic/elasticsearch') | ||
const ElasticsearchScrollStream = require('elasticsearch-scroll-stream') | ||
var client = new elasticsearch.Client(); | ||
const elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
@@ -79,3 +78,3 @@ // Create index and add documents here... | ||
// as parameters in the constructor | ||
var es_stream = new ElasticsearchScrollStream(client, { | ||
const es_stream = new ElasticsearchScrollStream(client, { | ||
index: 'elasticsearch-test-scroll-stream', | ||
@@ -90,6 +89,6 @@ type: 'test-type', | ||
// Pipe the results to other writeble streams.. | ||
es_stream.pipe(process.stdout); | ||
es_stream.pipe(process.stdout) | ||
es_stream.on('end', function() { | ||
console.log("End"); | ||
console.log("End") | ||
}); | ||
@@ -102,6 +101,6 @@ | ||
```js | ||
var elasticsearch = require('elasticsearch'); | ||
var ElasticsearchScrollStream = require('elasticsearch-scroll-stream'); | ||
const { Client } = require('@elastic/elasticsearch') | ||
const ElasticsearchScrollStream = require('elasticsearch-scroll-stream') | ||
var client = new elasticsearch.Client(); | ||
const elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
@@ -112,3 +111,3 @@ // Create index and add documents here... | ||
// as parameters in the constructor | ||
var es_stream = new ElasticsearchScrollStream(client, { | ||
const es_stream = new ElasticsearchScrollStream(client, { | ||
index: 'elasticsearch-test-scroll-stream', | ||
@@ -136,6 +135,6 @@ type: 'test-type', | ||
// Pipe the results to other writeble streams.. | ||
es_stream.pipe(process.stdout); | ||
es_stream.pipe(process.stdout) | ||
es_stream.on('end', function() { | ||
console.log("End"); | ||
console.log("End") | ||
}); | ||
@@ -148,17 +147,15 @@ | ||
```js | ||
var elasticsearch = require('elasticsearch'); | ||
var ElasticsearchScrollStream = require('elasticsearch-scroll-stream'); | ||
const { Client } = require('@elastic/elasticsearch') | ||
const ElasticsearchScrollStream = require('elasticsearch-scroll-stream') | ||
// Create index and add documents here... | ||
// You need to pass the client instance and the query object | ||
// as parameters in the constructor | ||
const pageSize = '5' | ||
let stopCounterIndex = (parseInt(pageSize) + 1) | ||
let counter = 0 | ||
let current_doc | ||
const elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
var pageSize = '5'; | ||
var stopCounterIndex = (parseInt(pageSize) + 1); | ||
var counter = 0; | ||
var current_doc; | ||
var elasticsearch_client = new elasticsearch.Client(); | ||
var es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
const es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
index: 'elasticsearch-test-scroll-stream', | ||
@@ -186,15 +183,15 @@ type: 'test-type', | ||
es_stream.on('data', function(data) { | ||
current_doc = JSON.parse(data.toString()); | ||
current_doc = JSON.parse(data.toString()) | ||
if (counter == stopCounterIndex) { | ||
es_stream.close(); | ||
es_stream.close() | ||
} | ||
counter++; | ||
counter++ | ||
}); | ||
es_stream.on('end', function() { | ||
console.log(counter); | ||
console.log(counter) | ||
}); | ||
es_stream.on('error', function(err) { | ||
console.log(err); | ||
console.log(err) | ||
}); | ||
@@ -201,0 +198,0 @@ |
@@ -5,14 +5,14 @@ /** | ||
*/ | ||
var expect = require('chai').expect, | ||
elasticsearch = require('elasticsearch'); | ||
var expect = require('chai').expect | ||
var ElasticsearchScrollStream = require('../index.js'); | ||
var Client = require('@elastic/elasticsearch').Client | ||
var ElasticsearchScrollStream = require('../index.js') | ||
describe('elasticsearch_scroll_stream', function() { | ||
it("Should stream correctly when '_source' property is specified", function(done) { | ||
this.timeout(10000); | ||
var counter = 0; | ||
var current_doc; | ||
var elasticsearch_client = new elasticsearch.Client(); | ||
this.timeout(10000) | ||
var counter = 0 | ||
var current_doc | ||
var elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
@@ -24,3 +24,3 @@ var es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
size: '50', | ||
_source: ["name"], | ||
_source: ['name'], | ||
body: { | ||
@@ -32,35 +32,34 @@ query: { | ||
query_string: { | ||
default_field: "_all", | ||
query: 'name:third*' | ||
} | ||
} | ||
] | ||
} | ||
} | ||
} | ||
}); | ||
default_field: '_all', | ||
query: 'name:third*', | ||
}, | ||
}, | ||
], | ||
}, | ||
}, | ||
}, | ||
}) | ||
es_stream.on('data', function(data) { | ||
expect(es_stream._total).to.equal(20); | ||
current_doc = JSON.parse(data.toString()); | ||
expect(current_doc.name).to.equal("third chunk name"); | ||
counter++; | ||
}); | ||
expect(es_stream._total).to.equal(20) | ||
current_doc = JSON.parse(data.toString()) | ||
expect(current_doc.name).to.equal('third chunk name') | ||
counter++ | ||
}) | ||
es_stream.on('end', function() { | ||
expect(counter).to.equal(20); | ||
done(); | ||
}); | ||
expect(counter).to.equal(20) | ||
done() | ||
}) | ||
es_stream.on('error', function(err) { | ||
done(err); | ||
}); | ||
}); | ||
done(err) | ||
}) | ||
}) | ||
it('Should explicitly clear the active search context when the scroll finishes', function(done) { | ||
this.timeout(10000) | ||
var current_doc | ||
var elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
it("Should explicitly clear the active search context when the scroll finishes", function(done) { | ||
this.timeout(10000); | ||
var current_doc; | ||
var elasticsearch_client = new elasticsearch.Client(); | ||
var es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
@@ -71,3 +70,3 @@ index: 'elasticsearch-test-scroll-stream', | ||
size: '50', | ||
_source: ["name"], | ||
_source: ['name'], | ||
body: { | ||
@@ -79,38 +78,40 @@ query: { | ||
query_string: { | ||
default_field: "_all", | ||
query: 'name:third*' | ||
} | ||
} | ||
] | ||
} | ||
} | ||
} | ||
}); | ||
default_field: '_all', | ||
query: 'name:third*', | ||
}, | ||
}, | ||
], | ||
}, | ||
}, | ||
}, | ||
}) | ||
es_stream.on('data', function(data) { | ||
current_doc = JSON.parse(data.toString()); | ||
expect(current_doc.name).to.equal("third chunk name"); | ||
}); | ||
current_doc = JSON.parse(data.toString()) | ||
expect(current_doc.name).to.equal('third chunk name') | ||
}) | ||
es_stream.on('end', function() { | ||
elasticsearch_client.indices.stats({ | ||
index: '_all', | ||
metric: 'search' | ||
}, function(err, res) { | ||
expect(res._all.total.search.open_contexts).to.equal(0); | ||
done(); | ||
}); | ||
}); | ||
elasticsearch_client.indices.stats( | ||
{ | ||
index: '_all', | ||
metric: 'search', | ||
}, | ||
function(err, res) { | ||
expect(res.body._all.total.search.open_contexts).to.equal(0) | ||
done() | ||
} | ||
) | ||
}) | ||
es_stream.on('error', function(err) { | ||
done(err); | ||
}); | ||
}); | ||
done(err) | ||
}) | ||
}) | ||
it('Should stream correctly when no fields are specified (full _source)', function(done) { | ||
this.timeout(10000); | ||
var counter = 0; | ||
var current_doc; | ||
var elasticsearch_client = new elasticsearch.Client(); | ||
this.timeout(10000) | ||
var counter = 0 | ||
var current_doc | ||
var elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
@@ -128,231 +129,250 @@ var es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
query_string: { | ||
default_field: "_all", | ||
query: 'name:third*' | ||
} | ||
} | ||
] | ||
} | ||
} | ||
} | ||
}); | ||
default_field: '_all', | ||
query: 'name:third*', | ||
}, | ||
}, | ||
], | ||
}, | ||
}, | ||
}, | ||
}) | ||
es_stream.on('data', function(data) { | ||
current_doc = JSON.parse(data.toString()); | ||
expect(current_doc.name).to.equal("third chunk name"); | ||
counter++; | ||
}); | ||
current_doc = JSON.parse(data.toString()) | ||
expect(current_doc.name).to.equal('third chunk name') | ||
counter++ | ||
}) | ||
es_stream.on('end', function() { | ||
expect(counter).to.equal(20); | ||
done(); | ||
}); | ||
expect(counter).to.equal(20) | ||
done() | ||
}) | ||
es_stream.on('error', function(err) { | ||
done(err); | ||
}); | ||
}); | ||
done(err) | ||
}) | ||
}) | ||
it("Should stream correctly when '_source' property is specified and optional_fields required", function(done) { | ||
this.timeout(10000); | ||
var counter = 0; | ||
var current_doc; | ||
var elasticsearch_client = new elasticsearch.Client(); | ||
this.timeout(10000) | ||
var counter = 0 | ||
var current_doc | ||
var elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
var es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
index: 'elasticsearch-test-scroll-stream', | ||
type: 'test-type', | ||
scroll: '10s', | ||
size: '50', | ||
_source: ["name"], | ||
body: { | ||
query: { | ||
bool: { | ||
must: [ | ||
{ | ||
query_string: { | ||
default_field: "_all", | ||
query: 'name:third*' | ||
} | ||
} | ||
] | ||
} | ||
} | ||
} | ||
}, ['_id', '_score']); | ||
var es_stream = new ElasticsearchScrollStream( | ||
elasticsearch_client, | ||
{ | ||
index: 'elasticsearch-test-scroll-stream', | ||
type: 'test-type', | ||
scroll: '10s', | ||
size: '50', | ||
_source: ['name'], | ||
body: { | ||
query: { | ||
bool: { | ||
must: [ | ||
{ | ||
query_string: { | ||
default_field: '_all', | ||
query: 'name:third*', | ||
}, | ||
}, | ||
], | ||
}, | ||
}, | ||
}, | ||
}, | ||
['_id', '_score'] | ||
) | ||
es_stream.on('data', function(data) { | ||
current_doc = JSON.parse(data.toString()); | ||
expect(current_doc.name).to.equal("third chunk name"); | ||
expect(current_doc).to.have.property("_id"); | ||
expect(current_doc).to.have.property("_score"); | ||
counter++; | ||
}); | ||
current_doc = JSON.parse(data.toString()) | ||
expect(current_doc.name).to.equal('third chunk name') | ||
expect(current_doc).to.have.property('_id') | ||
expect(current_doc).to.have.property('_score') | ||
counter++ | ||
}) | ||
es_stream.on('end', function() { | ||
expect(counter).to.equal(20); | ||
done(); | ||
}); | ||
expect(counter).to.equal(20) | ||
done() | ||
}) | ||
es_stream.on('error', function(err) { | ||
done(err); | ||
}); | ||
}); | ||
done(err) | ||
}) | ||
}) | ||
it('Should throw error when optional_fields is not an array', function(done) { | ||
var elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
it("Should throw error when optional_fields is not an array", function(done) { | ||
var elasticsearch_client = new elasticsearch.Client(); | ||
expect( | ||
ElasticsearchScrollStream.bind( | ||
this, | ||
elasticsearch_client, | ||
{ | ||
index: 'elasticsearch-test-scroll-stream', | ||
type: 'test-type', | ||
scroll: '10s', | ||
size: '50', | ||
_source: ['name'], | ||
body: { | ||
query: { | ||
bool: { | ||
must: [ | ||
{ | ||
query_string: { | ||
default_field: '_all', | ||
query: 'name:third*', | ||
}, | ||
}, | ||
], | ||
}, | ||
}, | ||
}, | ||
}, | ||
'_id' | ||
) | ||
).to.throw(/optional_fields must be an array/) | ||
done() | ||
}) | ||
expect(ElasticsearchScrollStream.bind(this, elasticsearch_client, { | ||
index: 'elasticsearch-test-scroll-stream', | ||
type: 'test-type', | ||
scroll: '10s', | ||
size: '50', | ||
_source: ["name"], | ||
body: { | ||
query: { | ||
bool: { | ||
must: [ | ||
{ | ||
query_string: { | ||
default_field: "_all", | ||
query: 'name:third*' | ||
} | ||
} | ||
] | ||
} | ||
} | ||
} | ||
}, '_id')).to.throw(/optional_fields must be an array/); | ||
done(); | ||
}); | ||
it('Should throw error when optional_fields does not contain an allowed value', function(done) { | ||
var elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
expect( | ||
ElasticsearchScrollStream.bind( | ||
this, | ||
elasticsearch_client, | ||
{ | ||
index: 'elasticsearch-test-scroll-stream', | ||
type: 'test-type', | ||
scroll: '10s', | ||
size: '50', | ||
_source: ['name'], | ||
body: { | ||
query: { | ||
bool: { | ||
must: [ | ||
{ | ||
query_string: { | ||
default_field: '_all', | ||
query: 'name:third*', | ||
}, | ||
}, | ||
], | ||
}, | ||
}, | ||
}, | ||
}, | ||
['invalid_value'] | ||
) | ||
).to.throw(/not allowed in optional_fields/) | ||
done() | ||
}) | ||
it("Should throw error when optional_fields does not contain an allowed value", function(done) { | ||
var elasticsearch_client = new elasticsearch.Client(); | ||
expect(ElasticsearchScrollStream.bind(this, elasticsearch_client, { | ||
index: 'elasticsearch-test-scroll-stream', | ||
type: 'test-type', | ||
scroll: '10s', | ||
size: '50', | ||
_source: ["name"], | ||
body: { | ||
query: { | ||
bool: { | ||
must: [ | ||
{ | ||
query_string: { | ||
default_field: "_all", | ||
query: 'name:third*' | ||
} | ||
} | ||
] | ||
} | ||
} | ||
} | ||
}, ['invalid_value'])).to.throw(/not allowed in optional_fields/); | ||
done(); | ||
}); | ||
it("Should correctly close the stream when #close() method is called (using 'return' in 'data' handler)", function(done) { | ||
this.timeout(10000); | ||
var pageSize = '5'; | ||
var stopCounterIndex = (parseInt(pageSize) + 1); | ||
var counter = 0; | ||
var current_doc; | ||
var elasticsearch_client = new elasticsearch.Client(); | ||
this.timeout(10000) | ||
var pageSize = '5' | ||
var stopCounterIndex = parseInt(pageSize) + 1 | ||
var counter = 0 | ||
var current_doc | ||
var elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
var es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
index: 'elasticsearch-test-scroll-stream', | ||
type: 'test-type', | ||
scroll: '10s', | ||
size: pageSize, | ||
_source: ["name"], | ||
body: { | ||
query: { | ||
bool: { | ||
must: [ | ||
{ | ||
query_string: { | ||
default_field: "_all", | ||
query: 'name:third*' | ||
} | ||
} | ||
] | ||
} | ||
} | ||
} | ||
}, ['_id', '_score']); | ||
var es_stream = new ElasticsearchScrollStream( | ||
elasticsearch_client, | ||
{ | ||
index: 'elasticsearch-test-scroll-stream', | ||
type: 'test-type', | ||
scroll: '10s', | ||
size: pageSize, | ||
_source: ['name'], | ||
body: { | ||
query: { | ||
bool: { | ||
must: [ | ||
{ | ||
query_string: { | ||
default_field: '_all', | ||
query: 'name:third*', | ||
}, | ||
}, | ||
], | ||
}, | ||
}, | ||
}, | ||
}, | ||
['_id', '_score'] | ||
) | ||
es_stream.on('data', function(data) { | ||
current_doc = JSON.parse(data.toString()); | ||
current_doc = JSON.parse(data.toString()) | ||
if (counter == stopCounterIndex) { | ||
es_stream.close(); | ||
return; | ||
es_stream.close() | ||
return | ||
} | ||
counter++; | ||
}); | ||
counter++ | ||
}) | ||
es_stream.on('end', function() { | ||
expect(counter).to.equal(stopCounterIndex); | ||
done(); | ||
}); | ||
expect(counter).to.equal(stopCounterIndex) | ||
done() | ||
}) | ||
es_stream.on('error', function(err) { | ||
done(err); | ||
}); | ||
}); | ||
done(err) | ||
}) | ||
}) | ||
it("Should correctly close the stream when #close() method is called (without 'return' into 'data' handler)", function(done) { | ||
this.timeout(10000); | ||
var pageSize = '5'; | ||
var stopCounterIndex = (parseInt(pageSize) + 1); | ||
var counter = 0; | ||
var current_doc; | ||
var elasticsearch_client = new elasticsearch.Client(); | ||
this.timeout(10000) | ||
var pageSize = '5' | ||
var stopCounterIndex = parseInt(pageSize) + 1 | ||
var counter = 0 | ||
var current_doc | ||
var elasticsearch_client = new Client({ node: 'http://localhost:9200' }) | ||
var es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
index: 'elasticsearch-test-scroll-stream', | ||
type: 'test-type', | ||
scroll: '10s', | ||
size: pageSize, | ||
_source: ["name"], | ||
body: { | ||
query: { | ||
bool: { | ||
must: [ | ||
{ | ||
query_string: { | ||
default_field: "_all", | ||
query: 'name:third*' | ||
} | ||
} | ||
] | ||
} | ||
} | ||
} | ||
}, ['_id', '_score']); | ||
var es_stream = new ElasticsearchScrollStream( | ||
elasticsearch_client, | ||
{ | ||
index: 'elasticsearch-test-scroll-stream', | ||
type: 'test-type', | ||
scroll: '10s', | ||
size: pageSize, | ||
_source: ['name'], | ||
body: { | ||
query: { | ||
bool: { | ||
must: [ | ||
{ | ||
query_string: { | ||
default_field: '_all', | ||
query: 'name:third*', | ||
}, | ||
}, | ||
], | ||
}, | ||
}, | ||
}, | ||
}, | ||
['_id', '_score'] | ||
) | ||
es_stream.on('data', function(data) { | ||
current_doc = JSON.parse(data.toString()); | ||
current_doc = JSON.parse(data.toString()) | ||
if (counter == stopCounterIndex) { | ||
es_stream.close(); | ||
es_stream.close() | ||
} | ||
counter++; | ||
}); | ||
counter++ | ||
}) | ||
es_stream.on('end', function() { | ||
expect(counter).to.equal(parseInt(pageSize) * 2); | ||
done(); | ||
}); | ||
expect(counter).to.equal(parseInt(pageSize) * 2) | ||
done() | ||
}) | ||
es_stream.on('error', function(err) { | ||
done(err); | ||
}); | ||
}); | ||
}); | ||
done(err) | ||
}) | ||
}) | ||
}) |
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
3
23921
11
456
236
1