Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@flowfuse/nr-project-nodes

Package Overview
Dependencies
Maintainers
3
Versions
61
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@flowfuse/nr-project-nodes - npm Package Compare versions

Comparing version 0.6.2-ae6ae46-202403151658.0 to 0.6.2

7

CHANGELOG.md

@@ -0,1 +1,8 @@

### 0.6.2
- Fix Project Nodes multiple MQTT connections issue (#66) @Steve-Mcl
- Display "feature not available" if feature flag is false (#63) @Steve-Mcl
- Better determination of device owner (#65) @Steve-Mcl
- Update npm-publish action version to v2 (#61) @ppawlowski
### 0.6.1

@@ -2,0 +9,0 @@

159

nodes/project-link.js

@@ -23,2 +23,3 @@ module.exports = function (RED) {

const OWNER_TYPE = RED.settings.flowforge.projectID ? 'instance' : 'application'
const featureEnabled = RED.settings.flowforge.projectLink.featureEnabled !== false

@@ -195,4 +196,6 @@ // #region JSDoc

const allNodes = new Set()
/** @type {MQTT.MqttClient} */
/** @type {MQTT.MqttClient} The Single MQTT client connection */
let client
/** @type {MQTT.MqttClient[]} An array to track and auto kill multiple client connections */
const clients = []
let connected = false

@@ -279,2 +282,3 @@ let connecting = false

function onConnect (/** @type {MQTT.IConnackPacket} */ packet) {
checkAndContain() // check and contain multiple clients
connAck.properties = packet.properties

@@ -294,3 +298,4 @@ connAck.reasonCode = packet.reasonCode

function onReconnect () {
RED.log.trace('Project Link nodes reconnecting')
RED.log.info('Project Link nodes reconnecting')
checkAndContain() // check and contain multiple clients
allNodes.forEach(node => {

@@ -312,2 +317,3 @@ try {

}
checkAndContain() // check and contain multiple clients
connected = false

@@ -325,2 +331,3 @@ connecting = false

function onClose (err) {
checkAndContain() // check and contain multiple clients
if (err instanceof Error) {

@@ -385,6 +392,39 @@ RED.log.trace(`Project link connection closed: ${err.message}`)

if (handler && handler !== l.handler) { return true }
client.removeListener(l.event, l.handler)
client && client.removeListener(l.event, l.handler)
return false // found and removed, filter out this one
})
}
/**
* Check all clients and closes any that are NOT the current client (i.e. keeps a max of 1 client open at any time)
* If the current client is closed/undefined, then any/all connections found in the clients array will be closed
* Lastly, the clients array is re-synced to contain only the current client (if it exists)
*/
function checkAndContain () {
// There should only ever be one client, but due to async nature of
// user operations, node-red on(...) events, mqtt auto reconnect,
// network outages and other timing difficulties, it has proven possible to generate multiple clients.
// Therefore, whenever we create a connection, we add it to the clients array (in the `connect()` function)
// That way, we can proactively check all clients and close any that are not the current client
// This is a low cost operation and will only be called when a connect/reconnect/disconnect callback is made
for (let i = 0; i < clients.length; i++) {
if (clients[i] && clients[i] !== client) {
if (client) {
// if we have a client, but this is not it, log the fact we caught a non-current client
RED.log.warn('Project link nodes: cleaning up non current client')
}
try {
clients[i].removeAllListeners()
clients[i].end(true)
} finally {
clients[i] = null
}
}
}
// re-sync the clients array if required
clients.length = 0
if (client) {
clients.push(client)
}
}
return { // public interface

@@ -536,2 +576,13 @@ subscribe (node, topic, options, callback) {

if (client) {
// if client is something, force it to end unconditionally
RED.log.trace('force end() project node client before new connect')
try {
client.removeAllListeners()
client.end(true)
} finally {
client = null
}
}
/** @type {MQTT.IClientOptions} */

@@ -587,2 +638,3 @@ const defaultOptions = {

client = MQTT.connect(parsedURL, options)
clients.push(client) // add to clients array for containment and auto cleanup of multiple clients
on('connect', onConnect)

@@ -600,3 +652,2 @@ on('error', onError)

disconnect (done) {
const closeMessage = null // FUTURE: Let broker/clients know of issue via close msg
const _callback = function (err) {

@@ -606,10 +657,23 @@ connecting = false

closing = false
if (err) {
RED.log.warn(`Project Link nodes disconnect error: ${err.message}`)
}
// By this point, the client will mostly likely have been ended cleanly
// however, there is no harm in forcing it to end here and so far, this
// solves the majority of multiple connect/disconnect issues witnessed.
if (client) {
try {
client.removeAllListeners()
client.end(true) // force end, most likely already ended (cleanly)
} catch (_err) {
// do nothing
}
}
done && typeof done === 'function' && done(err)
}
if (!client) { return _callback() }
const waitEnd = (client, ms) => {
return new Promise((resolve, reject) => {
closing = true
if (!client || !connected) {
if (!client) {
resolve()

@@ -621,10 +685,12 @@ } else {

} else {
// clean end() has exceeded WAIT_END, lets force end!
client && client.end(true)
reject(new Error('timeout'))
reject(new Error('timeout waiting for client clean end'))
}
}, ms)
client.end(() => {
client.end((err) => {
clearTimeout(t)
resolve()
if (err) {
reject(err)
} else {
resolve()
}
})

@@ -634,17 +700,8 @@ }

}
if (connected && closeMessage) {
mqtt.publish(closeMessage, function (err) {
waitEnd(client, 2000).then(() => {
_callback(err)
}).catch((e) => {
_callback(e)
})
})
} else {
waitEnd(client, 2000).then(() => {
_callback()
}).catch((_e) => {
_callback()
})
}
waitEnd(client, 2000).then(() => {
_callback()
}).catch((_e) => {
_callback(_e)
})
},

@@ -662,2 +719,3 @@ close (done) {

client = null
checkAndContain()
done(err)

@@ -703,4 +761,8 @@ })

let configOk = true
if (node.broadcast !== true && OWNER_TYPE === 'application') {
if (featureEnabled === false) {
configOk = false
node.status({ fill: 'red', shape: 'dot', text: 'feature not available' })
node.warn('Project Link feature is not available for your current Team.')
} else if (node.broadcast !== true && OWNER_TYPE === 'application') {
configOk = false
node.status({ fill: 'red', shape: 'dot', text: 'unsupported source option' })

@@ -784,5 +846,14 @@ node.warn('Receiving direct messages is not supported for application assigned devices. Please update the nodes source option to use "Listen for broadcast messages".')

node.broadcast = n.broadcast === true || n.broadcast === 'true'
mqtt.connect()
mqtt.registerStatus(node)
if (featureEnabled === false) {
node.status({ fill: 'red', shape: 'dot', text: 'feature not available' })
node.warn('Project Link feature is not available for your current Team.')
} else {
mqtt.connect()
mqtt.registerStatus(node)
}
node.on('input', async function (msg, _send, done) {
if (featureEnabled === false) {
done()
return
}
try {

@@ -872,16 +943,24 @@ if (node.mode === 'return') {

}
if (featureEnabled === false) {
node.status({ fill: 'red', shape: 'dot', text: 'feature not available' })
node.warn('Project Link feature is not available for your current Team.')
} else {
mqtt.connect()
mqtt.registerStatus(node)
// ↓ Useful for debugging ↓
// console.log(`🔗 LINK-CALL responseTopic SUB ${node.responseTopic}`)
mqtt.subscribe(node, node.responseTopic, { qos: 2 }, onSub)
.then(_result => {})
.catch(err => {
node.status({ fill: 'red', shape: 'dot', text: 'subscribe error' })
node.error(err)
})
}
mqtt.connect()
mqtt.registerStatus(node)
// ↓ Useful for debugging ↓
// console.log(`🔗 LINK-CALL responseTopic SUB ${node.responseTopic}`)
mqtt.subscribe(node, node.responseTopic, { qos: 2 }, onSub)
.then(_result => {})
.catch(err => {
node.status({ fill: 'red', shape: 'dot', text: 'subscribe error' })
node.error(err)
})
node.on('input', async function (msg, send, done) {
try {
if (featureEnabled === false) {
done()
return
}
const eventId = crypto.randomBytes(14).toString('hex')

@@ -888,0 +967,0 @@ /** @type {MessageEvent} */

{
"name": "@flowfuse/nr-project-nodes",
"version": "0.6.2-ae6ae46-202403151658.0",
"version": "0.6.2",
"description": "A collection of Node-RED nodes for easy communication between Node-RED instances running in the FlowFuse platform",

@@ -5,0 +5,0 @@ "scripts": {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc