elasticsearch-scroll-stream
Advanced tools
Comparing version 1.0.7 to 1.1.0
@@ -29,2 +29,3 @@ /** | ||
this._counter = 0; | ||
this._forceClose = false; | ||
Readable.call(this, stream_opts); | ||
@@ -66,3 +67,3 @@ }; | ||
if (response.hits.total !== self._counter) { | ||
if ((response.hits.total !== self._counter) && (!self._forceClose)) { | ||
self._client.scroll({ | ||
@@ -76,2 +77,3 @@ scroll: self._options.scroll, | ||
self._counter = 0; | ||
self._forceClose = false; | ||
self.push(null); | ||
@@ -84,3 +86,7 @@ }); | ||
LibElasticasearchScrollStream.prototype.close = function() { | ||
return this._forceClose = true; | ||
}; | ||
module.exports = LibElasticasearchScrollStream; | ||
{ | ||
"name": "elasticsearch-scroll-stream", | ||
"version": "1.0.7", | ||
"version": "1.1.0", | ||
"description": "Elasticsearch Scroll query results as a Stream", | ||
@@ -5,0 +5,0 @@ "repository": { |
@@ -14,2 +14,8 @@ | ||
## API | ||
`ElasticsearchScrollStream` is a Readable Stream, so it supports all the methods of a classic `Stream#Readable`. | ||
In addition it exposes a `#close()` method to force the stream to stop sourcing from Elasticsearch. | ||
## Installing | ||
@@ -129,2 +135,58 @@ | ||
Example of using the `close()` method. | ||
```js | ||
var elasticsearch = require('elasticsearch'); | ||
var ElasticsearchScrollStream = require('elasticsearch-scroll-stream'); | ||
// Create index and add documents here... | ||
// You need to pass the client instance and the query object | ||
// as parameters in the constructor | ||
var pageSize = '5'; | ||
var stopCounterIndex = (parseInt(pageSize) + 1); | ||
var counter = 0; | ||
var current_doc; | ||
var elasticsearch_client = new elasticsearch.Client(); | ||
var es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
index: 'elasticsearch-test-scroll-stream', | ||
type: 'test-type', | ||
scroll: '10s', | ||
size: pageSize, | ||
_source: ["name"], | ||
body: { | ||
query: { | ||
bool: { | ||
must: [ | ||
{ | ||
query_string: { | ||
default_field: "_all", | ||
query: 'name:third*' | ||
} | ||
} | ||
] | ||
} | ||
} | ||
} | ||
}, ['_id', '_score']); | ||
es_stream.on('data', function(data) { | ||
current_doc = JSON.parse(data.toString()); | ||
if (counter == stopCounterIndex) { | ||
es_stream.close(); | ||
} | ||
counter++; | ||
}); | ||
es_stream.on('end', function() { | ||
console.log(counter); | ||
}); | ||
es_stream.on('error', function(err) { | ||
console.log(err); | ||
}); | ||
``` | ||
See test files for more examples. | ||
@@ -131,0 +193,0 @@ |
@@ -55,3 +55,3 @@ /** | ||
it("Lib Elasticsearch: stream correctly when '_source' property is specified", function(done) { | ||
it("Should stream correctly when '_source' property is specified", function(done) { | ||
this.timeout(10000); | ||
@@ -101,3 +101,3 @@ var counter = 0; | ||
it('Lib Elasticsearch: stream correctly when no fields are specified (full _source)', function(done) { | ||
it('Should stream correctly when no fields are specified (full _source)', function(done) { | ||
this.timeout(10000); | ||
@@ -146,3 +146,3 @@ var counter = 0; | ||
it("Lib Elasticsearch: stream correctly when '_source' property is specified and optional_fields required", function(done) { | ||
it("Should stream correctly when '_source' property is specified and optional_fields required", function(done) { | ||
this.timeout(10000); | ||
@@ -194,3 +194,3 @@ var counter = 0; | ||
it("Lib Elasticsearch: should throw error when optional_fields is not an array", function(done) { | ||
it("Should throw error when optional_fields is not an array", function(done) { | ||
var elasticsearch_client = new elasticsearch.Client(); | ||
@@ -223,3 +223,3 @@ | ||
it("Lib Elasticsearch: should throw error when optional_fields does not contain an allowed value", function(done) { | ||
it("Should throw error when optional_fields does not contain an allowed value", function(done) { | ||
var elasticsearch_client = new elasticsearch.Client(); | ||
@@ -250,3 +250,103 @@ | ||
}); | ||
it("Should correctly close the stream when #close() method is called (using 'return' in 'data' handler)", function(done) { | ||
this.timeout(10000); | ||
var pageSize = '5'; | ||
var stopCounterIndex = (parseInt(pageSize) + 1); | ||
var counter = 0; | ||
var current_doc; | ||
var elasticsearch_client = new elasticsearch.Client(); | ||
var es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
index: 'elasticsearch-test-scroll-stream', | ||
type: 'test-type', | ||
scroll: '10s', | ||
size: pageSize, | ||
_source: ["name"], | ||
body: { | ||
query: { | ||
bool: { | ||
must: [ | ||
{ | ||
query_string: { | ||
default_field: "_all", | ||
query: 'name:third*' | ||
} | ||
} | ||
] | ||
} | ||
} | ||
} | ||
}, ['_id', '_score']); | ||
es_stream.on('data', function(data) { | ||
current_doc = JSON.parse(data.toString()); | ||
if (counter == stopCounterIndex) { | ||
es_stream.close(); | ||
return; | ||
} | ||
counter++; | ||
}); | ||
es_stream.on('end', function() { | ||
expect(counter).to.equal(stopCounterIndex); | ||
done(); | ||
}); | ||
es_stream.on('error', function(err) { | ||
done(err); | ||
}); | ||
}); | ||
it("Should correctly close the stream when #close() method is called (without 'return' into 'data' handler)", function(done) { | ||
this.timeout(10000); | ||
var pageSize = '5'; | ||
var stopCounterIndex = (parseInt(pageSize) + 1); | ||
var counter = 0; | ||
var current_doc; | ||
var elasticsearch_client = new elasticsearch.Client(); | ||
var es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
index: 'elasticsearch-test-scroll-stream', | ||
type: 'test-type', | ||
scroll: '10s', | ||
size: pageSize, | ||
_source: ["name"], | ||
body: { | ||
query: { | ||
bool: { | ||
must: [ | ||
{ | ||
query_string: { | ||
default_field: "_all", | ||
query: 'name:third*' | ||
} | ||
} | ||
] | ||
} | ||
} | ||
} | ||
}, ['_id', '_score']); | ||
es_stream.on('data', function(data) { | ||
current_doc = JSON.parse(data.toString()); | ||
if (counter == stopCounterIndex) { | ||
es_stream.close(); | ||
} | ||
counter++; | ||
}); | ||
es_stream.on('end', function() { | ||
expect(counter).to.equal(parseInt(pageSize) * 2); | ||
done(); | ||
}); | ||
es_stream.on('error', function(err) { | ||
done(err); | ||
}); | ||
}); | ||
}); | ||
24074
464
238