Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

elasticsearch-scroll-stream

Package Overview
Dependencies
Maintainers
2
Versions
22
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

elasticsearch-scroll-stream - npm Package Compare versions

Comparing version 1.2.2 to 1.3.0

3

index.d.ts

@@ -12,3 +12,4 @@ import { Client } from "@elastic/elasticsearch";

| "_parent"
| "_routing";
| "_routing"
| "inner_hits";

@@ -15,0 +16,0 @@ declare class ElasticsearchScrollStream extends Readable {

@@ -6,5 +6,5 @@ /**

*/
var LibElasticsearchAdaptee = require('./lib/elasticsearch-stream')
const LibElasticsearchAdaptee = require('./lib/elasticsearch-stream')
var allowed_extrafields = ['_id', '_score', '_type', '_index', '_parent', '_routing', 'inner_hits']
const allowed_extrafields = ['_id', '_score', '_type', '_index', '_parent', '_routing', 'inner_hits']

@@ -17,24 +17,23 @@ /**

* @param `optional_fields` - array of optional properties to include in the results.
* Allowed values: '_id', '_score', '_type', '_index', '_parent', '_routing'
* Allowed values: '_id', '_score', '_type', '_index', '_parent', '_routing', 'inner_hits'
* @param `stream_opts` - object to be passed to ReadableStream
*/
var ElasticsearchScrollStream = function(client, query_opts, optional_fields, stream_opts) {
if (arguments.length == 1) throw new Error('ElasticsearchScrollStream: missing parameters')
if (!client) throw new Error('ElasticsearchScrollStream: client is ', client)
class ElasticsearchScrollStream {
constructor(client, query_opts = {}, optional_fields = [], stream_opts = {}) {
if (!client) throw new Error('ElasticsearchScrollStream: client is ', client)
if (Object.keys(query_opts).length === 0) throw new Error('ElasticsearchScrollStream: missing parameters')
optional_fields = !!optional_fields ? optional_fields : []
if (!Array.isArray(optional_fields))
throw new Error('ElasticsearchScrollStream: optional_fields must be an array', optional_fields)
if (!Array.isArray(optional_fields))
throw new Error('ElasticsearchScrollStream: optional_fields must be an array', optional_fields)
optional_fields.forEach(function(entry) {
if (allowed_extrafields.indexOf(entry) == -1) {
throw new Error("ElasticsearchScrollStream: property '" + entry + "' not allowed in optional_fields")
}
})
optional_fields.forEach(entry => {
if (allowed_extrafields.indexOf(entry) === -1) {
throw new Error(`ElasticsearchScrollStream: property '${entry}' not allowed in optional_fields`)
}
})
stream_opts = !!stream_opts ? stream_opts : {}
return new LibElasticsearchAdaptee(client, query_opts, optional_fields, stream_opts)
return new LibElasticsearchAdaptee(client, query_opts, optional_fields, stream_opts)
}
}
module.exports = ElasticsearchScrollStream

@@ -1,9 +0,2 @@

/**
* Elasticsearch Stream
*
* Create a ReadableStream from an elasticsearch scroll query.
* Assumptions: client library is of type [elasticsearch](https://www.npmjs.org/package/elasticsearch)
*/
var Readable = require('stream').Readable,
util = require('util')
const { Readable } = require('stream')

@@ -20,27 +13,21 @@ /**

*/
var LibElasticsearchScrollStream = function(client, query_opts, optional_fields, stream_opts) {
this._client = client
this._options = query_opts
this._extrafields = optional_fields
this._options.scroll = query_opts.scroll || '10m'
this._reading = false
this._counter = 0
this._total = 0
this._forceClose = false
Readable.call(this, stream_opts)
}
class LibElasticsearchScrollStream extends Readable {
constructor(client, query_opts, optional_fields, stream_opts) {
super(stream_opts)
util.inherits(LibElasticsearchScrollStream, Readable)
LibElasticsearchScrollStream.prototype._read = function() {
if (this._reading) {
return false
this._client = client
this._options = query_opts
this._extrafields = optional_fields
this._options.scroll = query_opts.scroll || '10m'
this._reading = false
this._counter = 0
this._total = 0
this._forceClose = false
}
this._reading = true
var self = this
this._client.search(this._options, function getMoreUntilDone(err, response) {
getMoreUntilDone = (err, response) => {
if (err) {
return self.emit('error', err)
return this.emit('error', err)
}
let body = !!response.body ? response.body : response

@@ -53,50 +40,46 @@ // Set the total matching documents

// }
var body = !!response.body ? response.body : response
self._total = typeof body.hits.total === 'object' ? body.hits.total.value : body.hits.total
this._total = typeof body.hits.total === 'object' ? body.hits.total.value : body.hits.total
var objectMode = self._readableState.objectMode
body.hits.hits.forEach(function(hit) {
var ref_results = {}
if (hit.fields) {
ref_results = hit.fields
} else {
ref_results = hit._source
}
body.hits.hits.forEach(hit => {
let ref_results = hit.fields ? hit.fields : hit._source
// populate extra fields
self._extrafields.forEach(function(entry) {
this._extrafields.forEach(entry => {
ref_results[entry] = hit[entry]
})
self.push(objectMode ? ref_results : JSON.stringify(ref_results))
self._counter++
this.push(this._readableState.objectMode ? ref_results : JSON.stringify(ref_results))
this._counter++
})
if (self._total !== self._counter && !self._forceClose) {
self._client.scroll(
{
scroll: self._options.scroll,
scroll_id: body._scroll_id,
},
getMoreUntilDone
)
if (this._total !== this._counter && !this._forceClose) {
this._client.scroll({ scroll: this._options.scroll, scroll_id: body._scroll_id }, this.getMoreUntilDone)
} else {
// clearScroll for the current _scroll_id
self._client.clearScroll({ scrollId: [body._scroll_id] }, function(err, res) {
this._client.clearScroll({ scrollId: [body._scroll_id] }, (err, res) => {
// end correctly
return setImmediate(function() {
self._reading = false
self._counter = 0
self._forceClose = false
self.push(null)
return setImmediate(() => {
this._reading = false
this._counter = 0
this._forceClose = false
this.push(null)
})
})
}
})
}
}
LibElasticsearchScrollStream.prototype.close = function() {
return (this._forceClose = true)
_read() {
if (this._reading) {
return false
}
this._reading = true
this._client.search(this._options, this.getMoreUntilDone)
}
close() {
return (this._forceClose = true)
}
}
module.exports = LibElasticsearchScrollStream
{
"name": "elasticsearch-scroll-stream",
"version": "1.2.2",
"version": "1.3.0",
"description": "Elasticsearch Scroll query results as a Stream",

@@ -25,9 +25,9 @@ "repository": {

"engines": {
"node": ">= 0.10.x"
"node": ">=8"
},
"devDependencies": {
"@elastic/elasticsearch": "^7.6.1",
"@elastic/elasticsearch": "^7.8.0",
"chai": "^4.2.0",
"mocha": "^7.1.1"
"mocha": "^7.2.0"
}
}

@@ -5,16 +5,16 @@ /**

*/
var expect = require('chai').expect
const { expect } = require('chai')
var Client = require('@elastic/elasticsearch').Client
const { Client } = require('@elastic/elasticsearch')
var ElasticsearchScrollStream = require('../index.js')
const ElasticsearchScrollStream = require('../index.js')
describe('elasticsearch_scroll_stream', function() {
it("Should stream correctly when '_source' property is specified", function(done) {
describe('elasticsearch_scroll_stream', function () {
it("Should stream correctly when '_source' property is specified", function (done) {
this.timeout(10000)
var counter = 0
var current_doc
var elasticsearch_client = new Client({ node: 'http://localhost:9200' })
let counter = 0
let current_doc
let elasticsearch_client = new Client({ node: 'http://localhost:9200' })
var es_stream = new ElasticsearchScrollStream(elasticsearch_client, {
let es_stream = new ElasticsearchScrollStream(elasticsearch_client, {
index: 'elasticsearch-test-scroll-stream',

@@ -41,3 +41,3 @@ type: 'test-type',

es_stream.on('data', function(data) {
es_stream.on('data', function (data) {
expect(es_stream._total).to.equal(20)

@@ -49,3 +49,3 @@ current_doc = JSON.parse(data.toString())

es_stream.on('end', function() {
es_stream.on('end', function () {
expect(counter).to.equal(20)

@@ -55,3 +55,3 @@ done()

es_stream.on('error', function(err) {
es_stream.on('error', function (err) {
done(err)

@@ -61,8 +61,8 @@ })

it('Should explicitly clear the active search context when the scroll finishes', function(done) {
it('Should explicitly clear the active search context when the scroll finishes', function (done) {
this.timeout(10000)
var current_doc
var elasticsearch_client = new Client({ node: 'http://localhost:9200' })
let current_doc
let elasticsearch_client = new Client({ node: 'http://localhost:9200' })
var es_stream = new ElasticsearchScrollStream(elasticsearch_client, {
let es_stream = new ElasticsearchScrollStream(elasticsearch_client, {
index: 'elasticsearch-test-scroll-stream',

@@ -89,3 +89,3 @@ type: 'test-type',

es_stream.on('data', function(data) {
es_stream.on('data', function (data) {
current_doc = JSON.parse(data.toString())

@@ -95,3 +95,3 @@ expect(current_doc.name).to.equal('third chunk name')

es_stream.on('end', function() {
es_stream.on('end', function () {
elasticsearch_client.indices.stats(

@@ -102,3 +102,3 @@ {

},
function(err, res) {
function (err, res) {
expect(res.body._all.total.search.open_contexts).to.equal(0)

@@ -110,3 +110,3 @@ done()

es_stream.on('error', function(err) {
es_stream.on('error', function (err) {
done(err)

@@ -116,9 +116,9 @@ })

it('Should stream correctly when no fields are specified (full _source)', function(done) {
it('Should stream correctly when no fields are specified (full _source)', function (done) {
this.timeout(10000)
var counter = 0
var current_doc
var elasticsearch_client = new Client({ node: 'http://localhost:9200' })
let counter = 0
let current_doc
let elasticsearch_client = new Client({ node: 'http://localhost:9200' })
var es_stream = new ElasticsearchScrollStream(elasticsearch_client, {
let es_stream = new ElasticsearchScrollStream(elasticsearch_client, {
index: 'elasticsearch-test-scroll-stream',

@@ -144,3 +144,3 @@ type: 'test-type',

es_stream.on('data', function(data) {
es_stream.on('data', function (data) {
current_doc = JSON.parse(data.toString())

@@ -151,3 +151,3 @@ expect(current_doc.name).to.equal('third chunk name')

es_stream.on('end', function() {
es_stream.on('end', function () {
expect(counter).to.equal(20)

@@ -157,3 +157,3 @@ done()

es_stream.on('error', function(err) {
es_stream.on('error', function (err) {
done(err)

@@ -163,9 +163,9 @@ })

it("Should stream correctly when '_source' property is specified and optional_fields required", function(done) {
it("Should stream correctly when '_source' property is specified and optional_fields required", function (done) {
this.timeout(10000)
var counter = 0
var current_doc
var elasticsearch_client = new Client({ node: 'http://localhost:9200' })
let counter = 0
let current_doc
let elasticsearch_client = new Client({ node: 'http://localhost:9200' })
var es_stream = new ElasticsearchScrollStream(
let es_stream = new ElasticsearchScrollStream(
elasticsearch_client,

@@ -196,3 +196,3 @@ {

es_stream.on('data', function(data) {
es_stream.on('data', function (data) {
current_doc = JSON.parse(data.toString())

@@ -205,3 +205,3 @@ expect(current_doc.name).to.equal('third chunk name')

es_stream.on('end', function() {
es_stream.on('end', function () {
expect(counter).to.equal(20)

@@ -211,3 +211,3 @@ done()

es_stream.on('error', function(err) {
es_stream.on('error', function (err) {
done(err)

@@ -217,8 +217,7 @@ })

it('Should throw error when optional_fields is not an array', function(done) {
var elasticsearch_client = new Client({ node: 'http://localhost:9200' })
it('Should throw error when optional_fields is not an array', function (done) {
let elasticsearch_client = new Client({ node: 'http://localhost:9200' })
expect(
ElasticsearchScrollStream.bind(
this,
expect(() => {
new ElasticsearchScrollStream(
elasticsearch_client,

@@ -248,12 +247,11 @@ {

)
).to.throw(/optional_fields must be an array/)
}).to.throw(/optional_fields must be an array/)
done()
})
it('Should throw error when optional_fields does not contain an allowed value', function(done) {
var elasticsearch_client = new Client({ node: 'http://localhost:9200' })
it('Should throw error when optional_fields does not contain an allowed value', function (done) {
let elasticsearch_client = new Client({ node: 'http://localhost:9200' })
expect(
ElasticsearchScrollStream.bind(
this,
expect(() => {
new ElasticsearchScrollStream(
elasticsearch_client,

@@ -283,15 +281,15 @@ {

)
).to.throw(/not allowed in optional_fields/)
}).to.throw(/not allowed in optional_fields/)
done()
})
it("Should correctly close the stream when #close() method is called (using 'return' in 'data' handler)", function(done) {
it("Should correctly close the stream when #close() method is called (using 'return' in 'data' handler)", function (done) {
this.timeout(10000)
var pageSize = '5'
var stopCounterIndex = parseInt(pageSize) + 1
var counter = 0
var current_doc
var elasticsearch_client = new Client({ node: 'http://localhost:9200' })
let pageSize = '5'
let stopCounterIndex = parseInt(pageSize) + 1
let counter = 0
let current_doc
let elasticsearch_client = new Client({ node: 'http://localhost:9200' })
var es_stream = new ElasticsearchScrollStream(
let es_stream = new ElasticsearchScrollStream(
elasticsearch_client,

@@ -322,3 +320,3 @@ {

es_stream.on('data', function(data) {
es_stream.on('data', function (data) {
current_doc = JSON.parse(data.toString())

@@ -332,3 +330,3 @@ if (counter == stopCounterIndex) {

es_stream.on('end', function() {
es_stream.on('end', function () {
expect(counter).to.equal(stopCounterIndex)

@@ -338,3 +336,3 @@ done()

es_stream.on('error', function(err) {
es_stream.on('error', function (err) {
done(err)

@@ -344,11 +342,11 @@ })

it("Should correctly close the stream when #close() method is called (without 'return' into 'data' handler)", function(done) {
it("Should correctly close the stream when #close() method is called (without 'return' into 'data' handler)", function (done) {
this.timeout(10000)
var pageSize = '5'
var stopCounterIndex = parseInt(pageSize) + 1
var counter = 0
var current_doc
var elasticsearch_client = new Client({ node: 'http://localhost:9200' })
let pageSize = '5'
let stopCounterIndex = parseInt(pageSize) + 1
let counter = 0
let current_doc
let elasticsearch_client = new Client({ node: 'http://localhost:9200' })
var es_stream = new ElasticsearchScrollStream(
let es_stream = new ElasticsearchScrollStream(
elasticsearch_client,

@@ -379,3 +377,3 @@ {

es_stream.on('data', function(data) {
es_stream.on('data', function (data) {
current_doc = JSON.parse(data.toString())

@@ -388,3 +386,3 @@ if (counter == stopCounterIndex) {

es_stream.on('end', function() {
es_stream.on('end', function () {
expect(counter).to.equal(parseInt(pageSize) * 2)

@@ -394,3 +392,3 @@ done()

es_stream.on('error', function(err) {
es_stream.on('error', function (err) {
done(err)

@@ -397,0 +395,0 @@ })

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc