Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@node-red/nodes

Package Overview
Dependencies
Maintainers
2
Versions
116
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@node-red/nodes - npm Package Compare versions

Comparing version 2.2.2 to 2.2.3

examples/function/delay/06 - Simple Queue with release

5

core/common/20-inject.js

@@ -112,5 +112,6 @@ /**

if (valueType === "jsonata") {
if (p.exp) {
if (p.v) {
try {
var val = RED.util.evaluateJSONataExpression(p.exp, msg);
var exp = RED.util.prepareJSONataExpression(p.v, node);
var val = RED.util.evaluateJSONataExpression(exp, msg);
RED.util.setMessageProperty(msg, property, val, true);

@@ -117,0 +118,0 @@ }

7

core/common/21-debug.js

@@ -111,3 +111,5 @@ module.exports = function(RED) {

this.on("input", function(msg, send, done) {
if (hasOwnProperty.call(msg, "status") && hasOwnProperty.call(msg.status, "source") && hasOwnProperty.call(msg.status.source, "id") && (msg.status.source.id === node.id)) {
if (hasOwnProperty.call(msg, "status") && msg.status &&
hasOwnProperty.call(msg.status, "source") && msg.status.source &&
hasOwnProperty.call(msg.status.source, "id") && (msg.status.source.id === node.id)) {
done();

@@ -133,3 +135,4 @@ return;

}
if (hasOwnProperty.call(msg, "status")) {
if (hasOwnProperty.call(msg, "status") &&
msg.status) {
fill = msg.status.fill || "grey";

@@ -136,0 +139,0 @@ shape = msg.status.shape || "ring";

@@ -171,5 +171,5 @@ /**

var contextKey = RED.util.parseContextStore(rule.from);
if (/\[msg\./.test(context.key)) {
if (/\[msg\./.test(contextKey.key)) {
// The key has a nest msg. reference to evaluate first
context.key = RED.util.normalisePropertyExpression(contextKey.key,msg,true);
contextKey.key = RED.util.normalisePropertyExpression(contextKey.key,msg,true);
}

@@ -176,0 +176,0 @@ node.context()[rule.fromt].get(contextKey.key, contextKey.store, (err,fromValue) => {

@@ -278,14 +278,18 @@ /**

if (typeof(msg.flush) == 'number') { len = Math.min(Math.floor(msg.flush),len); }
while (len > 0) {
const msgInfo = node.buffer.shift();
if (Object.keys(msgInfo.msg).length > 1) {
node.send(msgInfo.msg);
msgInfo.done();
}
len = len - 1;
}
if (node.buffer.length === 0) {
if (len === 0) {
clearInterval(node.intervalID);
node.intervalID = -1;
}
else {
while (len > 0) {
const msgInfo = node.buffer.shift();
if (Object.keys(msgInfo.msg).length > 1) {
node.send(msgInfo.msg);
msgInfo.done();
}
len = len - 1;
}
clearInterval(node.intervalID);
node.intervalID = setInterval(sendMsgFromBuffer, node.rate);
}
node.status({fill:"blue",shape:"dot",text:node.buffer.length});

@@ -292,0 +296,0 @@ done();

@@ -71,3 +71,3 @@ /**

/**
* Test a topic string is valid
* Test a topic string is valid for subscription
* @param {string} topic

@@ -77,6 +77,15 @@ * @returns `true` if it is a valid topic

function isValidSubscriptionTopic(topic) {
return /^(#$|(\+|[^+#]*)(\/(\+|[^+#]*))*(\/(\+|#|[^+#]*))?$)/.test(topic)
return /^(#$|(\+|[^+#]*)(\/(\+|[^+#]*))*(\/(\+|#|[^+#]*))?$)/.test(topic);
}
/**
* Test a topic string is valid for publishing
* @param {string} topic
* @returns `true` if it is a valid topic
*/
function isValidPublishTopic(topic) {
return !/[\+#\b\f\n\r\t\v\0]/.test(topic);
}
/**
* Helper function for setting string property values in the MQTT V5 properties object

@@ -108,3 +117,3 @@ * @param {object} src Source object containing properties

} else if(src[propName] === "false" || src[propName] === false) {
dst[propName] = true;
dst[propName] = false;
}

@@ -294,3 +303,3 @@ } else {

}
topicOK = topicOK && !/[\+#\b\f\n\r\t\v\0]/.test(msg.topic);
topicOK = topicOK && isValidPublishTopic(msg.topic);

@@ -398,2 +407,3 @@ if (topicOK) {

node.subscriptions = {};
node.clientListeners = []
/** @type {mqtt.MqttClient}*/ this.client;

@@ -423,4 +433,8 @@ node.setOptions = function(opts, init) {

setIfHasProperty(opts, node, "receiveMaximum", init);
setIfHasProperty(opts, node, "userProperties", init);//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901116
setIfHasProperty(opts, node, "userPropertiesType", init);
//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901116
if (hasProperty(opts, "userProperties")) {
node.userProperties = opts.userProperties;
} else if (hasProperty(opts, "userProps")) {
node.userProperties = opts.userProps;
}

@@ -474,3 +488,3 @@ function createLWT(topic, payload, qos, retain, v5opts, v5SubPropName) {

//will v5 properties must be set in the "properties" sub object
node.options.will = createLWT(opts.willTopic, opts.willPayload, opts.willQos, opts.willRetain, opts.willMsg, "properies");
node.options.will = createLWT(opts.willTopic, opts.willPayload, opts.willQos, opts.willRetain, opts.willMsg, "properties");
};

@@ -535,3 +549,3 @@ } else {

// check if proxy is set in env
let prox, noprox;
let prox, noprox, noproxy;
if (process.env.http_proxy) { prox = process.env.http_proxy; }

@@ -663,7 +677,12 @@ if (process.env.HTTP_PROXY) { prox = process.env.HTTP_PROXY; }

node.serverProperties = {};
if(node.client) {
//belt and braces to avoid left over clients
node.client.end(true);
node._clientRemoveListeners();
}
node.client = mqtt.connect(node.brokerurl, node.options);
node.client.setMaxListeners(0);
let callbackDone = false; //prevent re-connects causing node.client.on('connect' firing callback multiple times
let callbackDone = false; //prevent re-connects causing node._clientOn('connect' firing callback multiple times
// Register successful connect or reconnect handler
node.client.on('connect', function (connack) {
node._clientOn('connect', function (connack) {
node.closing = false;

@@ -700,3 +719,3 @@ node.connecting = false;

// Remove any existing listeners before resubscribing to avoid duplicates in the event of a re-connection
node.client.removeAllListeners('message');
node._clientRemoveListeners('message');

@@ -713,3 +732,3 @@ // Re-subscribe to stored topics

_options = node.subscriptions[s][r].options;
node.client.on('message',node.subscriptions[s][r].handler);
node._clientOn('message',node.subscriptions[s][r].handler);
}

@@ -727,7 +746,7 @@ }

});
node.client.on("reconnect", function() {
node._clientOn("reconnect", function() {
setStatusConnecting(node, true);
});
//Broker Disconnect - V5 event
node.client.on("disconnect", function(packet) {
node._clientOn("disconnect", function(packet) {
//Emitted after receiving disconnect packet from broker. MQTT 5.0 feature.

@@ -746,3 +765,3 @@ const rc = (packet && packet.properties && packet.reasonCode) || packet.reasonCode;

// Register disconnect handlers
node.client.on('close', function () {
node._clientOn('close', function () {
if (node.connected) {

@@ -759,3 +778,3 @@ node.connected = false;

// The client's own reconnect logic will take care of errors
node.client.on('error', function (error) {
node._clientOn('error', function (error) {
});

@@ -767,28 +786,48 @@ }catch(err) {

};
node.disconnect = function (callback) {
const _callback = function (resetNodeConnectedState) {
setStatusDisconnected(node, true);
if(resetNodeConnectedState) {
node.closing = true;
node.connecting = false;
node.connected = false;
const _callback = function () {
if(node.connected || node.connecting) {
setStatusDisconnected(node, true);
}
if(node.client) { node._clientRemoveListeners(); }
node.connecting = false;
node.connected = false;
callback && typeof callback == "function" && callback();
};
if(!node.client) { return _callback(); }
if(node.closing) { return _callback(); }
if(node.closing) {
return _callback(false);
}
var endCallBack = function endCallBack() {
}
let waitEnd = (client, ms) => {
return new Promise( (resolve, reject) => {
node.closing = true;
if(!client) {
resolve();
} else {
const t = setTimeout(() => {
//clean end() has exceeded WAIT_END, lets force end!
client && client.end(true);
reject();
}, ms);
client.end(() => {
clearTimeout(t);
resolve()
});
}
});
};
if(node.connected && node.closeMessage) {
node.publish(node.closeMessage, function (err) {
node.client.end(endCallBack);
_callback(true);
waitEnd(node.client, 2000).then(() => {
_callback();
}).catch((e) => {
_callback();
})
});
} else if(node.connected) {
node.client.end(endCallBack);
_callback(true);
} else {
_callback(false);
waitEnd(node.client, 2000).then(() => {
_callback();
}).catch((e) => {
_callback();
})
}

@@ -830,3 +869,3 @@ }

if (node.connected) {
node.client.on('message',sub.handler);
node._clientOn('message',sub.handler);
node.client.subscribe(topic, options);

@@ -842,3 +881,3 @@ }

if(node.client) {
node.client.removeListener('message',sub[ref].handler);
node._clientRemoveListeners('message',sub[ref].handler);
}

@@ -877,4 +916,14 @@ delete sub[ref];

};
let topicOK = hasProperty(msg, "topic") && (typeof msg.topic === "string") && (isValidPublishTopic(msg.topic));
//https://github.com/mqttjs/MQTT.js/blob/master/README.md#mqttclientpublishtopic-message-options-callback
if(node.options.protocolVersion == 5) {
const bsp = node.serverProperties || {};
if (msg.userProperties && typeof msg.userProperties !== "object") {
delete msg.userProperties;
}
if (hasProperty(msg, "topicAlias") && !isNaN(Number(msg.topicAlias))) {
msg.topicAlias = parseInt(msg.topicAlias);
} else {
delete msg.topicAlias;
}
options.properties = options.properties || {};

@@ -889,4 +938,7 @@ setStrProp(msg, options.properties, "responseTopic");

//FUTURE setIntProp(msg, options.properties, "subscriptionIdentifier", 1, 268435455);
if (options.properties.topicAlias) {
if (!node.topicAliases.hasOwnProperty(options.properties.topicAlias) && msg.topic == "") {
//check & sanitise topic
if (topicOK && options.properties.topicAlias) {
let aliasValid = (bsp.topicAliasMaximum && bsp.topicAliasMaximum >= options.properties.topicAlias);
if (!aliasValid) {
done("Invalid topicAlias");

@@ -896,13 +948,23 @@ return

if (node.topicAliases[options.properties.topicAlias] === msg.topic) {
msg.topic = ""
msg.topic = "";
} else {
node.topicAliases[options.properties.topicAlias] = msg.topic
node.topicAliases[options.properties.topicAlias] = msg.topic;
}
} else if (!msg.topic && options.properties.responseTopic) {
msg.topic = msg.responseTopic;
topicOK = isValidPublishTopic(msg.topic);
delete msg.responseTopic; //prevent responseTopic being resent?
}
}
node.client.publish(msg.topic, msg.payload, options, function(err) {
done && done(err);
return
});
if (topicOK) {
node.client.publish(msg.topic, msg.payload, options, function(err) {
done && done(err);
return
});
} else {
const error = new Error(RED._("mqtt.errors.invalid-topic"));
error.warn = true;
done(error);
}
}

@@ -912,6 +974,37 @@ };

node.on('close', function(done) {
node.closing = true;
node.disconnect(done);
node.disconnect(function() {
done();
});
});
/**
* Add event handlers to the MQTT.js client and track them so that
* we do not remove any handlers that the MQTT client uses internally.
* Use {@link node._clientRemoveListeners `node._clientRemoveListeners`} to remove handlers
* @param {string} event The name of the event
* @param {function} handler The handler for this event
*/
node._clientOn = function(event, handler) {
node.clientListeners.push({event, handler})
node.client.on(event, handler)
}
/**
* Remove event handlers from the MQTT.js client & only the events
* that we attached in {@link node._clientOn `node._clientOn`}.
* * If `event` is omitted, then all events matching `handler` are removed
* * If `handler` is omitted, then all events named `event` are removed
* * If both parameters are omitted, then all events are removed
* @param {string} [event] The name of the event (optional)
* @param {function} [handler] The handler for this event (optional)
*/
node._clientRemoveListeners = function(event, handler) {
node.clientListeners = node.clientListeners.filter((l) => {
if (event && event !== l.event) { return true; }
if (handler && handler !== l.handler) { return true; }
node.client.removeListener(l.event, l.handler)
return false; //found and removed, filter out this one
})
}
}

@@ -1091,2 +1184,3 @@

node.brokerConn.deregister(node, done);
node.brokerConn = null;
} else {

@@ -1156,2 +1250,3 @@ done();

node.brokerConn.deregister(node,done);
node.brokerConn = null;
} else {

@@ -1158,0 +1253,0 @@ done();

@@ -217,2 +217,6 @@ /**

})
if (node.insecureHTTPParser) {
options.insecureHTTPParser = true
}
}

@@ -219,0 +223,0 @@ ],

@@ -38,5 +38,3 @@ /**

var listenerNodes = {};
var activeListenerNodes = 0;
// A node red node that sets up a local websocket server

@@ -170,3 +168,2 @@ function WebSocketListenerNode(n) {

if (node.isServer) {
activeListenerNodes++;
if (!serverUpgradeAdded) {

@@ -215,3 +212,3 @@ RED.server.on('upgrade', handleServerUpgrade);

node.on("close", function() {
node.on("close", function(done) {
if (node.heartbeatInterval) {

@@ -224,7 +221,2 @@ clearInterval(node.heartbeatInterval);

node._inputNodes = [];
activeListenerNodes--;
// if (activeListenerNodes === 0 && serverUpgradeAdded) {
// RED.server.removeListener('upgrade', handleServerUpgrade);
// serverUpgradeAdded = false;
// }
}

@@ -234,6 +226,17 @@ else {

node.server.close();
if (node.tout) {
clearTimeout(node.tout);
node.tout = null;
}
//wait 20*50 (1000ms max) for ws to close.
//call done when readyState === ws.CLOSED (or 1000ms, whichever comes fist)
const closeMonitorInterval = 20;
let closeMonitorCount = 50;
let si = setInterval(() => {
if(node.server.readyState === ws.CLOSED || closeMonitorCount <= 0) {
if (node.tout) {
clearTimeout(node.tout);
node.tout = null;
}
clearInterval(si);
return done();
}
closeMonitorCount--;
}, closeMonitorInterval);
}

@@ -240,0 +243,0 @@ });

@@ -138,3 +138,3 @@ /**

for (var i = 0; i<parts.length-1; i+=1) {
msg = {topic:node.topic, payload:parts[i] + node.newline.trimEnd()};
msg = {topic:node.topic, payload:parts[i]};
msg._session = {type:"tcp",id:id};

@@ -233,3 +233,3 @@ node.send(msg);

for (var i = 0; i<parts.length-1; i+=1) {
msg = {topic:node.topic, payload:parts[i] + node.newline.trimEnd(), ip:socket.remoteAddress, port:socket.remotePort};
msg = {topic:node.topic, payload:parts[i], ip:socket.remoteAddress, port:socket.remotePort};
msg._session = {type:"tcp",id:id};

@@ -437,3 +437,3 @@ node.send(msg);

else {
var connectedSockets = [];
const connectedSockets = new Set();
node.status({text:RED._("tcpin.status.connections",{count:0})});

@@ -459,12 +459,12 @@ let srv = net;

node.log(RED._("tcpin.status.connection-closed",{host:socket.remoteAddress, port:socket.remotePort}));
connectedSockets.splice(connectedSockets.indexOf(socket),1);
node.status({text:RED._("tcpin.status.connections",{count:connectedSockets.length})});
connectedSockets.delete(socket);
node.status({text:RED._("tcpin.status.connections",{count:connectedSockets.size})});
});
socket.on('error',function() {
node.log(RED._("tcpin.errors.socket-error",{host:socket.remoteAddress, port:socket.remotePort}));
connectedSockets.splice(connectedSockets.indexOf(socket),1);
node.status({text:RED._("tcpin.status.connections",{count:connectedSockets.length})});
connectedSockets.delete(socket);
node.status({text:RED._("tcpin.status.connections",{count:connectedSockets.size})});
});
connectedSockets.push(socket);
node.status({text:RED._("tcpin.status.connections",{count:connectedSockets.length})});
connectedSockets.add(socket);
node.status({text:RED._("tcpin.status.connections",{count:connectedSockets.size})});
});

@@ -482,6 +482,6 @@

}
for (var i = 0; i < connectedSockets.length; i += 1) {
if (node.doend === true) { connectedSockets[i].end(buffer); }
else { connectedSockets[i].write(buffer); }
}
connectedSockets.forEach(soc => {
if (node.doend === true) { soc.end(buffer); }
else { soc.write(buffer); }
})
}

@@ -503,8 +503,6 @@ nodeDone();

node.on('close', function() {
for (var c in connectedSockets) {
if (connectedSockets.hasOwnProperty(c)) {
connectedSockets[c].end();
connectedSockets[c].unref();
}
}
connectedSockets.forEach(soc => {
soc.end();
soc.unref();
})
server.close();

@@ -511,0 +509,0 @@ node.log(RED._("tcpin.status.stopped-listening",{port:node.port}));

@@ -92,2 +92,5 @@ /**

}
else if (msg.payload[s][t].toString().indexOf("\n") !== -1) { // add quotes if any "\n"
msg.payload[s][t] = node.quo + msg.payload[s][t].toString() + node.quo;
}
}

@@ -116,3 +119,3 @@ ou += msg.payload[s].join(node.sep) + node.ret;

}
else if (q.indexOf(node.sep) !== -1) { // add quotes if any "commas"
else if (q.indexOf(node.sep) !== -1 || p.indexOf("\n") !== -1) { // add quotes if any "commas" or "\n"
ou += node.quo + q + node.quo + node.sep;

@@ -139,3 +142,3 @@ }

}
else if (p.indexOf(node.sep) !== -1) { // add quotes if any "commas"
else if (p.indexOf(node.sep) !== -1 || p.indexOf("\n") !== -1) { // add quotes if any "commas" or "\n"
ou += node.quo + p + node.quo + node.sep;

@@ -142,0 +145,0 @@ }

@@ -317,7 +317,9 @@ /**

}
msgInfo.send({payload: result});
msgInfo.msg.payload = result;
msgInfo.send(msgInfo.msg);
done();
});
} else {
msgInfo.send({payload: result});
msgInfo.msg.payload = result;
msgInfo.send(msgInfo.msg);
done();

@@ -324,0 +326,0 @@ }

@@ -528,3 +528,4 @@ {

"requesting": "requesting"
}
},
"insecureHTTPParser": "Disable strict HTTP parsing"
},

@@ -531,0 +532,0 @@ "websocket": {

{
"name": "@node-red/nodes",
"version": "2.2.2",
"version": "2.2.3",
"license": "Apache-2.0",

@@ -5,0 +5,0 @@ "repository": {

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc