elasticsearch-loop
Advanced tools
Comparing version 0.1.0 to 0.2.0
163
index.js
@@ -1,114 +0,105 @@ | ||
var elasticsearch = require('elasticsearch'); | ||
var connection; | ||
var total = 0; | ||
var dataAmount = 0; | ||
var lastDataAmount = 0; | ||
var duplicateCount = 0; | ||
var firstLoop = true; | ||
var debugMode = false; | ||
var elasticsearchLoop = { | ||
var elasticsearch = require('elasticsearch') | ||
connect: (connectionDetail) => { | ||
connection = new elasticsearch.Client(connectionDetail); | ||
}, | ||
class elasticsearchLoop { | ||
debugMode: () => { | ||
debugMode = true; | ||
}, | ||
constructor() { | ||
printText: (text, debugLog) => { | ||
if (debugLog == true && debugMode == true) { | ||
console.log(text); | ||
} | ||
this.initConstant() | ||
} | ||
if (!debugLog) { | ||
console.log(text); | ||
} | ||
initConstant() { | ||
this.connection | ||
this.total = 0 | ||
this.dataAmount = 0 | ||
this.lastDataAmount = 0 | ||
this.queryDsl = 0 | ||
this.duplicateCount = 0 | ||
this.firstLoop = true | ||
this.debugMode = false | ||
}, | ||
this.loopCallback = null | ||
this.successCallback = null | ||
this.errorCallback = null | ||
} | ||
nullFunction: () => { | ||
connect(connectionDetail) { | ||
this.initConstant() | ||
this.connection = new elasticsearch.Client(connectionDetail) | ||
} | ||
}, | ||
enableDebugMode() { | ||
this.debugMode = true | ||
} | ||
query: (query, loopCallback, successCallback, errorCallback, errorEndCallback) => { | ||
if (!connection) { | ||
throw Error('Elasticsearch Connection is not defind.'); | ||
printText(text, debugLog) { | ||
if (debugLog == true && this.debugMode == true) { | ||
console.log(text) | ||
} | ||
if (!query) { | ||
throw Error('Elasticsearch Query is not defind.'); | ||
if (!debugLog) { | ||
console.log(text) | ||
} | ||
if (!loopCallback) { | ||
loopCallback = elasticsearchLoop.printText; | ||
} | ||
} | ||
if (!successCallback) { | ||
successCallback = elasticsearchLoop.nullFunction | ||
} | ||
nullFunction() { | ||
if (!errorCallback) { | ||
successCallback = elasticsearchLoop.printText; | ||
} | ||
query(queryDsl, loopCallback, successCallback, errorCallback) { | ||
if (!this.connection) { | ||
throw Error('Elasticsearch Connection is not defind.') | ||
} | ||
if (!errorEndCallback) { | ||
errorEndCallback = elasticsearchLoop.nullFunction | ||
if (!queryDsl) { | ||
throw Error('Elasticsearch Query is not defind.') | ||
} | ||
elasticsearchLoop.getMessageLoop(query, loopCallback, successCallback, errorCallback, errorEndCallback); | ||
}, | ||
this.loopCallback = loopCallback || this.printText | ||
getMessageLoop: (query, loopCallback, successCallback, errorCallback, errorEndCallback,maxRetire) => { | ||
query.size = 100; | ||
query.scroll = '60m'; | ||
query.search_type = "scan" | ||
this.successCallback = successCallback || this.nullFunction | ||
if(maxRetire){ | ||
maxRetire = 1000; | ||
} | ||
this.errorCallback = errorCallback || this.printText | ||
this.queryDsl = queryDsl | ||
this.getMessageLoop() | ||
} | ||
elasticsearchLoop.printText("[Elasticsearch] Loop Start.", true); | ||
connection.search(query, function getMoreUntilDone(error, response) { | ||
getMessageLoop() { | ||
this.queryDsl.from = 0 | ||
this.queryDsl.size = 300 | ||
if (duplicateCount > maxRetire) { | ||
elasticsearchLoop.printText("[Elasticsearch] Loop Ended With Error.", true); | ||
errorEndCallback(); | ||
} | ||
this.printText("[Elasticsearch] Loop Start.", true) | ||
this.connection.search(this.queryDsl, this.getMoreUntilDone.bind(this)) | ||
} | ||
if (!error) { | ||
total = response.hits.total; | ||
scroll_id = response._scroll_id; | ||
if (!firstLoop) { | ||
elasticsearchLoop.printText("[Elasticsearch] Fetched " + dataAmount.toLocaleString() + " From " + total.toLocaleString(), true); | ||
} else { | ||
elasticsearchLoop.printText("[Elasticsearch] Total " + total.toLocaleString() + " Records", true); | ||
firstLoop = false; | ||
} | ||
response.hits.hits.forEach(function(hit) { | ||
loopCallback(hit); | ||
dataAmount++; | ||
}); | ||
} else { | ||
errorCallback(error) | ||
} | ||
getMoreUntilDone(error, response) { | ||
if (!error) { | ||
if (lastDataAmount == dataAmount) { | ||
duplicateCount++; | ||
this.total = response.hits.total | ||
if (!this.firstLoop) { | ||
this.printText("[Elasticsearch] Fetched " + this.dataAmount.toLocaleString() + " From " + this.total.toLocaleString(), true) | ||
} else { | ||
duplicateCount = 0; | ||
this.printText("[Elasticsearch] Total " + this.total.toLocaleString() + " Records", true) | ||
this.firstLoop = false | ||
} | ||
lastDataAmount = dataAmount; | ||
response.hits.hits.forEach((hit) => { | ||
this.loopCallback(hit) | ||
this.dataAmount++ | ||
}) | ||
if (total !== dataAmount) { | ||
connection.scroll({ | ||
scrollId: scroll_id, | ||
scroll: '60m' | ||
}, getMoreUntilDone); | ||
} else { | ||
elasticsearchLoop.printText("[Elasticsearch] Loop Ended.", true); | ||
successCallback(); | ||
} | ||
}); | ||
} else { | ||
this.errorCallback(error) | ||
} | ||
this.lastDataAmount = this.dataAmount | ||
if (this.total !== this.dataAmount) { | ||
this.queryDsl.from = this.queryDsl.from + this.queryDsl.size | ||
this.connection.search(this.queryDsl, this.getMoreUntilDone.bind(this)) | ||
} else { | ||
this.printText("[Elasticsearch] Loop Ended.", true) | ||
this.successCallback() | ||
} | ||
} | ||
@@ -118,2 +109,2 @@ | ||
module.exports = elasticsearchLoop; | ||
module.exports = new elasticsearchLoop() |
{ | ||
"name": "elasticsearch-loop", | ||
"version": "0.1.0", | ||
"version": "0.2.0", | ||
"description": "", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -33,4 +33,2 @@ # Elasticsearch loop | ||
//What do you want to do when query is error. | ||
},function(data){ | ||
//What do you want to do when your loop is finish with error. | ||
}); | ||
@@ -37,0 +35,0 @@ |
4290
3
81
40