New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

elasticsearch-scroll-stream

Package Overview
Dependencies
Maintainers
2
Versions
22
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

elasticsearch-scroll-stream - npm Package Compare versions

Comparing version 1.1.5 to 1.2.0

31

index.js

@@ -6,6 +6,5 @@ /**

*/
var LibElasticalAdaptee = require("./lib/elastical-stream"),
LibElasticsearchAdaptee = require("./lib/elasticsearch-stream");
var LibElasticsearchAdaptee = require('./lib/elasticsearch-stream')
var allowed_extrafields = ['_id', '_score', '_type', '_index', '_parent', '_routing', 'inner_hits'];
var allowed_extrafields = ['_id', '_score', '_type', '_index', '_parent', '_routing', 'inner_hits']

@@ -22,24 +21,20 @@ /**

var ElasticsearchScrollStream = function(client, query_opts, optional_fields, stream_opts) {
if (arguments.length == 1) throw new Error("ElasticsearchScrollStream: missing parameters");
if (!client) throw new Error("ElasticsearchScrollStream: client is ", client);
if (arguments.length == 1) throw new Error('ElasticsearchScrollStream: missing parameters')
if (!client) throw new Error('ElasticsearchScrollStream: client is ', client)
optional_fields = (!!optional_fields) ? optional_fields : []
if (!Array.isArray(optional_fields)) throw new Error("ElasticsearchScrollStream: optional_fields must be an array", optional_fields);
optional_fields = !!optional_fields ? optional_fields : []
if (!Array.isArray(optional_fields))
throw new Error('ElasticsearchScrollStream: optional_fields must be an array', optional_fields)
optional_fields.forEach(function(entry) {
if (allowed_extrafields.indexOf(entry) == -1) {
throw new Error("ElasticsearchScrollStream: property '" + entry + "' not allowed in optional_fields");
throw new Error("ElasticsearchScrollStream: property '" + entry + "' not allowed in optional_fields")
}
});
})
stream_opts = (!!stream_opts) ? stream_opts : {};
stream_opts = !!stream_opts ? stream_opts : {}
if (!!client.nodes) {
return new LibElasticsearchAdaptee(client, query_opts, optional_fields, stream_opts);
} else {
return new LibElasticalAdaptee(client, query_opts, stream_opts);
}
};
return new LibElasticsearchAdaptee(client, query_opts, optional_fields, stream_opts)
}
module.exports = ElasticsearchScrollStream;
module.exports = ElasticsearchScrollStream

@@ -7,6 +7,5 @@ /**

*/
var Readable = require("stream").Readable,
util = require("util");
var Readable = require('stream').Readable,
util = require('util')
/**

@@ -23,27 +22,25 @@ * LibElasticsearchScrollStream

var LibElasticsearchScrollStream = function(client, query_opts, optional_fields, stream_opts) {
this._client = client
this._options = query_opts
this._extrafields = optional_fields
this._options.scroll = query_opts.scroll || '10m'
this._reading = false
this._counter = 0
this._total = 0
this._forceClose = false
Readable.call(this, stream_opts)
}
this._client = client;
this._options = query_opts;
this._extrafields = optional_fields;
this._options.scroll = query_opts.scroll || '10m';
this._reading = false;
this._counter = 0;
this._total = 0;
this._forceClose = false;
Readable.call(this, stream_opts);
};
util.inherits(LibElasticsearchScrollStream, Readable)
util.inherits(LibElasticsearchScrollStream, Readable);
LibElasticsearchScrollStream.prototype._read = function() {
if (this._reading) {
return false;
return false
}
this._reading = true;
var self = this;
this._reading = true
var self = this
this._client.search(this._options, function getMoreUntilDone(err, response) {
if (err) {
return self.emit("error", err);
return self.emit('error', err)
}

@@ -57,11 +54,12 @@

// }
self._total = (typeof response.hits.total === 'object') ? response.hits.total.value : response.hits.total;
var body = !!response.body ? response.body : response
self._total = typeof body.hits.total === 'object' ? body.hits.total.value : body.hits.total
var objectMode = self._readableState.objectMode;
response.hits.hits.forEach(function(hit) {
var ref_results = {};
if(hit.fields) {
ref_results = hit.fields;
var objectMode = self._readableState.objectMode
body.hits.hits.forEach(function(hit) {
var ref_results = {}
if (hit.fields) {
ref_results = hit.fields
} else {
ref_results = hit._source;
ref_results = hit._source
}

@@ -71,35 +69,36 @@

self._extrafields.forEach(function(entry) {
ref_results[entry] = hit[entry];
});
ref_results[entry] = hit[entry]
})
self.push(objectMode ? ref_results : JSON.stringify(ref_results));
self._counter++;
});
self.push(objectMode ? ref_results : JSON.stringify(ref_results))
self._counter++
})
if ((self._total !== self._counter) && (!self._forceClose)) {
self._client.scroll({
scroll: self._options.scroll,
scroll_id: response._scroll_id
}, getMoreUntilDone);
if (self._total !== self._counter && !self._forceClose) {
self._client.scroll(
{
scroll: self._options.scroll,
scroll_id: body._scroll_id,
},
getMoreUntilDone
)
} else {
// trigger an asyncronous clearScroll for the current _scroll_id
self._client.clearScroll({ scrollId: [response._scroll_id] }, function(err, res) {});
// end correctly
return setImmediate(function() {
self._reading = false;
self._counter = 0;
self._forceClose = false;
self.push(null);
});
// clearScroll for the current _scroll_id
self._client.clearScroll({ scrollId: [body._scroll_id] }, function(err, res) {
// end correctly
return setImmediate(function() {
self._reading = false
self._counter = 0
self._forceClose = false
self.push(null)
})
})
}
})
}
});
};
LibElasticsearchScrollStream.prototype.close = function() {
return this._forceClose = true;
};
return (this._forceClose = true)
}
module.exports = LibElasticsearchScrollStream;
module.exports = LibElasticsearchScrollStream
{
"name": "elasticsearch-scroll-stream",
"version": "1.1.5",
"version": "1.2.0",
"description": "Elasticsearch Scroll query results as a Stream",

@@ -28,7 +28,6 @@ "repository": {

"devDependencies": {
"@elastic/elasticsearch": "^7.2.0",
"chai": "^4.2.0",
"elastical": "0.0.13",
"elasticsearch": "^16.0.0",
"mocha": "^5.2.0"
}
}

@@ -8,6 +8,6 @@

This module works with the following Elasticsearch nodejs clients:
This module works with the official Elasticsearch nodejs clients:
- [elasticsearch](https://www.npmjs.org/package/elasticsearch) (official Elasticsearch js API)
- [elastical](https://www.npmjs.org/package/elastical) (DEPRECATED - Discontinued support)
- [@elastic/elasticsearch](https://www.npmjs.com/package/@elastic/elasticsearch) (new Elasticsearch js API)
- [elasticsearch](https://www.npmjs.org/package/elasticsearch) (old Elasticsearch js API)

@@ -34,6 +34,6 @@

```js
var elasticsearch = require('elasticsearch');
var ElasticsearchScrollStream = require('elasticsearch-scroll-stream');
const { Client } = require('@elastic/elasticsearch')
const ElasticsearchScrollStream = require('elasticsearch-scroll-stream')
var client = new elasticsearch.Client();
const elasticsearch_client = new Client({ node: 'http://localhost:9200' })

@@ -44,3 +44,3 @@ // Create index and add documents here...

// as parameters in the constructor
var es_stream = new ElasticsearchScrollStream(client, {
const es_stream = new ElasticsearchScrollStream(client, {
index: 'elasticsearch-test-scroll-stream',

@@ -51,12 +51,11 @@ type: 'test-type',

_source: ['name'],
q: 'name:*'
});
q: 'name:*',
})
// Pipe the results to other writeble streams..
es_stream.pipe(process.stdout);
es_stream.pipe(process.stdout)
es_stream.on('end', function() {
console.log("End");
});
console.log('End')
})
```

@@ -69,6 +68,6 @@

```js
var elasticsearch = require('elasticsearch');
var ElasticsearchScrollStream = require('elasticsearch-scroll-stream');
const { Client } = require('@elastic/elasticsearch')
const ElasticsearchScrollStream = require('elasticsearch-scroll-stream')
var client = new elasticsearch.Client();
const elasticsearch_client = new Client({ node: 'http://localhost:9200' })

@@ -79,3 +78,3 @@ // Create index and add documents here...

// as parameters in the constructor
var es_stream = new ElasticsearchScrollStream(client, {
const es_stream = new ElasticsearchScrollStream(client, {
index: 'elasticsearch-test-scroll-stream',

@@ -90,6 +89,6 @@ type: 'test-type',

// Pipe the results to other writeble streams..
es_stream.pipe(process.stdout);
es_stream.pipe(process.stdout)
es_stream.on('end', function() {
console.log("End");
console.log("End")
});

@@ -102,6 +101,6 @@

```js
var elasticsearch = require('elasticsearch');
var ElasticsearchScrollStream = require('elasticsearch-scroll-stream');
const { Client } = require('@elastic/elasticsearch')
const ElasticsearchScrollStream = require('elasticsearch-scroll-stream')
var client = new elasticsearch.Client();
const elasticsearch_client = new Client({ node: 'http://localhost:9200' })

@@ -112,3 +111,3 @@ // Create index and add documents here...

// as parameters in the constructor
var es_stream = new ElasticsearchScrollStream(client, {
const es_stream = new ElasticsearchScrollStream(client, {
index: 'elasticsearch-test-scroll-stream',

@@ -136,6 +135,6 @@ type: 'test-type',

// Pipe the results to other writeble streams..
es_stream.pipe(process.stdout);
es_stream.pipe(process.stdout)
es_stream.on('end', function() {
console.log("End");
console.log("End")
});

@@ -148,17 +147,15 @@

```js
var elasticsearch = require('elasticsearch');
var ElasticsearchScrollStream = require('elasticsearch-scroll-stream');
const { Client } = require('@elastic/elasticsearch')
const ElasticsearchScrollStream = require('elasticsearch-scroll-stream')
// Create index and add documents here...
// You need to pass the client instance and the query object
// as parameters in the constructor
const pageSize = '5'
let stopCounterIndex = (parseInt(pageSize) + 1)
let counter = 0
let current_doc
const elasticsearch_client = new Client({ node: 'http://localhost:9200' })
var pageSize = '5';
var stopCounterIndex = (parseInt(pageSize) + 1);
var counter = 0;
var current_doc;
var elasticsearch_client = new elasticsearch.Client();
var es_stream = new ElasticsearchScrollStream(elasticsearch_client, {
const es_stream = new ElasticsearchScrollStream(elasticsearch_client, {
index: 'elasticsearch-test-scroll-stream',

@@ -186,15 +183,15 @@ type: 'test-type',

es_stream.on('data', function(data) {
current_doc = JSON.parse(data.toString());
current_doc = JSON.parse(data.toString())
if (counter == stopCounterIndex) {
es_stream.close();
es_stream.close()
}
counter++;
counter++
});
es_stream.on('end', function() {
console.log(counter);
console.log(counter)
});
es_stream.on('error', function(err) {
console.log(err);
console.log(err)
});

@@ -201,0 +198,0 @@

@@ -5,14 +5,14 @@ /**

*/
var expect = require('chai').expect,
elasticsearch = require('elasticsearch');
var expect = require('chai').expect
var ElasticsearchScrollStream = require('../index.js');
var Client = require('@elastic/elasticsearch').Client
var ElasticsearchScrollStream = require('../index.js')
describe('elasticsearch_scroll_stream', function() {
it("Should stream correctly when '_source' property is specified", function(done) {
this.timeout(10000);
var counter = 0;
var current_doc;
var elasticsearch_client = new elasticsearch.Client();
this.timeout(10000)
var counter = 0
var current_doc
var elasticsearch_client = new Client({ node: 'http://localhost:9200' })

@@ -24,3 +24,3 @@ var es_stream = new ElasticsearchScrollStream(elasticsearch_client, {

size: '50',
_source: ["name"],
_source: ['name'],
body: {

@@ -32,35 +32,34 @@ query: {

query_string: {
default_field: "_all",
query: 'name:third*'
}
}
]
}
}
}
});
default_field: '_all',
query: 'name:third*',
},
},
],
},
},
},
})
es_stream.on('data', function(data) {
expect(es_stream._total).to.equal(20);
current_doc = JSON.parse(data.toString());
expect(current_doc.name).to.equal("third chunk name");
counter++;
});
expect(es_stream._total).to.equal(20)
current_doc = JSON.parse(data.toString())
expect(current_doc.name).to.equal('third chunk name')
counter++
})
es_stream.on('end', function() {
expect(counter).to.equal(20);
done();
});
expect(counter).to.equal(20)
done()
})
es_stream.on('error', function(err) {
done(err);
});
});
done(err)
})
})
it('Should explicitly clear the active search context when the scroll finishes', function(done) {
this.timeout(10000)
var current_doc
var elasticsearch_client = new Client({ node: 'http://localhost:9200' })
it("Should explicitly clear the active search context when the scroll finishes", function(done) {
this.timeout(10000);
var current_doc;
var elasticsearch_client = new elasticsearch.Client();
var es_stream = new ElasticsearchScrollStream(elasticsearch_client, {

@@ -71,3 +70,3 @@ index: 'elasticsearch-test-scroll-stream',

size: '50',
_source: ["name"],
_source: ['name'],
body: {

@@ -79,38 +78,40 @@ query: {

query_string: {
default_field: "_all",
query: 'name:third*'
}
}
]
}
}
}
});
default_field: '_all',
query: 'name:third*',
},
},
],
},
},
},
})
es_stream.on('data', function(data) {
current_doc = JSON.parse(data.toString());
expect(current_doc.name).to.equal("third chunk name");
});
current_doc = JSON.parse(data.toString())
expect(current_doc.name).to.equal('third chunk name')
})
es_stream.on('end', function() {
elasticsearch_client.indices.stats({
index: '_all',
metric: 'search'
}, function(err, res) {
expect(res._all.total.search.open_contexts).to.equal(0);
done();
});
});
elasticsearch_client.indices.stats(
{
index: '_all',
metric: 'search',
},
function(err, res) {
expect(res.body._all.total.search.open_contexts).to.equal(0)
done()
}
)
})
es_stream.on('error', function(err) {
done(err);
});
});
done(err)
})
})
it('Should stream correctly when no fields are specified (full _source)', function(done) {
this.timeout(10000);
var counter = 0;
var current_doc;
var elasticsearch_client = new elasticsearch.Client();
this.timeout(10000)
var counter = 0
var current_doc
var elasticsearch_client = new Client({ node: 'http://localhost:9200' })

@@ -128,231 +129,250 @@ var es_stream = new ElasticsearchScrollStream(elasticsearch_client, {

query_string: {
default_field: "_all",
query: 'name:third*'
}
}
]
}
}
}
});
default_field: '_all',
query: 'name:third*',
},
},
],
},
},
},
})
es_stream.on('data', function(data) {
current_doc = JSON.parse(data.toString());
expect(current_doc.name).to.equal("third chunk name");
counter++;
});
current_doc = JSON.parse(data.toString())
expect(current_doc.name).to.equal('third chunk name')
counter++
})
es_stream.on('end', function() {
expect(counter).to.equal(20);
done();
});
expect(counter).to.equal(20)
done()
})
es_stream.on('error', function(err) {
done(err);
});
});
done(err)
})
})
it("Should stream correctly when '_source' property is specified and optional_fields required", function(done) {
this.timeout(10000);
var counter = 0;
var current_doc;
var elasticsearch_client = new elasticsearch.Client();
this.timeout(10000)
var counter = 0
var current_doc
var elasticsearch_client = new Client({ node: 'http://localhost:9200' })
var es_stream = new ElasticsearchScrollStream(elasticsearch_client, {
index: 'elasticsearch-test-scroll-stream',
type: 'test-type',
scroll: '10s',
size: '50',
_source: ["name"],
body: {
query: {
bool: {
must: [
{
query_string: {
default_field: "_all",
query: 'name:third*'
}
}
]
}
}
}
}, ['_id', '_score']);
var es_stream = new ElasticsearchScrollStream(
elasticsearch_client,
{
index: 'elasticsearch-test-scroll-stream',
type: 'test-type',
scroll: '10s',
size: '50',
_source: ['name'],
body: {
query: {
bool: {
must: [
{
query_string: {
default_field: '_all',
query: 'name:third*',
},
},
],
},
},
},
},
['_id', '_score']
)
es_stream.on('data', function(data) {
current_doc = JSON.parse(data.toString());
expect(current_doc.name).to.equal("third chunk name");
expect(current_doc).to.have.property("_id");
expect(current_doc).to.have.property("_score");
counter++;
});
current_doc = JSON.parse(data.toString())
expect(current_doc.name).to.equal('third chunk name')
expect(current_doc).to.have.property('_id')
expect(current_doc).to.have.property('_score')
counter++
})
es_stream.on('end', function() {
expect(counter).to.equal(20);
done();
});
expect(counter).to.equal(20)
done()
})
es_stream.on('error', function(err) {
done(err);
});
});
done(err)
})
})
it('Should throw error when optional_fields is not an array', function(done) {
var elasticsearch_client = new Client({ node: 'http://localhost:9200' })
it("Should throw error when optional_fields is not an array", function(done) {
var elasticsearch_client = new elasticsearch.Client();
expect(
ElasticsearchScrollStream.bind(
this,
elasticsearch_client,
{
index: 'elasticsearch-test-scroll-stream',
type: 'test-type',
scroll: '10s',
size: '50',
_source: ['name'],
body: {
query: {
bool: {
must: [
{
query_string: {
default_field: '_all',
query: 'name:third*',
},
},
],
},
},
},
},
'_id'
)
).to.throw(/optional_fields must be an array/)
done()
})
expect(ElasticsearchScrollStream.bind(this, elasticsearch_client, {
index: 'elasticsearch-test-scroll-stream',
type: 'test-type',
scroll: '10s',
size: '50',
_source: ["name"],
body: {
query: {
bool: {
must: [
{
query_string: {
default_field: "_all",
query: 'name:third*'
}
}
]
}
}
}
}, '_id')).to.throw(/optional_fields must be an array/);
done();
});
it('Should throw error when optional_fields does not contain an allowed value', function(done) {
var elasticsearch_client = new Client({ node: 'http://localhost:9200' })
expect(
ElasticsearchScrollStream.bind(
this,
elasticsearch_client,
{
index: 'elasticsearch-test-scroll-stream',
type: 'test-type',
scroll: '10s',
size: '50',
_source: ['name'],
body: {
query: {
bool: {
must: [
{
query_string: {
default_field: '_all',
query: 'name:third*',
},
},
],
},
},
},
},
['invalid_value']
)
).to.throw(/not allowed in optional_fields/)
done()
})
it("Should throw error when optional_fields does not contain an allowed value", function(done) {
var elasticsearch_client = new elasticsearch.Client();
expect(ElasticsearchScrollStream.bind(this, elasticsearch_client, {
index: 'elasticsearch-test-scroll-stream',
type: 'test-type',
scroll: '10s',
size: '50',
_source: ["name"],
body: {
query: {
bool: {
must: [
{
query_string: {
default_field: "_all",
query: 'name:third*'
}
}
]
}
}
}
}, ['invalid_value'])).to.throw(/not allowed in optional_fields/);
done();
});
it("Should correctly close the stream when #close() method is called (using 'return' in 'data' handler)", function(done) {
this.timeout(10000);
var pageSize = '5';
var stopCounterIndex = (parseInt(pageSize) + 1);
var counter = 0;
var current_doc;
var elasticsearch_client = new elasticsearch.Client();
this.timeout(10000)
var pageSize = '5'
var stopCounterIndex = parseInt(pageSize) + 1
var counter = 0
var current_doc
var elasticsearch_client = new Client({ node: 'http://localhost:9200' })
var es_stream = new ElasticsearchScrollStream(elasticsearch_client, {
index: 'elasticsearch-test-scroll-stream',
type: 'test-type',
scroll: '10s',
size: pageSize,
_source: ["name"],
body: {
query: {
bool: {
must: [
{
query_string: {
default_field: "_all",
query: 'name:third*'
}
}
]
}
}
}
}, ['_id', '_score']);
var es_stream = new ElasticsearchScrollStream(
elasticsearch_client,
{
index: 'elasticsearch-test-scroll-stream',
type: 'test-type',
scroll: '10s',
size: pageSize,
_source: ['name'],
body: {
query: {
bool: {
must: [
{
query_string: {
default_field: '_all',
query: 'name:third*',
},
},
],
},
},
},
},
['_id', '_score']
)
es_stream.on('data', function(data) {
current_doc = JSON.parse(data.toString());
current_doc = JSON.parse(data.toString())
if (counter == stopCounterIndex) {
es_stream.close();
return;
es_stream.close()
return
}
counter++;
});
counter++
})
es_stream.on('end', function() {
expect(counter).to.equal(stopCounterIndex);
done();
});
expect(counter).to.equal(stopCounterIndex)
done()
})
es_stream.on('error', function(err) {
done(err);
});
});
done(err)
})
})
it("Should correctly close the stream when #close() method is called (without 'return' into 'data' handler)", function(done) {
this.timeout(10000);
var pageSize = '5';
var stopCounterIndex = (parseInt(pageSize) + 1);
var counter = 0;
var current_doc;
var elasticsearch_client = new elasticsearch.Client();
this.timeout(10000)
var pageSize = '5'
var stopCounterIndex = parseInt(pageSize) + 1
var counter = 0
var current_doc
var elasticsearch_client = new Client({ node: 'http://localhost:9200' })
var es_stream = new ElasticsearchScrollStream(elasticsearch_client, {
index: 'elasticsearch-test-scroll-stream',
type: 'test-type',
scroll: '10s',
size: pageSize,
_source: ["name"],
body: {
query: {
bool: {
must: [
{
query_string: {
default_field: "_all",
query: 'name:third*'
}
}
]
}
}
}
}, ['_id', '_score']);
var es_stream = new ElasticsearchScrollStream(
elasticsearch_client,
{
index: 'elasticsearch-test-scroll-stream',
type: 'test-type',
scroll: '10s',
size: pageSize,
_source: ['name'],
body: {
query: {
bool: {
must: [
{
query_string: {
default_field: '_all',
query: 'name:third*',
},
},
],
},
},
},
},
['_id', '_score']
)
es_stream.on('data', function(data) {
current_doc = JSON.parse(data.toString());
current_doc = JSON.parse(data.toString())
if (counter == stopCounterIndex) {
es_stream.close();
es_stream.close()
}
counter++;
});
counter++
})
es_stream.on('end', function() {
expect(counter).to.equal(parseInt(pageSize) * 2);
done();
});
expect(counter).to.equal(parseInt(pageSize) * 2)
done()
})
es_stream.on('error', function(err) {
done(err);
});
});
});
done(err)
})
})
})
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc