elasticsearch-scroll-stream
Advanced tools
Comparing version 1.0.4 to 1.0.5
20
index.js
@@ -12,15 +12,27 @@ /** | ||
* ElasticsearchScrollStream | ||
* @param `client` - elastical instance | ||
* @param `query_opts` - query object to be passed to elastical. | ||
* @param `client` - elasticsearch instance | ||
* @param `query_opts` - query object to be passed to elasticsearch | ||
* See [Elasticsearch API reference](http://www.elasticsearch.org/guide/en/elasticsearch/reference/1.x/search-request-body.html) | ||
* @param `optional_fields` - array of optional properties to include in the results. | ||
* Allowed values: '_id', '_score', '_type', '_index' | ||
* @param `stream_opts` - object to be passed to ReadableStream | ||
*/ | ||
var ElasticsearchScrollStream = function(client, query_opts, stream_opts) { | ||
var ElasticsearchScrollStream = function(client, query_opts, optional_fields, stream_opts) { | ||
if (arguments.length == 1) throw new Error("ElasticsearchScrollStream: missing parameters"); | ||
if (!client) throw new Error("ElasticsearchScrollStream: client is ", client); | ||
optional_fields = (!!optional_fields) ? optional_fields : [] | ||
if (!Array.isArray(optional_fields)) throw new Error("ElasticsearchScrollStream: optional_fields must be an array", optional_fields); | ||
allowed_extrafields = ['_id', '_score', '_type', '_index']; | ||
optional_fields.forEach(function(entry) { | ||
if (allowed_extrafields.indexOf(entry) == -1) { | ||
throw new Error("ElasticsearchScrollStream: property '" + entry + "' not allowed in optional_fields"); | ||
} | ||
}); | ||
stream_opts = (!!stream_opts) ? stream_opts : {}; | ||
if (!!client.nodes) { | ||
return new LibElasticsearchAdaptee(client, query_opts, stream_opts); | ||
return new LibElasticsearchAdaptee(client, query_opts, optional_fields, stream_opts); | ||
} else { | ||
@@ -27,0 +39,0 @@ return new LibElasticalAdaptee(client, query_opts, stream_opts); |
@@ -15,10 +15,13 @@ /** | ||
* @param `client` - elasticsearch instance | ||
* @param `query_opts` - query object to be passed to elastical. | ||
* @param `query_opts` - query object to be passed to elasticsearch. | ||
* It contains the query | ||
* @param `optional_fields` - array of optional properties to include in the results. | ||
* Allowed values: '_id', '_score', '_type', '_index' | ||
* @param `stream_opts` - object to be passed to ReadableStream | ||
*/ | ||
var LibElasticasearchScrollStream = function(client, query_opts, stream_opts) { | ||
var LibElasticasearchScrollStream = function(client, query_opts, optional_fields, stream_opts) { | ||
this._client = client; | ||
this._options = query_opts; | ||
this._extrafields = optional_fields; | ||
this._options.scroll = query_opts.scroll || '10m'; | ||
@@ -46,8 +49,15 @@ this._reading = false; | ||
response.hits.hits.forEach(function(hit) { | ||
var ref_results = {}; | ||
if(hit.fields) { | ||
self.push(JSON.stringify(hit.fields)); | ||
ref_results = hit.fields; | ||
} else { | ||
self.push(JSON.stringify(hit._source)); | ||
ref_results = hit._source; | ||
} | ||
// populate extra fields | ||
self._extrafields.forEach(function(entry) { | ||
ref_results[entry] = hit[entry]; | ||
}) | ||
self.push(JSON.stringify(ref_results)); | ||
self._counter++; | ||
@@ -54,0 +64,0 @@ }); |
{ | ||
"name": "elasticsearch-scroll-stream", | ||
"version": "1.0.4", | ||
"version": "1.0.5", | ||
"description": "Elasticsearch Scroll query results as a Stream", | ||
@@ -5,0 +5,0 @@ "repository": { |
@@ -23,3 +23,3 @@ | ||
Example with a [simple query strings](http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-uri-request.html) query. | ||
Example with a [simple query strings](http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-uri-request.html) query. | ||
@@ -42,3 +42,3 @@ ```js | ||
size: '50', | ||
fields: ['name'], | ||
_source: ['name'], // you can use fields:['name'] alternatively, or nothing at all for the full _source result | ||
q: 'name:*' | ||
@@ -56,2 +56,35 @@ }); | ||
Example with a [simple query strings](http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-uri-request.html) query, | ||
and `optional_fields` specified (in this case we want `_id` and `_score` fields into results). | ||
```js | ||
var elasticsearch = require('elasticsearch'); | ||
var ElasticsearchScrollStream = require('elasticsearch-scroll-stream'); | ||
var client = new elasticsearch.Client(); | ||
// Create index and add documents here... | ||
// You need to pass the client instance and the query object | ||
// as parameters in the constructor | ||
var es_stream = new ElasticsearchScrollStream(client, { | ||
index: 'elasticsearch-test-scroll-stream', | ||
type: 'test-type', | ||
search_type: 'scan', | ||
scroll: '10s', | ||
size: '50', | ||
fields: ['name'], | ||
q: 'name:*' | ||
}, ['_id', '_score']); // optional_fields parameter: allowed values are those supported by elasticsearch | ||
// Pipe the results to other writeble streams.. | ||
es_stream.pipe(process.stdout); | ||
es_stream.on('end', function() { | ||
console.log("End"); | ||
}); | ||
``` | ||
Example with a full request definition using the [Elasticsearch Query DSL](http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/query-dsl.html). | ||
@@ -58,0 +91,0 @@ |
@@ -13,3 +13,3 @@ /** | ||
it('Lib Elastical: should stream data correctly from elasticsearch', function(done) { | ||
it('Lib Elastical: stream data correctly from elasticsearch', function(done) { | ||
var counter = 0; | ||
@@ -57,3 +57,3 @@ var current_doc; | ||
it('Lib Elasticsearch: should stream data correctly when fields are specified', function(done) { | ||
it("Lib Elasticsearch: stream correctly when 'fields' property is specified", function(done) { | ||
this.timeout(10000); | ||
@@ -103,3 +103,3 @@ var counter = 0; | ||
it('Lib Elasticsearch: should stream data correctly when no fields are specified (full _source)', function(done) { | ||
it("Lib Elasticsearch: stream correctly when '_source' property is specified", function(done) { | ||
this.timeout(10000); | ||
@@ -115,2 +115,3 @@ var counter = 0; | ||
size: '50', | ||
_source: ["name"], | ||
body: { | ||
@@ -147,3 +148,195 @@ query: { | ||
}); | ||
it('Lib Elasticsearch: stream correctly when no fields are specified (full _source)', function(done) { | ||
this.timeout(10000); | ||
var counter = 0; | ||
var current_doc; | ||
var elasticsearch_client = new elasticsearch.Client(); | ||
var es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
index: 'elasticsearch-test-scroll-stream', | ||
type: 'test-type', | ||
scroll: '10s', | ||
size: '50', | ||
body: { | ||
query: { | ||
bool: { | ||
must: [ | ||
{ | ||
query_string: { | ||
default_field: "_all", | ||
query: 'name:third*' | ||
} | ||
} | ||
] | ||
} | ||
} | ||
} | ||
}); | ||
es_stream.on('data', function(data) { | ||
current_doc = JSON.parse(data.toString()); | ||
expect(current_doc.name).to.equal("third chunk name"); | ||
counter++; | ||
}); | ||
es_stream.on('end', function() { | ||
expect(counter).to.equal(20); | ||
done(); | ||
}); | ||
es_stream.on('error', function(err) { | ||
done(err); | ||
}); | ||
}); | ||
it("Lib Elasticsearch: stream correctly when 'fields' property is specified and optional_fields required", function(done) { | ||
this.timeout(10000); | ||
var counter = 0; | ||
var current_doc; | ||
var elasticsearch_client = new elasticsearch.Client(); | ||
var es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
index: 'elasticsearch-test-scroll-stream', | ||
type: 'test-type', | ||
scroll: '10s', | ||
size: '50', | ||
fields: ['name'], | ||
body: { | ||
query: { | ||
bool: { | ||
must: [ | ||
{ | ||
query_string: { | ||
default_field: "_all", | ||
query: 'name:third*' | ||
} | ||
} | ||
] | ||
} | ||
} | ||
} | ||
}, ['_id']); | ||
es_stream.on('data', function(data) { | ||
current_doc = JSON.parse(data.toString()); | ||
expect(current_doc.name[0]).to.equal("third chunk name"); | ||
expect(current_doc).to.have.property("_id"); | ||
counter++; | ||
}); | ||
es_stream.on('end', function() { | ||
expect(counter).to.equal(20); | ||
done(); | ||
}); | ||
es_stream.on('error', function(err) { | ||
done(err); | ||
}); | ||
}); | ||
it("Lib Elasticsearch: stream correctly when '_source' property is specified and optional_fields required", function(done) { | ||
this.timeout(10000); | ||
var counter = 0; | ||
var current_doc; | ||
var elasticsearch_client = new elasticsearch.Client(); | ||
var es_stream = new ElasticsearchScrollStream(elasticsearch_client, { | ||
index: 'elasticsearch-test-scroll-stream', | ||
type: 'test-type', | ||
scroll: '10s', | ||
size: '50', | ||
_source: ["name"], | ||
body: { | ||
query: { | ||
bool: { | ||
must: [ | ||
{ | ||
query_string: { | ||
default_field: "_all", | ||
query: 'name:third*' | ||
} | ||
} | ||
] | ||
} | ||
} | ||
} | ||
}, ['_id', '_score']); | ||
es_stream.on('data', function(data) { | ||
current_doc = JSON.parse(data.toString()); | ||
expect(current_doc.name).to.equal("third chunk name"); | ||
expect(current_doc).to.have.property("_id"); | ||
expect(current_doc).to.have.property("_score"); | ||
counter++; | ||
}); | ||
es_stream.on('end', function() { | ||
expect(counter).to.equal(20); | ||
done(); | ||
}); | ||
es_stream.on('error', function(err) { | ||
done(err); | ||
}); | ||
}); | ||
it("Lib Elasticsearch: should throw error when optional_fields is not an array", function(done) { | ||
var elasticsearch_client = new elasticsearch.Client(); | ||
expect(ElasticsearchScrollStream.bind(this, elasticsearch_client, { | ||
index: 'elasticsearch-test-scroll-stream', | ||
type: 'test-type', | ||
scroll: '10s', | ||
size: '50', | ||
_source: ["name"], | ||
body: { | ||
query: { | ||
bool: { | ||
must: [ | ||
{ | ||
query_string: { | ||
default_field: "_all", | ||
query: 'name:third*' | ||
} | ||
} | ||
] | ||
} | ||
} | ||
} | ||
}, '_id')).to.throw(/optional_fields must be an array/); | ||
done(); | ||
}); | ||
it("Lib Elasticsearch: should throw error when optional_fields does not contain an allowed value", function(done) { | ||
var elasticsearch_client = new elasticsearch.Client(); | ||
expect(ElasticsearchScrollStream.bind(this, elasticsearch_client, { | ||
index: 'elasticsearch-test-scroll-stream', | ||
type: 'test-type', | ||
scroll: '10s', | ||
size: '50', | ||
_source: ["name"], | ||
body: { | ||
query: { | ||
bool: { | ||
must: [ | ||
{ | ||
query_string: { | ||
default_field: "_all", | ||
query: 'name:third*' | ||
} | ||
} | ||
] | ||
} | ||
} | ||
} | ||
}, ['invalid_value'])).to.throw(/not allowed in optional_fields/); | ||
done(); | ||
}); | ||
}); | ||
22275
450
179