Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

node-red-contrib-influxdb

Package Overview
Dependencies
Maintainers
2
Versions
23
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

node-red-contrib-influxdb - npm Package Compare versions

Comparing version 0.5.4 to 0.6.0

168

influxdb.js

@@ -44,3 +44,2 @@ var _ = require('lodash');

}
this.client = new Influx.InfluxDB({

@@ -57,21 +56,15 @@ hosts: [{

});
} else if (n.influxdbVersion === VERSION_18_FLUX) {
var token = `${this.credentials.username}:${this.credentials.password}`;
clientOptions = {
url: n.url,
rejectUnauthorized: n.rejectUnauthorized,
token: token
}
} else if (n.influxdbVersion === VERSION_18_FLUX || n.influxdbVersion === VERSION_20) {
this.client = new InfluxDB(clientOptions);
} else if (n.influxdbVersion === VERSION_20) {
const token = n.influxdbVersion === VERSION_18_FLUX ?
`${this.credentials.username}:${this.credentials.password}` :
this.credentials.token;
clientOptions = {
url: n.url,
rejectUnauthorized: n.rejectUnauthorized,
token: this.credentials.token
token
}
this.client = new InfluxDB(clientOptions);
}
this.influxdbVersion = n.influxdbVersion;

@@ -92,4 +85,12 @@ }

function setFieldIntegers(fields) {
for (const prop in fields) {
const value = fields[prop];
if (isIntegerString(value)) {
fields[prop] = parseInt(value.substring(0,value.length-1));
}
}
}
function addFieldToPoint(point, name, value) {
if (name === 'time') {

@@ -121,3 +122,2 @@ point.timestamp(value);

function writePoints(msg, node, done) {
node.client.closed == false ? null : node.client.closed = false;
var measurement = msg.hasOwnProperty('measurement') ? msg.measurement : node.measurement;

@@ -129,10 +129,8 @@ if (!measurement) {

if (_.isArray(msg.payload) && msg.payload.length > 0) {
// array of arrays
// array of arrays: multiple points with fields and tags
if (_.isArray(msg.payload[0]) && msg.payload[0].length > 0) {
msg.payload.forEach(element => {
let point = new Point(measurement);
let fields = element[0];
addFieldsToPoint(point, fields);
let tags = element[1];

@@ -145,8 +143,6 @@ for (const prop in tags) {

} else {
// array of non-arrays, assume one point with both fields and tags
// array of non-arrays: one point with both fields and tags
let point = new Point(measurement);
let fields = msg.payload[0];
addFieldsToPoint(point, fields);
const tags = msg.payload[1];

@@ -156,6 +152,6 @@ for (const prop in tags) {

}
node.client.writePoint(point)
}
} else {
// single object: fields only
if (_.isPlainObject(msg.payload)) {

@@ -175,9 +171,5 @@ let point = new Point(measurement);

// actual write happens here
node.client
.close()
.then(() => {
node.client.flush(true).then(() => {
done();
})
.catch(error => {
}).catch(error => {
msg.influx_error = {

@@ -197,3 +189,3 @@ errorMessage: error

/**
* Output node to write to an influxdb measurement
* Output node to write to a single influxdb measurement
*/

@@ -204,5 +196,7 @@ function InfluxOutNode(n) {

this.influxdb = n.influxdb;
this.influxdbConfig = RED.nodes.getNode(this.influxdb);
this.precision = n.precision;
this.retentionPolicy = n.retentionPolicy;
this.influxdbConfig = RED.nodes.getNode(this.influxdb);
// 1.8 and 2.0 only
this.database = n.database;

@@ -214,11 +208,2 @@ this.precisionV18FluxV20 = n.precisionV18FluxV20;

function setFieldIntegers(fields) {
for (const prop in fields) {
const value = fields[prop];
if (isIntegerString(value)) {
fields[prop] = parseInt(value.substring(0,value.length-1));
}
}
}
if (!this.influxdbConfig) {

@@ -230,4 +215,5 @@ this.error(RED._("influxdb.errors.missingconfig"));

var node = this;
if (version === VERSION_1X) {
var node = this;
var client = this.influxdbConfig.client;

@@ -290,2 +276,3 @@

} else {
// fields only
if (_.isPlainObject(msg.payload)) {

@@ -313,4 +300,3 @@ let fields = _.clone(msg.payload)

client.writePoints(points, writeOptions)
.then(() => {
client.writePoints(points, writeOptions).then(() => {
done();

@@ -333,3 +319,2 @@ }).catch(function (err) {

this.client = this.influxdbConfig.client.getWriteApi(org, bucket, this.precisionV18FluxV20);
var node = this;

@@ -345,3 +330,3 @@ node.on("input", function (msg, send, done) {

/**
* Output node to write batches of points to influxdb
* Output node to write to multiple InfluxDb measurements
*/

@@ -351,6 +336,14 @@ function InfluxBatchNode(n) {

this.influxdb = n.influxdb;
this.influxdbConfig = RED.nodes.getNode(this.influxdb);
this.precision = n.precision;
this.retentionPolicy = n.retentionPolicy;
this.influxdbConfig = RED.nodes.getNode(this.influxdb);
// 1.8 and 2.0
this.database = n.database;
this.precisionV18FluxV20 = n.precisionV18FluxV20;
this.retentionPolicyV18Flux = n.retentionPolicyV18Flux;
this.org = n.org;
this.bucket = n.bucket;
if (!this.influxdbConfig) {

@@ -360,31 +353,72 @@ this.error(RED._("influxdb.errors.missingconfig"));

}
if (this.influxdbConfig.influxdbVersion !== VERSION_1X) {
this.error(RED._("influxdb.errors.invalidconfig"));
return;
}
let version = this.influxdbConfig.influxdbVersion;
var node = this;
var client = this.influxdbConfig.client;
node.on("input", function (msg, send, done) {
var writeOptions = {};
var precision = msg.hasOwnProperty('precision') ? msg.precision : node.precision;
var retentionPolicy = msg.hasOwnProperty('retentionPolicy') ? msg.retentionPolicy : node.retentionPolicy;
if (version === VERSION_1X) {
var client = this.influxdbConfig.client;
if (precision) {
writeOptions.precision = precision;
}
node.on("input", function (msg, send, done) {
var writeOptions = {};
var precision = msg.hasOwnProperty('precision') ? msg.precision : node.precision;
var retentionPolicy = msg.hasOwnProperty('retentionPolicy') ? msg.retentionPolicy : node.retentionPolicy;
if (retentionPolicy) {
writeOptions.retentionPolicy = retentionPolicy;
}
if (precision) {
writeOptions.precision = precision;
}
client.writePoints(msg.payload, writeOptions).then(() => {
done();
}).catch(function (err) {
msg.influx_error = {
statusCode: err.res ? err.res.statusCode : 503
if (retentionPolicy) {
writeOptions.retentionPolicy = retentionPolicy;
}
done(err);
client.writePoints(msg.payload, writeOptions).then(() => {
done();
}).catch(function (err) {
msg.influx_error = {
statusCode: err.res ? err.res.statusCode : 503
}
done(err);
});
});
});
} else if (version === VERSION_18_FLUX || version === VERSION_20) {
let bucket = node.bucket;
if (version === VERSION_18_FLUX) {
let retentionPolicy = this.retentionPolicyV18Flux ? this.retentionPolicyV18Flux : 'autogen';
bucket = `${this.database}/${retentionPolicy}`;
}
let org = version === VERSION_18_FLUX ? '' : this.org;
var client = this.influxdbConfig.client.getWriteApi(org, bucket, this.precisionV18FluxV20);
node.on("input", function (msg, send, done) {
msg.payload.forEach(element => {
let point = new Point(element.measurement);
// time is reserved as a field name still! will be overridden by the timestamp below.
addFieldsToPoint(point, element.fields);
let tags = element.tags;
if (tags) {
for (const prop in tags) {
point.tag(prop, tags[prop]);
}
}
if (element.timestamp) {
point.timestamp(element.timestamp);
}
client.writePoint(point);
});
// ensure we write everything including scheduled retries
client.flush(true).then(() => {
done();
}).catch(error => {
msg.influx_error = {
errorMessage: error
};
done(error);
});
});
}
}

@@ -391,0 +425,0 @@

{
"name": "node-red-contrib-influxdb",
"version": "0.5.4",
"version": "0.6.0",
"description": "Node-RED nodes to save and query data from an influxdb time series database",

@@ -5,0 +5,0 @@ "main": "influxdb.js",

@@ -27,3 +27,3 @@ # node-red-contrib-influxdb

### Input Node (InfluxDb 1.x and 2.0)
### Input Node

@@ -51,3 +51,3 @@ Queries one or more measurements in an influxdb database. The query is specified in the node configuration or in the ***msg.query*** property. Setting it in the node will override the ***msg.query***. The result is returned in ***msg.payload***.

### Output Node (InfluxDb 1.x and 2.0)
### Output Node

@@ -141,7 +141,7 @@ Writes one or more points (fields and tags) to a measurement.

### The Batch Output Node (InfluxDb 1.x Only)
### The Batch Output Node
The batch output node (influx batch) sends a list of *points* together in a batch to InfluxDB in a slightly different format from the output node, more in line with the underlying node.js [influx library version 5.x](https://www.npmjs.com/package/influx). In each point you must specify the measurement name to write into as well as a list of tag and field values. Optionally, you can specify the time to tag that point at, defaulting to the current time.
The batch output node (influx batch) sends a list of *points* together in a batch to InfluxDB in a slightly different format from the output node. Using the batch node you must specify the measurement name to write into as well as a list of tag and field values. Optionally, you can specify the timestamp for the point, defaulting to the current time.
Under the hood we are calling the node influxdb 5.x library **writePoints()** call as documented [here](https://node-influx.github.io/class/src/index.js~InfluxDB.html#instance-method-writePoints).
>Note: Javascript numbers are *always* written as a float. As in the output node, when using the 1.8-flux or 2.0 configuration, you can explicitly write an integer using a number in a string with an 'i' suffix, for example, to write the integer `1234` use the string `'1234i'`. This is *not* supported using 1.x configurations; all numbers are written as float values.

@@ -148,0 +148,0 @@ By default the node will write timestamps using ms precision since that's what JavaScript gives us. if you specify the timestamp as a Date object, we'll convert it to milliseconds.

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc