happner-elastic-dataprovider
Advanced tools
Comparing version 2.1.1-issue-3-0.0.0 to 2.1.1-performance-bulk-2.1.2
287
index.js
@@ -38,2 +38,8 @@ var mongoToElastic = require('./lib/mongo-to-elastic') | ||
ElasticProvider.prototype.UPSERT_TYPE = { | ||
insert:0, | ||
upsert:1, | ||
bulk:2 | ||
}; | ||
/* initialize and stop */ | ||
@@ -84,3 +90,3 @@ { | ||
/* upsert, update and remove */ | ||
/* upsert, insert, update and remove */ | ||
{ | ||
@@ -92,2 +98,4 @@ | ||
//[start:{"key":"upsert", "self":"_this"}:start] | ||
var modifiedOn = Date.now(); | ||
@@ -97,70 +105,143 @@ | ||
if (!options) options = {}; | ||
if (options.refresh == null) options.refresh = true; //slow but reliable | ||
if (options.upsertType == null) options.upsertType = _this.UPSERT_TYPE.upsert; | ||
if (options.retries == null) options.retries = 20; | ||
this.__getRoute(path, setData.data, "upsert", function (e, route, routeData) { | ||
//[end:{"key":"upsert", "self":"_this", "error":"e"}:end] | ||
if (e) return callback(e); | ||
setData.data = routeData; | ||
try{ | ||
var index = route.index; | ||
setData.data = routeData; | ||
var elasticMessage = { | ||
"index": index, | ||
"type": route.type, | ||
id: path, | ||
body: { | ||
doc: { | ||
modified: modifiedOn, | ||
timestamp: timestamp, | ||
path: path, | ||
data: setData.data | ||
}, | ||
upsert: { | ||
created: modifiedOn, | ||
modified: modifiedOn, | ||
timestamp: timestamp, | ||
path: path, | ||
data: setData.data | ||
} | ||
}, | ||
_source: true, | ||
refresh: true, | ||
retry_on_conflict: 20 | ||
}; | ||
var index = route.index; | ||
if (options.modifiedBy) { | ||
if (options.upsertType == _this.UPSERT_TYPE.insert) | ||
elasticMessage.body.upsert.modifiedBy = options.modifiedBy; | ||
elasticMessage.body.doc.modifiedBy = options.modifiedBy; | ||
elasticMessage.body.upsert.createdBy = options.modifiedBy; | ||
} | ||
return _this.__insert(path, setData, options, index, route, timestamp, modifiedOn, callback); | ||
if (setData._tag) { | ||
elasticMessage.body.doc._tag = setData._tag; | ||
elasticMessage.body.upsert._tag = setData._tag; | ||
if (options.upsertType == _this.UPSERT_TYPE.bulk) | ||
return _this.__bulk(path, setData, options, index, route, timestamp, modifiedOn, callback); | ||
_this.__update(path, setData, options, index, route, timestamp, modifiedOn, callback); | ||
}catch(e){ | ||
callback(e); | ||
} | ||
}); | ||
}; | ||
if (!options) options = {}; | ||
ElasticProvider.prototype.__bulk = function (path, setData, options, index, route, timestamp, modifiedOn, callback) { | ||
callback(new Error('bulk not implemented yet')); | ||
}; | ||
options.upsert = true; | ||
ElasticProvider.prototype.__insert = function (path, setData, options, index, route, timestamp, modifiedOn, callback) { | ||
_this.db.update(elasticMessage, function (e, response) { | ||
var _this = this; | ||
if (e) return callback(e); | ||
//[start:{"key":"__insert", "self":"_this"}:start] | ||
var data = response.get._source; | ||
var createObj = { | ||
modified: timestamp, | ||
timestamp: timestamp, | ||
path: path, | ||
data: setData.data, | ||
created: timestamp | ||
}; | ||
var created = null; | ||
var elasticMessage = { | ||
"index": index, | ||
"type": route.type, | ||
id: path, | ||
body: createObj, | ||
refresh: options.refresh | ||
}; | ||
if (response.result == 'created') created = _this.__partialTransform(response.get, index, route.type); | ||
//e, response, created, upsert, meta | ||
if (options.modifiedBy) { | ||
callback(null, data, created, true, _this.__getMeta(response.get._source)); | ||
}); | ||
elasticMessage.body.modifiedBy = options.modifiedBy; | ||
elasticMessage.body.createdBy = options.modifiedBy; | ||
} | ||
if (setData._tag) elasticMessage.body._tag = setData._tag; | ||
_this.db.create(elasticMessage, function (e, response) { | ||
//[end:{"key":"__insert", "self":"_this", "error":"e"}:end] | ||
if (e) return callback(e); | ||
//e, response, created, upsert, meta | ||
callback(null, createObj, _this.__partialInsertTransform(createObj, response), true, _this.__getMeta(createObj)); | ||
}); | ||
}; | ||
ElasticProvider.prototype.update = function (criteria, data, options, callback) { | ||
ElasticProvider.prototype.__update = function (path, setData, options, index, route, timestamp, modifiedOn, callback) { | ||
return this.db.update(criteria, data, options, callback); | ||
var _this = this; | ||
//[start:{"key":"__update", "self":"_this"}:start] | ||
var elasticMessage = { | ||
"index": index, | ||
"type": route.type, | ||
id: path, | ||
body: { | ||
doc: { | ||
modified: modifiedOn, | ||
timestamp: timestamp, | ||
path: path, | ||
data: setData.data | ||
}, | ||
upsert: { | ||
created: modifiedOn, | ||
modified: modifiedOn, | ||
timestamp: timestamp, | ||
path: path, | ||
data: setData.data | ||
} | ||
}, | ||
_source: true, | ||
refresh: options.refresh, | ||
retry_on_conflict: options.retries | ||
}; | ||
if (options.modifiedBy) { | ||
elasticMessage.body.upsert.modifiedBy = options.modifiedBy; | ||
elasticMessage.body.doc.modifiedBy = options.modifiedBy; | ||
elasticMessage.body.upsert.createdBy = options.modifiedBy; | ||
} | ||
if (setData._tag) { | ||
elasticMessage.body.doc._tag = setData._tag; | ||
elasticMessage.body.upsert._tag = setData._tag; | ||
} | ||
_this.db.update(elasticMessage, function (e, response) { | ||
//[end:{"key":"__update", "self":"_this", "error":"e"}:end] | ||
if (e) return callback(e); | ||
var data = response.get._source; | ||
var created = null; | ||
if (response.result == 'created') created = _this.__partialTransform(response.get, index, route.type); | ||
callback(null, data, created, true, _this.__getMeta(response.get._source)); | ||
}); | ||
}; | ||
@@ -172,2 +253,4 @@ | ||
//[start:{"key":"remove", "self":"_this"}:start] | ||
var multiple = path.indexOf('*') > -1; | ||
@@ -183,2 +266,4 @@ | ||
//[end:{"key":"remove", "self":"_this", "error":"e"}:end] | ||
if (e) return callback(e); | ||
@@ -244,2 +329,4 @@ | ||
//[start:{"key":"find", "self":"_this"}:start] | ||
_this.__getRoute(searchPath, null, 'find', function (e, route) { | ||
@@ -249,3 +336,6 @@ | ||
if (route.noIndexYet) return callback(null, []); | ||
if (route.noIndexYet) { | ||
//[end:{"key":"find", "self":"_this"}:end] | ||
return callback(null, []); | ||
} | ||
@@ -293,2 +383,4 @@ var elasticMessage = { | ||
//[end:{"key":"find", "self":"_this"}:end] | ||
if (resp.hits && resp.hits.hits && resp.hits.hits.length > 0) { | ||
@@ -311,2 +403,6 @@ | ||
var _this = this; | ||
//[start:{"key":"findOne", "self":"_this"}:start] | ||
var path = criteria.path; | ||
@@ -316,4 +412,6 @@ | ||
this.find(path, {options: fields, criteria: criteria}, function (e, results) { | ||
_this.find(path, {options: fields, criteria: criteria}, function (e, results) { | ||
//[end:{"key":"findOne", "self":"_this"}:end] | ||
if (e) return callback(e); | ||
@@ -331,2 +429,6 @@ | ||
var _this = this; | ||
//[start:{"key":"count", "self":"_this"}:start] | ||
var countMessage = { | ||
@@ -347,4 +449,6 @@ index: message.index, | ||
this.db.count(countMessage, function (e, response) { | ||
_this.db.count(countMessage, function (e, response) { | ||
//[end:{"key":"count", "self":"_this"}:end] | ||
if (e) return callback(e); | ||
@@ -375,2 +479,3 @@ | ||
_score: dataItem._score, | ||
_version: dataItem._version, | ||
_tag: dataItem._source._tag, | ||
@@ -387,4 +492,24 @@ created: dataItem._source.created, | ||
ElasticProvider.prototype.__partialInsertTransform = function (createdObj, response) { | ||
return { | ||
_id: response._id, | ||
_index: response._index, | ||
_type: response._type, | ||
_version: response._version, | ||
_tag: createdObj._tag, | ||
created: createdObj.created, | ||
deleted: createdObj.deleted, | ||
modified: createdObj.modified, | ||
createdBy: createdObj.createdBy, | ||
modifiedBy: createdObj.modifiedBy, | ||
deletedBy: createdObj.deletedBy, | ||
data: createdObj.data | ||
}; | ||
}; | ||
ElasticProvider.prototype.transform = function (dataObj, meta) { | ||
//[start:{"key":"transform", "self":"this"}:start] | ||
var transformed = {data: dataObj.data}; | ||
@@ -414,2 +539,4 @@ | ||
//[end:{"key":"transform", "self":"this"}:end] | ||
return transformed; | ||
@@ -430,2 +557,4 @@ }; | ||
//[start:{"key":"__parseFields", "self":"this"}:start] | ||
traverse(fields).forEach(function (value) { | ||
@@ -482,2 +611,4 @@ | ||
//[end:{"key":"__parseFields", "self":"this"}:end] | ||
return fields; | ||
@@ -519,2 +650,4 @@ }; | ||
//[start:{"key":"preparePath", "self":"this"}:start] | ||
if (!path) return '*'; | ||
@@ -535,2 +668,4 @@ | ||
//[end:{"key":"preparePath", "self":"this"}:end] | ||
return prepared; | ||
@@ -695,2 +830,4 @@ }; | ||
//[start:{"key":"__getRoute", "self":"_this"}:start] | ||
var route = null; | ||
@@ -714,3 +851,6 @@ | ||
if (!route) return callback(new Error('route for path ' + path + ' does not exist')); | ||
if (!route) { | ||
//[end:{"key":"__getRoute", "self":"_this"}:end] | ||
return callback(new Error('route for path ' + path + ' does not exist')); | ||
} | ||
@@ -721,2 +861,4 @@ if (route.dynamic) | ||
//[end:{"key":"__getRoute", "self":"_this"}:end] | ||
if (e) return callback(e); | ||
@@ -727,2 +869,3 @@ | ||
//[end:{"key":"__getRoute", "self":"_this"}:end] | ||
return callback(null, {index: route.index, type: _this.config.defaultType}, data); | ||
@@ -735,2 +878,4 @@ }; | ||
//[start:{"key":"__createDynamicObject", "self":"this"}:start] | ||
Object.keys(dynamicParts.values).forEach(function (dynamicValueKey) { | ||
@@ -761,2 +906,4 @@ | ||
//[end:{"key":"__createDynamicObject", "self":"this"}:end] | ||
return data; | ||
@@ -766,2 +913,3 @@ | ||
//[end:{"key":"__createDynamicObject", "self":"this", "error":"e"}:end] | ||
throw new Error('failed to generate dynamic object', e); | ||
@@ -775,2 +923,4 @@ } | ||
//[start:{"key":"__createDynamicIndex", "self":"_this"}:start] | ||
if (/[A-Z-+_*$@#$%&!]/.test(dynamicParts.index)) return callback(new Error('bad dynamic index name: ' + dynamicParts.index + ' must be lowercase with no special characterssuch as A-Z-+_*$@#$%&!')); | ||
@@ -789,2 +939,4 @@ | ||
//[end:{"key":"__createDynamicIndex", "self":"_this"}:end] | ||
_this.__createIndex(dynamicParts.index, indexJSON, callback); | ||
@@ -797,8 +949,15 @@ }; | ||
//[start:{"key":"__cacheRoute", "self":"_this"}:start] | ||
var cacheKey = parts.index + '_' + parts.type; | ||
if (_this.routeCache) return _this.routeCache.set(cacheKey, parts, callback); | ||
if (_this.routeCache) { | ||
//[end:{"key":"__cacheRoute", "self":"_this"}:end] | ||
return _this.routeCache.set(cacheKey, parts, callback); | ||
} | ||
_this.__dynamicRoutes[cacheKey] = parts; | ||
//[end:{"key":"__cacheRoute", "self":"_this"}:end] | ||
callback(); | ||
@@ -811,6 +970,13 @@ }; | ||
//[start:{"key":"__getRouteFromCache", "self":"_this"}:start] | ||
var cacheKey = parts.index + '_' + parts.type; | ||
if (_this.routeCache) return _this.routeCache.get(cacheKey, callback); | ||
if (_this.routeCache) { | ||
//[end:{"key":"__getRouteFromCache", "self":"_this"}:end] | ||
return _this.routeCache.get(cacheKey, callback); | ||
} | ||
//[end:{"key":"__getRouteFromCache", "self":"_this"}:end] | ||
callback(null, this.__dynamicRoutes[cacheKey]); | ||
@@ -823,4 +989,8 @@ }; | ||
//[start:{"key":"__routeCreatedAlready", "self":"_this"}:start] | ||
_this.__getRouteFromCache(parts, function (e, route) { | ||
//[end:{"key":"__routeCreatedAlready", "self":"_this"}:end] | ||
if (e) return callback(e); | ||
@@ -839,2 +1009,4 @@ | ||
//[start:{"key":"__prepareDynamicIndex", "self":"_this"}:start] | ||
var indexSegments = dataStoreRoute.pattern.split('/'); | ||
@@ -853,2 +1025,3 @@ | ||
var fieldSegment = segment.replace("{{", "").replace("}}", ""); | ||
if (fieldSegment == 'index') { | ||
@@ -882,2 +1055,4 @@ | ||
//[end:{"key":"__prepareDynamicIndex", "self":"_this"}:end] | ||
if (e) return callback(e); | ||
@@ -908,3 +1083,2 @@ | ||
ElasticProvider.prototype.__createIndex = function (index, indexConfig, callback) { | ||
@@ -914,2 +1088,4 @@ | ||
//[start:{"key":"__prepareDynamicIndex", "self":"_this"}:start] | ||
_this.db.indices.create(indexConfig, function (e, response) { | ||
@@ -919,2 +1095,3 @@ | ||
//[end:{"key":"__prepareDynamicIndex", "self":"_this"}:end] | ||
return callback(new Error('failed creating index ' + index + ':' + e.toString(), e)); | ||
@@ -931,2 +1108,4 @@ } | ||
//[start:{"key":"__buildIndexObj", "self":"_this"}:start] | ||
if (indexConfig.index == null) indexConfig.index = _this.config.defaultIndex; | ||
@@ -969,2 +1148,4 @@ | ||
//[end:{"key":"__buildIndexObj", "self":"_this"}:end] | ||
return indexJSON; | ||
@@ -971,0 +1152,0 @@ }; |
{ | ||
"name": "happner-elastic-dataprovider", | ||
"version": "2.1.1-issue-3-0.0.0", | ||
"version": "2.1.1-performance-bulk-2.1.2", | ||
"scripts": { | ||
"test": "mocha test" | ||
"test": "mocha test", | ||
"test-perf": "mocha test/perf.js" | ||
}, | ||
@@ -14,2 +15,3 @@ "devDependencies": { | ||
"happner-2": "*", | ||
"happner-profane": "^1.0.0", | ||
"json-from-schema": "1.10.0", | ||
@@ -16,0 +18,0 @@ "mocha": "*", |
Sorry, the diff of this file is not supported yet
121406
16
3061
11