elasticsearch-helper
A Nodejs module facilitating querying Elasticsearch clusters.
table of contents
disclaimer
After experiencing a lot of issues due to the way Elasticsearch handles the queries, I decided to create this helper currently used on production level that had helped us to drastically improve the readability and flexibility of our code.
With this helper you will be able to query your Elasticsearch clusters very easily. Everything is chainable and the query always returns a promise.
Even if we use this on production level, we still find bugs and add improvements to the module codebase. Feel free to fork it and modify it for your own needs.
installation
npm install --save elasticsearch-helper
usage
Add client
const ES = require("elasticsearch-helper")
ES.addClient("127.0.0.1:9200");
ES.addClient("client1","127.0.0.1:9200");
ES.addClient("client1","127.0.0.1:9200",true);
ES.AddClient(...)
NOTE: If host is added without http or https it will assume it is http
Global functions
ES.indexes("[Client name]")
Use client
The client is chainable which means that you can call functions one after the other until you execute the query. The query is then returning a promise.
Initialise a query:
ES.query("Index1");
ES.query("Index*");
ES.query("Index1");
ES.query("Index1").use("Client1")
Error handling
A method can be created to handle errors (like logging or formatting), This error method is part of a Promise and should return something if it needs to keep processing.
Errors are always processed as Promise rejection
ES.onError(err => {
console.log("This message will appear after every error")
return err;
})
ES.query("Index1")
.onError(err => {
console.log("This message will appear after this query has an error")
return err;
})
Events
Listeners can be added for specific events.
onUpserted
This event will trigger every time a data has updated or created on specific indexes.
If the 2nd argument is left empty it will check every index.
ES.onUpserted((indexName, typeName, documentId) => {
}, [])
ES.query("Index1")
.onUpserted((indexName, typeName, documentId) => {
})
onDocumentChanged
This event will trigger every time a data has an effective change on specific indexes.
If the 2nd argument is left empty it will check every index.
This event creates 2 queries (1 for before retreiving the document and 1 after the doc is inserted) to check if the document has an actual change
ES.onDocumentChanged((beforeValue, afterValue) => {
}, [])
ES.query("Index1")
.onDocumentChanged((beforeValue, afterValue) => {
})
Indexes
All index operations are under the index() method to avoid conflicts
All methods return a promise.
We implemented some helpers based on what we were using a lot.
New ones will be added over time.
mappings
Return mappings for specific index(es)
ES.query("Index1")
.index()
.mappings()
ES.query("Index1")
.mappings()
copyTo
Easily copy an index/type to another client/index/type using bulk inserts.
NOTE1: you can copy based on a query, check below to see how to do queries.
NOTE2: If you want to copy millions of rows remember to set size()
, Elasticsearch-helper will create a scroll.
ES.query("Index1")
.index()
.copyTo(ES.query("Index2"));
ES.query("Index1")
.index()
.copyTo(ES.query("Index2").use("client2"));
ES.query("Index1")
.index()
.copyTo(ES.query("Index2"));
ES.query("Index1")
.index()
.copyTo(ES.query("Index2"));
ES.query("Index1")
.must(
ES.type.term("first_name","Josh"),
)
.index()
.copyTo(ES.query("Index2"));
ES.query("Index1")
.copyTo(ES.query("Index2"));
delete
Delete an index
WARNING: This operation is final and cannot be reverted unless you have a snapshot, use at you own risk.
For security reason you cannot delete multiple indexes at the same time.
ES.query("Index1")
.index()
.delete();
ES.query("Index1")
.use("client2")
.index()
.delete();
ES.query("Index1")
.deleteIndex();
exists
Check if an index exists.
ES.query("Index1")
.index()
.exists();
ES.query("Index1")
.use("client2")
.index()
.exists();
ES.query("Index1")
.exists();
touch
Create an empty index without any mappings
ES.query("Index1")
.index()
.touch();
ES.query("Index1")
.use("client2")
.index()
.touch();
storeDocumentHistory
This will automatically store a version of a document into an index.
The document body stored is stringified
ES.storeDocumentHistory([], historicalIndexName)
Documents
Doing query:
For those example we will use the query variable 'q':
const q = ES.query("Index1");
Single Document
Retrieve
Retrieve a document with id 'ID'. Returns false if not found.
q.id("ID")
.run()
.then(hit => {
console.log(hit.id())
console.log(hit.index())
console.log(hit.type())
console.log(hit.data())
console.log(hit.source())
})
Delete
Delete document. Cannot be reversed.
q.id("ID")
.delete()
.then(success => {
})
Create/Overwrite
Overwrite the document if 'ID' exists. Create a new document otherwise.
q.id("ID")
.body({...})
.run()
.then(hit => {
})
Update
Update the document if it exists. Will return an error otherwise.
q.id("ID")
.update({...})
.run()
.then(hit => {
})
Upsert
Update the document if it exists. Will create a new document with 'ID' other wise.
q.id("ID")
.upsert({...})
.run()
.then(hit => {
})
Multiple Documents
Types & search options
This helper includes the different search features of Elasticsearch such as must
, must_not
etc.
GETs and DELETEs are using the same methodology for querying building. Example:
q.must(
ES.type.term("fieldname","fieldvalue"),
ES.filter.should(
ES.type.terms("fieldname2","fieldvalues")
)
)
Filter types
ES.filter.must();
ES.filter.must_not();
ES.filter.should();
ES.filter.filter();
Search types
NOTE: not all types are currently implemented. Others will be added over time.
ES.type.term("fieldkey","fieldvalue");
ES.type.term("name.first_name","josh");
ES.type.terms("fieldkey","fieldvalues as array");
ES.type.terms("name.first_name",["josh","alan","jack"]);
ES.type.exists("fieldkey");
ES.type.exists("name.first_name");
ES.type.range("fieldkey","range object options");
ES.type.range("age",{
gte: 10,
lte: 30
});
ES.type.wildcard("fieldkey","fieldvalue");
ES.type.wildcard("name.first_name","josh*");
ES.type.prefix("fieldkey","fieldvalue");
ES.type.prefix("name.first_name","josh");
More info: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html
ES.type.query_string("text to search" [,"option object"]);
ES.type.query_string("*jabba*",{
"fields": [ "field1" ],
"analyze_wildcard": true
});
Nested is an advanced feature of Elasticsearch allowing to do queries on sub-documents such as an array of objects. This type require that a specific mapping being setup. For more information: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-nested-query.html
In nested query you always define the parent and the filters always prepend the parent name. All filters are available.
This type can be combined with other types at any level and/or create sub nested queries.
ES.type.nested("parent","filter object");
ES.type.nested("name",ES.filter.must(
ES.type.term("name.first", "josh"),
ES.type.term("name.last", "wake")
));
Geo distance is an advanced feature that require a specific mapping in your index. For more information:
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-geo-distance-query.html
Geo distance requires a few parameters:
ES.type.geo("fieldkey","origin latlon","distance"[,"calculation"]);
ES.type.geo("location.geo",{ "lat": 48,"lon": 2 },"120km"[,"arc"])
Retrieve
q.must(
).run()
.then(hits => {
const total = hits.total
const length = hits.length
const hit = hits[0];
console.log(hit.id())
console.log(hit.index())
console.log(hit.type())
console.log(hit.data())
console.log(hit.source())
})
Delete
Delete by query is only available on Elasticsearch 5.X and over
q.must(
).delete()
.then(success => {
})
Count
Count the documents
q.must(
).count()
.then(count => {
})
Update by Query
Create query that will modify documents with the same logic. If you need different values per document please refer to Bulk
NOTE: This is an advanced feature that can modify many documents at the same time. Make sure your query is selecting the right documents before executing it.
q.must(
)
.updateByQuery("ctx._source.someProperty=\"propertyvalue\"")
.run()
.then(numberOfDocsUpdated => {
})
Bulk
You can do bulk processes across documents using the bulkOperation method
bulkOperation operation can be one of those: index
, update
, delete
, create
q.bulk(
ES.bulkOperation("update", "documentId1", {
property1: "property1Value"
}),
ES.bulkOperation("update", "documentId2", {
property2: "property2Value"
}),
ES.bulkOperation("delete", "documentId3"),
ES.bulkOperation(operation, documentId, [data]),
)
.then(response => {
})
aggregations [BETA]
Elasticsearch has a very powerful aggregation system but the way to handle it can be tricky. I tried to solve this issue by wrapping it in what I think is the simplest way.
q.aggs(
ES.agg.date_histogram("created_date")("date_created","1d")
.aggs(
ES.agg.terms("first_name")("data.first_name")
)
)
.run()
.then(response => {
const arrayAggList = response.agg("created_date")
const arrayValues = arrayAggList.values()
const firstValue = arrayValues[0];
const valueID = firstValue.id();
const valueData = firstValue.data();
const arrayChildAggList = arrayAggList.agg("first_name");
for(let parentKeyvalue in arrayChildAggList){
arrayChildAggList[parentKeyvalue].values().forEach(value => {
console.log(parentKeyvalue, value.id(),value.data());
})
}
})
Aggregation types
ES.agg.terms("aggregation name")("field to aggregate on"[,"options object"])
interval: string using a time unit
ES.agg.date_histogram("aggregation name")("field to aggregate on","interval")
ES.agg.average("aggregation name")("field to aggregate on")
NOTE: Aggregations below do not support sub aggregations. Error will be thrown.
ES.agg.cardinality("aggregation name")("field to aggregate on")
ES.agg.extended_stats("aggregation name")("field to aggregate on")
ES.agg.maximum("aggregation name")("field to aggregate on")
ES.agg.minimum("aggregation name")("field to aggregate on")
ES.agg.sum("aggregation name")("field to aggregate on")
ES.agg.value_count("aggregation name")("field to aggregate on")
Other options
q.size(1000)
q.from(10)
q.fields(["name","id"])
Documentation
q.sort([{ "post_date" : {"order" : "asc"}}, ...])
Examples
Query
const ES = require("elasticsearch-helper")
ES.AddClient("client1","127.0.0.1:9200");
ES.query("Index1","Type1")
.use("client1")
.size(10)
.must(
ES.type.term("name","John"),
ES.type.terms("lastname",["Smith","Wake"])
)
.must_not(
ES.type.range("age",{
lte:20,
gte:30
})
)
.run()
.then(hits => {
})
Query with aggregation
const ES = require("elasticsearch-helper")
ES.AddClient("client1","127.0.0.1:9200");
ES.Query("user")
.size(1001)
.must(
ES.type.term("name","jacques"),
ES.type.range("age",{gt:20,lte:40}),
ES.filter.should(
ES.type.term("color","blue"),
ES.type.term("vehicle","car")
)
)
.aggs(
ES.agg.date_histogram("created_date")("date_created","1d")
.aggs(
ES.agg.terms("first_name")("data.first_name.raw")
)
)
.run()
.then(response => {
const arrayAggList = response.agg("created_date")
const arrayValues = arrayAggList.values()
const firstValue = arrayValues[0];
const valueID = firstValue.id();
const valueData = firstValue.data();
const arrayChildAggList = arrayAggList.agg("first_name");
for(let parentKeyvalue in arrayChildAggList){
arrayChildAggList[parentKeyvalue].values().forEach(value => {
console.log(parentKeyvalue, value.id(),value.data());
})
}
}).catch(err => {
console.log(err)
})