Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@flowfuse/device-agent

Package Overview
Dependencies
Maintainers
3
Versions
19
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@flowfuse/device-agent - npm Package Compare versions

Comparing version 2.2.0 to 2.3.0

lib/states.js

5

CHANGELOG.md

@@ -0,1 +1,6 @@

#### 2.3.0: Release
- Add support for device actions (#239) @Steve-Mcl
- Save affinity cookie locally so it can be reused between restarts (#238) @knolleary
#### 2.2.0: Release

@@ -2,0 +7,0 @@

820

lib/agent.js

@@ -10,2 +10,3 @@ const { IntervalJitter } = require('./IntervalJitter')

const utils = require('./utils.js')
const { States, isTargetState, isValidState } = require('./states')

@@ -28,2 +29,3 @@ const PROJECT_FILE = 'flowforge-project.json'

this.currentMode = 'autonomous'
this.targetState = config.targetState || States.RUNNING
this.updating = false

@@ -35,4 +37,5 @@ this.queuedUpdate = null

// that the first MQTT check-in will trigger a response
this.currentState = 'unknown'
this.currentState = States.UNKNOWN
this.editorToken = null
this.editorAffinity = null
// ensure licensed property is present (default to null)

@@ -58,2 +61,3 @@ if (utils.hasProperty(this.config, 'licensed') === false) {

this.editorToken = null
this.editorAffinity = null
} else {

@@ -66,4 +70,6 @@ // New format

this.currentMode = config.mode || 'autonomous'
this.targetState = isTargetState(config.targetState) ? config.targetState : States.RUNNING
this.config.licensed = config.licensed || null
this.editorToken = config.editorToken || null
this.editorAffinity = config.editorAffinity || null
}

@@ -91,2 +97,3 @@ this.printAgentStatus()

info(` * Operation Mode : ${this.currentMode || 'unknown'}`)
info(` * Target State : ${this.targetState || States.RUNNING}`)
info(` * Licensed : ${this.config.licensed === null ? 'unknown' : this.config.licensed ? 'yes' : 'no'}`)

@@ -115,4 +122,6 @@ if (typeof this.currentSettings?.env === 'object') {

mode: this.currentMode,
targetState: this.targetState,
licensed: this.config.licensed,
editorToken: this.editorToken
editorToken: this.editorToken,
editorAffinity: this.editorAffinity
}))

@@ -123,3 +132,3 @@ }

if (this.config?.provisioningMode) {
this.currentState = 'provisioning'
this.currentState = States.PROVISIONING
await this.httpClient.startPolling()

@@ -129,2 +138,8 @@ } else {

if (this.config?.brokerURL) {
// ensure http comms are stopped if using MQTT
this.httpClient.stopPolling()
// ensure any existing MQTT comms are stopped before initiating new ones
if (this.mqttClient) {
this.mqttClient.stop()
}
// We have been provided a broker URL to use

@@ -136,3 +151,7 @@ this.mqttClient = mqttClient.newMQTTClient(this, this.config)

} else {
this.currentState = 'stopped'
// ensure MQTT comms are stopped if switching to HTTP
if (this.mqttClient && this.config?.brokerURL) {
this.mqttClient.stop()
}
this.currentState = States.STOPPED
// Fallback to HTTP polling

@@ -146,2 +165,11 @@ await this.httpClient.startPolling()

async stop () {
// Stop the launcher before stopping http/mqtt channels to permit
// audit logging and status updates to the platform
if (this.launcher) {
// Stop the launcher using non std state 'shutdown' to indicate a shutdown.
// This is mainly for consistent logging and preventing the auto restart
// logic kicking in when the agent is stopped
await this.launcher.stop(false, 'shutdown')
this.launcher = undefined
}
await this.httpClient.stopPolling()

@@ -151,7 +179,53 @@ if (this.mqttClient) {

}
}
async restartNR () {
this.currentState = States.RESTARTING
this.retrySetState(false) // clear any retry timers
if (this.launcher) {
await this.launcher.stop()
// Stop the launcher using the state 'restarting'
// This will not be persisted to the targetState property
// It indicates the launcher it should not attempt to auto restart
// the NR process but permit the process to exit gracefully
await this.launcher.stop(false, States.RESTARTING)
this.launcher = undefined
}
await this.updateTargetState(States.RUNNING)
await this.setState({ targetState: States.RUNNING })
return this.targetState === States.RUNNING
}
async startNR () {
await this.updateTargetState(States.RUNNING)
await this.setState({ targetState: States.RUNNING })
return this.targetState === States.RUNNING
}
async suspendNR () {
this.retrySetState(false) // clear any retry timers
// update the settings to indicate the device is suspended so that upon
// a reboot the device agent will not start the launcher
const result = await this.updateTargetState(States.SUSPENDED)
if (this.launcher) {
await this.launcher.stop(false, States.SUSPENDED)
this.launcher = undefined
this.currentState = States.SUSPENDED
}
return result && this.targetState === States.SUSPENDED
}
async updateTargetState (newState) {
if (isTargetState(newState)) {
const changed = this.targetState !== newState
this.targetState = newState
if (changed) {
this.retrySetState(false) // clear any retry timers
this.targetState = newState
await this.saveProject()
}
return true
}
return false
}
async getCurrentPackage () {

@@ -182,3 +256,3 @@ if (this.launcher) {

}
return {
const state = {
ownerType: this.currentOwnerType,

@@ -191,2 +265,3 @@ project: this.currentProject || null,

mode: this.currentMode,
targetState: this.targetState,
health: {

@@ -199,2 +274,6 @@ uptime: Math.floor((Date.now() - this.startTime) / 1000),

}
if (this.currentMode === 'developer' && this.editorToken && this.editorAffinity) {
state.affinity = this.editorAffinity
}
return state
}

@@ -226,372 +305,354 @@

async setState (newState) {
debug(newState)
debug(JSON.stringify(newState))
// If busy updating, queue the update
if (this.updating) {
const queuedUpdateIsTargetStateChange = this.queuedUpdate && typeof this.queuedUpdate === 'object' && utils.hasProperty(this.queuedUpdate, 'targetState')
if (queuedUpdateIsTargetStateChange) {
// the queued update is a target state change request, lets not overwrite it
// unless the new state is also a target state change request
const newStateIsTargetStateChange = typeof newState === 'object' && utils.hasProperty(newState, 'targetState')
if (newStateIsTargetStateChange) {
this.queuedUpdate = newState
}
return
}
this.queuedUpdate = newState
return
}
this.updating = true
// normalise the state object
Agent.normaliseStateObject(newState)
try {
this.updating = true
// normalise the state object
Agent.normaliseStateObject(newState)
// store license status - this property can be used for enabling EE features
if (newState && utils.hasProperty(newState, 'licensed') && typeof newState.licensed === 'boolean') {
const licenseChanged = newState.licensed !== this.config.licensed
if (licenseChanged) {
this.config.licensed = newState.licensed
this.saveProject() // update project file
if (this.config.licensed) {
info('License enabled')
// TODO: handle license change disabled -> enabled. Flag for reload?
} else {
info('License disabled')
// TODO: handle license change enabled -> disabled. Flag for reload?
// store license status - this property can be used for enabling EE features
if (newState && utils.hasProperty(newState, 'licensed') && typeof newState.licensed === 'boolean') {
const licenseChanged = newState.licensed !== this.config.licensed
if (licenseChanged) {
this.config.licensed = newState.licensed
this.saveProject() // update project file
if (this.config.licensed) {
info('License enabled')
// TODO: handle license change disabled -> enabled. Flag for reload?
} else {
info('License disabled')
// TODO: handle license change enabled -> disabled. Flag for reload?
}
}
}
}
/** `forgeSnapshot` will be set to the current snapshot in the forge platform *if required* */
let forgeSnapshot = null
/** `forgeSnapshot` will be set to the current snapshot in the forge platform *if required* */
let forgeSnapshot = null
// first, check if the new state indicates a change of operation mode from the current mode
// When changing from developer mode to autonomous mode, we need to check if the flows/modules
// for Node-RED were changed vs the current snapshot on the forge platform.
// If they differ, we flag that a reload of the snapshot is required.
if (newState !== null && newState.mode && newState.mode !== this.currentMode) {
if (!['developer', 'autonomous'].includes(newState.mode)) {
newState.mode = 'autonomous'
}
if (!this.currentMode) {
this.currentMode = newState.mode
} else if (this.currentMode !== newState.mode) {
this.currentMode = newState.mode
if (newState.mode === 'developer') {
info('Enabling developer mode')
// check to see if this is run state change request
if (typeof newState === 'object' && utils.hasProperty(newState, 'targetState')) {
if (isTargetState(newState.targetState)) {
const changed = newState.targetState !== this.targetState
this.targetState = newState.targetState
await this.saveProject()
} else {
// exiting developer mode
this.editorToken = null // clear the discarded token
let _launcher = this.launcher
if (!_launcher) {
// create a temporary launcher to read the current snapshot on disk
_launcher = Launcher.newLauncher(this.config, this.currentApplication, this.currentProject, this.currentSnapshot, this.currentSettings, this.currentMode)
if (changed) {
this.retrySetState(false) // since this is a target state change, cancel any retry timers
}
}
delete newState.targetState
}
try {
forgeSnapshot = await this.httpClient.getSnapshot()
this.retrySetState(false) // success - stop retry timer
} catch (err) {
if (!this.retrySetStateTimer.isRunning) {
this.currentState = 'error'
warn(`Problem getting snapshot: ${err.toString()}`)
debug(err)
this.retrySetState(newState)
// next, check if the new state indicates a change of operation mode from the current mode
// When changing from developer mode to autonomous mode, we need to check if the flows/modules
// for Node-RED were changed vs the current snapshot on the forge platform.
// If they differ, we flag that a reload of the snapshot is required.
if (newState !== null && newState.mode && newState.mode !== this.currentMode) {
if (!['developer', 'autonomous'].includes(newState.mode)) {
newState.mode = 'autonomous'
}
if (!this.currentMode) {
this.currentMode = newState.mode
} else if (this.currentMode !== newState.mode) {
this.currentMode = newState.mode
if (newState.mode === 'developer') {
info('Enabling developer mode')
await this.saveProject()
} else {
// exiting developer mode
this.editorToken = null
this.editorAffinity = null
let _launcher = this.launcher
if (!_launcher) {
// create a temporary launcher to read the current snapshot on disk
_launcher = Launcher.newLauncher(this, this.currentApplication, this.currentProject, this.currentSnapshot, this.currentSettings, this.currentMode)
}
this.updating = false
return
}
// before checking for changed flows etc, check if the snapshot on disk is the same as the snapshot on the forge platform
// if it has changed, we need to reload the snapshot from the forge platform
if (forgeSnapshot?.id !== _launcher.snapshot?.id) {
info('Local snapshot ID differs from the snapshot on the forge platform')
newState.reloadSnapshot = true
}
try {
forgeSnapshot = await this.httpClient.getSnapshot()
this.retrySetState(false) // success - stop retry timer
} catch (err) {
if (!this.retrySetStateTimer.isRunning) {
this.currentState = States.ERROR
warn(`Problem getting snapshot: ${err.toString()}`)
debug(err)
this.retrySetState(newState)
}
this.updating = false
this.currentState = States.ERROR
this.queuedUpdate = null // we are in error state, clear any queued updates, halt!
return
}
// next check the key system environment variables match
if (newState.reloadSnapshot !== true) {
const checkMatch = (key) => {
return (forgeSnapshot?.env[key] || null) === (_launcher?.snapshot?.env[key] || null)
// before checking for changed flows etc, check if the snapshot on disk is the same as the snapshot on the forge platform
// if it has changed, we need to reload the snapshot from the forge platform
if (forgeSnapshot?.id !== _launcher.snapshot?.id) {
info('Local snapshot ID differs from the snapshot on the forge platform')
newState.reloadSnapshot = true
}
if (newState.ownerType === 'application') {
// TODO: Since this is an early MVP of devices at application level, we fake any updates to the snapshot
// We DONT reload the snapshot from the forge platform because we don't want to overwrite any local
// changes made to flows and modules. This is a temporary workaround until we have a better solution
if (typeof forgeSnapshot?.env === 'object' && typeof _launcher.snapshot?.env === 'object') {
const matchOk = checkMatch('FF_SNAPSHOT_ID') && checkMatch('FF_SNAPSHOT_NAME') && checkMatch('FF_DEVICE_ID') && checkMatch('FF_DEVICE_NAME') && checkMatch('FF_DEVICE_TYPE') && checkMatch('FF_APPLICATION_ID') && checkMatch('FF_APPLICATION_NAME')
if (matchOk === false) {
info('Local environment variables differ from the snapshot on the forge platform')
// manually update the snapshot to match the snapshot from the forge platform
// this is a temporary workaround until we have a better solution for devices at application level
this.currentSnapshot.env.FF_SNAPSHOT_ID = forgeSnapshot.env.FF_SNAPSHOT_ID
this.currentSnapshot.env.FF_SNAPSHOT_NAME = forgeSnapshot.env.FF_SNAPSHOT_NAME
this.currentSnapshot.env.FF_DEVICE_ID = forgeSnapshot.env.FF_DEVICE_ID
this.currentSnapshot.env.FF_DEVICE_NAME = forgeSnapshot.env.FF_DEVICE_NAME
this.currentSnapshot.env.FF_DEVICE_TYPE = forgeSnapshot.env.FF_DEVICE_TYPE
this.currentSnapshot.env.FF_APPLICATION_ID = forgeSnapshot.env.FF_APPLICATION_ID
this.currentSnapshot.env.FF_APPLICATION_NAME = forgeSnapshot.env.FF_APPLICATION_NAME
}
// next check the key system environment variables match
if (newState.reloadSnapshot !== true) {
const checkMatch = (key) => {
return (forgeSnapshot?.env[key] || null) === (_launcher?.snapshot?.env[key] || null)
}
} else {
if (typeof forgeSnapshot?.env === 'object' && typeof _launcher.snapshot?.env === 'object') {
const matchOk = checkMatch('FF_SNAPSHOT_ID') && checkMatch('FF_SNAPSHOT_NAME') && checkMatch('FF_DEVICE_ID') && checkMatch('FF_DEVICE_NAME') && checkMatch('FF_DEVICE_TYPE')
if (matchOk === false) {
info('Local environment variables differ from the snapshot on the forge platform')
newState.reloadSnapshot = true
if (newState.ownerType === 'application') {
// TODO: Since this is an early MVP of devices at application level, we fake any updates to the snapshot
// We DONT reload the snapshot from the forge platform because we don't want to overwrite any local
// changes made to flows and modules. This is a temporary workaround until we have a better solution
if (typeof forgeSnapshot?.env === 'object' && typeof _launcher.snapshot?.env === 'object') {
const matchOk = checkMatch('FF_SNAPSHOT_ID') && checkMatch('FF_SNAPSHOT_NAME') && checkMatch('FF_DEVICE_ID') && checkMatch('FF_DEVICE_NAME') && checkMatch('FF_DEVICE_TYPE') && checkMatch('FF_APPLICATION_ID') && checkMatch('FF_APPLICATION_NAME')
if (matchOk === false) {
info('Local environment variables differ from the snapshot on the forge platform')
// manually update the snapshot to match the snapshot from the forge platform
// this is a temporary workaround until we have a better solution for devices at application level
this.currentSnapshot.env.FF_SNAPSHOT_ID = forgeSnapshot.env.FF_SNAPSHOT_ID
this.currentSnapshot.env.FF_SNAPSHOT_NAME = forgeSnapshot.env.FF_SNAPSHOT_NAME
this.currentSnapshot.env.FF_DEVICE_ID = forgeSnapshot.env.FF_DEVICE_ID
this.currentSnapshot.env.FF_DEVICE_NAME = forgeSnapshot.env.FF_DEVICE_NAME
this.currentSnapshot.env.FF_DEVICE_TYPE = forgeSnapshot.env.FF_DEVICE_TYPE
this.currentSnapshot.env.FF_APPLICATION_ID = forgeSnapshot.env.FF_APPLICATION_ID
this.currentSnapshot.env.FF_APPLICATION_NAME = forgeSnapshot.env.FF_APPLICATION_NAME
}
}
} else {
if (typeof forgeSnapshot?.env === 'object' && typeof _launcher.snapshot?.env === 'object') {
const matchOk = checkMatch('FF_SNAPSHOT_ID') && checkMatch('FF_SNAPSHOT_NAME') && checkMatch('FF_DEVICE_ID') && checkMatch('FF_DEVICE_NAME') && checkMatch('FF_DEVICE_TYPE')
if (matchOk === false) {
info('Local environment variables differ from the snapshot on the forge platform')
newState.reloadSnapshot = true
}
}
}
}
}
// Do a full comparison if this is NOT an application with a "starter" snapshot ID of "0"
const doFull = !(newState.ownerType === 'application' && newState.snapshot === '0')
if (doFull && newState.reloadSnapshot !== true) {
let diskSnapshot = { flows: [], modules: {} }
try {
const modules = (await _launcher.readPackage())?.modules
const flows = await _launcher.readFlow()
diskSnapshot = { flows, modules }
} catch (error) {
info('An error occurred while attempting to read flows & package file from disk')
newState.reloadSnapshot = true
// Do a full comparison if this is NOT an application with a "starter" snapshot ID of "0"
const doFull = !(newState.ownerType === 'application' && newState.snapshot === '0')
if (doFull && newState.reloadSnapshot !== true) {
let diskSnapshot = { flows: [], modules: {} }
try {
const modules = (await _launcher.readPackage())?.modules
const flows = await _launcher.readFlow()
diskSnapshot = { flows, modules }
} catch (error) {
info('An error occurred while attempting to read flows & package file from disk')
newState.reloadSnapshot = true
}
const changes = utils.compareNodeRedData(forgeSnapshot, diskSnapshot) === false
if (changes) {
info('Local flows differ from the snapshot on the forge platform')
newState.reloadSnapshot = true
}
}
const changes = utils.compareNodeRedData(forgeSnapshot, diskSnapshot) === false
if (changes) {
info('Local flows differ from the snapshot on the forge platform')
newState.reloadSnapshot = true
if (newState.reloadSnapshot) {
info('Local flows have changed. Restoring current snapshot')
} else {
// only save the project if the snapshot is not being reloaded
// since the snapshot will be reloaded and the project will be saved then
await this.saveProject()
}
info('Disabling developer mode')
}
if (newState.reloadSnapshot) {
info('Local flows have changed. Restoring current snapshot')
} else {
// only save the project if the snapshot is not being reloaded
// since the snapshot will be reloaded and the project will be saved then
await this.saveProject()
}
info('Disabling developer mode')
// report the new mode for more instantaneous feedback (improve the UX)
this.checkIn(2)
}
// report the new mode for more instantaneous feedback (improve the UX)
this.checkIn(2)
}
}
/** A flag to inhibit updates if we are in developer mode */
const developerMode = this.currentMode === 'developer'
/** A flag to inhibit updates if we are in developer mode */
const developerMode = this.currentMode === 'developer'
/** A flag to indicate execution should skip to the update step */
const skipToUpdate = newState?.reloadSnapshot === true
/** A flag to indicate execution should skip to the update step */
const skipToUpdate = newState?.reloadSnapshot === true
if (newState === null) {
// The agent should not be running (bad credentials/device details)
// Wipe the local configuration
if (developerMode === false) {
this.stop()
if (newState === null) {
// The agent should not be running (bad credentials/device details)
// Wipe the local configuration
if (developerMode === false) {
await this.stop()
this.currentSnapshot = null
this.currentApplication = null
this.currentProject = null
this.currentSettings = null
this.currentMode = null
this.editorToken = null
this.editorAffinity = null
await this.saveProject()
this.currentState = States.STOPPED
this.updating = false
}
} else if (!skipToUpdate && developerMode === false && newState.application === null && this.currentOwnerType === 'application') {
if (this.currentApplication) {
debug('Removed from application')
}
// Device unassigned from application
if (this.mqttClient) {
this.mqttClient.setApplication(null)
}
// Stop the device if running - with clean flag
if (this.launcher) {
await this.launcher.stop(true)
this.launcher = undefined
}
this.currentApplication = null
this.currentSnapshot = null
this.currentApplication = null
this.currentProject = null
this.currentSettings = null
this.currentMode = null
this.editorToken = null
// if new settings hash is explicitly null, clear the current settings
// otherwise, if currentSettings.hash exists, see if it differs from the
// new settings hash & update accordingly
if (newState.settings === null) {
this.currentSettings = null
} else if (this.currentSettings?.hash) {
if (this.currentSettings.hash !== newState.settings) {
this.currentSettings = await this.httpClient.getSettings()
}
}
await this.saveProject()
this.currentState = 'stopped'
this.currentState = States.STOPPED
this.updating = false
}
} else if (!skipToUpdate && developerMode === false && newState.application === null && this.currentOwnerType === 'application') {
if (this.currentApplication) {
debug('Removed from application')
}
// Device unassigned from application
if (this.mqttClient) {
this.mqttClient.setApplication(null)
}
// Stop the device if running - with clean flag
if (this.launcher) {
await this.launcher.stop(true)
this.launcher = undefined
}
this.currentApplication = null
this.currentSnapshot = null
// if new settings hash is explicitly null, clear the current settings
// otherwise, if currentSettings.hash exists, see if it differs from the
// new settings hash & update accordingly
if (newState.settings === null) {
this.currentSettings = null
} else if (this.currentSettings?.hash) {
if (this.currentSettings.hash !== newState.settings) {
this.currentSettings = await this.httpClient.getSettings()
} else if (!skipToUpdate && developerMode === false && newState.project === null && this.currentOwnerType === 'project') {
if (this.currentProject) {
debug('Removed from project')
}
}
await this.saveProject()
this.currentState = 'stopped'
this.updating = false
} else if (!skipToUpdate && developerMode === false && newState.project === null && this.currentOwnerType === 'project') {
if (this.currentProject) {
debug('Removed from project')
}
// Device unassigned from project
if (this.mqttClient) {
this.mqttClient.setProject(null)
}
// Stop the project if running - with clean flag
if (this.launcher) {
await this.launcher.stop(true)
this.launcher = undefined
}
this.currentProject = null
this.currentSnapshot = null
// if new settings hash is explicitly null, clear the current settings
// otherwise, if currentSettings.hash exists, see if it differs from the
// new settings hash & update accordingly
if (newState.settings === null) {
this.currentSettings = null
} else if (this.currentSettings?.hash) {
if (this.currentSettings.hash !== newState.settings) {
this.currentSettings = await this.httpClient.getSettings()
// Device unassigned from project
if (this.mqttClient) {
this.mqttClient.setProject(null)
}
}
await this.saveProject()
this.currentState = 'stopped'
this.updating = false
} else if (!skipToUpdate && developerMode === false && newState.snapshot === null) {
// Snapshot removed, but project/application still set
if (this.currentSnapshot) {
debug('Active snapshot removed')
// Stop the project if running - with clean flag
if (this.launcher) {
await this.launcher.stop(true)
this.launcher = undefined
}
this.currentProject = null
this.currentSnapshot = null
// if new settings hash is explicitly null, clear the current settings
// otherwise, if currentSettings.hash exists, see if it differs from the
// new settings hash & update accordingly
if (newState.settings === null) {
this.currentSettings = null
} else if (this.currentSettings?.hash) {
if (this.currentSettings.hash !== newState.settings) {
this.currentSettings = await this.httpClient.getSettings()
}
}
await this.saveProject()
}
let setApp = false
let setProject = false
if (utils.hasProperty(newState, 'application')) {
if (newState.application !== this.currentApplication) {
this.currentApplication = newState.application
setApp = true
this.currentState = States.STOPPED
this.updating = false
} else if (!skipToUpdate && developerMode === false && newState.snapshot === null) {
// Snapshot removed, but project/application still set
if (this.currentSnapshot) {
debug('Active snapshot removed')
this.currentSnapshot = null
await this.saveProject()
}
}
if (utils.hasProperty(newState, 'project')) {
if (newState.project !== this.currentProject) {
this.currentProject = newState.project
setProject = true
let setApp = false
let setProject = false
if (utils.hasProperty(newState, 'application')) {
if (newState.application !== this.currentApplication) {
this.currentApplication = newState.application
setApp = true
}
}
}
if (utils.hasProperty(newState, 'project')) {
if (newState.project !== this.currentProject) {
this.currentProject = newState.project
setProject = true
}
}
if (this.mqttClient) {
if (setApp) {
this.mqttClient.setProject(null)
this.mqttClient.setApplication(this.currentApplication)
if (this.mqttClient) {
if (setApp) {
this.mqttClient.setProject(null)
this.mqttClient.setApplication(this.currentApplication)
}
if (setProject) {
this.mqttClient.setApplication(null)
this.mqttClient.setProject(this.currentProject)
}
}
if (setProject) {
this.mqttClient.setApplication(null)
this.mqttClient.setProject(this.currentProject)
if (setApp || setProject) {
await this.saveProject()
this.checkIn(2)
}
}
if (setApp || setProject) {
await this.saveProject()
this.checkIn(2)
}
if (this.launcher) {
await this.launcher.stop(true)
this.launcher = undefined
}
this.currentState = 'stopped'
this.updating = false
} else {
// Check if any updates are needed
let updateSnapshot = false
let updateSettings = false
const unknownOrStopped = (this.currentState === 'unknown' || this.currentState === 'stopped')
const snapShotUpdatePending = !!(!this.currentSnapshot && newState.snapshot)
const projectUpdatePending = !!(newState.ownerType === 'project' && !this.currentProject && newState.project)
const applicationUpdatePending = !!(newState.ownerType === 'application' && !this.currentApplication && newState.application)
if (unknownOrStopped && developerMode && snapShotUpdatePending && (projectUpdatePending || applicationUpdatePending)) {
info('Developer Mode: no flows found - updating to latest snapshot')
this.currentProject = newState.project
this.currentApplication = newState.application
updateSnapshot = true
updateSettings = true
} else if (developerMode === false) {
if (utils.hasProperty(newState, 'project') && (!this.currentSnapshot || newState.project !== this.currentProject)) {
info('New instance assigned')
this.currentApplication = null
if (this.launcher) {
await this.launcher.stop(true)
this.launcher = undefined
}
this.currentState = States.STOPPED
this.updating = false
} else {
// Check if any updates are needed
let updateSnapshot = false
let updateSettings = false
const unknownOrStopped = (this.currentState === States.UNKNOWN || this.currentState === States.STOPPED)
const snapShotUpdatePending = !!(!this.currentSnapshot && newState.snapshot)
const projectUpdatePending = !!(newState.ownerType === 'project' && !this.currentProject && newState.project)
const applicationUpdatePending = !!(newState.ownerType === 'application' && !this.currentApplication && newState.application)
if (unknownOrStopped && developerMode && snapShotUpdatePending && (projectUpdatePending || applicationUpdatePending)) {
info('Developer Mode: no flows found - updating to latest snapshot')
this.currentProject = newState.project
// Update everything
updateSnapshot = true
updateSettings = true
} else if (utils.hasProperty(newState, 'application') && (!this.currentSnapshot || newState.application !== this.currentApplication)) {
info('New application assigned')
this.currentProject = null
this.currentApplication = newState.application
// Update everything
updateSnapshot = true
updateSettings = true
} else {
if (utils.hasProperty(newState, 'snapshot') && (!this.currentSnapshot || newState.snapshot !== this.currentSnapshot.id)) {
info('New snapshot available')
} else if (developerMode === false) {
if (utils.hasProperty(newState, 'project') && (!this.currentSnapshot || newState.project !== this.currentProject)) {
info('New instance assigned')
this.currentApplication = null
this.currentProject = newState.project
// Update everything
updateSnapshot = true
}
// reloadSnapshot is a special case - it is used to force a reload of the current
// snapshot following a change from autonomous to developer mode
if (newState.reloadSnapshot === true && updateSnapshot === false) {
info('Reload snapshot requested')
updateSettings = true
} else if (utils.hasProperty(newState, 'application') && (!this.currentSnapshot || newState.application !== this.currentApplication)) {
info('New application assigned')
this.currentProject = null
this.currentApplication = newState.application
// Update everything
updateSnapshot = true
}
if (utils.hasProperty(newState, 'settings') && (!this.currentSettings || newState.settings !== this.currentSettings?.hash)) {
info('New settings available')
updateSettings = true
}
if (this.currentSettings === null) {
updateSettings = true
}
// If the snapshot is to be updated, the settings must also be updated
// this is because snapshot includes special, platform defined environment variables e.g. FF_SNAPSHOT_ID
if (updateSnapshot === true) {
updateSettings = true
}
}
}
if (!skipToUpdate && !updateSnapshot && !updateSettings) {
// Nothing to update.
// Start the launcher with the current config, Snapshot & settings
if (!this.launcher && this.currentSnapshot) {
this.launcher = Launcher.newLauncher(this.config, this.currentApplication, this.currentProject, this.currentSnapshot, this.currentSettings, this.currentMode)
await this.launcher.start()
if (this.mqttClient) {
this.mqttClient.setProject(this.currentProject)
this.mqttClient.setApplication(this.currentApplication)
if (developerMode && this.editorToken) {
this.mqttClient.startTunnel(this.editorToken)
} else {
if (utils.hasProperty(newState, 'snapshot') && (!this.currentSnapshot || newState.snapshot !== this.currentSnapshot.id)) {
info('New snapshot available')
updateSnapshot = true
}
}
this.checkIn(2)
this.currentState = 'stopped'
}
this.updating = false
} else {
// At this point of the state machine, we are to stop the launcher and update the snapshot and/or settings
// then start the launcher with the new snapshot and/or settings
// Stop the launcher if currently running
this.currentState = 'updating'
if (this.launcher) {
info('Stopping current snapshot')
await this.launcher.stop()
this.launcher = undefined
}
if (updateSnapshot) {
try {
this.currentSnapshot = forgeSnapshot || await this.httpClient.getSnapshot()
this.retrySetState(false) // success - stop retry timer
} catch (err) {
if (!this.retrySetStateTimer.isRunning) {
this.currentState = 'error'
warn(`Problem getting snapshot: ${err.toString()}`)
debug(err)
this.retrySetState(newState)
// reloadSnapshot is a special case - it is used to force a reload of the current
// snapshot following a change from autonomous to developer mode
if (newState.reloadSnapshot === true && updateSnapshot === false) {
info('Reload snapshot requested')
updateSnapshot = true
}
this.updating = false
return
if (utils.hasProperty(newState, 'settings') && (!this.currentSettings || newState.settings !== this.currentSettings?.hash)) {
info('New settings available')
updateSettings = true
}
if (this.currentSettings === null) {
updateSettings = true
}
// If the snapshot is to be updated, the settings must also be updated
// this is because snapshot includes special, platform defined environment variables e.g. FF_SNAPSHOT_ID
if (updateSnapshot === true) {
updateSettings = true
}
}
}
if (updateSettings) {
this.currentSettings = await this.httpClient.getSettings()
}
if (this.currentSnapshot?.id) {
try {
await this.saveProject()
this.printAgentStatus('Launching with new settings...')
this.launcher = Launcher.newLauncher(this.config, this.currentApplication, this.currentProject, this.currentSnapshot, this.currentSettings, this.currentMode)
await this.launcher.writeConfiguration({ updateSnapshot, updateSettings })
if (!skipToUpdate && !updateSnapshot && !updateSettings) {
// Nothing to update. So long as the target state is not SUSPENDED,
// start the launcher with the current config, Snapshot & settings
if (!this.launcher && this.currentSnapshot && this.targetState !== States.SUSPENDED) {
this.launcher = Launcher.newLauncher(this, this.currentApplication, this.currentProject, this.currentSnapshot, this.currentSettings, this.currentMode)
await this.launcher.start()

@@ -601,28 +662,104 @@ if (this.mqttClient) {

this.mqttClient.setApplication(this.currentApplication)
if (developerMode && this.editorToken) {
this.mqttClient.startTunnel(this.editorToken)
if (developerMode && this.editorToken && this.launcher) {
this.mqttClient.startTunnel(this.editorToken, this.editorAffinity)
}
}
this.currentState = this.launcher.state
this.checkIn(2)
} catch (err) {
warn(`Error whilst starting Node-RED: ${err.toString()}`)
if (this.launcher) {
await this.launcher.stop(true)
}
}
this.updating = false
} else {
// At this point of the state machine, we are to stop the launcher and update the snapshot and/or settings
// then start the launcher with the new snapshot and/or settings
// Stop the launcher if currently running
this.currentState = States.UPDATING
if (this.launcher) {
info('Stopping current snapshot')
await this.launcher.stop(false, States.UPDATING)
this.launcher = undefined
}
if (updateSnapshot) {
try {
this.currentSnapshot = forgeSnapshot || await this.httpClient.getSnapshot()
this.retrySetState(false) // success - stop retry timer
} catch (err) {
if (!this.retrySetStateTimer.isRunning) {
this.currentState = States.ERROR
warn(`Problem getting snapshot: ${err.toString()}`)
debug(err)
this.retrySetState(newState)
}
this.updating = false
return
}
}
if (updateSettings) {
this.currentSettings = await this.httpClient.getSettings()
}
if (this.currentSnapshot?.id) {
try {
await this.saveProject()
let optimisticState = States.STOPPED
let performStart = true
this.currentState = States.UPDATING
if (this.targetState === States.SUSPENDED) {
this.printAgentStatus('Applying new settings...')
performStart = false
optimisticState = States.SUSPENDED
} else if (this.targetState === States.RUNNING) {
this.printAgentStatus('Launching with new settings...')
optimisticState = States.STARTING
}
this.launcher = Launcher.newLauncher(this, this.currentApplication, this.currentProject, this.currentSnapshot, this.currentSettings, this.currentMode)
await this.launcher.writeConfiguration({ updateSnapshot, updateSettings })
if (performStart) {
await this.launcher?.start()
} else {
this.launcher = undefined
}
if (this.mqttClient) {
this.mqttClient.setProject(this.currentProject)
this.mqttClient.setApplication(this.currentApplication)
if (developerMode && this.editorToken && this.launcher) {
this.mqttClient.startTunnel(this.editorToken, this.editorAffinity)
}
}
this.currentState = optimisticState
this.checkIn(2)
} catch (err) {
warn(`Error whilst starting Node-RED: ${err.toString()}`)
if (this.launcher) {
await this.launcher.stop(true, States.ERROR)
}
this.launcher = undefined
this.currentState = States.ERROR
this.queuedUpdate = null // we are in error state, clear any queued updates, halt!
}
}
}
}
if (!this.launcher) {
if (this.targetState === States.SUSPENDED) {
this.currentState = States.SUSPENDED
} else if (isValidState(this.currentState) === false) {
this.currentState = States.STOPPED
}
} else {
this.currentState = this.launcher?.state || States.RUNNING
}
} finally {
this.updating = false
if (this.queuedUpdate) {
const update = this.queuedUpdate
this.queuedUpdate = null
this.setState(update).catch(err => {
this.updating = false
warn(`Error whilst processing queued update: ${err.toString()}`)
debug(err)
})
}
}
this.currentState = this.launcher ? 'running' : 'stopped'
this.updating = false
if (this.queuedUpdate) {
const update = this.queuedUpdate
this.queuedUpdate = null
this.setState(update).catch(err => {
this.updating = false
warn(`Error whilst processing queued update: ${err.toString()}`)
debug(err)
})
}
}

@@ -709,5 +846,6 @@

async saveEditorToken (token) {
const changed = this.editorToken !== token
async saveEditorToken (token, affinity) {
const changed = (this.editorToken !== token || this.editorAffinity !== affinity)
this.editorToken = token
this.editorAffinity = affinity
if (changed) {

@@ -714,0 +852,0 @@ await this.saveProject()

@@ -25,3 +25,3 @@ const { info, warn, debug } = require('../logging/log')

this.options = options || {}
this.affinity = undefined
this.affinity = this.options.affinity

@@ -28,0 +28,0 @@ // How long to wait before attempting to reconnect. Start at 500ms - back

@@ -67,2 +67,6 @@ const got = require('got').default

isPolling () {
return this.heartbeat.isRunning
}
async checkIn () {

@@ -69,0 +73,0 @@ const payload = this.agent.getState()

@@ -7,2 +7,3 @@ const childProcess = require('child_process')

const { copyDir } = require('./utils')
const { States } = require('./states')

@@ -26,4 +27,4 @@ const MIN_RESTART_TIME = 10000 // 10 seconds

class Launcher {
constructor (config, application, project, snapshot, settings, mode) {
this.config = config
constructor (agent, application, project, snapshot, settings, mode) {
this.config = agent?.config
this.application = application

@@ -36,3 +37,8 @@ this.project = project

this.startTime = []
this.state = 'stopped'
this.state = States.STOPPED
this.stopReason = ''
this.installProcess = null
this.deferredStop = null
/** @type {import('./agent.js').Agent */
this.agent = agent

@@ -95,2 +101,3 @@ this.auditLogURL = `${this.config.forgeURL}/logging/device/${this.config.deviceId}/audit`

info('Installing dependencies')
this.state = States.UPDATING
if (this.config.moduleCache) {

@@ -112,3 +119,3 @@ info('Using module_cache')

} else {
return new Promise((resolve, reject) => {
this.installProcess = new Promise((resolve, reject) => {
childProcess.exec('npm install --production', {

@@ -119,2 +126,3 @@ cwd: this.projectDir

resolve()
this.installProcess = null
} else {

@@ -124,5 +132,7 @@ warn('Install failed')

reject(error)
this.installProcess = null
}
})
})
return this.installProcess
}

@@ -287,2 +297,3 @@ }

if (!options || options.updateSnapshot) {
this.state = States.INSTALLING
await this.writeNPMRCFile()

@@ -323,3 +334,4 @@ await this.writePackage()

}
this.state = 'starting'
this.state = States.STARTING
if (!existsSync(this.projectDir) ||

@@ -368,2 +380,4 @@ !existsSync(this.files.flows) ||

info('Starting Node-RED')
this.state = States.STARTING // state may have been changed by stop() or deferredStop or Installing
this.stopReason = ''
const appEnv = env

@@ -401,8 +415,18 @@ const processArgs = [

}
this.state = 'running'
this.state = States.RUNNING
})
this.proc.on('exit', async (code, signal) => {
this.state = 'stopped'
if (!this.shuttingDown) {
// determine if Node-RED exited for an expected reason
// if yes, don't restart it since it was specifically stopped (e.g. not crashed)
const expected = ['shutdown', States.RESTARTING, States.UPDATING, States.SUSPENDED].includes(this.stopReason)
if (expected) {
this.state = States.STOPPED // assume stopped
} else {
this.state = States.CRASHED // assume crashed
}
if (this.exitCallback) {
this.exitCallback()
}
if (!expected) {
let restart = true

@@ -418,4 +442,5 @@ if (this.startTime.length === MAX_RESTART_COUNT) {

info('Node-RED restart loop detected - stopping')
this.state = 'crashed'
this.state = States.CRASHED
restart = false
await this.agent?.checkIn()
}

@@ -427,8 +452,2 @@ }

}
} else {
// is really shutting down (i.e. was commanded by the
// agent, so we won't be doing an auto restart)
if (this.exitCallback) {
this.exitCallback()
}
}

@@ -455,4 +474,18 @@ })

async stop (clean) {
info('Stopping Node-RED')
async stop (clean, reason) {
if (this.installProcess && this.state === States.INSTALLING) {
// If the launcher is currently installing, we should try not to interrupt this
// to avoid corruption (NPM can leave temporary directories preventing future installs)
// We should wait for the install to finish before stopping
// give it a few seconds to finish
const timeout = new Promise(resolve => setTimeout(resolve, 10000))
await Promise.race([this.installProcess, timeout])
// now proceed with stopping, regardless of whether the install finished
}
let finalState = States.STOPPED
this.stopReason = reason || 'shutdown'
info('Stopping Node-RED. Reason: ' + this.stopReason)
if (this.stopReason === States.SUSPENDED) {
finalState = States.SUSPENDED
}
if (this.deferredStop) {

@@ -472,6 +505,7 @@ // A stop request is already inflight - return the existing deferred object

}
info('Node-RED Stopped')
await this.agent?.checkIn() // let FF know we've stopped
}
if (this.proc) {
this.shuttingDown = true
// Setup a promise that will resolve once the process has really exited

@@ -501,7 +535,7 @@ this.deferredStop = new Promise((resolve, reject) => {

this.proc.kill()
this.state = 'stopped'
this.state = finalState
})
return this.deferredStop
} else {
this.state = 'stopped'
this.state = finalState
await postShutdownOps()

@@ -513,4 +547,4 @@ }

module.exports = {
newLauncher: (config, application, project, snapshot, settings, mode) => new Launcher(config, application, project, snapshot, settings, mode),
newLauncher: (agent, application, project, snapshot, settings, mode) => new Launcher(agent, application, project, snapshot, settings, mode),
Launcher
}

@@ -92,3 +92,2 @@ const mqtt = require('mqtt')

await this.agent.setState(msg)
return
} else if (msg.command === 'startLog') {

@@ -99,10 +98,9 @@ if (!this.logEnabled) {

this.logEnabled = true
return
} else if (msg.command === 'stopLog') {
this.logEnabled = false
return
} else if (msg.command === 'startEditor') {
await this.startTunnel(msg.payload?.token, msg)
return
await this.startTunnel(msg.payload?.token, this.agent.editorAffinity || null, msg)
} else if (msg.command === 'stopEditor') {
// Clear the saved token
await this.saveEditorToken(null, null)
if (this.tunnel) {

@@ -113,3 +111,2 @@ info('Disabling remote editor access')

}
return
} else if (msg.command === 'upload') {

@@ -120,5 +117,7 @@ info('Capturing device snapshot')

this.sendCommandResponse(msg, response)
return
} else if (msg.command === 'action') {
await this.handleActionRequest(msg)
} else {
warn(`Unknown command type received from platform: ${msg.command}`)
}
warn(`Unknown command type received from platform: ${msg.command}`)
} catch (err) {

@@ -141,2 +140,47 @@ warn(err)

/**
* Perform a device action of starting, restarting or suspending the Node-RED instance
* @param {Object} msg - the incoming message data
*/
async handleActionRequest (msg) {
const action = msg?.payload?.action || ''
try {
let result = false
let error = null
switch (action) {
case 'start':
info('Node-RED start requested')
result = await this.agent.startNR()
break
case 'restart':
info('Node-RED restart requested')
result = await this.agent.restartNR()
break
case 'suspend':
info('Node-RED suspend requested')
result = await this.agent.suspendNR()
break
default:
error = new Error(`Unsupported action requested: ${action}`)
error.code = 'unsupported_action'
throw error
}
if (result) {
this.sendCommandResponse(msg, { success: result })
} else {
throw new Error('Requested action ' + action + ' failed')
}
} catch (err) {
warn(err.toString())
debug(err)
const error = {
message: err.toString(),
code: err.code || 'unexpected_error',
error: err.message || 'Unexpected error'
}
this.sendCommandResponse(msg, { success: false, error })
}
this.agent.checkIn(3, 1000) // attempt a check in (3 retries, 1s interval)
}
stop () {

@@ -149,3 +193,7 @@ if (this.heartbeat.isRunning) {

setMQTT(undefined)
this.client && this.client.end()
if (this.client) {
this.setApplication(null) // unsubscribe from application commands
this.setProject(null) // unsubscribe from application commands
this.client.end()
}
}

@@ -275,3 +323,3 @@

async startTunnel (token, msg) {
async startTunnel (token, affinity, msg) {
info('Enabling remote editor access')

@@ -292,3 +340,3 @@ try {

// * Enable Device Editor (Step 6) - (forge:MQTT->device) Create the tunnel on the device
this.tunnel = EditorTunnel.create(this.config, { token })
this.tunnel = EditorTunnel.create(this.config, { token, affinity })

@@ -299,7 +347,13 @@ // * Enable Device Editor (Step 7) - (device) Begin the device tunnel connect process

// store the token for later use (i.e. device agent is restarted)
await this.saveEditorToken(result ? token : null)
if (result) {
await this.saveEditorToken(token, this.tunnel.affinity)
} else {
// Failed to connect - clear the token/affinity so it can be
// refreshed
await this.saveEditorToken(null, null)
}
if (msg) {
// * Enable Device Editor (Step 10) - (device->forge:MQTT) Send a response to the platform
this.sendCommandResponse(msg, { connected: result, token })
this.sendCommandResponse(msg, { connected: result, token, affinity: this.tunnel.affinity })
}

@@ -315,4 +369,4 @@ } catch (err) {

async saveEditorToken (token) {
await this.agent?.saveEditorToken(token)
async saveEditorToken (token, affinity) {
await this.agent?.saveEditorToken(token, affinity)
}

@@ -319,0 +373,0 @@ }

{
"name": "@flowfuse/device-agent",
"version": "2.2.0",
"version": "2.3.0",
"description": "An Edge Agent for running Node-RED instances deployed from the FlowFuse Platform",

@@ -5,0 +5,0 @@ "exports": {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc