bunyan-influxdb
Advanced tools
Comparing version 0.5.4 to 0.5.5
@@ -13,2 +13,3 @@ 'use strict'; | ||
this.options = options || {}; | ||
this.clients = {}; | ||
this.client = new Influx.InfluxDB(options.influx); | ||
@@ -31,27 +32,33 @@ | ||
} | ||
var transformed = this.options.transform(chunk); | ||
if (!transformed || transformed.length < 3) { | ||
return; | ||
var list = this.options.transform(chunk); | ||
if (list && list.length > 0 && list[0].splice) { | ||
} else { | ||
list = [list]; | ||
} | ||
var that = this; | ||
list.map(function (transformed) { | ||
if (!transformed || transformed.length < 3) { | ||
return; | ||
} | ||
if (typeof transformed[1] !== 'object') { | ||
transformed[1] = {value: transformed[1]} | ||
} | ||
if (typeof transformed[1] !== 'object') { | ||
transformed[1] = {value: transformed[1]} | ||
} | ||
var cargoItem = { | ||
measurement: transformed[0], | ||
fields: transformed[1], | ||
tags: transformed[2], | ||
db: transformed[2].appId || this.options.influx.database | ||
}; | ||
var cargoItem = { | ||
measurement: transformed[0], | ||
fields: transformed[1], | ||
tags: transformed[2], | ||
db: transformed[2].appId || that.options.influx.database | ||
}; | ||
if(transformed[3]) { | ||
cargoItem.timestamp = transformed[3]; | ||
} | ||
delete transformed[2].appId; | ||
if(transformed[3]) { | ||
cargoItem.timestamp = transformed[3]; | ||
} | ||
delete transformed[2].appId; | ||
this.cargo.push(cargoItem, function (err) { | ||
err && console.error('bunyan-influxdb ', err.message); | ||
that.cargo.push(cargoItem, function (err) { | ||
err && console.error('bunyan-influxdb ', err.message); | ||
}); | ||
}); | ||
}; | ||
@@ -61,4 +68,4 @@ | ||
InfluxDBStream.prototype.processCargo = function (tasks, callback) { | ||
var self = this; | ||
var dbs = {}; | ||
tasks.map(function (t) { | ||
@@ -75,27 +82,42 @@ var db_key = 'nodb'; | ||
Object.keys(dbs).map(function(db){ | ||
async.retry( | ||
{ | ||
times: self.options.tries || 1, | ||
interval: self.options.tryInterval || 5000 | ||
}, | ||
this.writePointsWithRetry(db, dbs[db], callback); | ||
}.bind(this)); | ||
}; | ||
function (clbk) { | ||
var options = {}; | ||
if (db !== 'nodb') { | ||
options.database = db; | ||
} | ||
self.client.writePoints(dbs[db], options) | ||
.then(clbk) | ||
.catch(function (err) { | ||
// err && console.error("Influxdb write error ", err.message); | ||
clbk(err); | ||
}) | ||
}, | ||
InfluxDBStream.prototype.writePointsWithRetry = function (db, points, callback) { | ||
async.retry( | ||
{ | ||
times: this.options.tries || 1, | ||
interval: this.options.tryInterval || 5000 | ||
}, | ||
callback | ||
); | ||
}); | ||
function (clbk) { | ||
var options = {}; | ||
if (db !== 'nodb') { | ||
options.database = db; | ||
} | ||
this | ||
.getClient(db) | ||
.writePoints(points, options) | ||
.then(clbk) | ||
.catch(clbk) | ||
}.bind(this), | ||
callback | ||
); | ||
}; | ||
InfluxDBStream.prototype.getClient = function (db) { | ||
// if (!db || db === 'nodb') { | ||
// return this.client; | ||
// } | ||
// if (!this.clients[db]) { | ||
// this.clients[db] = new Influx.InfluxDB(this.options.influx); | ||
// } | ||
// return this.clients[db]; | ||
return this.client; | ||
}; | ||
module.exports = function (options) { | ||
@@ -102,0 +124,0 @@ if (!options) { |
{ | ||
"name": "bunyan-influxdb", | ||
"version": "0.5.4", | ||
"version": "0.5.5", | ||
"description": "Bunyan InfluxDB Stream", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
4411
102