node-red-contrib-influxdb
Advanced tools
Comparing version 0.5.4 to 0.6.0
168
influxdb.js
@@ -44,3 +44,2 @@ var _ = require('lodash'); | ||
} | ||
this.client = new Influx.InfluxDB({ | ||
@@ -57,21 +56,15 @@ hosts: [{ | ||
}); | ||
} else if (n.influxdbVersion === VERSION_18_FLUX) { | ||
var token = `${this.credentials.username}:${this.credentials.password}`; | ||
clientOptions = { | ||
url: n.url, | ||
rejectUnauthorized: n.rejectUnauthorized, | ||
token: token | ||
} | ||
} else if (n.influxdbVersion === VERSION_18_FLUX || n.influxdbVersion === VERSION_20) { | ||
this.client = new InfluxDB(clientOptions); | ||
} else if (n.influxdbVersion === VERSION_20) { | ||
const token = n.influxdbVersion === VERSION_18_FLUX ? | ||
`${this.credentials.username}:${this.credentials.password}` : | ||
this.credentials.token; | ||
clientOptions = { | ||
url: n.url, | ||
rejectUnauthorized: n.rejectUnauthorized, | ||
token: this.credentials.token | ||
token | ||
} | ||
this.client = new InfluxDB(clientOptions); | ||
} | ||
this.influxdbVersion = n.influxdbVersion; | ||
@@ -92,4 +85,12 @@ } | ||
function setFieldIntegers(fields) { | ||
for (const prop in fields) { | ||
const value = fields[prop]; | ||
if (isIntegerString(value)) { | ||
fields[prop] = parseInt(value.substring(0,value.length-1)); | ||
} | ||
} | ||
} | ||
function addFieldToPoint(point, name, value) { | ||
if (name === 'time') { | ||
@@ -121,3 +122,2 @@ point.timestamp(value); | ||
function writePoints(msg, node, done) { | ||
node.client.closed == false ? null : node.client.closed = false; | ||
var measurement = msg.hasOwnProperty('measurement') ? msg.measurement : node.measurement; | ||
@@ -129,10 +129,8 @@ if (!measurement) { | ||
if (_.isArray(msg.payload) && msg.payload.length > 0) { | ||
// array of arrays | ||
// array of arrays: multiple points with fields and tags | ||
if (_.isArray(msg.payload[0]) && msg.payload[0].length > 0) { | ||
msg.payload.forEach(element => { | ||
let point = new Point(measurement); | ||
let fields = element[0]; | ||
addFieldsToPoint(point, fields); | ||
let tags = element[1]; | ||
@@ -145,8 +143,6 @@ for (const prop in tags) { | ||
} else { | ||
// array of non-arrays, assume one point with both fields and tags | ||
// array of non-arrays: one point with both fields and tags | ||
let point = new Point(measurement); | ||
let fields = msg.payload[0]; | ||
addFieldsToPoint(point, fields); | ||
const tags = msg.payload[1]; | ||
@@ -156,6 +152,6 @@ for (const prop in tags) { | ||
} | ||
node.client.writePoint(point) | ||
} | ||
} else { | ||
// single object: fields only | ||
if (_.isPlainObject(msg.payload)) { | ||
@@ -175,9 +171,5 @@ let point = new Point(measurement); | ||
// actual write happens here | ||
node.client | ||
.close() | ||
.then(() => { | ||
node.client.flush(true).then(() => { | ||
done(); | ||
}) | ||
.catch(error => { | ||
}).catch(error => { | ||
msg.influx_error = { | ||
@@ -197,3 +189,3 @@ errorMessage: error | ||
/** | ||
* Output node to write to an influxdb measurement | ||
* Output node to write to a single influxdb measurement | ||
*/ | ||
@@ -204,5 +196,7 @@ function InfluxOutNode(n) { | ||
this.influxdb = n.influxdb; | ||
this.influxdbConfig = RED.nodes.getNode(this.influxdb); | ||
this.precision = n.precision; | ||
this.retentionPolicy = n.retentionPolicy; | ||
this.influxdbConfig = RED.nodes.getNode(this.influxdb); | ||
// 1.8 and 2.0 only | ||
this.database = n.database; | ||
@@ -214,11 +208,2 @@ this.precisionV18FluxV20 = n.precisionV18FluxV20; | ||
function setFieldIntegers(fields) { | ||
for (const prop in fields) { | ||
const value = fields[prop]; | ||
if (isIntegerString(value)) { | ||
fields[prop] = parseInt(value.substring(0,value.length-1)); | ||
} | ||
} | ||
} | ||
if (!this.influxdbConfig) { | ||
@@ -230,4 +215,5 @@ this.error(RED._("influxdb.errors.missingconfig")); | ||
var node = this; | ||
if (version === VERSION_1X) { | ||
var node = this; | ||
var client = this.influxdbConfig.client; | ||
@@ -290,2 +276,3 @@ | ||
} else { | ||
// fields only | ||
if (_.isPlainObject(msg.payload)) { | ||
@@ -313,4 +300,3 @@ let fields = _.clone(msg.payload) | ||
client.writePoints(points, writeOptions) | ||
.then(() => { | ||
client.writePoints(points, writeOptions).then(() => { | ||
done(); | ||
@@ -333,3 +319,2 @@ }).catch(function (err) { | ||
this.client = this.influxdbConfig.client.getWriteApi(org, bucket, this.precisionV18FluxV20); | ||
var node = this; | ||
@@ -345,3 +330,3 @@ node.on("input", function (msg, send, done) { | ||
/** | ||
* Output node to write batches of points to influxdb | ||
* Output node to write to multiple InfluxDb measurements | ||
*/ | ||
@@ -351,6 +336,14 @@ function InfluxBatchNode(n) { | ||
this.influxdb = n.influxdb; | ||
this.influxdbConfig = RED.nodes.getNode(this.influxdb); | ||
this.precision = n.precision; | ||
this.retentionPolicy = n.retentionPolicy; | ||
this.influxdbConfig = RED.nodes.getNode(this.influxdb); | ||
// 1.8 and 2.0 | ||
this.database = n.database; | ||
this.precisionV18FluxV20 = n.precisionV18FluxV20; | ||
this.retentionPolicyV18Flux = n.retentionPolicyV18Flux; | ||
this.org = n.org; | ||
this.bucket = n.bucket; | ||
if (!this.influxdbConfig) { | ||
@@ -360,31 +353,72 @@ this.error(RED._("influxdb.errors.missingconfig")); | ||
} | ||
if (this.influxdbConfig.influxdbVersion !== VERSION_1X) { | ||
this.error(RED._("influxdb.errors.invalidconfig")); | ||
return; | ||
} | ||
let version = this.influxdbConfig.influxdbVersion; | ||
var node = this; | ||
var client = this.influxdbConfig.client; | ||
node.on("input", function (msg, send, done) { | ||
var writeOptions = {}; | ||
var precision = msg.hasOwnProperty('precision') ? msg.precision : node.precision; | ||
var retentionPolicy = msg.hasOwnProperty('retentionPolicy') ? msg.retentionPolicy : node.retentionPolicy; | ||
if (version === VERSION_1X) { | ||
var client = this.influxdbConfig.client; | ||
if (precision) { | ||
writeOptions.precision = precision; | ||
} | ||
node.on("input", function (msg, send, done) { | ||
var writeOptions = {}; | ||
var precision = msg.hasOwnProperty('precision') ? msg.precision : node.precision; | ||
var retentionPolicy = msg.hasOwnProperty('retentionPolicy') ? msg.retentionPolicy : node.retentionPolicy; | ||
if (retentionPolicy) { | ||
writeOptions.retentionPolicy = retentionPolicy; | ||
} | ||
if (precision) { | ||
writeOptions.precision = precision; | ||
} | ||
client.writePoints(msg.payload, writeOptions).then(() => { | ||
done(); | ||
}).catch(function (err) { | ||
msg.influx_error = { | ||
statusCode: err.res ? err.res.statusCode : 503 | ||
if (retentionPolicy) { | ||
writeOptions.retentionPolicy = retentionPolicy; | ||
} | ||
done(err); | ||
client.writePoints(msg.payload, writeOptions).then(() => { | ||
done(); | ||
}).catch(function (err) { | ||
msg.influx_error = { | ||
statusCode: err.res ? err.res.statusCode : 503 | ||
} | ||
done(err); | ||
}); | ||
}); | ||
}); | ||
} else if (version === VERSION_18_FLUX || version === VERSION_20) { | ||
let bucket = node.bucket; | ||
if (version === VERSION_18_FLUX) { | ||
let retentionPolicy = this.retentionPolicyV18Flux ? this.retentionPolicyV18Flux : 'autogen'; | ||
bucket = `${this.database}/${retentionPolicy}`; | ||
} | ||
let org = version === VERSION_18_FLUX ? '' : this.org; | ||
var client = this.influxdbConfig.client.getWriteApi(org, bucket, this.precisionV18FluxV20); | ||
node.on("input", function (msg, send, done) { | ||
msg.payload.forEach(element => { | ||
let point = new Point(element.measurement); | ||
// time is reserved as a field name still! will be overridden by the timestamp below. | ||
addFieldsToPoint(point, element.fields); | ||
let tags = element.tags; | ||
if (tags) { | ||
for (const prop in tags) { | ||
point.tag(prop, tags[prop]); | ||
} | ||
} | ||
if (element.timestamp) { | ||
point.timestamp(element.timestamp); | ||
} | ||
client.writePoint(point); | ||
}); | ||
// ensure we write everything including scheduled retries | ||
client.flush(true).then(() => { | ||
done(); | ||
}).catch(error => { | ||
msg.influx_error = { | ||
errorMessage: error | ||
}; | ||
done(error); | ||
}); | ||
}); | ||
} | ||
} | ||
@@ -391,0 +425,0 @@ |
{ | ||
"name": "node-red-contrib-influxdb", | ||
"version": "0.5.4", | ||
"version": "0.6.0", | ||
"description": "Node-RED nodes to save and query data from an influxdb time series database", | ||
@@ -5,0 +5,0 @@ "main": "influxdb.js", |
@@ -27,3 +27,3 @@ # node-red-contrib-influxdb | ||
### Input Node (InfluxDb 1.x and 2.0) | ||
### Input Node | ||
@@ -51,3 +51,3 @@ Queries one or more measurements in an influxdb database. The query is specified in the node configuration or in the ***msg.query*** property. Setting it in the node will override the ***msg.query***. The result is returned in ***msg.payload***. | ||
### Output Node (InfluxDb 1.x and 2.0) | ||
### Output Node | ||
@@ -141,7 +141,7 @@ Writes one or more points (fields and tags) to a measurement. | ||
### The Batch Output Node (InfluxDb 1.x Only) | ||
### The Batch Output Node | ||
The batch output node (influx batch) sends a list of *points* together in a batch to InfluxDB in a slightly different format from the output node, more in line with the underlying node.js [influx library version 5.x](https://www.npmjs.com/package/influx). In each point you must specify the measurement name to write into as well as a list of tag and field values. Optionally, you can specify the time to tag that point at, defaulting to the current time. | ||
The batch output node (influx batch) sends a list of *points* together in a batch to InfluxDB in a slightly different format from the output node. Using the batch node you must specify the measurement name to write into as well as a list of tag and field values. Optionally, you can specify the timestamp for the point, defaulting to the current time. | ||
Under the hood we are calling the node influxdb 5.x library **writePoints()** call as documented [here](https://node-influx.github.io/class/src/index.js~InfluxDB.html#instance-method-writePoints). | ||
>Note: Javascript numbers are *always* written as a float. As in the output node, when using the 1.8-flux or 2.0 configuration, you can explicitly write an integer using a number in a string with an 'i' suffix, for example, to write the integer `1234` use the string `'1234i'`. This is *not* supported using 1.x configurations; all numbers are written as float values. | ||
@@ -148,0 +148,0 @@ By default the node will write timestamps using ms precision since that's what JavaScript gives us. if you specify the timestamp as a Date object, we'll convert it to milliseconds. |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
80850
519
0