@node-red/nodes
Advanced tools
Comparing version 2.2.2 to 2.2.3
@@ -112,5 +112,6 @@ /** | ||
if (valueType === "jsonata") { | ||
if (p.exp) { | ||
if (p.v) { | ||
try { | ||
var val = RED.util.evaluateJSONataExpression(p.exp, msg); | ||
var exp = RED.util.prepareJSONataExpression(p.v, node); | ||
var val = RED.util.evaluateJSONataExpression(exp, msg); | ||
RED.util.setMessageProperty(msg, property, val, true); | ||
@@ -117,0 +118,0 @@ } |
@@ -111,3 +111,5 @@ module.exports = function(RED) { | ||
this.on("input", function(msg, send, done) { | ||
if (hasOwnProperty.call(msg, "status") && hasOwnProperty.call(msg.status, "source") && hasOwnProperty.call(msg.status.source, "id") && (msg.status.source.id === node.id)) { | ||
if (hasOwnProperty.call(msg, "status") && msg.status && | ||
hasOwnProperty.call(msg.status, "source") && msg.status.source && | ||
hasOwnProperty.call(msg.status.source, "id") && (msg.status.source.id === node.id)) { | ||
done(); | ||
@@ -133,3 +135,4 @@ return; | ||
} | ||
if (hasOwnProperty.call(msg, "status")) { | ||
if (hasOwnProperty.call(msg, "status") && | ||
msg.status) { | ||
fill = msg.status.fill || "grey"; | ||
@@ -136,0 +139,0 @@ shape = msg.status.shape || "ring"; |
@@ -171,5 +171,5 @@ /** | ||
var contextKey = RED.util.parseContextStore(rule.from); | ||
if (/\[msg\./.test(context.key)) { | ||
if (/\[msg\./.test(contextKey.key)) { | ||
// The key has a nest msg. reference to evaluate first | ||
context.key = RED.util.normalisePropertyExpression(contextKey.key,msg,true); | ||
contextKey.key = RED.util.normalisePropertyExpression(contextKey.key,msg,true); | ||
} | ||
@@ -176,0 +176,0 @@ node.context()[rule.fromt].get(contextKey.key, contextKey.store, (err,fromValue) => { |
@@ -278,14 +278,18 @@ /** | ||
if (typeof(msg.flush) == 'number') { len = Math.min(Math.floor(msg.flush),len); } | ||
while (len > 0) { | ||
const msgInfo = node.buffer.shift(); | ||
if (Object.keys(msgInfo.msg).length > 1) { | ||
node.send(msgInfo.msg); | ||
msgInfo.done(); | ||
} | ||
len = len - 1; | ||
} | ||
if (node.buffer.length === 0) { | ||
if (len === 0) { | ||
clearInterval(node.intervalID); | ||
node.intervalID = -1; | ||
} | ||
else { | ||
while (len > 0) { | ||
const msgInfo = node.buffer.shift(); | ||
if (Object.keys(msgInfo.msg).length > 1) { | ||
node.send(msgInfo.msg); | ||
msgInfo.done(); | ||
} | ||
len = len - 1; | ||
} | ||
clearInterval(node.intervalID); | ||
node.intervalID = setInterval(sendMsgFromBuffer, node.rate); | ||
} | ||
node.status({fill:"blue",shape:"dot",text:node.buffer.length}); | ||
@@ -292,0 +296,0 @@ done(); |
@@ -71,3 +71,3 @@ /** | ||
/** | ||
* Test a topic string is valid | ||
* Test a topic string is valid for subscription | ||
* @param {string} topic | ||
@@ -77,6 +77,15 @@ * @returns `true` if it is a valid topic | ||
function isValidSubscriptionTopic(topic) { | ||
return /^(#$|(\+|[^+#]*)(\/(\+|[^+#]*))*(\/(\+|#|[^+#]*))?$)/.test(topic) | ||
return /^(#$|(\+|[^+#]*)(\/(\+|[^+#]*))*(\/(\+|#|[^+#]*))?$)/.test(topic); | ||
} | ||
/** | ||
* Test a topic string is valid for publishing | ||
* @param {string} topic | ||
* @returns `true` if it is a valid topic | ||
*/ | ||
function isValidPublishTopic(topic) { | ||
return !/[\+#\b\f\n\r\t\v\0]/.test(topic); | ||
} | ||
/** | ||
* Helper function for setting string property values in the MQTT V5 properties object | ||
@@ -108,3 +117,3 @@ * @param {object} src Source object containing properties | ||
} else if(src[propName] === "false" || src[propName] === false) { | ||
dst[propName] = true; | ||
dst[propName] = false; | ||
} | ||
@@ -294,3 +303,3 @@ } else { | ||
} | ||
topicOK = topicOK && !/[\+#\b\f\n\r\t\v\0]/.test(msg.topic); | ||
topicOK = topicOK && isValidPublishTopic(msg.topic); | ||
@@ -398,2 +407,3 @@ if (topicOK) { | ||
node.subscriptions = {}; | ||
node.clientListeners = [] | ||
/** @type {mqtt.MqttClient}*/ this.client; | ||
@@ -423,4 +433,8 @@ node.setOptions = function(opts, init) { | ||
setIfHasProperty(opts, node, "receiveMaximum", init); | ||
setIfHasProperty(opts, node, "userProperties", init);//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901116 | ||
setIfHasProperty(opts, node, "userPropertiesType", init); | ||
//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901116 | ||
if (hasProperty(opts, "userProperties")) { | ||
node.userProperties = opts.userProperties; | ||
} else if (hasProperty(opts, "userProps")) { | ||
node.userProperties = opts.userProps; | ||
} | ||
@@ -474,3 +488,3 @@ function createLWT(topic, payload, qos, retain, v5opts, v5SubPropName) { | ||
//will v5 properties must be set in the "properties" sub object | ||
node.options.will = createLWT(opts.willTopic, opts.willPayload, opts.willQos, opts.willRetain, opts.willMsg, "properies"); | ||
node.options.will = createLWT(opts.willTopic, opts.willPayload, opts.willQos, opts.willRetain, opts.willMsg, "properties"); | ||
}; | ||
@@ -535,3 +549,3 @@ } else { | ||
// check if proxy is set in env | ||
let prox, noprox; | ||
let prox, noprox, noproxy; | ||
if (process.env.http_proxy) { prox = process.env.http_proxy; } | ||
@@ -663,7 +677,12 @@ if (process.env.HTTP_PROXY) { prox = process.env.HTTP_PROXY; } | ||
node.serverProperties = {}; | ||
if(node.client) { | ||
//belt and braces to avoid left over clients | ||
node.client.end(true); | ||
node._clientRemoveListeners(); | ||
} | ||
node.client = mqtt.connect(node.brokerurl, node.options); | ||
node.client.setMaxListeners(0); | ||
let callbackDone = false; //prevent re-connects causing node.client.on('connect' firing callback multiple times | ||
let callbackDone = false; //prevent re-connects causing node._clientOn('connect' firing callback multiple times | ||
// Register successful connect or reconnect handler | ||
node.client.on('connect', function (connack) { | ||
node._clientOn('connect', function (connack) { | ||
node.closing = false; | ||
@@ -700,3 +719,3 @@ node.connecting = false; | ||
// Remove any existing listeners before resubscribing to avoid duplicates in the event of a re-connection | ||
node.client.removeAllListeners('message'); | ||
node._clientRemoveListeners('message'); | ||
@@ -713,3 +732,3 @@ // Re-subscribe to stored topics | ||
_options = node.subscriptions[s][r].options; | ||
node.client.on('message',node.subscriptions[s][r].handler); | ||
node._clientOn('message',node.subscriptions[s][r].handler); | ||
} | ||
@@ -727,7 +746,7 @@ } | ||
}); | ||
node.client.on("reconnect", function() { | ||
node._clientOn("reconnect", function() { | ||
setStatusConnecting(node, true); | ||
}); | ||
//Broker Disconnect - V5 event | ||
node.client.on("disconnect", function(packet) { | ||
node._clientOn("disconnect", function(packet) { | ||
//Emitted after receiving disconnect packet from broker. MQTT 5.0 feature. | ||
@@ -746,3 +765,3 @@ const rc = (packet && packet.properties && packet.reasonCode) || packet.reasonCode; | ||
// Register disconnect handlers | ||
node.client.on('close', function () { | ||
node._clientOn('close', function () { | ||
if (node.connected) { | ||
@@ -759,3 +778,3 @@ node.connected = false; | ||
// The client's own reconnect logic will take care of errors | ||
node.client.on('error', function (error) { | ||
node._clientOn('error', function (error) { | ||
}); | ||
@@ -767,28 +786,48 @@ }catch(err) { | ||
}; | ||
node.disconnect = function (callback) { | ||
const _callback = function (resetNodeConnectedState) { | ||
setStatusDisconnected(node, true); | ||
if(resetNodeConnectedState) { | ||
node.closing = true; | ||
node.connecting = false; | ||
node.connected = false; | ||
const _callback = function () { | ||
if(node.connected || node.connecting) { | ||
setStatusDisconnected(node, true); | ||
} | ||
if(node.client) { node._clientRemoveListeners(); } | ||
node.connecting = false; | ||
node.connected = false; | ||
callback && typeof callback == "function" && callback(); | ||
}; | ||
if(!node.client) { return _callback(); } | ||
if(node.closing) { return _callback(); } | ||
if(node.closing) { | ||
return _callback(false); | ||
} | ||
var endCallBack = function endCallBack() { | ||
} | ||
let waitEnd = (client, ms) => { | ||
return new Promise( (resolve, reject) => { | ||
node.closing = true; | ||
if(!client) { | ||
resolve(); | ||
} else { | ||
const t = setTimeout(() => { | ||
//clean end() has exceeded WAIT_END, lets force end! | ||
client && client.end(true); | ||
reject(); | ||
}, ms); | ||
client.end(() => { | ||
clearTimeout(t); | ||
resolve() | ||
}); | ||
} | ||
}); | ||
}; | ||
if(node.connected && node.closeMessage) { | ||
node.publish(node.closeMessage, function (err) { | ||
node.client.end(endCallBack); | ||
_callback(true); | ||
waitEnd(node.client, 2000).then(() => { | ||
_callback(); | ||
}).catch((e) => { | ||
_callback(); | ||
}) | ||
}); | ||
} else if(node.connected) { | ||
node.client.end(endCallBack); | ||
_callback(true); | ||
} else { | ||
_callback(false); | ||
waitEnd(node.client, 2000).then(() => { | ||
_callback(); | ||
}).catch((e) => { | ||
_callback(); | ||
}) | ||
} | ||
@@ -830,3 +869,3 @@ } | ||
if (node.connected) { | ||
node.client.on('message',sub.handler); | ||
node._clientOn('message',sub.handler); | ||
node.client.subscribe(topic, options); | ||
@@ -842,3 +881,3 @@ } | ||
if(node.client) { | ||
node.client.removeListener('message',sub[ref].handler); | ||
node._clientRemoveListeners('message',sub[ref].handler); | ||
} | ||
@@ -877,4 +916,14 @@ delete sub[ref]; | ||
}; | ||
let topicOK = hasProperty(msg, "topic") && (typeof msg.topic === "string") && (isValidPublishTopic(msg.topic)); | ||
//https://github.com/mqttjs/MQTT.js/blob/master/README.md#mqttclientpublishtopic-message-options-callback | ||
if(node.options.protocolVersion == 5) { | ||
const bsp = node.serverProperties || {}; | ||
if (msg.userProperties && typeof msg.userProperties !== "object") { | ||
delete msg.userProperties; | ||
} | ||
if (hasProperty(msg, "topicAlias") && !isNaN(Number(msg.topicAlias))) { | ||
msg.topicAlias = parseInt(msg.topicAlias); | ||
} else { | ||
delete msg.topicAlias; | ||
} | ||
options.properties = options.properties || {}; | ||
@@ -889,4 +938,7 @@ setStrProp(msg, options.properties, "responseTopic"); | ||
//FUTURE setIntProp(msg, options.properties, "subscriptionIdentifier", 1, 268435455); | ||
if (options.properties.topicAlias) { | ||
if (!node.topicAliases.hasOwnProperty(options.properties.topicAlias) && msg.topic == "") { | ||
//check & sanitise topic | ||
if (topicOK && options.properties.topicAlias) { | ||
let aliasValid = (bsp.topicAliasMaximum && bsp.topicAliasMaximum >= options.properties.topicAlias); | ||
if (!aliasValid) { | ||
done("Invalid topicAlias"); | ||
@@ -896,13 +948,23 @@ return | ||
if (node.topicAliases[options.properties.topicAlias] === msg.topic) { | ||
msg.topic = "" | ||
msg.topic = ""; | ||
} else { | ||
node.topicAliases[options.properties.topicAlias] = msg.topic | ||
node.topicAliases[options.properties.topicAlias] = msg.topic; | ||
} | ||
} else if (!msg.topic && options.properties.responseTopic) { | ||
msg.topic = msg.responseTopic; | ||
topicOK = isValidPublishTopic(msg.topic); | ||
delete msg.responseTopic; //prevent responseTopic being resent? | ||
} | ||
} | ||
node.client.publish(msg.topic, msg.payload, options, function(err) { | ||
done && done(err); | ||
return | ||
}); | ||
if (topicOK) { | ||
node.client.publish(msg.topic, msg.payload, options, function(err) { | ||
done && done(err); | ||
return | ||
}); | ||
} else { | ||
const error = new Error(RED._("mqtt.errors.invalid-topic")); | ||
error.warn = true; | ||
done(error); | ||
} | ||
} | ||
@@ -912,6 +974,37 @@ }; | ||
node.on('close', function(done) { | ||
node.closing = true; | ||
node.disconnect(done); | ||
node.disconnect(function() { | ||
done(); | ||
}); | ||
}); | ||
/** | ||
* Add event handlers to the MQTT.js client and track them so that | ||
* we do not remove any handlers that the MQTT client uses internally. | ||
* Use {@link node._clientRemoveListeners `node._clientRemoveListeners`} to remove handlers | ||
* @param {string} event The name of the event | ||
* @param {function} handler The handler for this event | ||
*/ | ||
node._clientOn = function(event, handler) { | ||
node.clientListeners.push({event, handler}) | ||
node.client.on(event, handler) | ||
} | ||
/** | ||
* Remove event handlers from the MQTT.js client & only the events | ||
* that we attached in {@link node._clientOn `node._clientOn`}. | ||
* * If `event` is omitted, then all events matching `handler` are removed | ||
* * If `handler` is omitted, then all events named `event` are removed | ||
* * If both parameters are omitted, then all events are removed | ||
* @param {string} [event] The name of the event (optional) | ||
* @param {function} [handler] The handler for this event (optional) | ||
*/ | ||
node._clientRemoveListeners = function(event, handler) { | ||
node.clientListeners = node.clientListeners.filter((l) => { | ||
if (event && event !== l.event) { return true; } | ||
if (handler && handler !== l.handler) { return true; } | ||
node.client.removeListener(l.event, l.handler) | ||
return false; //found and removed, filter out this one | ||
}) | ||
} | ||
} | ||
@@ -1091,2 +1184,3 @@ | ||
node.brokerConn.deregister(node, done); | ||
node.brokerConn = null; | ||
} else { | ||
@@ -1156,2 +1250,3 @@ done(); | ||
node.brokerConn.deregister(node,done); | ||
node.brokerConn = null; | ||
} else { | ||
@@ -1158,0 +1253,0 @@ done(); |
@@ -217,2 +217,6 @@ /** | ||
}) | ||
if (node.insecureHTTPParser) { | ||
options.insecureHTTPParser = true | ||
} | ||
} | ||
@@ -219,0 +223,0 @@ ], |
@@ -38,5 +38,3 @@ /** | ||
var listenerNodes = {}; | ||
var activeListenerNodes = 0; | ||
// A node red node that sets up a local websocket server | ||
@@ -170,3 +168,2 @@ function WebSocketListenerNode(n) { | ||
if (node.isServer) { | ||
activeListenerNodes++; | ||
if (!serverUpgradeAdded) { | ||
@@ -215,3 +212,3 @@ RED.server.on('upgrade', handleServerUpgrade); | ||
node.on("close", function() { | ||
node.on("close", function(done) { | ||
if (node.heartbeatInterval) { | ||
@@ -224,7 +221,2 @@ clearInterval(node.heartbeatInterval); | ||
node._inputNodes = []; | ||
activeListenerNodes--; | ||
// if (activeListenerNodes === 0 && serverUpgradeAdded) { | ||
// RED.server.removeListener('upgrade', handleServerUpgrade); | ||
// serverUpgradeAdded = false; | ||
// } | ||
} | ||
@@ -234,6 +226,17 @@ else { | ||
node.server.close(); | ||
if (node.tout) { | ||
clearTimeout(node.tout); | ||
node.tout = null; | ||
} | ||
//wait 20*50 (1000ms max) for ws to close. | ||
//call done when readyState === ws.CLOSED (or 1000ms, whichever comes fist) | ||
const closeMonitorInterval = 20; | ||
let closeMonitorCount = 50; | ||
let si = setInterval(() => { | ||
if(node.server.readyState === ws.CLOSED || closeMonitorCount <= 0) { | ||
if (node.tout) { | ||
clearTimeout(node.tout); | ||
node.tout = null; | ||
} | ||
clearInterval(si); | ||
return done(); | ||
} | ||
closeMonitorCount--; | ||
}, closeMonitorInterval); | ||
} | ||
@@ -240,0 +243,0 @@ }); |
@@ -138,3 +138,3 @@ /** | ||
for (var i = 0; i<parts.length-1; i+=1) { | ||
msg = {topic:node.topic, payload:parts[i] + node.newline.trimEnd()}; | ||
msg = {topic:node.topic, payload:parts[i]}; | ||
msg._session = {type:"tcp",id:id}; | ||
@@ -233,3 +233,3 @@ node.send(msg); | ||
for (var i = 0; i<parts.length-1; i+=1) { | ||
msg = {topic:node.topic, payload:parts[i] + node.newline.trimEnd(), ip:socket.remoteAddress, port:socket.remotePort}; | ||
msg = {topic:node.topic, payload:parts[i], ip:socket.remoteAddress, port:socket.remotePort}; | ||
msg._session = {type:"tcp",id:id}; | ||
@@ -437,3 +437,3 @@ node.send(msg); | ||
else { | ||
var connectedSockets = []; | ||
const connectedSockets = new Set(); | ||
node.status({text:RED._("tcpin.status.connections",{count:0})}); | ||
@@ -459,12 +459,12 @@ let srv = net; | ||
node.log(RED._("tcpin.status.connection-closed",{host:socket.remoteAddress, port:socket.remotePort})); | ||
connectedSockets.splice(connectedSockets.indexOf(socket),1); | ||
node.status({text:RED._("tcpin.status.connections",{count:connectedSockets.length})}); | ||
connectedSockets.delete(socket); | ||
node.status({text:RED._("tcpin.status.connections",{count:connectedSockets.size})}); | ||
}); | ||
socket.on('error',function() { | ||
node.log(RED._("tcpin.errors.socket-error",{host:socket.remoteAddress, port:socket.remotePort})); | ||
connectedSockets.splice(connectedSockets.indexOf(socket),1); | ||
node.status({text:RED._("tcpin.status.connections",{count:connectedSockets.length})}); | ||
connectedSockets.delete(socket); | ||
node.status({text:RED._("tcpin.status.connections",{count:connectedSockets.size})}); | ||
}); | ||
connectedSockets.push(socket); | ||
node.status({text:RED._("tcpin.status.connections",{count:connectedSockets.length})}); | ||
connectedSockets.add(socket); | ||
node.status({text:RED._("tcpin.status.connections",{count:connectedSockets.size})}); | ||
}); | ||
@@ -482,6 +482,6 @@ | ||
} | ||
for (var i = 0; i < connectedSockets.length; i += 1) { | ||
if (node.doend === true) { connectedSockets[i].end(buffer); } | ||
else { connectedSockets[i].write(buffer); } | ||
} | ||
connectedSockets.forEach(soc => { | ||
if (node.doend === true) { soc.end(buffer); } | ||
else { soc.write(buffer); } | ||
}) | ||
} | ||
@@ -503,8 +503,6 @@ nodeDone(); | ||
node.on('close', function() { | ||
for (var c in connectedSockets) { | ||
if (connectedSockets.hasOwnProperty(c)) { | ||
connectedSockets[c].end(); | ||
connectedSockets[c].unref(); | ||
} | ||
} | ||
connectedSockets.forEach(soc => { | ||
soc.end(); | ||
soc.unref(); | ||
}) | ||
server.close(); | ||
@@ -511,0 +509,0 @@ node.log(RED._("tcpin.status.stopped-listening",{port:node.port})); |
@@ -92,2 +92,5 @@ /** | ||
} | ||
else if (msg.payload[s][t].toString().indexOf("\n") !== -1) { // add quotes if any "\n" | ||
msg.payload[s][t] = node.quo + msg.payload[s][t].toString() + node.quo; | ||
} | ||
} | ||
@@ -116,3 +119,3 @@ ou += msg.payload[s].join(node.sep) + node.ret; | ||
} | ||
else if (q.indexOf(node.sep) !== -1) { // add quotes if any "commas" | ||
else if (q.indexOf(node.sep) !== -1 || p.indexOf("\n") !== -1) { // add quotes if any "commas" or "\n" | ||
ou += node.quo + q + node.quo + node.sep; | ||
@@ -139,3 +142,3 @@ } | ||
} | ||
else if (p.indexOf(node.sep) !== -1) { // add quotes if any "commas" | ||
else if (p.indexOf(node.sep) !== -1 || p.indexOf("\n") !== -1) { // add quotes if any "commas" or "\n" | ||
ou += node.quo + p + node.quo + node.sep; | ||
@@ -142,0 +145,0 @@ } |
@@ -317,7 +317,9 @@ /** | ||
} | ||
msgInfo.send({payload: result}); | ||
msgInfo.msg.payload = result; | ||
msgInfo.send(msgInfo.msg); | ||
done(); | ||
}); | ||
} else { | ||
msgInfo.send({payload: result}); | ||
msgInfo.msg.payload = result; | ||
msgInfo.send(msgInfo.msg); | ||
done(); | ||
@@ -324,0 +326,0 @@ } |
@@ -528,3 +528,4 @@ { | ||
"requesting": "requesting" | ||
} | ||
}, | ||
"insecureHTTPParser": "Disable strict HTTP parsing" | ||
}, | ||
@@ -531,0 +532,0 @@ "websocket": { |
{ | ||
"name": "@node-red/nodes", | ||
"version": "2.2.2", | ||
"version": "2.2.3", | ||
"license": "Apache-2.0", | ||
@@ -5,0 +5,0 @@ "repository": { |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
2195077
489
22809