@flowfuse/nr-project-nodes
Advanced tools
Comparing version 0.6.2-ae6ae46-202403151658.0 to 0.6.2
@@ -0,1 +1,8 @@ | ||
### 0.6.2 | ||
- Fix Project Nodes multiple MQTT connections issue (#66) @Steve-Mcl | ||
- Display "feature not available" if feature flag is false (#63) @Steve-Mcl | ||
- Better determination of device owner (#65) @Steve-Mcl | ||
- Update npm-publish action version to v2 (#61) @ppawlowski | ||
### 0.6.1 | ||
@@ -2,0 +9,0 @@ |
@@ -23,2 +23,3 @@ module.exports = function (RED) { | ||
const OWNER_TYPE = RED.settings.flowforge.projectID ? 'instance' : 'application' | ||
const featureEnabled = RED.settings.flowforge.projectLink.featureEnabled !== false | ||
@@ -195,4 +196,6 @@ // #region JSDoc | ||
const allNodes = new Set() | ||
/** @type {MQTT.MqttClient} */ | ||
/** @type {MQTT.MqttClient} The Single MQTT client connection */ | ||
let client | ||
/** @type {MQTT.MqttClient[]} An array to track and auto kill multiple client connections */ | ||
const clients = [] | ||
let connected = false | ||
@@ -279,2 +282,3 @@ let connecting = false | ||
function onConnect (/** @type {MQTT.IConnackPacket} */ packet) { | ||
checkAndContain() // check and contain multiple clients | ||
connAck.properties = packet.properties | ||
@@ -294,3 +298,4 @@ connAck.reasonCode = packet.reasonCode | ||
function onReconnect () { | ||
RED.log.trace('Project Link nodes reconnecting') | ||
RED.log.info('Project Link nodes reconnecting') | ||
checkAndContain() // check and contain multiple clients | ||
allNodes.forEach(node => { | ||
@@ -312,2 +317,3 @@ try { | ||
} | ||
checkAndContain() // check and contain multiple clients | ||
connected = false | ||
@@ -325,2 +331,3 @@ connecting = false | ||
function onClose (err) { | ||
checkAndContain() // check and contain multiple clients | ||
if (err instanceof Error) { | ||
@@ -385,6 +392,39 @@ RED.log.trace(`Project link connection closed: ${err.message}`) | ||
if (handler && handler !== l.handler) { return true } | ||
client.removeListener(l.event, l.handler) | ||
client && client.removeListener(l.event, l.handler) | ||
return false // found and removed, filter out this one | ||
}) | ||
} | ||
/** | ||
* Check all clients and closes any that are NOT the current client (i.e. keeps a max of 1 client open at any time) | ||
* If the current client is closed/undefined, then any/all connections found in the clients array will be closed | ||
* Lastly, the clients array is re-synced to contain only the current client (if it exists) | ||
*/ | ||
function checkAndContain () { | ||
// There should only ever be one client, but due to async nature of | ||
// user operations, node-red on(...) events, mqtt auto reconnect, | ||
// network outages and other timing difficulties, it has proven possible to generate multiple clients. | ||
// Therefore, whenever we create a connection, we add it to the clients array (in the `connect()` function) | ||
// That way, we can proactively check all clients and close any that are not the current client | ||
// This is a low cost operation and will only be called when a connect/reconnect/disconnect callback is made | ||
for (let i = 0; i < clients.length; i++) { | ||
if (clients[i] && clients[i] !== client) { | ||
if (client) { | ||
// if we have a client, but this is not it, log the fact we caught a non-current client | ||
RED.log.warn('Project link nodes: cleaning up non current client') | ||
} | ||
try { | ||
clients[i].removeAllListeners() | ||
clients[i].end(true) | ||
} finally { | ||
clients[i] = null | ||
} | ||
} | ||
} | ||
// re-sync the clients array if required | ||
clients.length = 0 | ||
if (client) { | ||
clients.push(client) | ||
} | ||
} | ||
return { // public interface | ||
@@ -536,2 +576,13 @@ subscribe (node, topic, options, callback) { | ||
if (client) { | ||
// if client is something, force it to end unconditionally | ||
RED.log.trace('force end() project node client before new connect') | ||
try { | ||
client.removeAllListeners() | ||
client.end(true) | ||
} finally { | ||
client = null | ||
} | ||
} | ||
/** @type {MQTT.IClientOptions} */ | ||
@@ -587,2 +638,3 @@ const defaultOptions = { | ||
client = MQTT.connect(parsedURL, options) | ||
clients.push(client) // add to clients array for containment and auto cleanup of multiple clients | ||
on('connect', onConnect) | ||
@@ -600,3 +652,2 @@ on('error', onError) | ||
disconnect (done) { | ||
const closeMessage = null // FUTURE: Let broker/clients know of issue via close msg | ||
const _callback = function (err) { | ||
@@ -606,10 +657,23 @@ connecting = false | ||
closing = false | ||
if (err) { | ||
RED.log.warn(`Project Link nodes disconnect error: ${err.message}`) | ||
} | ||
// By this point, the client will mostly likely have been ended cleanly | ||
// however, there is no harm in forcing it to end here and so far, this | ||
// solves the majority of multiple connect/disconnect issues witnessed. | ||
if (client) { | ||
try { | ||
client.removeAllListeners() | ||
client.end(true) // force end, most likely already ended (cleanly) | ||
} catch (_err) { | ||
// do nothing | ||
} | ||
} | ||
done && typeof done === 'function' && done(err) | ||
} | ||
if (!client) { return _callback() } | ||
const waitEnd = (client, ms) => { | ||
return new Promise((resolve, reject) => { | ||
closing = true | ||
if (!client || !connected) { | ||
if (!client) { | ||
resolve() | ||
@@ -621,10 +685,12 @@ } else { | ||
} else { | ||
// clean end() has exceeded WAIT_END, lets force end! | ||
client && client.end(true) | ||
reject(new Error('timeout')) | ||
reject(new Error('timeout waiting for client clean end')) | ||
} | ||
}, ms) | ||
client.end(() => { | ||
client.end((err) => { | ||
clearTimeout(t) | ||
resolve() | ||
if (err) { | ||
reject(err) | ||
} else { | ||
resolve() | ||
} | ||
}) | ||
@@ -634,17 +700,8 @@ } | ||
} | ||
if (connected && closeMessage) { | ||
mqtt.publish(closeMessage, function (err) { | ||
waitEnd(client, 2000).then(() => { | ||
_callback(err) | ||
}).catch((e) => { | ||
_callback(e) | ||
}) | ||
}) | ||
} else { | ||
waitEnd(client, 2000).then(() => { | ||
_callback() | ||
}).catch((_e) => { | ||
_callback() | ||
}) | ||
} | ||
waitEnd(client, 2000).then(() => { | ||
_callback() | ||
}).catch((_e) => { | ||
_callback(_e) | ||
}) | ||
}, | ||
@@ -662,2 +719,3 @@ close (done) { | ||
client = null | ||
checkAndContain() | ||
done(err) | ||
@@ -703,4 +761,8 @@ }) | ||
let configOk = true | ||
if (node.broadcast !== true && OWNER_TYPE === 'application') { | ||
if (featureEnabled === false) { | ||
configOk = false | ||
node.status({ fill: 'red', shape: 'dot', text: 'feature not available' }) | ||
node.warn('Project Link feature is not available for your current Team.') | ||
} else if (node.broadcast !== true && OWNER_TYPE === 'application') { | ||
configOk = false | ||
node.status({ fill: 'red', shape: 'dot', text: 'unsupported source option' }) | ||
@@ -784,5 +846,14 @@ node.warn('Receiving direct messages is not supported for application assigned devices. Please update the nodes source option to use "Listen for broadcast messages".') | ||
node.broadcast = n.broadcast === true || n.broadcast === 'true' | ||
mqtt.connect() | ||
mqtt.registerStatus(node) | ||
if (featureEnabled === false) { | ||
node.status({ fill: 'red', shape: 'dot', text: 'feature not available' }) | ||
node.warn('Project Link feature is not available for your current Team.') | ||
} else { | ||
mqtt.connect() | ||
mqtt.registerStatus(node) | ||
} | ||
node.on('input', async function (msg, _send, done) { | ||
if (featureEnabled === false) { | ||
done() | ||
return | ||
} | ||
try { | ||
@@ -872,16 +943,24 @@ if (node.mode === 'return') { | ||
} | ||
if (featureEnabled === false) { | ||
node.status({ fill: 'red', shape: 'dot', text: 'feature not available' }) | ||
node.warn('Project Link feature is not available for your current Team.') | ||
} else { | ||
mqtt.connect() | ||
mqtt.registerStatus(node) | ||
// ↓ Useful for debugging ↓ | ||
// console.log(`🔗 LINK-CALL responseTopic SUB ${node.responseTopic}`) | ||
mqtt.subscribe(node, node.responseTopic, { qos: 2 }, onSub) | ||
.then(_result => {}) | ||
.catch(err => { | ||
node.status({ fill: 'red', shape: 'dot', text: 'subscribe error' }) | ||
node.error(err) | ||
}) | ||
} | ||
mqtt.connect() | ||
mqtt.registerStatus(node) | ||
// ↓ Useful for debugging ↓ | ||
// console.log(`🔗 LINK-CALL responseTopic SUB ${node.responseTopic}`) | ||
mqtt.subscribe(node, node.responseTopic, { qos: 2 }, onSub) | ||
.then(_result => {}) | ||
.catch(err => { | ||
node.status({ fill: 'red', shape: 'dot', text: 'subscribe error' }) | ||
node.error(err) | ||
}) | ||
node.on('input', async function (msg, send, done) { | ||
try { | ||
if (featureEnabled === false) { | ||
done() | ||
return | ||
} | ||
const eventId = crypto.randomBytes(14).toString('hex') | ||
@@ -888,0 +967,0 @@ /** @type {MessageEvent} */ |
{ | ||
"name": "@flowfuse/nr-project-nodes", | ||
"version": "0.6.2-ae6ae46-202403151658.0", | ||
"version": "0.6.2", | ||
"description": "A collection of Node-RED nodes for easy communication between Node-RED instances running in the FlowFuse platform", | ||
@@ -5,0 +5,0 @@ "scripts": { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
91167
1029