@flowfuse/device-agent
Advanced tools
Comparing version 2.2.0 to 2.3.0
@@ -0,1 +1,6 @@ | ||
#### 2.3.0: Release | ||
- Add support for device actions (#239) @Steve-Mcl | ||
- Save affinity cookie locally so it can be reused between restarts (#238) @knolleary | ||
#### 2.2.0: Release | ||
@@ -2,0 +7,0 @@ |
820
lib/agent.js
@@ -10,2 +10,3 @@ const { IntervalJitter } = require('./IntervalJitter') | ||
const utils = require('./utils.js') | ||
const { States, isTargetState, isValidState } = require('./states') | ||
@@ -28,2 +29,3 @@ const PROJECT_FILE = 'flowforge-project.json' | ||
this.currentMode = 'autonomous' | ||
this.targetState = config.targetState || States.RUNNING | ||
this.updating = false | ||
@@ -35,4 +37,5 @@ this.queuedUpdate = null | ||
// that the first MQTT check-in will trigger a response | ||
this.currentState = 'unknown' | ||
this.currentState = States.UNKNOWN | ||
this.editorToken = null | ||
this.editorAffinity = null | ||
// ensure licensed property is present (default to null) | ||
@@ -58,2 +61,3 @@ if (utils.hasProperty(this.config, 'licensed') === false) { | ||
this.editorToken = null | ||
this.editorAffinity = null | ||
} else { | ||
@@ -66,4 +70,6 @@ // New format | ||
this.currentMode = config.mode || 'autonomous' | ||
this.targetState = isTargetState(config.targetState) ? config.targetState : States.RUNNING | ||
this.config.licensed = config.licensed || null | ||
this.editorToken = config.editorToken || null | ||
this.editorAffinity = config.editorAffinity || null | ||
} | ||
@@ -91,2 +97,3 @@ this.printAgentStatus() | ||
info(` * Operation Mode : ${this.currentMode || 'unknown'}`) | ||
info(` * Target State : ${this.targetState || States.RUNNING}`) | ||
info(` * Licensed : ${this.config.licensed === null ? 'unknown' : this.config.licensed ? 'yes' : 'no'}`) | ||
@@ -115,4 +122,6 @@ if (typeof this.currentSettings?.env === 'object') { | ||
mode: this.currentMode, | ||
targetState: this.targetState, | ||
licensed: this.config.licensed, | ||
editorToken: this.editorToken | ||
editorToken: this.editorToken, | ||
editorAffinity: this.editorAffinity | ||
})) | ||
@@ -123,3 +132,3 @@ } | ||
if (this.config?.provisioningMode) { | ||
this.currentState = 'provisioning' | ||
this.currentState = States.PROVISIONING | ||
await this.httpClient.startPolling() | ||
@@ -129,2 +138,8 @@ } else { | ||
if (this.config?.brokerURL) { | ||
// ensure http comms are stopped if using MQTT | ||
this.httpClient.stopPolling() | ||
// ensure any existing MQTT comms are stopped before initiating new ones | ||
if (this.mqttClient) { | ||
this.mqttClient.stop() | ||
} | ||
// We have been provided a broker URL to use | ||
@@ -136,3 +151,7 @@ this.mqttClient = mqttClient.newMQTTClient(this, this.config) | ||
} else { | ||
this.currentState = 'stopped' | ||
// ensure MQTT comms are stopped if switching to HTTP | ||
if (this.mqttClient && this.config?.brokerURL) { | ||
this.mqttClient.stop() | ||
} | ||
this.currentState = States.STOPPED | ||
// Fallback to HTTP polling | ||
@@ -146,2 +165,11 @@ await this.httpClient.startPolling() | ||
async stop () { | ||
// Stop the launcher before stopping http/mqtt channels to permit | ||
// audit logging and status updates to the platform | ||
if (this.launcher) { | ||
// Stop the launcher using non std state 'shutdown' to indicate a shutdown. | ||
// This is mainly for consistent logging and preventing the auto restart | ||
// logic kicking in when the agent is stopped | ||
await this.launcher.stop(false, 'shutdown') | ||
this.launcher = undefined | ||
} | ||
await this.httpClient.stopPolling() | ||
@@ -151,7 +179,53 @@ if (this.mqttClient) { | ||
} | ||
} | ||
async restartNR () { | ||
this.currentState = States.RESTARTING | ||
this.retrySetState(false) // clear any retry timers | ||
if (this.launcher) { | ||
await this.launcher.stop() | ||
// Stop the launcher using the state 'restarting' | ||
// This will not be persisted to the targetState property | ||
// It indicates the launcher it should not attempt to auto restart | ||
// the NR process but permit the process to exit gracefully | ||
await this.launcher.stop(false, States.RESTARTING) | ||
this.launcher = undefined | ||
} | ||
await this.updateTargetState(States.RUNNING) | ||
await this.setState({ targetState: States.RUNNING }) | ||
return this.targetState === States.RUNNING | ||
} | ||
async startNR () { | ||
await this.updateTargetState(States.RUNNING) | ||
await this.setState({ targetState: States.RUNNING }) | ||
return this.targetState === States.RUNNING | ||
} | ||
async suspendNR () { | ||
this.retrySetState(false) // clear any retry timers | ||
// update the settings to indicate the device is suspended so that upon | ||
// a reboot the device agent will not start the launcher | ||
const result = await this.updateTargetState(States.SUSPENDED) | ||
if (this.launcher) { | ||
await this.launcher.stop(false, States.SUSPENDED) | ||
this.launcher = undefined | ||
this.currentState = States.SUSPENDED | ||
} | ||
return result && this.targetState === States.SUSPENDED | ||
} | ||
async updateTargetState (newState) { | ||
if (isTargetState(newState)) { | ||
const changed = this.targetState !== newState | ||
this.targetState = newState | ||
if (changed) { | ||
this.retrySetState(false) // clear any retry timers | ||
this.targetState = newState | ||
await this.saveProject() | ||
} | ||
return true | ||
} | ||
return false | ||
} | ||
async getCurrentPackage () { | ||
@@ -182,3 +256,3 @@ if (this.launcher) { | ||
} | ||
return { | ||
const state = { | ||
ownerType: this.currentOwnerType, | ||
@@ -191,2 +265,3 @@ project: this.currentProject || null, | ||
mode: this.currentMode, | ||
targetState: this.targetState, | ||
health: { | ||
@@ -199,2 +274,6 @@ uptime: Math.floor((Date.now() - this.startTime) / 1000), | ||
} | ||
if (this.currentMode === 'developer' && this.editorToken && this.editorAffinity) { | ||
state.affinity = this.editorAffinity | ||
} | ||
return state | ||
} | ||
@@ -226,372 +305,354 @@ | ||
async setState (newState) { | ||
debug(newState) | ||
debug(JSON.stringify(newState)) | ||
// If busy updating, queue the update | ||
if (this.updating) { | ||
const queuedUpdateIsTargetStateChange = this.queuedUpdate && typeof this.queuedUpdate === 'object' && utils.hasProperty(this.queuedUpdate, 'targetState') | ||
if (queuedUpdateIsTargetStateChange) { | ||
// the queued update is a target state change request, lets not overwrite it | ||
// unless the new state is also a target state change request | ||
const newStateIsTargetStateChange = typeof newState === 'object' && utils.hasProperty(newState, 'targetState') | ||
if (newStateIsTargetStateChange) { | ||
this.queuedUpdate = newState | ||
} | ||
return | ||
} | ||
this.queuedUpdate = newState | ||
return | ||
} | ||
this.updating = true | ||
// normalise the state object | ||
Agent.normaliseStateObject(newState) | ||
try { | ||
this.updating = true | ||
// normalise the state object | ||
Agent.normaliseStateObject(newState) | ||
// store license status - this property can be used for enabling EE features | ||
if (newState && utils.hasProperty(newState, 'licensed') && typeof newState.licensed === 'boolean') { | ||
const licenseChanged = newState.licensed !== this.config.licensed | ||
if (licenseChanged) { | ||
this.config.licensed = newState.licensed | ||
this.saveProject() // update project file | ||
if (this.config.licensed) { | ||
info('License enabled') | ||
// TODO: handle license change disabled -> enabled. Flag for reload? | ||
} else { | ||
info('License disabled') | ||
// TODO: handle license change enabled -> disabled. Flag for reload? | ||
// store license status - this property can be used for enabling EE features | ||
if (newState && utils.hasProperty(newState, 'licensed') && typeof newState.licensed === 'boolean') { | ||
const licenseChanged = newState.licensed !== this.config.licensed | ||
if (licenseChanged) { | ||
this.config.licensed = newState.licensed | ||
this.saveProject() // update project file | ||
if (this.config.licensed) { | ||
info('License enabled') | ||
// TODO: handle license change disabled -> enabled. Flag for reload? | ||
} else { | ||
info('License disabled') | ||
// TODO: handle license change enabled -> disabled. Flag for reload? | ||
} | ||
} | ||
} | ||
} | ||
/** `forgeSnapshot` will be set to the current snapshot in the forge platform *if required* */ | ||
let forgeSnapshot = null | ||
/** `forgeSnapshot` will be set to the current snapshot in the forge platform *if required* */ | ||
let forgeSnapshot = null | ||
// first, check if the new state indicates a change of operation mode from the current mode | ||
// When changing from developer mode to autonomous mode, we need to check if the flows/modules | ||
// for Node-RED were changed vs the current snapshot on the forge platform. | ||
// If they differ, we flag that a reload of the snapshot is required. | ||
if (newState !== null && newState.mode && newState.mode !== this.currentMode) { | ||
if (!['developer', 'autonomous'].includes(newState.mode)) { | ||
newState.mode = 'autonomous' | ||
} | ||
if (!this.currentMode) { | ||
this.currentMode = newState.mode | ||
} else if (this.currentMode !== newState.mode) { | ||
this.currentMode = newState.mode | ||
if (newState.mode === 'developer') { | ||
info('Enabling developer mode') | ||
// check to see if this is run state change request | ||
if (typeof newState === 'object' && utils.hasProperty(newState, 'targetState')) { | ||
if (isTargetState(newState.targetState)) { | ||
const changed = newState.targetState !== this.targetState | ||
this.targetState = newState.targetState | ||
await this.saveProject() | ||
} else { | ||
// exiting developer mode | ||
this.editorToken = null // clear the discarded token | ||
let _launcher = this.launcher | ||
if (!_launcher) { | ||
// create a temporary launcher to read the current snapshot on disk | ||
_launcher = Launcher.newLauncher(this.config, this.currentApplication, this.currentProject, this.currentSnapshot, this.currentSettings, this.currentMode) | ||
if (changed) { | ||
this.retrySetState(false) // since this is a target state change, cancel any retry timers | ||
} | ||
} | ||
delete newState.targetState | ||
} | ||
try { | ||
forgeSnapshot = await this.httpClient.getSnapshot() | ||
this.retrySetState(false) // success - stop retry timer | ||
} catch (err) { | ||
if (!this.retrySetStateTimer.isRunning) { | ||
this.currentState = 'error' | ||
warn(`Problem getting snapshot: ${err.toString()}`) | ||
debug(err) | ||
this.retrySetState(newState) | ||
// next, check if the new state indicates a change of operation mode from the current mode | ||
// When changing from developer mode to autonomous mode, we need to check if the flows/modules | ||
// for Node-RED were changed vs the current snapshot on the forge platform. | ||
// If they differ, we flag that a reload of the snapshot is required. | ||
if (newState !== null && newState.mode && newState.mode !== this.currentMode) { | ||
if (!['developer', 'autonomous'].includes(newState.mode)) { | ||
newState.mode = 'autonomous' | ||
} | ||
if (!this.currentMode) { | ||
this.currentMode = newState.mode | ||
} else if (this.currentMode !== newState.mode) { | ||
this.currentMode = newState.mode | ||
if (newState.mode === 'developer') { | ||
info('Enabling developer mode') | ||
await this.saveProject() | ||
} else { | ||
// exiting developer mode | ||
this.editorToken = null | ||
this.editorAffinity = null | ||
let _launcher = this.launcher | ||
if (!_launcher) { | ||
// create a temporary launcher to read the current snapshot on disk | ||
_launcher = Launcher.newLauncher(this, this.currentApplication, this.currentProject, this.currentSnapshot, this.currentSettings, this.currentMode) | ||
} | ||
this.updating = false | ||
return | ||
} | ||
// before checking for changed flows etc, check if the snapshot on disk is the same as the snapshot on the forge platform | ||
// if it has changed, we need to reload the snapshot from the forge platform | ||
if (forgeSnapshot?.id !== _launcher.snapshot?.id) { | ||
info('Local snapshot ID differs from the snapshot on the forge platform') | ||
newState.reloadSnapshot = true | ||
} | ||
try { | ||
forgeSnapshot = await this.httpClient.getSnapshot() | ||
this.retrySetState(false) // success - stop retry timer | ||
} catch (err) { | ||
if (!this.retrySetStateTimer.isRunning) { | ||
this.currentState = States.ERROR | ||
warn(`Problem getting snapshot: ${err.toString()}`) | ||
debug(err) | ||
this.retrySetState(newState) | ||
} | ||
this.updating = false | ||
this.currentState = States.ERROR | ||
this.queuedUpdate = null // we are in error state, clear any queued updates, halt! | ||
return | ||
} | ||
// next check the key system environment variables match | ||
if (newState.reloadSnapshot !== true) { | ||
const checkMatch = (key) => { | ||
return (forgeSnapshot?.env[key] || null) === (_launcher?.snapshot?.env[key] || null) | ||
// before checking for changed flows etc, check if the snapshot on disk is the same as the snapshot on the forge platform | ||
// if it has changed, we need to reload the snapshot from the forge platform | ||
if (forgeSnapshot?.id !== _launcher.snapshot?.id) { | ||
info('Local snapshot ID differs from the snapshot on the forge platform') | ||
newState.reloadSnapshot = true | ||
} | ||
if (newState.ownerType === 'application') { | ||
// TODO: Since this is an early MVP of devices at application level, we fake any updates to the snapshot | ||
// We DONT reload the snapshot from the forge platform because we don't want to overwrite any local | ||
// changes made to flows and modules. This is a temporary workaround until we have a better solution | ||
if (typeof forgeSnapshot?.env === 'object' && typeof _launcher.snapshot?.env === 'object') { | ||
const matchOk = checkMatch('FF_SNAPSHOT_ID') && checkMatch('FF_SNAPSHOT_NAME') && checkMatch('FF_DEVICE_ID') && checkMatch('FF_DEVICE_NAME') && checkMatch('FF_DEVICE_TYPE') && checkMatch('FF_APPLICATION_ID') && checkMatch('FF_APPLICATION_NAME') | ||
if (matchOk === false) { | ||
info('Local environment variables differ from the snapshot on the forge platform') | ||
// manually update the snapshot to match the snapshot from the forge platform | ||
// this is a temporary workaround until we have a better solution for devices at application level | ||
this.currentSnapshot.env.FF_SNAPSHOT_ID = forgeSnapshot.env.FF_SNAPSHOT_ID | ||
this.currentSnapshot.env.FF_SNAPSHOT_NAME = forgeSnapshot.env.FF_SNAPSHOT_NAME | ||
this.currentSnapshot.env.FF_DEVICE_ID = forgeSnapshot.env.FF_DEVICE_ID | ||
this.currentSnapshot.env.FF_DEVICE_NAME = forgeSnapshot.env.FF_DEVICE_NAME | ||
this.currentSnapshot.env.FF_DEVICE_TYPE = forgeSnapshot.env.FF_DEVICE_TYPE | ||
this.currentSnapshot.env.FF_APPLICATION_ID = forgeSnapshot.env.FF_APPLICATION_ID | ||
this.currentSnapshot.env.FF_APPLICATION_NAME = forgeSnapshot.env.FF_APPLICATION_NAME | ||
} | ||
// next check the key system environment variables match | ||
if (newState.reloadSnapshot !== true) { | ||
const checkMatch = (key) => { | ||
return (forgeSnapshot?.env[key] || null) === (_launcher?.snapshot?.env[key] || null) | ||
} | ||
} else { | ||
if (typeof forgeSnapshot?.env === 'object' && typeof _launcher.snapshot?.env === 'object') { | ||
const matchOk = checkMatch('FF_SNAPSHOT_ID') && checkMatch('FF_SNAPSHOT_NAME') && checkMatch('FF_DEVICE_ID') && checkMatch('FF_DEVICE_NAME') && checkMatch('FF_DEVICE_TYPE') | ||
if (matchOk === false) { | ||
info('Local environment variables differ from the snapshot on the forge platform') | ||
newState.reloadSnapshot = true | ||
if (newState.ownerType === 'application') { | ||
// TODO: Since this is an early MVP of devices at application level, we fake any updates to the snapshot | ||
// We DONT reload the snapshot from the forge platform because we don't want to overwrite any local | ||
// changes made to flows and modules. This is a temporary workaround until we have a better solution | ||
if (typeof forgeSnapshot?.env === 'object' && typeof _launcher.snapshot?.env === 'object') { | ||
const matchOk = checkMatch('FF_SNAPSHOT_ID') && checkMatch('FF_SNAPSHOT_NAME') && checkMatch('FF_DEVICE_ID') && checkMatch('FF_DEVICE_NAME') && checkMatch('FF_DEVICE_TYPE') && checkMatch('FF_APPLICATION_ID') && checkMatch('FF_APPLICATION_NAME') | ||
if (matchOk === false) { | ||
info('Local environment variables differ from the snapshot on the forge platform') | ||
// manually update the snapshot to match the snapshot from the forge platform | ||
// this is a temporary workaround until we have a better solution for devices at application level | ||
this.currentSnapshot.env.FF_SNAPSHOT_ID = forgeSnapshot.env.FF_SNAPSHOT_ID | ||
this.currentSnapshot.env.FF_SNAPSHOT_NAME = forgeSnapshot.env.FF_SNAPSHOT_NAME | ||
this.currentSnapshot.env.FF_DEVICE_ID = forgeSnapshot.env.FF_DEVICE_ID | ||
this.currentSnapshot.env.FF_DEVICE_NAME = forgeSnapshot.env.FF_DEVICE_NAME | ||
this.currentSnapshot.env.FF_DEVICE_TYPE = forgeSnapshot.env.FF_DEVICE_TYPE | ||
this.currentSnapshot.env.FF_APPLICATION_ID = forgeSnapshot.env.FF_APPLICATION_ID | ||
this.currentSnapshot.env.FF_APPLICATION_NAME = forgeSnapshot.env.FF_APPLICATION_NAME | ||
} | ||
} | ||
} else { | ||
if (typeof forgeSnapshot?.env === 'object' && typeof _launcher.snapshot?.env === 'object') { | ||
const matchOk = checkMatch('FF_SNAPSHOT_ID') && checkMatch('FF_SNAPSHOT_NAME') && checkMatch('FF_DEVICE_ID') && checkMatch('FF_DEVICE_NAME') && checkMatch('FF_DEVICE_TYPE') | ||
if (matchOk === false) { | ||
info('Local environment variables differ from the snapshot on the forge platform') | ||
newState.reloadSnapshot = true | ||
} | ||
} | ||
} | ||
} | ||
} | ||
// Do a full comparison if this is NOT an application with a "starter" snapshot ID of "0" | ||
const doFull = !(newState.ownerType === 'application' && newState.snapshot === '0') | ||
if (doFull && newState.reloadSnapshot !== true) { | ||
let diskSnapshot = { flows: [], modules: {} } | ||
try { | ||
const modules = (await _launcher.readPackage())?.modules | ||
const flows = await _launcher.readFlow() | ||
diskSnapshot = { flows, modules } | ||
} catch (error) { | ||
info('An error occurred while attempting to read flows & package file from disk') | ||
newState.reloadSnapshot = true | ||
// Do a full comparison if this is NOT an application with a "starter" snapshot ID of "0" | ||
const doFull = !(newState.ownerType === 'application' && newState.snapshot === '0') | ||
if (doFull && newState.reloadSnapshot !== true) { | ||
let diskSnapshot = { flows: [], modules: {} } | ||
try { | ||
const modules = (await _launcher.readPackage())?.modules | ||
const flows = await _launcher.readFlow() | ||
diskSnapshot = { flows, modules } | ||
} catch (error) { | ||
info('An error occurred while attempting to read flows & package file from disk') | ||
newState.reloadSnapshot = true | ||
} | ||
const changes = utils.compareNodeRedData(forgeSnapshot, diskSnapshot) === false | ||
if (changes) { | ||
info('Local flows differ from the snapshot on the forge platform') | ||
newState.reloadSnapshot = true | ||
} | ||
} | ||
const changes = utils.compareNodeRedData(forgeSnapshot, diskSnapshot) === false | ||
if (changes) { | ||
info('Local flows differ from the snapshot on the forge platform') | ||
newState.reloadSnapshot = true | ||
if (newState.reloadSnapshot) { | ||
info('Local flows have changed. Restoring current snapshot') | ||
} else { | ||
// only save the project if the snapshot is not being reloaded | ||
// since the snapshot will be reloaded and the project will be saved then | ||
await this.saveProject() | ||
} | ||
info('Disabling developer mode') | ||
} | ||
if (newState.reloadSnapshot) { | ||
info('Local flows have changed. Restoring current snapshot') | ||
} else { | ||
// only save the project if the snapshot is not being reloaded | ||
// since the snapshot will be reloaded and the project will be saved then | ||
await this.saveProject() | ||
} | ||
info('Disabling developer mode') | ||
// report the new mode for more instantaneous feedback (improve the UX) | ||
this.checkIn(2) | ||
} | ||
// report the new mode for more instantaneous feedback (improve the UX) | ||
this.checkIn(2) | ||
} | ||
} | ||
/** A flag to inhibit updates if we are in developer mode */ | ||
const developerMode = this.currentMode === 'developer' | ||
/** A flag to inhibit updates if we are in developer mode */ | ||
const developerMode = this.currentMode === 'developer' | ||
/** A flag to indicate execution should skip to the update step */ | ||
const skipToUpdate = newState?.reloadSnapshot === true | ||
/** A flag to indicate execution should skip to the update step */ | ||
const skipToUpdate = newState?.reloadSnapshot === true | ||
if (newState === null) { | ||
// The agent should not be running (bad credentials/device details) | ||
// Wipe the local configuration | ||
if (developerMode === false) { | ||
this.stop() | ||
if (newState === null) { | ||
// The agent should not be running (bad credentials/device details) | ||
// Wipe the local configuration | ||
if (developerMode === false) { | ||
await this.stop() | ||
this.currentSnapshot = null | ||
this.currentApplication = null | ||
this.currentProject = null | ||
this.currentSettings = null | ||
this.currentMode = null | ||
this.editorToken = null | ||
this.editorAffinity = null | ||
await this.saveProject() | ||
this.currentState = States.STOPPED | ||
this.updating = false | ||
} | ||
} else if (!skipToUpdate && developerMode === false && newState.application === null && this.currentOwnerType === 'application') { | ||
if (this.currentApplication) { | ||
debug('Removed from application') | ||
} | ||
// Device unassigned from application | ||
if (this.mqttClient) { | ||
this.mqttClient.setApplication(null) | ||
} | ||
// Stop the device if running - with clean flag | ||
if (this.launcher) { | ||
await this.launcher.stop(true) | ||
this.launcher = undefined | ||
} | ||
this.currentApplication = null | ||
this.currentSnapshot = null | ||
this.currentApplication = null | ||
this.currentProject = null | ||
this.currentSettings = null | ||
this.currentMode = null | ||
this.editorToken = null | ||
// if new settings hash is explicitly null, clear the current settings | ||
// otherwise, if currentSettings.hash exists, see if it differs from the | ||
// new settings hash & update accordingly | ||
if (newState.settings === null) { | ||
this.currentSettings = null | ||
} else if (this.currentSettings?.hash) { | ||
if (this.currentSettings.hash !== newState.settings) { | ||
this.currentSettings = await this.httpClient.getSettings() | ||
} | ||
} | ||
await this.saveProject() | ||
this.currentState = 'stopped' | ||
this.currentState = States.STOPPED | ||
this.updating = false | ||
} | ||
} else if (!skipToUpdate && developerMode === false && newState.application === null && this.currentOwnerType === 'application') { | ||
if (this.currentApplication) { | ||
debug('Removed from application') | ||
} | ||
// Device unassigned from application | ||
if (this.mqttClient) { | ||
this.mqttClient.setApplication(null) | ||
} | ||
// Stop the device if running - with clean flag | ||
if (this.launcher) { | ||
await this.launcher.stop(true) | ||
this.launcher = undefined | ||
} | ||
this.currentApplication = null | ||
this.currentSnapshot = null | ||
// if new settings hash is explicitly null, clear the current settings | ||
// otherwise, if currentSettings.hash exists, see if it differs from the | ||
// new settings hash & update accordingly | ||
if (newState.settings === null) { | ||
this.currentSettings = null | ||
} else if (this.currentSettings?.hash) { | ||
if (this.currentSettings.hash !== newState.settings) { | ||
this.currentSettings = await this.httpClient.getSettings() | ||
} else if (!skipToUpdate && developerMode === false && newState.project === null && this.currentOwnerType === 'project') { | ||
if (this.currentProject) { | ||
debug('Removed from project') | ||
} | ||
} | ||
await this.saveProject() | ||
this.currentState = 'stopped' | ||
this.updating = false | ||
} else if (!skipToUpdate && developerMode === false && newState.project === null && this.currentOwnerType === 'project') { | ||
if (this.currentProject) { | ||
debug('Removed from project') | ||
} | ||
// Device unassigned from project | ||
if (this.mqttClient) { | ||
this.mqttClient.setProject(null) | ||
} | ||
// Stop the project if running - with clean flag | ||
if (this.launcher) { | ||
await this.launcher.stop(true) | ||
this.launcher = undefined | ||
} | ||
this.currentProject = null | ||
this.currentSnapshot = null | ||
// if new settings hash is explicitly null, clear the current settings | ||
// otherwise, if currentSettings.hash exists, see if it differs from the | ||
// new settings hash & update accordingly | ||
if (newState.settings === null) { | ||
this.currentSettings = null | ||
} else if (this.currentSettings?.hash) { | ||
if (this.currentSettings.hash !== newState.settings) { | ||
this.currentSettings = await this.httpClient.getSettings() | ||
// Device unassigned from project | ||
if (this.mqttClient) { | ||
this.mqttClient.setProject(null) | ||
} | ||
} | ||
await this.saveProject() | ||
this.currentState = 'stopped' | ||
this.updating = false | ||
} else if (!skipToUpdate && developerMode === false && newState.snapshot === null) { | ||
// Snapshot removed, but project/application still set | ||
if (this.currentSnapshot) { | ||
debug('Active snapshot removed') | ||
// Stop the project if running - with clean flag | ||
if (this.launcher) { | ||
await this.launcher.stop(true) | ||
this.launcher = undefined | ||
} | ||
this.currentProject = null | ||
this.currentSnapshot = null | ||
// if new settings hash is explicitly null, clear the current settings | ||
// otherwise, if currentSettings.hash exists, see if it differs from the | ||
// new settings hash & update accordingly | ||
if (newState.settings === null) { | ||
this.currentSettings = null | ||
} else if (this.currentSettings?.hash) { | ||
if (this.currentSettings.hash !== newState.settings) { | ||
this.currentSettings = await this.httpClient.getSettings() | ||
} | ||
} | ||
await this.saveProject() | ||
} | ||
let setApp = false | ||
let setProject = false | ||
if (utils.hasProperty(newState, 'application')) { | ||
if (newState.application !== this.currentApplication) { | ||
this.currentApplication = newState.application | ||
setApp = true | ||
this.currentState = States.STOPPED | ||
this.updating = false | ||
} else if (!skipToUpdate && developerMode === false && newState.snapshot === null) { | ||
// Snapshot removed, but project/application still set | ||
if (this.currentSnapshot) { | ||
debug('Active snapshot removed') | ||
this.currentSnapshot = null | ||
await this.saveProject() | ||
} | ||
} | ||
if (utils.hasProperty(newState, 'project')) { | ||
if (newState.project !== this.currentProject) { | ||
this.currentProject = newState.project | ||
setProject = true | ||
let setApp = false | ||
let setProject = false | ||
if (utils.hasProperty(newState, 'application')) { | ||
if (newState.application !== this.currentApplication) { | ||
this.currentApplication = newState.application | ||
setApp = true | ||
} | ||
} | ||
} | ||
if (utils.hasProperty(newState, 'project')) { | ||
if (newState.project !== this.currentProject) { | ||
this.currentProject = newState.project | ||
setProject = true | ||
} | ||
} | ||
if (this.mqttClient) { | ||
if (setApp) { | ||
this.mqttClient.setProject(null) | ||
this.mqttClient.setApplication(this.currentApplication) | ||
if (this.mqttClient) { | ||
if (setApp) { | ||
this.mqttClient.setProject(null) | ||
this.mqttClient.setApplication(this.currentApplication) | ||
} | ||
if (setProject) { | ||
this.mqttClient.setApplication(null) | ||
this.mqttClient.setProject(this.currentProject) | ||
} | ||
} | ||
if (setProject) { | ||
this.mqttClient.setApplication(null) | ||
this.mqttClient.setProject(this.currentProject) | ||
if (setApp || setProject) { | ||
await this.saveProject() | ||
this.checkIn(2) | ||
} | ||
} | ||
if (setApp || setProject) { | ||
await this.saveProject() | ||
this.checkIn(2) | ||
} | ||
if (this.launcher) { | ||
await this.launcher.stop(true) | ||
this.launcher = undefined | ||
} | ||
this.currentState = 'stopped' | ||
this.updating = false | ||
} else { | ||
// Check if any updates are needed | ||
let updateSnapshot = false | ||
let updateSettings = false | ||
const unknownOrStopped = (this.currentState === 'unknown' || this.currentState === 'stopped') | ||
const snapShotUpdatePending = !!(!this.currentSnapshot && newState.snapshot) | ||
const projectUpdatePending = !!(newState.ownerType === 'project' && !this.currentProject && newState.project) | ||
const applicationUpdatePending = !!(newState.ownerType === 'application' && !this.currentApplication && newState.application) | ||
if (unknownOrStopped && developerMode && snapShotUpdatePending && (projectUpdatePending || applicationUpdatePending)) { | ||
info('Developer Mode: no flows found - updating to latest snapshot') | ||
this.currentProject = newState.project | ||
this.currentApplication = newState.application | ||
updateSnapshot = true | ||
updateSettings = true | ||
} else if (developerMode === false) { | ||
if (utils.hasProperty(newState, 'project') && (!this.currentSnapshot || newState.project !== this.currentProject)) { | ||
info('New instance assigned') | ||
this.currentApplication = null | ||
if (this.launcher) { | ||
await this.launcher.stop(true) | ||
this.launcher = undefined | ||
} | ||
this.currentState = States.STOPPED | ||
this.updating = false | ||
} else { | ||
// Check if any updates are needed | ||
let updateSnapshot = false | ||
let updateSettings = false | ||
const unknownOrStopped = (this.currentState === States.UNKNOWN || this.currentState === States.STOPPED) | ||
const snapShotUpdatePending = !!(!this.currentSnapshot && newState.snapshot) | ||
const projectUpdatePending = !!(newState.ownerType === 'project' && !this.currentProject && newState.project) | ||
const applicationUpdatePending = !!(newState.ownerType === 'application' && !this.currentApplication && newState.application) | ||
if (unknownOrStopped && developerMode && snapShotUpdatePending && (projectUpdatePending || applicationUpdatePending)) { | ||
info('Developer Mode: no flows found - updating to latest snapshot') | ||
this.currentProject = newState.project | ||
// Update everything | ||
updateSnapshot = true | ||
updateSettings = true | ||
} else if (utils.hasProperty(newState, 'application') && (!this.currentSnapshot || newState.application !== this.currentApplication)) { | ||
info('New application assigned') | ||
this.currentProject = null | ||
this.currentApplication = newState.application | ||
// Update everything | ||
updateSnapshot = true | ||
updateSettings = true | ||
} else { | ||
if (utils.hasProperty(newState, 'snapshot') && (!this.currentSnapshot || newState.snapshot !== this.currentSnapshot.id)) { | ||
info('New snapshot available') | ||
} else if (developerMode === false) { | ||
if (utils.hasProperty(newState, 'project') && (!this.currentSnapshot || newState.project !== this.currentProject)) { | ||
info('New instance assigned') | ||
this.currentApplication = null | ||
this.currentProject = newState.project | ||
// Update everything | ||
updateSnapshot = true | ||
} | ||
// reloadSnapshot is a special case - it is used to force a reload of the current | ||
// snapshot following a change from autonomous to developer mode | ||
if (newState.reloadSnapshot === true && updateSnapshot === false) { | ||
info('Reload snapshot requested') | ||
updateSettings = true | ||
} else if (utils.hasProperty(newState, 'application') && (!this.currentSnapshot || newState.application !== this.currentApplication)) { | ||
info('New application assigned') | ||
this.currentProject = null | ||
this.currentApplication = newState.application | ||
// Update everything | ||
updateSnapshot = true | ||
} | ||
if (utils.hasProperty(newState, 'settings') && (!this.currentSettings || newState.settings !== this.currentSettings?.hash)) { | ||
info('New settings available') | ||
updateSettings = true | ||
} | ||
if (this.currentSettings === null) { | ||
updateSettings = true | ||
} | ||
// If the snapshot is to be updated, the settings must also be updated | ||
// this is because snapshot includes special, platform defined environment variables e.g. FF_SNAPSHOT_ID | ||
if (updateSnapshot === true) { | ||
updateSettings = true | ||
} | ||
} | ||
} | ||
if (!skipToUpdate && !updateSnapshot && !updateSettings) { | ||
// Nothing to update. | ||
// Start the launcher with the current config, Snapshot & settings | ||
if (!this.launcher && this.currentSnapshot) { | ||
this.launcher = Launcher.newLauncher(this.config, this.currentApplication, this.currentProject, this.currentSnapshot, this.currentSettings, this.currentMode) | ||
await this.launcher.start() | ||
if (this.mqttClient) { | ||
this.mqttClient.setProject(this.currentProject) | ||
this.mqttClient.setApplication(this.currentApplication) | ||
if (developerMode && this.editorToken) { | ||
this.mqttClient.startTunnel(this.editorToken) | ||
} else { | ||
if (utils.hasProperty(newState, 'snapshot') && (!this.currentSnapshot || newState.snapshot !== this.currentSnapshot.id)) { | ||
info('New snapshot available') | ||
updateSnapshot = true | ||
} | ||
} | ||
this.checkIn(2) | ||
this.currentState = 'stopped' | ||
} | ||
this.updating = false | ||
} else { | ||
// At this point of the state machine, we are to stop the launcher and update the snapshot and/or settings | ||
// then start the launcher with the new snapshot and/or settings | ||
// Stop the launcher if currently running | ||
this.currentState = 'updating' | ||
if (this.launcher) { | ||
info('Stopping current snapshot') | ||
await this.launcher.stop() | ||
this.launcher = undefined | ||
} | ||
if (updateSnapshot) { | ||
try { | ||
this.currentSnapshot = forgeSnapshot || await this.httpClient.getSnapshot() | ||
this.retrySetState(false) // success - stop retry timer | ||
} catch (err) { | ||
if (!this.retrySetStateTimer.isRunning) { | ||
this.currentState = 'error' | ||
warn(`Problem getting snapshot: ${err.toString()}`) | ||
debug(err) | ||
this.retrySetState(newState) | ||
// reloadSnapshot is a special case - it is used to force a reload of the current | ||
// snapshot following a change from autonomous to developer mode | ||
if (newState.reloadSnapshot === true && updateSnapshot === false) { | ||
info('Reload snapshot requested') | ||
updateSnapshot = true | ||
} | ||
this.updating = false | ||
return | ||
if (utils.hasProperty(newState, 'settings') && (!this.currentSettings || newState.settings !== this.currentSettings?.hash)) { | ||
info('New settings available') | ||
updateSettings = true | ||
} | ||
if (this.currentSettings === null) { | ||
updateSettings = true | ||
} | ||
// If the snapshot is to be updated, the settings must also be updated | ||
// this is because snapshot includes special, platform defined environment variables e.g. FF_SNAPSHOT_ID | ||
if (updateSnapshot === true) { | ||
updateSettings = true | ||
} | ||
} | ||
} | ||
if (updateSettings) { | ||
this.currentSettings = await this.httpClient.getSettings() | ||
} | ||
if (this.currentSnapshot?.id) { | ||
try { | ||
await this.saveProject() | ||
this.printAgentStatus('Launching with new settings...') | ||
this.launcher = Launcher.newLauncher(this.config, this.currentApplication, this.currentProject, this.currentSnapshot, this.currentSettings, this.currentMode) | ||
await this.launcher.writeConfiguration({ updateSnapshot, updateSettings }) | ||
if (!skipToUpdate && !updateSnapshot && !updateSettings) { | ||
// Nothing to update. So long as the target state is not SUSPENDED, | ||
// start the launcher with the current config, Snapshot & settings | ||
if (!this.launcher && this.currentSnapshot && this.targetState !== States.SUSPENDED) { | ||
this.launcher = Launcher.newLauncher(this, this.currentApplication, this.currentProject, this.currentSnapshot, this.currentSettings, this.currentMode) | ||
await this.launcher.start() | ||
@@ -601,28 +662,104 @@ if (this.mqttClient) { | ||
this.mqttClient.setApplication(this.currentApplication) | ||
if (developerMode && this.editorToken) { | ||
this.mqttClient.startTunnel(this.editorToken) | ||
if (developerMode && this.editorToken && this.launcher) { | ||
this.mqttClient.startTunnel(this.editorToken, this.editorAffinity) | ||
} | ||
} | ||
this.currentState = this.launcher.state | ||
this.checkIn(2) | ||
} catch (err) { | ||
warn(`Error whilst starting Node-RED: ${err.toString()}`) | ||
if (this.launcher) { | ||
await this.launcher.stop(true) | ||
} | ||
} | ||
this.updating = false | ||
} else { | ||
// At this point of the state machine, we are to stop the launcher and update the snapshot and/or settings | ||
// then start the launcher with the new snapshot and/or settings | ||
// Stop the launcher if currently running | ||
this.currentState = States.UPDATING | ||
if (this.launcher) { | ||
info('Stopping current snapshot') | ||
await this.launcher.stop(false, States.UPDATING) | ||
this.launcher = undefined | ||
} | ||
if (updateSnapshot) { | ||
try { | ||
this.currentSnapshot = forgeSnapshot || await this.httpClient.getSnapshot() | ||
this.retrySetState(false) // success - stop retry timer | ||
} catch (err) { | ||
if (!this.retrySetStateTimer.isRunning) { | ||
this.currentState = States.ERROR | ||
warn(`Problem getting snapshot: ${err.toString()}`) | ||
debug(err) | ||
this.retrySetState(newState) | ||
} | ||
this.updating = false | ||
return | ||
} | ||
} | ||
if (updateSettings) { | ||
this.currentSettings = await this.httpClient.getSettings() | ||
} | ||
if (this.currentSnapshot?.id) { | ||
try { | ||
await this.saveProject() | ||
let optimisticState = States.STOPPED | ||
let performStart = true | ||
this.currentState = States.UPDATING | ||
if (this.targetState === States.SUSPENDED) { | ||
this.printAgentStatus('Applying new settings...') | ||
performStart = false | ||
optimisticState = States.SUSPENDED | ||
} else if (this.targetState === States.RUNNING) { | ||
this.printAgentStatus('Launching with new settings...') | ||
optimisticState = States.STARTING | ||
} | ||
this.launcher = Launcher.newLauncher(this, this.currentApplication, this.currentProject, this.currentSnapshot, this.currentSettings, this.currentMode) | ||
await this.launcher.writeConfiguration({ updateSnapshot, updateSettings }) | ||
if (performStart) { | ||
await this.launcher?.start() | ||
} else { | ||
this.launcher = undefined | ||
} | ||
if (this.mqttClient) { | ||
this.mqttClient.setProject(this.currentProject) | ||
this.mqttClient.setApplication(this.currentApplication) | ||
if (developerMode && this.editorToken && this.launcher) { | ||
this.mqttClient.startTunnel(this.editorToken, this.editorAffinity) | ||
} | ||
} | ||
this.currentState = optimisticState | ||
this.checkIn(2) | ||
} catch (err) { | ||
warn(`Error whilst starting Node-RED: ${err.toString()}`) | ||
if (this.launcher) { | ||
await this.launcher.stop(true, States.ERROR) | ||
} | ||
this.launcher = undefined | ||
this.currentState = States.ERROR | ||
this.queuedUpdate = null // we are in error state, clear any queued updates, halt! | ||
} | ||
} | ||
} | ||
} | ||
if (!this.launcher) { | ||
if (this.targetState === States.SUSPENDED) { | ||
this.currentState = States.SUSPENDED | ||
} else if (isValidState(this.currentState) === false) { | ||
this.currentState = States.STOPPED | ||
} | ||
} else { | ||
this.currentState = this.launcher?.state || States.RUNNING | ||
} | ||
} finally { | ||
this.updating = false | ||
if (this.queuedUpdate) { | ||
const update = this.queuedUpdate | ||
this.queuedUpdate = null | ||
this.setState(update).catch(err => { | ||
this.updating = false | ||
warn(`Error whilst processing queued update: ${err.toString()}`) | ||
debug(err) | ||
}) | ||
} | ||
} | ||
this.currentState = this.launcher ? 'running' : 'stopped' | ||
this.updating = false | ||
if (this.queuedUpdate) { | ||
const update = this.queuedUpdate | ||
this.queuedUpdate = null | ||
this.setState(update).catch(err => { | ||
this.updating = false | ||
warn(`Error whilst processing queued update: ${err.toString()}`) | ||
debug(err) | ||
}) | ||
} | ||
} | ||
@@ -709,5 +846,6 @@ | ||
async saveEditorToken (token) { | ||
const changed = this.editorToken !== token | ||
async saveEditorToken (token, affinity) { | ||
const changed = (this.editorToken !== token || this.editorAffinity !== affinity) | ||
this.editorToken = token | ||
this.editorAffinity = affinity | ||
if (changed) { | ||
@@ -714,0 +852,0 @@ await this.saveProject() |
@@ -25,3 +25,3 @@ const { info, warn, debug } = require('../logging/log') | ||
this.options = options || {} | ||
this.affinity = undefined | ||
this.affinity = this.options.affinity | ||
@@ -28,0 +28,0 @@ // How long to wait before attempting to reconnect. Start at 500ms - back |
@@ -67,2 +67,6 @@ const got = require('got').default | ||
isPolling () { | ||
return this.heartbeat.isRunning | ||
} | ||
async checkIn () { | ||
@@ -69,0 +73,0 @@ const payload = this.agent.getState() |
@@ -7,2 +7,3 @@ const childProcess = require('child_process') | ||
const { copyDir } = require('./utils') | ||
const { States } = require('./states') | ||
@@ -26,4 +27,4 @@ const MIN_RESTART_TIME = 10000 // 10 seconds | ||
class Launcher { | ||
constructor (config, application, project, snapshot, settings, mode) { | ||
this.config = config | ||
constructor (agent, application, project, snapshot, settings, mode) { | ||
this.config = agent?.config | ||
this.application = application | ||
@@ -36,3 +37,8 @@ this.project = project | ||
this.startTime = [] | ||
this.state = 'stopped' | ||
this.state = States.STOPPED | ||
this.stopReason = '' | ||
this.installProcess = null | ||
this.deferredStop = null | ||
/** @type {import('./agent.js').Agent */ | ||
this.agent = agent | ||
@@ -95,2 +101,3 @@ this.auditLogURL = `${this.config.forgeURL}/logging/device/${this.config.deviceId}/audit` | ||
info('Installing dependencies') | ||
this.state = States.UPDATING | ||
if (this.config.moduleCache) { | ||
@@ -112,3 +119,3 @@ info('Using module_cache') | ||
} else { | ||
return new Promise((resolve, reject) => { | ||
this.installProcess = new Promise((resolve, reject) => { | ||
childProcess.exec('npm install --production', { | ||
@@ -119,2 +126,3 @@ cwd: this.projectDir | ||
resolve() | ||
this.installProcess = null | ||
} else { | ||
@@ -124,5 +132,7 @@ warn('Install failed') | ||
reject(error) | ||
this.installProcess = null | ||
} | ||
}) | ||
}) | ||
return this.installProcess | ||
} | ||
@@ -287,2 +297,3 @@ } | ||
if (!options || options.updateSnapshot) { | ||
this.state = States.INSTALLING | ||
await this.writeNPMRCFile() | ||
@@ -323,3 +334,4 @@ await this.writePackage() | ||
} | ||
this.state = 'starting' | ||
this.state = States.STARTING | ||
if (!existsSync(this.projectDir) || | ||
@@ -368,2 +380,4 @@ !existsSync(this.files.flows) || | ||
info('Starting Node-RED') | ||
this.state = States.STARTING // state may have been changed by stop() or deferredStop or Installing | ||
this.stopReason = '' | ||
const appEnv = env | ||
@@ -401,8 +415,18 @@ const processArgs = [ | ||
} | ||
this.state = 'running' | ||
this.state = States.RUNNING | ||
}) | ||
this.proc.on('exit', async (code, signal) => { | ||
this.state = 'stopped' | ||
if (!this.shuttingDown) { | ||
// determine if Node-RED exited for an expected reason | ||
// if yes, don't restart it since it was specifically stopped (e.g. not crashed) | ||
const expected = ['shutdown', States.RESTARTING, States.UPDATING, States.SUSPENDED].includes(this.stopReason) | ||
if (expected) { | ||
this.state = States.STOPPED // assume stopped | ||
} else { | ||
this.state = States.CRASHED // assume crashed | ||
} | ||
if (this.exitCallback) { | ||
this.exitCallback() | ||
} | ||
if (!expected) { | ||
let restart = true | ||
@@ -418,4 +442,5 @@ if (this.startTime.length === MAX_RESTART_COUNT) { | ||
info('Node-RED restart loop detected - stopping') | ||
this.state = 'crashed' | ||
this.state = States.CRASHED | ||
restart = false | ||
await this.agent?.checkIn() | ||
} | ||
@@ -427,8 +452,2 @@ } | ||
} | ||
} else { | ||
// is really shutting down (i.e. was commanded by the | ||
// agent, so we won't be doing an auto restart) | ||
if (this.exitCallback) { | ||
this.exitCallback() | ||
} | ||
} | ||
@@ -455,4 +474,18 @@ }) | ||
async stop (clean) { | ||
info('Stopping Node-RED') | ||
async stop (clean, reason) { | ||
if (this.installProcess && this.state === States.INSTALLING) { | ||
// If the launcher is currently installing, we should try not to interrupt this | ||
// to avoid corruption (NPM can leave temporary directories preventing future installs) | ||
// We should wait for the install to finish before stopping | ||
// give it a few seconds to finish | ||
const timeout = new Promise(resolve => setTimeout(resolve, 10000)) | ||
await Promise.race([this.installProcess, timeout]) | ||
// now proceed with stopping, regardless of whether the install finished | ||
} | ||
let finalState = States.STOPPED | ||
this.stopReason = reason || 'shutdown' | ||
info('Stopping Node-RED. Reason: ' + this.stopReason) | ||
if (this.stopReason === States.SUSPENDED) { | ||
finalState = States.SUSPENDED | ||
} | ||
if (this.deferredStop) { | ||
@@ -472,6 +505,7 @@ // A stop request is already inflight - return the existing deferred object | ||
} | ||
info('Node-RED Stopped') | ||
await this.agent?.checkIn() // let FF know we've stopped | ||
} | ||
if (this.proc) { | ||
this.shuttingDown = true | ||
// Setup a promise that will resolve once the process has really exited | ||
@@ -501,7 +535,7 @@ this.deferredStop = new Promise((resolve, reject) => { | ||
this.proc.kill() | ||
this.state = 'stopped' | ||
this.state = finalState | ||
}) | ||
return this.deferredStop | ||
} else { | ||
this.state = 'stopped' | ||
this.state = finalState | ||
await postShutdownOps() | ||
@@ -513,4 +547,4 @@ } | ||
module.exports = { | ||
newLauncher: (config, application, project, snapshot, settings, mode) => new Launcher(config, application, project, snapshot, settings, mode), | ||
newLauncher: (agent, application, project, snapshot, settings, mode) => new Launcher(agent, application, project, snapshot, settings, mode), | ||
Launcher | ||
} |
@@ -92,3 +92,2 @@ const mqtt = require('mqtt') | ||
await this.agent.setState(msg) | ||
return | ||
} else if (msg.command === 'startLog') { | ||
@@ -99,10 +98,9 @@ if (!this.logEnabled) { | ||
this.logEnabled = true | ||
return | ||
} else if (msg.command === 'stopLog') { | ||
this.logEnabled = false | ||
return | ||
} else if (msg.command === 'startEditor') { | ||
await this.startTunnel(msg.payload?.token, msg) | ||
return | ||
await this.startTunnel(msg.payload?.token, this.agent.editorAffinity || null, msg) | ||
} else if (msg.command === 'stopEditor') { | ||
// Clear the saved token | ||
await this.saveEditorToken(null, null) | ||
if (this.tunnel) { | ||
@@ -113,3 +111,2 @@ info('Disabling remote editor access') | ||
} | ||
return | ||
} else if (msg.command === 'upload') { | ||
@@ -120,5 +117,7 @@ info('Capturing device snapshot') | ||
this.sendCommandResponse(msg, response) | ||
return | ||
} else if (msg.command === 'action') { | ||
await this.handleActionRequest(msg) | ||
} else { | ||
warn(`Unknown command type received from platform: ${msg.command}`) | ||
} | ||
warn(`Unknown command type received from platform: ${msg.command}`) | ||
} catch (err) { | ||
@@ -141,2 +140,47 @@ warn(err) | ||
/** | ||
* Perform a device action of starting, restarting or suspending the Node-RED instance | ||
* @param {Object} msg - the incoming message data | ||
*/ | ||
async handleActionRequest (msg) { | ||
const action = msg?.payload?.action || '' | ||
try { | ||
let result = false | ||
let error = null | ||
switch (action) { | ||
case 'start': | ||
info('Node-RED start requested') | ||
result = await this.agent.startNR() | ||
break | ||
case 'restart': | ||
info('Node-RED restart requested') | ||
result = await this.agent.restartNR() | ||
break | ||
case 'suspend': | ||
info('Node-RED suspend requested') | ||
result = await this.agent.suspendNR() | ||
break | ||
default: | ||
error = new Error(`Unsupported action requested: ${action}`) | ||
error.code = 'unsupported_action' | ||
throw error | ||
} | ||
if (result) { | ||
this.sendCommandResponse(msg, { success: result }) | ||
} else { | ||
throw new Error('Requested action ' + action + ' failed') | ||
} | ||
} catch (err) { | ||
warn(err.toString()) | ||
debug(err) | ||
const error = { | ||
message: err.toString(), | ||
code: err.code || 'unexpected_error', | ||
error: err.message || 'Unexpected error' | ||
} | ||
this.sendCommandResponse(msg, { success: false, error }) | ||
} | ||
this.agent.checkIn(3, 1000) // attempt a check in (3 retries, 1s interval) | ||
} | ||
stop () { | ||
@@ -149,3 +193,7 @@ if (this.heartbeat.isRunning) { | ||
setMQTT(undefined) | ||
this.client && this.client.end() | ||
if (this.client) { | ||
this.setApplication(null) // unsubscribe from application commands | ||
this.setProject(null) // unsubscribe from application commands | ||
this.client.end() | ||
} | ||
} | ||
@@ -275,3 +323,3 @@ | ||
async startTunnel (token, msg) { | ||
async startTunnel (token, affinity, msg) { | ||
info('Enabling remote editor access') | ||
@@ -292,3 +340,3 @@ try { | ||
// * Enable Device Editor (Step 6) - (forge:MQTT->device) Create the tunnel on the device | ||
this.tunnel = EditorTunnel.create(this.config, { token }) | ||
this.tunnel = EditorTunnel.create(this.config, { token, affinity }) | ||
@@ -299,7 +347,13 @@ // * Enable Device Editor (Step 7) - (device) Begin the device tunnel connect process | ||
// store the token for later use (i.e. device agent is restarted) | ||
await this.saveEditorToken(result ? token : null) | ||
if (result) { | ||
await this.saveEditorToken(token, this.tunnel.affinity) | ||
} else { | ||
// Failed to connect - clear the token/affinity so it can be | ||
// refreshed | ||
await this.saveEditorToken(null, null) | ||
} | ||
if (msg) { | ||
// * Enable Device Editor (Step 10) - (device->forge:MQTT) Send a response to the platform | ||
this.sendCommandResponse(msg, { connected: result, token }) | ||
this.sendCommandResponse(msg, { connected: result, token, affinity: this.tunnel.affinity }) | ||
} | ||
@@ -315,4 +369,4 @@ } catch (err) { | ||
async saveEditorToken (token) { | ||
await this.agent?.saveEditorToken(token) | ||
async saveEditorToken (token, affinity) { | ||
await this.agent?.saveEditorToken(token, affinity) | ||
} | ||
@@ -319,0 +373,0 @@ } |
{ | ||
"name": "@flowfuse/device-agent", | ||
"version": "2.2.0", | ||
"version": "2.3.0", | ||
"description": "An Edge Agent for running Node-RED instances deployed from the FlowFuse Platform", | ||
@@ -5,0 +5,0 @@ "exports": { |
279795
36
4352