@node-red/runtime
Advanced tools
Comparing version 0.20.0-beta.3 to 0.20.0-beta.4
@@ -17,34 +17,106 @@ /** | ||
var when = require("when"); | ||
var clone = require("clone"); | ||
var typeRegistry = require("@node-red/registry"); | ||
var Log; | ||
var redUtil = require("@node-red/util").util; | ||
var flowUtil = require("./util"); | ||
var Node; | ||
var events = require("../../events"); | ||
var Subflow; | ||
var Log; | ||
var nodeCloseTimeout = 15000; | ||
function Flow(global,flow) { | ||
if (typeof flow === 'undefined') { | ||
flow = global; | ||
/** | ||
* This class represents a flow within the runtime. It is responsible for | ||
* creating, starting and stopping all nodes within the flow. | ||
*/ | ||
class Flow { | ||
/** | ||
* Create a Flow object. | ||
* @param {[type]} parent The parent flow | ||
* @param {[type]} globalFlow The global flow definition | ||
* @param {[type]} flow This flow's definition | ||
*/ | ||
constructor(parent,globalFlow,flow) { | ||
this.TYPE = 'flow'; | ||
this.parent = parent; | ||
this.global = globalFlow; | ||
if (typeof flow === 'undefined') { | ||
this.flow = globalFlow; | ||
this.isGlobalFlow = true; | ||
} else { | ||
this.flow = flow; | ||
this.isGlobalFlow = false; | ||
} | ||
this.id = this.flow.id || "global"; | ||
this.activeNodes = {}; | ||
this.subflowInstanceNodes = {}; | ||
this.catchNodes = []; | ||
this.statusNodes = []; | ||
} | ||
var activeNodes = {}; | ||
var subflowInstanceNodes = {}; | ||
var catchNodeMap = {}; | ||
var statusNodeMap = {}; | ||
this.start = function(diff) { | ||
/** | ||
* Log a debug-level message from this flow | ||
* @param {[type]} msg [description] | ||
* @return {[type]} [description] | ||
*/ | ||
debug(msg) { | ||
Log.log({ | ||
id: this.id||"global", | ||
level: Log.DEBUG, | ||
type:this.TYPE, | ||
msg:msg | ||
}) | ||
} | ||
/** | ||
* Log a info-level message from this flow | ||
* @param {[type]} msg [description] | ||
* @return {[type]} [description] | ||
*/ | ||
log(msg) { | ||
Log.log({ | ||
id: this.id||"global", | ||
level: Log.INFO, | ||
type:this.TYPE, | ||
msg:msg | ||
}) | ||
} | ||
/** | ||
* Log a trace-level message from this flow | ||
* @param {[type]} msg [description] | ||
* @return {[type]} [description] | ||
*/ | ||
trace(msg) { | ||
Log.log({ | ||
id: this.id||"global", | ||
level: Log.TRACE, | ||
type:this.TYPE, | ||
msg:msg | ||
}) | ||
} | ||
/** | ||
* Start this flow. | ||
* The `diff` argument helps define what needs to be started in the case | ||
* of a modified-nodes/flows type deploy. | ||
* @param {[type]} msg [description] | ||
* @return {[type]} [description] | ||
*/ | ||
start(diff) { | ||
this.trace("start "+this.TYPE); | ||
var node; | ||
var newNode; | ||
var id; | ||
catchNodeMap = {}; | ||
statusNodeMap = {}; | ||
this.catchNodes = []; | ||
this.statusNodes = []; | ||
var configNodes = Object.keys(flow.configs); | ||
var configNodes = Object.keys(this.flow.configs); | ||
var configNodeAttempts = {}; | ||
while (configNodes.length > 0) { | ||
id = configNodes.shift(); | ||
node = flow.configs[id]; | ||
if (!activeNodes[id]) { | ||
node = this.flow.configs[id]; | ||
if (!this.activeNodes[id]) { | ||
var readyToCreate = true; | ||
@@ -54,4 +126,4 @@ // This node doesn't exist. | ||
for (var prop in node) { | ||
if (node.hasOwnProperty(prop) && prop !== 'id' && prop !== 'wires' && prop !== '_users' && flow.configs[node[prop]]) { | ||
if (!activeNodes[node[prop]]) { | ||
if (node.hasOwnProperty(prop) && prop !== 'id' && prop !== 'wires' && prop !== '_users' && this.flow.configs[node[prop]]) { | ||
if (!this.activeNodes[node[prop]]) { | ||
// References a non-existent config node | ||
@@ -70,5 +142,5 @@ // Add it to the back of the list to try again later | ||
if (readyToCreate) { | ||
newNode = createNode(node.type,node); | ||
newNode = flowUtil.createNode(this,node); | ||
if (newNode) { | ||
activeNodes[id] = newNode; | ||
this.activeNodes[id] = newNode; | ||
} | ||
@@ -81,5 +153,5 @@ } | ||
for (var j=0;j<diff.rewired.length;j++) { | ||
var rewireNode = activeNodes[diff.rewired[j]]; | ||
var rewireNode = this.activeNodes[diff.rewired[j]]; | ||
if (rewireNode) { | ||
rewireNode.updateWires(flow.nodes[rewireNode.id].wires); | ||
rewireNode.updateWires(this.flow.nodes[rewireNode.id].wires); | ||
} | ||
@@ -89,22 +161,34 @@ } | ||
for (id in flow.nodes) { | ||
if (flow.nodes.hasOwnProperty(id)) { | ||
node = flow.nodes[id]; | ||
for (id in this.flow.nodes) { | ||
if (this.flow.nodes.hasOwnProperty(id)) { | ||
node = this.flow.nodes[id]; | ||
if (!node.subflow) { | ||
if (!activeNodes[id]) { | ||
newNode = createNode(node.type,node); | ||
if (!this.activeNodes[id]) { | ||
newNode = flowUtil.createNode(this,node); | ||
if (newNode) { | ||
activeNodes[id] = newNode; | ||
this.activeNodes[id] = newNode; | ||
} | ||
} | ||
} else { | ||
if (!subflowInstanceNodes[id]) { | ||
if (!this.subflowInstanceNodes[id]) { | ||
try { | ||
var nodes = createSubflow(flow.subflows[node.subflow]||global.subflows[node.subflow],node,flow.subflows,global.subflows,activeNodes); | ||
subflowInstanceNodes[id] = nodes.map(function(n) { return n.id}); | ||
for (var i=0;i<nodes.length;i++) { | ||
if (nodes[i]) { | ||
activeNodes[nodes[i].id] = nodes[i]; | ||
} | ||
} | ||
var subflowDefinition = this.flow.subflows[node.subflow]||this.global.subflows[node.subflow] | ||
// console.log("NEED TO CREATE A SUBFLOW",id,node.subflow); | ||
this.subflowInstanceNodes[id] = true; | ||
var subflow = Subflow.create( | ||
this, | ||
this.global, | ||
subflowDefinition, | ||
node | ||
); | ||
subflow.start(); | ||
this.activeNodes[id] = subflow.node; | ||
this.subflowInstanceNodes[id] = subflow; | ||
// this.subflowInstanceNodes[id] = nodes.map(function(n) { return n.id}); | ||
// for (var i=0;i<nodes.length;i++) { | ||
// if (nodes[i]) { | ||
// this.activeNodes[nodes[i].id] = nodes[i]; | ||
// } | ||
// } | ||
} catch(err) { | ||
@@ -118,69 +202,64 @@ console.log(err.stack) | ||
for (id in activeNodes) { | ||
if (activeNodes.hasOwnProperty(id)) { | ||
node = activeNodes[id]; | ||
var activeCount = Object.keys(this.activeNodes).length; | ||
if (activeCount > 0) { | ||
this.trace("------------------|--------------|-----------------"); | ||
this.trace(" id | type | alias"); | ||
this.trace("------------------|--------------|-----------------"); | ||
} | ||
// Build the map of catch/status nodes. | ||
for (id in this.activeNodes) { | ||
if (this.activeNodes.hasOwnProperty(id)) { | ||
node = this.activeNodes[id]; | ||
this.trace(" "+id.padEnd(16)+" | "+node.type.padEnd(12)+" | "+(node._alias||"")); | ||
if (node.type === "catch") { | ||
catchNodeMap[node.z] = catchNodeMap[node.z] || []; | ||
catchNodeMap[node.z].push(node); | ||
this.catchNodes.push(node); | ||
} else if (node.type === "status") { | ||
statusNodeMap[node.z] = statusNodeMap[node.z] || []; | ||
statusNodeMap[node.z].push(node); | ||
this.statusNodes.push(node); | ||
} | ||
} | ||
} | ||
if (activeCount > 0) { | ||
this.trace("------------------|--------------|-----------------"); | ||
} | ||
// this.dump(); | ||
} | ||
this.stop = function(stopList, removedList) { | ||
return when.promise(function(resolve) { | ||
var i; | ||
if (stopList) { | ||
for (i=0;i<stopList.length;i++) { | ||
if (subflowInstanceNodes[stopList[i]]) { | ||
// The first in the list is the instance node we already | ||
// know about | ||
stopList = stopList.concat(subflowInstanceNodes[stopList[i]].slice(1)) | ||
} | ||
} | ||
} else { | ||
stopList = Object.keys(activeNodes); | ||
} | ||
// Convert the list to a map to avoid multiple scans of the list | ||
var removedMap = {}; | ||
removedList = removedList || []; | ||
removedList.forEach(function(id) { | ||
removedMap[id] = true; | ||
}); | ||
/** | ||
* Stop this flow. | ||
* The `stopList` argument helps define what needs to be stopped in the case | ||
* of a modified-nodes/flows type deploy. | ||
* @param {[type]} stopList [description] | ||
* @param {[type]} removedList [description] | ||
* @return {[type]} [description] | ||
*/ | ||
stop(stopList, removedList) { | ||
this.trace("stop "+this.TYPE); | ||
var i; | ||
if (!stopList) { | ||
stopList = Object.keys(this.activeNodes); | ||
} | ||
// Convert the list to a map to avoid multiple scans of the list | ||
var removedMap = {}; | ||
removedList = removedList || []; | ||
removedList.forEach(function(id) { | ||
removedMap[id] = true; | ||
}); | ||
var promises = []; | ||
for (i=0;i<stopList.length;i++) { | ||
var node = activeNodes[stopList[i]]; | ||
if (node) { | ||
delete activeNodes[stopList[i]]; | ||
if (subflowInstanceNodes[stopList[i]]) { | ||
delete subflowInstanceNodes[stopList[i]]; | ||
var promises = []; | ||
for (i=0;i<stopList.length;i++) { | ||
var node = this.activeNodes[stopList[i]]; | ||
if (node) { | ||
delete this.activeNodes[stopList[i]]; | ||
if (this.subflowInstanceNodes[stopList[i]]) { | ||
try { | ||
var subflow = this.subflowInstanceNodes[stopList[i]]; | ||
promises.push(stopNode(node,false).then(() => { subflow.stop() })); | ||
} catch(err) { | ||
node.error(err); | ||
} | ||
delete this.subflowInstanceNodes[stopList[i]]; | ||
} else { | ||
try { | ||
var removed = removedMap[stopList[i]]; | ||
promises.push( | ||
when.promise(function(resolve, reject) { | ||
var start; | ||
var nt = node.type; | ||
var nid = node.id; | ||
var n = node; | ||
when.promise(function(resolve) { | ||
Log.trace("Stopping node "+nt+":"+nid+(removed?" removed":"")); | ||
start = Date.now(); | ||
resolve(n.close(removed)); | ||
}).timeout(nodeCloseTimeout).then(function(){ | ||
var delta = Date.now() - start; | ||
Log.trace("Stopped node "+nt+":"+nid+" ("+delta+"ms)" ); | ||
resolve(delta); | ||
},function(err) { | ||
var delta = Date.now() - start; | ||
n.error(Log._("nodes.flows.stopping-error",{message:err})); | ||
Log.debug(err.stack); | ||
reject(err); | ||
}); | ||
}) | ||
); | ||
promises.push(stopNode(node,removed).catch(()=>{})); | ||
} catch(err) { | ||
@@ -191,56 +270,132 @@ node.error(err); | ||
} | ||
when.settle(promises).then(function(results) { | ||
resolve(); | ||
}); | ||
}); | ||
} | ||
return Promise.all(promises); | ||
} | ||
this.update = function(_global,_flow) { | ||
global = _global; | ||
flow = _flow; | ||
/** | ||
* Update the flow definition. This doesn't change anything that is running. | ||
* This should be called after `stop` and before `start`. | ||
* @param {[type]} _global [description] | ||
* @param {[type]} _flow [description] | ||
* @return {[type]} [description] | ||
*/ | ||
update(_global,_flow) { | ||
this.global = _global; | ||
this.flow = _flow; | ||
} | ||
this.getNode = function(id) { | ||
return activeNodes[id]; | ||
/** | ||
* Get a node instance from this flow. If the node is not known to this | ||
* flow, pass the request up to the parent. | ||
* @param {[type]} id [description] | ||
* @return {[type]} [description] | ||
*/ | ||
getNode(id) { | ||
// console.log('getNode',id,!!this.activeNodes[id]) | ||
if (!id) { | ||
return undefined; | ||
} | ||
// console.log((new Error().stack).toString().split("\n").slice(1,3).join("\n")) | ||
if ((this.flow.configs && this.flow.configs[id]) || (this.flow.nodes && this.flow.nodes[id])) { | ||
// This is a node owned by this flow, so return whatever we have got | ||
// During a stop/restart, activeNodes could be null for this id | ||
return this.activeNodes[id]; | ||
} else if (this.activeNodes[id]) { | ||
// TEMP: this is a subflow internal node within this flow | ||
return this.activeNodes[id]; | ||
} | ||
return this.parent.getNode(id); | ||
} | ||
this.getActiveNodes = function() { | ||
return activeNodes; | ||
/** | ||
* Get all of the nodes instantiated within this flow | ||
* @return {[type]} [description] | ||
*/ | ||
getActiveNodes() { | ||
return this.activeNodes; | ||
} | ||
this.handleStatus = function(node,statusMessage) { | ||
var targetStatusNodes = null; | ||
var reportingNode = node; | ||
var handled = false; | ||
while (reportingNode && !handled) { | ||
targetStatusNodes = statusNodeMap[reportingNode.z]; | ||
if (targetStatusNodes) { | ||
targetStatusNodes.forEach(function(targetStatusNode) { | ||
if (targetStatusNode.scope && targetStatusNode.scope.indexOf(node.id) === -1) { | ||
return; | ||
} | ||
var message = { | ||
status: { | ||
text: "", | ||
source: { | ||
id: node.id, | ||
type: node.type, | ||
name: node.name | ||
} | ||
/** | ||
* Get a flow setting value. This currently automatically defers to the parent | ||
* flow which, as defined in ./index.js returns `process.env[key]`. | ||
* This lays the groundwork for Subflow to have instance-specific settings | ||
* @param {[type]} key [description] | ||
* @return {[type]} [description] | ||
*/ | ||
getSetting(key) { | ||
return this.parent.getSetting(key); | ||
} | ||
/** | ||
* Handle a status event from a node within this flow. | ||
* @param {Node} node The original node that triggered the event | ||
* @param {Object} statusMessage The status object | ||
* @param {Node} reportingNode The node emitting the status event. | ||
* This could be a subflow instance node when the status | ||
* is being delegated up. | ||
* @param {boolean} muteStatusEvent Whether to emit the status event | ||
* @return {[type]} [description] | ||
*/ | ||
handleStatus(node,statusMessage,reportingNode,muteStatusEvent) { | ||
if (!reportingNode) { | ||
reportingNode = node; | ||
} | ||
if (!muteStatusEvent) { | ||
events.emit("node-status",{ | ||
id: node.id, | ||
status:statusMessage | ||
}); | ||
} | ||
let handled = false; | ||
if (this.id === 'global' && node.users) { | ||
// This is a global config node | ||
// Delegate status to any nodes using this config node | ||
for (let userNode in node.users) { | ||
if (node.users.hasOwnProperty(userNode)) { | ||
node.users[userNode]._flow.handleStatus(node,statusMessage,node.users[userNode],true); | ||
} | ||
} | ||
handled = true; | ||
} else { | ||
this.statusNodes.forEach(function(targetStatusNode) { | ||
if (targetStatusNode.scope && targetStatusNode.scope.indexOf(reportingNode.id) === -1) { | ||
return; | ||
} | ||
var message = { | ||
status: { | ||
text: "", | ||
source: { | ||
id: node.id, | ||
type: node.type, | ||
name: node.name | ||
} | ||
}; | ||
if (statusMessage.hasOwnProperty("text")) { | ||
message.status.text = statusMessage.text.toString(); | ||
} | ||
targetStatusNode.receive(message); | ||
handled = true; | ||
}); | ||
} | ||
if (!handled) { | ||
reportingNode = activeNodes[reportingNode.z]; | ||
} | ||
}; | ||
if (statusMessage.hasOwnProperty("text")) { | ||
message.status.text = statusMessage.text.toString(); | ||
} | ||
targetStatusNode.receive(message); | ||
handled = true; | ||
}); | ||
} | ||
return handled; | ||
} | ||
this.handleError = function(node,logMessage,msg) { | ||
/** | ||
* Handle an error event from a node within this flow. If there are no Catch | ||
* nodes within this flow, pass the event to the parent flow. | ||
* @param {[type]} node [description] | ||
* @param {[type]} logMessage [description] | ||
* @param {[type]} msg [description] | ||
* @param {[type]} reportingNode [description] | ||
* @return {[type]} [description] | ||
*/ | ||
handleError(node,logMessage,msg,reportingNode) { | ||
if (!reportingNode) { | ||
reportingNode = node; | ||
} | ||
// console.log("HE",logMessage); | ||
var count = 1; | ||
@@ -258,250 +413,85 @@ if (msg && msg.hasOwnProperty("error") && msg.error !== null) { | ||
} | ||
var targetCatchNodes = null; | ||
var throwingNode = node; | ||
var handled = false; | ||
while (throwingNode && !handled) { | ||
targetCatchNodes = catchNodeMap[throwingNode.z]; | ||
if (targetCatchNodes) { | ||
targetCatchNodes.forEach(function(targetCatchNode) { | ||
if (targetCatchNode.scope && targetCatchNode.scope.indexOf(throwingNode.id) === -1) { | ||
return; | ||
} | ||
var errorMessage; | ||
if (msg) { | ||
errorMessage = redUtil.cloneMessage(msg); | ||
} else { | ||
errorMessage = {}; | ||
} | ||
if (errorMessage.hasOwnProperty("error")) { | ||
errorMessage._error = errorMessage.error; | ||
} | ||
errorMessage.error = { | ||
message: logMessage.toString(), | ||
source: { | ||
id: node.id, | ||
type: node.type, | ||
name: node.name, | ||
count: count | ||
} | ||
}; | ||
if (logMessage.hasOwnProperty('stack')) { | ||
errorMessage.error.stack = logMessage.stack; | ||
} | ||
targetCatchNode.receive(errorMessage); | ||
handled = true; | ||
}); | ||
} | ||
if (!handled) { | ||
throwingNode = activeNodes[throwingNode.z]; | ||
} | ||
} | ||
return handled; | ||
} | ||
} | ||
let handled = false; | ||
function createNode(type,config) { | ||
var nn = null; | ||
try { | ||
var nt = typeRegistry.get(type); | ||
if (nt) { | ||
var conf = clone(config); | ||
delete conf.credentials; | ||
for (var p in conf) { | ||
if (conf.hasOwnProperty(p)) { | ||
flowUtil.mapEnvVarProperties(conf,p); | ||
} | ||
} | ||
try { | ||
nn = new nt(conf); | ||
} | ||
catch (err) { | ||
Log.log({ | ||
level: Log.ERROR, | ||
id:conf.id, | ||
type: type, | ||
msg: err | ||
}); | ||
} | ||
} else { | ||
Log.error(Log._("nodes.flow.unknown-type", {type:type})); | ||
} | ||
} catch(err) { | ||
Log.error(err); | ||
} | ||
return nn; | ||
} | ||
function createSubflow(sf,sfn,subflows,globalSubflows,activeNodes) { | ||
//console.log("CREATE SUBFLOW",sf.id,sfn.id); | ||
var nodes = []; | ||
var node_map = {}; | ||
var newNodes = []; | ||
var node; | ||
var wires; | ||
var i,j,k; | ||
var createNodeInSubflow = function(def) { | ||
node = clone(def); | ||
var nid = redUtil.generateId(); | ||
node_map[node.id] = node; | ||
node._alias = node.id; | ||
node.id = nid; | ||
node.z = sfn.id; | ||
newNodes.push(node); | ||
} | ||
// Clone all of the subflow node definitions and give them new IDs | ||
for (i in sf.configs) { | ||
if (sf.configs.hasOwnProperty(i)) { | ||
createNodeInSubflow(sf.configs[i]); | ||
} | ||
} | ||
// Clone all of the subflow node definitions and give them new IDs | ||
for (i in sf.nodes) { | ||
if (sf.nodes.hasOwnProperty(i)) { | ||
createNodeInSubflow(sf.nodes[i]); | ||
} | ||
} | ||
// Look for any catch/status nodes and update their scope ids | ||
// Update all subflow interior wiring to reflect new node IDs | ||
for (i=0;i<newNodes.length;i++) { | ||
node = newNodes[i]; | ||
if (node.wires) { | ||
var outputs = node.wires; | ||
for (j=0;j<outputs.length;j++) { | ||
wires = outputs[j]; | ||
for (k=0;k<wires.length;k++) { | ||
outputs[j][k] = node_map[outputs[j][k]].id | ||
if (this.id === 'global' && node.users) { | ||
// This is a global config node | ||
// Delegate status to any nodes using this config node | ||
for (let userNode in node.users) { | ||
if (node.users.hasOwnProperty(userNode)) { | ||
node.users[userNode]._flow.handleError(node,logMessage,msg,node.users[userNode]); | ||
} | ||
} | ||
if ((node.type === 'catch' || node.type === 'status') && node.scope) { | ||
node.scope = node.scope.map(function(id) { | ||
return node_map[id]?node_map[id].id:"" | ||
}) | ||
} else { | ||
for (var prop in node) { | ||
if (node.hasOwnProperty(prop) && prop !== '_alias') { | ||
if (node_map[node[prop]]) { | ||
//console.log("Mapped",node.type,node.id,prop,node_map[node[prop]].id); | ||
node[prop] = node_map[node[prop]].id; | ||
} | ||
} | ||
handled = true; | ||
} else { | ||
this.catchNodes.forEach(function(targetCatchNode) { | ||
if (targetCatchNode.scope && targetCatchNode.scope.indexOf(reportingNode.id) === -1) { | ||
return; | ||
} | ||
} | ||
} | ||
} | ||
// Create a subflow node to accept inbound messages and route appropriately | ||
var Node = require("../Node"); | ||
var subflowInstance = { | ||
id: sfn.id, | ||
type: sfn.type, | ||
z: sfn.z, | ||
name: sfn.name, | ||
wires: [] | ||
} | ||
if (sf.in) { | ||
subflowInstance.wires = sf.in.map(function(n) { return n.wires.map(function(w) { return node_map[w.id].id;})}) | ||
subflowInstance._originalWires = clone(subflowInstance.wires); | ||
} | ||
var subflowNode = new Node(subflowInstance); | ||
subflowNode.on("input", function(msg) { this.send(msg);}); | ||
subflowNode._updateWires = subflowNode.updateWires; | ||
subflowNode.updateWires = function(newWires) { | ||
// Wire the subflow outputs | ||
if (sf.out) { | ||
var node,wires,i,j; | ||
// Restore the original wiring to the internal nodes | ||
subflowInstance.wires = clone(subflowInstance._originalWires); | ||
for (i=0;i<sf.out.length;i++) { | ||
wires = sf.out[i].wires; | ||
for (j=0;j<wires.length;j++) { | ||
if (wires[j].id != sf.id) { | ||
node = node_map[wires[j].id]; | ||
if (node._originalWires) { | ||
node.wires = clone(node._originalWires); | ||
} | ||
} | ||
var errorMessage; | ||
if (msg) { | ||
errorMessage = redUtil.cloneMessage(msg); | ||
} else { | ||
errorMessage = {}; | ||
} | ||
} | ||
var modifiedNodes = {}; | ||
var subflowInstanceModified = false; | ||
for (i=0;i<sf.out.length;i++) { | ||
wires = sf.out[i].wires; | ||
for (j=0;j<wires.length;j++) { | ||
if (wires[j].id === sf.id) { | ||
subflowInstance.wires[wires[j].port] = subflowInstance.wires[wires[j].port].concat(newWires[i]); | ||
subflowInstanceModified = true; | ||
} else { | ||
node = node_map[wires[j].id]; | ||
node.wires[wires[j].port] = node.wires[wires[j].port].concat(newWires[i]); | ||
modifiedNodes[node.id] = node; | ||
if (errorMessage.hasOwnProperty("error")) { | ||
errorMessage._error = errorMessage.error; | ||
} | ||
errorMessage.error = { | ||
message: logMessage.toString(), | ||
source: { | ||
id: node.id, | ||
type: node.type, | ||
name: node.name, | ||
count: count | ||
} | ||
}; | ||
if (logMessage.hasOwnProperty('stack')) { | ||
errorMessage.error.stack = logMessage.stack; | ||
} | ||
} | ||
Object.keys(modifiedNodes).forEach(function(id) { | ||
var node = modifiedNodes[id]; | ||
subflowNode.instanceNodes[id].updateWires(node.wires); | ||
targetCatchNode.receive(errorMessage); | ||
handled = true; | ||
}); | ||
if (subflowInstanceModified) { | ||
subflowNode._updateWires(subflowInstance.wires); | ||
} | ||
} | ||
return handled; | ||
} | ||
nodes.push(subflowNode); | ||
// Wire the subflow outputs | ||
if (sf.out) { | ||
var modifiedNodes = {}; | ||
for (i=0;i<sf.out.length;i++) { | ||
wires = sf.out[i].wires; | ||
for (j=0;j<wires.length;j++) { | ||
if (wires[j].id === sf.id) { | ||
// A subflow input wired straight to a subflow output | ||
subflowInstance.wires[wires[j].port] = subflowInstance.wires[wires[j].port].concat(sfn.wires[i]) | ||
subflowNode._updateWires(subflowInstance.wires); | ||
} else { | ||
node = node_map[wires[j].id]; | ||
modifiedNodes[node.id] = node; | ||
if (!node._originalWires) { | ||
node._originalWires = clone(node.wires); | ||
} | ||
node.wires[wires[j].port] = (node.wires[wires[j].port]||[]).concat(sfn.wires[i]); | ||
dump() { | ||
console.log("==================") | ||
console.log(this.TYPE, this.id); | ||
for (var id in this.activeNodes) { | ||
if (this.activeNodes.hasOwnProperty(id)) { | ||
var node = this.activeNodes[id]; | ||
console.log(" ",id.padEnd(16),node.type) | ||
if (node.wires) { | ||
console.log(" -> ",node.wires) | ||
} | ||
} | ||
} | ||
console.log("==================") | ||
} | ||
} | ||
// Instantiate the nodes | ||
for (i=0;i<newNodes.length;i++) { | ||
node = newNodes[i]; | ||
var type = node.type; | ||
var m = /^subflow:(.+)$/.exec(type); | ||
if (!m) { | ||
var newNode = createNode(type,node); | ||
if (newNode) { | ||
activeNodes[node.id] = newNode; | ||
nodes.push(newNode); | ||
} | ||
} else { | ||
var subflowId = m[1]; | ||
nodes = nodes.concat(createSubflow(subflows[subflowId]||globalSubflows[subflowId],node,subflows,globalSubflows,activeNodes)); | ||
} | ||
} | ||
subflowNode.instanceNodes = {}; | ||
nodes.forEach(function(node) { | ||
subflowNode.instanceNodes[node.id] = node; | ||
/** | ||
* Stop an individual node within this flow. | ||
* | ||
* @param {[type]} node [description] | ||
* @param {[type]} removed [description] | ||
* @return {[type]} [description] | ||
*/ | ||
function stopNode(node,removed) { | ||
Log.trace("Stopping node "+node.type+":"+node.id+(removed?" removed":"")); | ||
const start = Date.now(); | ||
const closePromise = node.close(removed); | ||
const closeTimeout = new Promise((resolve,reject) => { | ||
setTimeout(() => { | ||
reject("Close timed out"); | ||
}, nodeCloseTimeout); | ||
}); | ||
return nodes; | ||
return Promise.race([closePromise,closeTimeout]).then(() => { | ||
var delta = Date.now() - start; | ||
Log.trace("Stopped node "+node.type+":"+node.id+" ("+delta+"ms)" ); | ||
}).catch(err => { | ||
node.error(Log._("nodes.flows.stopping-error",{message:err})); | ||
Log.debug(err.stack); | ||
}) | ||
} | ||
@@ -514,7 +504,9 @@ | ||
Log = runtime.log; | ||
Node = require("../Node"); | ||
Subflow = require("./Subflow"); | ||
Subflow.init(runtime); | ||
}, | ||
create: function(global,conf) { | ||
return new Flow(global,conf); | ||
} | ||
create: function(parent,global,conf) { | ||
return new Flow(parent,global,conf); | ||
}, | ||
Flow: Flow | ||
} |
@@ -45,3 +45,2 @@ /** | ||
var activeNodesToFlow = {}; | ||
var subflowInstanceNodeMap = {}; | ||
@@ -145,4 +144,7 @@ var typeEventRegistered = false; | ||
} else { | ||
// Clone the provided config so it can be manipulated | ||
config = clone(_config); | ||
// Parse the configuration | ||
newFlowConfig = flowUtil.parseConfig(clone(config)); | ||
// Generate a diff to identify what has changed | ||
diff = flowUtil.diffConfigs(activeFlowConfig,newFlowConfig); | ||
@@ -158,4 +160,10 @@ | ||
// Allow the credential store to remove anything no longer needed | ||
credentials.clean(config); | ||
// Remember whether credentials need saving or not | ||
var credsDirty = credentials.dirty(); | ||
// Get the latest credentials and ask storage to save them (if needed) | ||
// as well as the new flow configuration. | ||
configSavePromise = credentials.export().then(function(creds) { | ||
@@ -182,11 +190,16 @@ var saveConfig = { | ||
if (forceStart || started) { | ||
return stop(type,diff,muteLog).then(function() { | ||
return context.clean(activeFlowConfig).then(function() { | ||
start(type,diff,muteLog).then(function() { | ||
events.emit("runtime-event",{id:"runtime-deploy",payload:{revision:flowRevision},retain: true}); | ||
}); | ||
return flowRevision; | ||
// Flows are running (or should be) | ||
// Stop the active flows (according to deploy type and the diff) | ||
return stop(type,diff,muteLog).then(() => { | ||
// Once stopped, allow context to remove anything no longer needed | ||
return context.clean(activeFlowConfig) | ||
}).then(() => { | ||
// Start the active flows | ||
start(type,diff,muteLog).then(() => { | ||
events.emit("runtime-event",{id:"runtime-deploy",payload:{revision:flowRevision},retain: true}); | ||
}); | ||
}).catch(function(err) { | ||
}) | ||
// Return the new revision asynchronously to the actual start | ||
return flowRevision; | ||
}).catch(function(err) { }) | ||
} else { | ||
@@ -226,60 +239,7 @@ events.emit("runtime-event",{id:"runtime-deploy",payload:{revision:flowRevision},retain: true}); | ||
function delegateError(node,logMessage,msg) { | ||
var handled = false; | ||
if (activeFlows[node.z]) { | ||
handled = activeFlows[node.z].handleError(node,logMessage,msg); | ||
} else if (activeNodesToFlow[node.z] && activeFlows[activeNodesToFlow[node.z]]) { | ||
handled = activeFlows[activeNodesToFlow[node.z]].handleError(node,logMessage,msg); | ||
} else if (activeFlowConfig.subflows[node.z] && subflowInstanceNodeMap[node.id]) { | ||
subflowInstanceNodeMap[node.id].forEach(function(n) { | ||
handled = handled || delegateError(getNode(n),logMessage,msg); | ||
}); | ||
} | ||
return handled; | ||
} | ||
function handleError(node,logMessage,msg) { | ||
var handled = false; | ||
if (node.z) { | ||
handled = delegateError(node,logMessage,msg); | ||
} else { | ||
if (activeFlowConfig.configs[node.id]) { | ||
activeFlowConfig.configs[node.id]._users.forEach(function(id) { | ||
var userNode = activeFlowConfig.allNodes[id]; | ||
handled = handled || delegateError(userNode,logMessage,msg); | ||
}) | ||
} | ||
} | ||
return handled; | ||
} | ||
function delegateStatus(node,statusMessage) { | ||
if (activeFlows[node.z]) { | ||
activeFlows[node.z].handleStatus(node,statusMessage); | ||
} else if (activeNodesToFlow[node.z] && activeFlows[activeNodesToFlow[node.z]]) { | ||
activeFlows[activeNodesToFlow[node.z]].handleStatus(node,statusMessage); | ||
} | ||
} | ||
function handleStatus(node,statusMessage) { | ||
events.emit("node-status",{ | ||
id: node.id, | ||
status:statusMessage | ||
}); | ||
if (node.z) { | ||
delegateStatus(node,statusMessage); | ||
} else { | ||
if (activeFlowConfig.configs[node.id]) { | ||
activeFlowConfig.configs[node.id]._users.forEach(function(id) { | ||
var userNode = activeFlowConfig.allNodes[id]; | ||
delegateStatus(userNode,statusMessage); | ||
}) | ||
} | ||
} | ||
} | ||
function start(type,diff,muteLog) { | ||
//dumpActiveNodes(); | ||
type = type||"full"; | ||
started = true; | ||
var i; | ||
// If there are missing types, report them, emit the necessary runtime event and return | ||
if (activeFlowConfig.missingTypes.length > 0) { | ||
@@ -305,4 +265,6 @@ log.info(log._("nodes.flows.missing-types")); | ||
events.emit("runtime-event",{id:"runtime-state",payload:{error:"missing-types", type:"warning",text:"notification.warnings.missing-types",types:activeFlowConfig.missingTypes},retain:true}); | ||
return when.resolve(); | ||
return Promise.resolve(); | ||
} | ||
// In safe mode, don't actually start anything, emit the necessary runtime event and return | ||
if (settings.safeMode) { | ||
@@ -315,2 +277,3 @@ log.info("*****************************************************************") | ||
} | ||
if (!muteLog) { | ||
@@ -323,12 +286,19 @@ if (type !== "full") { | ||
} | ||
var id; | ||
if (type === "full") { | ||
// A full start means everything should | ||
// Check the 'global' flow is running | ||
if (!activeFlows['global']) { | ||
log.debug("red/nodes/flows.start : starting flow : global"); | ||
activeFlows['global'] = Flow.create(activeFlowConfig); | ||
activeFlows['global'] = Flow.create(flowAPI,activeFlowConfig); | ||
} | ||
// Check each flow in the active configuration | ||
for (id in activeFlowConfig.flows) { | ||
if (activeFlowConfig.flows.hasOwnProperty(id)) { | ||
if (!activeFlowConfig.flows[id].disabled && !activeFlows[id]) { | ||
activeFlows[id] = Flow.create(activeFlowConfig,activeFlowConfig.flows[id]); | ||
// This flow is not disabled, nor is it currently active, so create it | ||
activeFlows[id] = Flow.create(flowAPI,activeFlowConfig,activeFlowConfig.flows[id]); | ||
log.debug("red/nodes/flows.start : starting flow : "+id); | ||
@@ -341,2 +311,5 @@ } else { | ||
} else { | ||
// A modified-type deploy means restarting things that have changed | ||
// Update the global flow | ||
activeFlows['global'].update(activeFlowConfig,activeFlowConfig); | ||
@@ -347,5 +320,7 @@ for (id in activeFlowConfig.flows) { | ||
if (activeFlows[id]) { | ||
// This flow exists and is not disabled, so update it | ||
activeFlows[id].update(activeFlowConfig,activeFlowConfig.flows[id]); | ||
} else { | ||
activeFlows[id] = Flow.create(activeFlowConfig,activeFlowConfig.flows[id]); | ||
// This flow didn't previously exist, so create it | ||
activeFlows[id] = Flow.create(flowAPI,activeFlowConfig,activeFlowConfig.flows[id]); | ||
log.debug("red/nodes/flows.start : starting flow : "+id); | ||
@@ -359,15 +334,16 @@ } | ||
} | ||
// Having created or updated all flows, now start them. | ||
for (id in activeFlows) { | ||
if (activeFlows.hasOwnProperty(id)) { | ||
activeFlows[id].start(diff); | ||
var activeNodes = activeFlows[id].getActiveNodes(); | ||
Object.keys(activeNodes).forEach(function(nid) { | ||
activeNodesToFlow[nid] = id; | ||
if (activeNodes[nid]._alias) { | ||
subflowInstanceNodeMap[activeNodes[nid]._alias] = subflowInstanceNodeMap[activeNodes[nid]._alias] || []; | ||
subflowInstanceNodeMap[activeNodes[nid]._alias].push(nid); | ||
} | ||
}); | ||
try { | ||
activeFlows[id].start(diff); | ||
// Create a map of node id to flow id and also a subflowInstance lookup map | ||
var activeNodes = activeFlows[id].getActiveNodes(); | ||
Object.keys(activeNodes).forEach(function(nid) { | ||
activeNodesToFlow[nid] = id; | ||
}); | ||
} catch(err) { | ||
console.log(err.stack); | ||
} | ||
} | ||
@@ -390,3 +366,3 @@ } | ||
} | ||
return when.resolve(); | ||
return Promise.resolve(); | ||
} | ||
@@ -396,3 +372,3 @@ | ||
if (!started) { | ||
return when.resolve(); | ||
return Promise.resolve(); | ||
} | ||
@@ -435,30 +411,23 @@ type = type||"full"; | ||
return when.promise(function(resolve,reject) { | ||
when.settle(promises).then(function() { | ||
for (id in activeNodesToFlow) { | ||
if (activeNodesToFlow.hasOwnProperty(id)) { | ||
if (!activeFlows[activeNodesToFlow[id]]) { | ||
delete activeNodesToFlow[id]; | ||
} | ||
return Promise.resolve(promises).then(function() { | ||
for (id in activeNodesToFlow) { | ||
if (activeNodesToFlow.hasOwnProperty(id)) { | ||
if (!activeFlows[activeNodesToFlow[id]]) { | ||
delete activeNodesToFlow[id]; | ||
} | ||
} | ||
if (stopList) { | ||
stopList.forEach(function(id) { | ||
delete activeNodesToFlow[id]; | ||
}); | ||
} | ||
if (stopList) { | ||
stopList.forEach(function(id) { | ||
delete activeNodesToFlow[id]; | ||
}); | ||
} | ||
if (!muteLog) { | ||
if (type !== "full") { | ||
log.info(log._("nodes.flows.stopped-modified-"+type)); | ||
} else { | ||
log.info(log._("nodes.flows.stopped-flows")); | ||
} | ||
// Ideally we'd prune just what got stopped - but mapping stopList | ||
// id to the list of subflow instance nodes is something only Flow | ||
// can do... so cheat by wiping the map knowing it'll be rebuilt | ||
// in start() | ||
subflowInstanceNodeMap = {}; | ||
if (!muteLog) { | ||
if (type !== "full") { | ||
log.info(log._("nodes.flows.stopped-modified-"+type)); | ||
} else { | ||
log.info(log._("nodes.flows.stopped-flows")); | ||
} | ||
} | ||
resolve(); | ||
}); | ||
} | ||
events.emit("nodes-stopped"); | ||
}); | ||
@@ -720,2 +689,10 @@ } | ||
const flowAPI = { | ||
getNode: getNode, | ||
handleError: () => false, | ||
handleStatus: () => false, | ||
getSetting: k => process.env[k] | ||
} | ||
module.exports = { | ||
@@ -759,4 +736,4 @@ init: init, | ||
handleError: handleError, | ||
handleStatus: handleStatus, | ||
// handleError: handleError, | ||
// handleStatus: handleStatus, | ||
@@ -763,0 +740,0 @@ checkTypeInUse: checkTypeInUse, |
@@ -18,2 +18,3 @@ /** | ||
var redUtil = require("@node-red/util").util; | ||
var Log = require("@node-red/util").log; | ||
var subflowInstanceRE = /^subflow:(.+)$/; | ||
@@ -44,18 +45,24 @@ var typeRegistry = require("@node-red/registry"); | ||
function mapEnvVarProperties(obj,prop) { | ||
if (Buffer.isBuffer(obj[prop])) { | ||
function mapEnvVarProperties(obj,prop,flow) { | ||
var v = obj[prop]; | ||
if (Buffer.isBuffer(v)) { | ||
return; | ||
} else if (Array.isArray(obj[prop])) { | ||
for (var i=0;i<obj[prop].length;i++) { | ||
mapEnvVarProperties(obj[prop],i); | ||
} else if (Array.isArray(v)) { | ||
for (var i=0;i<v.length;i++) { | ||
mapEnvVarProperties(v,i,flow); | ||
} | ||
} else if (typeof obj[prop] === 'string') { | ||
if (obj[prop][0] === "$" && (EnvVarPropertyRE_old.test(obj[prop]) || EnvVarPropertyRE.test(obj[prop])) ) { | ||
var envVar = obj[prop].substring(2,obj[prop].length-1); | ||
obj[prop] = process.env.hasOwnProperty(envVar)?process.env[envVar]:obj[prop]; | ||
if (obj[prop][0] === "$" && (EnvVarPropertyRE_old.test(v) || EnvVarPropertyRE.test(v)) ) { | ||
var envVar = v.substring(2,v.length-1); | ||
if (!flow) { | ||
obj[prop] = process.env.hasOwnProperty(envVar)?process.env[envVar]:v; | ||
} else { | ||
var r = flow.getSetting(envVar); | ||
obj[prop] = r!==undefined?r:obj[prop]; | ||
} | ||
} | ||
} else { | ||
for (var p in obj[prop]) { | ||
if (obj[prop].hasOwnProperty(p)) { | ||
mapEnvVarProperties(obj[prop],p); | ||
for (var p in v) { | ||
if (v.hasOwnProperty(p)) { | ||
mapEnvVarProperties(v,p,flow); | ||
} | ||
@@ -432,5 +439,6 @@ } | ||
// console.log( | ||
// (added[id]?"+":(changed[id]?"!":" "))+(wiringChanged[id]?"w":" ")+(diff.linked.indexOf(id)!==-1?"~":" "), | ||
// id, | ||
// newConfig.allNodes[id].type, | ||
// (added[id]?"a":(changed[id]?"c":" "))+(wiringChanged[id]?"w":" ")+(diff.linked.indexOf(id)!==-1?"l":" "), | ||
// newConfig.allNodes[id].type.padEnd(10), | ||
// id.padEnd(16), | ||
// (newConfig.allNodes[id].z||"").padEnd(16), | ||
// newConfig.allNodes[id].name||newConfig.allNodes[id].label||"" | ||
@@ -449,3 +457,42 @@ // ); | ||
return diff; | ||
}, | ||
/** | ||
* Create a new instance of a node | ||
* @param {Flow} flow The containing flow | ||
* @param {object} config The node configuration object | ||
* @return {Node} The instance of the node | ||
*/ | ||
createNode: function(flow,config) { | ||
var newNode = null; | ||
var type = config.type; | ||
try { | ||
var nodeTypeConstructor = typeRegistry.get(type); | ||
if (nodeTypeConstructor) { | ||
var conf = clone(config); | ||
delete conf.credentials; | ||
for (var p in conf) { | ||
if (conf.hasOwnProperty(p)) { | ||
mapEnvVarProperties(conf,p,flow); | ||
} | ||
} | ||
try { | ||
conf._flow = flow; | ||
newNode = new nodeTypeConstructor(conf); | ||
} catch (err) { | ||
Log.log({ | ||
level: Log.ERROR, | ||
id:conf.id, | ||
type: type, | ||
msg: err | ||
}); | ||
} | ||
} else { | ||
Log.error(Log._("nodes.flow.unknown-type", {type:type})); | ||
} | ||
} catch(err) { | ||
Log.error(err); | ||
} | ||
return newNode; | ||
} | ||
} |
@@ -107,3 +107,3 @@ /** | ||
if (creds.hasOwnProperty(p)) { | ||
flowUtil.mapEnvVarProperties(creds,p); | ||
flowUtil.mapEnvVarProperties(creds,p,node._flow); | ||
} | ||
@@ -110,0 +110,0 @@ } |
@@ -19,3 +19,2 @@ /** | ||
var EventEmitter = require("events").EventEmitter; | ||
var when = require("when"); | ||
@@ -39,2 +38,8 @@ var redUtil = require("@node-red/util").util; | ||
} | ||
if (n._flow) { | ||
// Make this a non-enumerable property as it may cause | ||
// circular references. Any existing code that tries to JSON serialise | ||
// the object (such as dashboard) will not like circular refs | ||
Object.defineProperty(this,'_flow', {value: n._flow, }) | ||
} | ||
this.updateWires(n.wires); | ||
@@ -94,17 +99,30 @@ } | ||
promises.push( | ||
when.promise(function(resolve) { | ||
var args = []; | ||
if (callback.length === 2) { | ||
args.push(!!removed); | ||
new Promise((resolve) => { | ||
try { | ||
var args = []; | ||
if (callback.length === 2) { | ||
args.push(!!removed); | ||
} | ||
args.push(() => { | ||
resolve(); | ||
}); | ||
callback.apply(node, args); | ||
} catch(err) { | ||
// TODO: error thrown in node async close callback | ||
// We've never logged this properly. | ||
resolve(); | ||
} | ||
args.push(resolve); | ||
callback.apply(node, args); | ||
}) | ||
); | ||
} else { | ||
callback.call(node); | ||
try { | ||
callback.call(node); | ||
} catch(err) { | ||
// TODO: error thrown in node sync close callback | ||
// We've never logged this properly. | ||
} | ||
} | ||
} | ||
if (promises.length > 0) { | ||
return when.settle(promises).then(function() { | ||
return Promise.all(promises).then(function() { | ||
if (this._context) { | ||
@@ -118,3 +136,3 @@ return context.delete(this._alias||this.id,this.z); | ||
} | ||
return; | ||
return Promise.resolve(); | ||
} | ||
@@ -138,3 +156,3 @@ }; | ||
this.metric("send",msg); | ||
node = flows.get(this._wire); | ||
node = this._flow.getNode(this._wire); | ||
/* istanbul ignore else */ | ||
@@ -171,3 +189,3 @@ if (node) { | ||
for (var j = 0; j < wires.length; j++) { | ||
node = flows.get(wires[j]); // node at end of wire j | ||
node = this._flow.getNode(wires[j]); // node at end of wire j | ||
if (node) { | ||
@@ -260,3 +278,3 @@ // for each msg to send eg. [[m1, m2, ...], ...] | ||
if (msg) { | ||
handled = flows.handleError(this,logMessage,msg); | ||
handled = this._flow.handleError(this,logMessage,msg); | ||
} | ||
@@ -296,4 +314,5 @@ if (!handled) { | ||
Node.prototype.status = function(status) { | ||
flows.handleStatus(this,status); | ||
this._flow.handleStatus(this,status); | ||
}; | ||
module.exports = Node; |
{ | ||
"name": "@node-red/runtime", | ||
"version": "0.20.0-beta.3", | ||
"version": "0.20.0-beta.4", | ||
"license": "Apache-2.0", | ||
@@ -19,4 +19,4 @@ "main": "./lib/index.js", | ||
"dependencies": { | ||
"@node-red/registry": "0.20.0-beta.3", | ||
"@node-red/util": "0.20.0-beta.3", | ||
"@node-red/registry": "0.20.0-beta.4", | ||
"@node-red/util": "0.20.0-beta.4", | ||
"clone": "2.1.2", | ||
@@ -23,0 +23,0 @@ "express": "4.16.4", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
416212
46
10160
44
+ Added@node-red/registry@0.20.0-beta.4(transitive)
+ Added@node-red/util@0.20.0-beta.4(transitive)
+ Addedjsonata@1.6.4(transitive)
- Removed@node-red/registry@0.20.0-beta.3(transitive)
- Removed@node-red/util@0.20.0-beta.3(transitive)
- Removedjsonata@1.6.3(transitive)
Updated@node-red/util@0.20.0-beta.4