Socket
Socket
Sign inDemoInstall

@node-red/runtime

Package Overview
Dependencies
Maintainers
2
Versions
109
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@node-red/runtime - npm Package Compare versions

Comparing version 0.20.0-beta.3 to 0.20.0-beta.4

lib/nodes/flows/Subflow.js

730

lib/nodes/flows/Flow.js

@@ -17,34 +17,106 @@ /**

var when = require("when");
var clone = require("clone");
var typeRegistry = require("@node-red/registry");
var Log;
var redUtil = require("@node-red/util").util;
var flowUtil = require("./util");
var Node;
var events = require("../../events");
var Subflow;
var Log;
var nodeCloseTimeout = 15000;
function Flow(global,flow) {
if (typeof flow === 'undefined') {
flow = global;
/**
* This class represents a flow within the runtime. It is responsible for
* creating, starting and stopping all nodes within the flow.
*/
class Flow {
/**
* Create a Flow object.
* @param {[type]} parent The parent flow
* @param {[type]} globalFlow The global flow definition
* @param {[type]} flow This flow's definition
*/
constructor(parent,globalFlow,flow) {
this.TYPE = 'flow';
this.parent = parent;
this.global = globalFlow;
if (typeof flow === 'undefined') {
this.flow = globalFlow;
this.isGlobalFlow = true;
} else {
this.flow = flow;
this.isGlobalFlow = false;
}
this.id = this.flow.id || "global";
this.activeNodes = {};
this.subflowInstanceNodes = {};
this.catchNodes = [];
this.statusNodes = [];
}
var activeNodes = {};
var subflowInstanceNodes = {};
var catchNodeMap = {};
var statusNodeMap = {};
this.start = function(diff) {
/**
* Log a debug-level message from this flow
* @param {[type]} msg [description]
* @return {[type]} [description]
*/
debug(msg) {
Log.log({
id: this.id||"global",
level: Log.DEBUG,
type:this.TYPE,
msg:msg
})
}
/**
* Log a info-level message from this flow
* @param {[type]} msg [description]
* @return {[type]} [description]
*/
log(msg) {
Log.log({
id: this.id||"global",
level: Log.INFO,
type:this.TYPE,
msg:msg
})
}
/**
* Log a trace-level message from this flow
* @param {[type]} msg [description]
* @return {[type]} [description]
*/
trace(msg) {
Log.log({
id: this.id||"global",
level: Log.TRACE,
type:this.TYPE,
msg:msg
})
}
/**
* Start this flow.
* The `diff` argument helps define what needs to be started in the case
* of a modified-nodes/flows type deploy.
* @param {[type]} msg [description]
* @return {[type]} [description]
*/
start(diff) {
this.trace("start "+this.TYPE);
var node;
var newNode;
var id;
catchNodeMap = {};
statusNodeMap = {};
this.catchNodes = [];
this.statusNodes = [];
var configNodes = Object.keys(flow.configs);
var configNodes = Object.keys(this.flow.configs);
var configNodeAttempts = {};
while (configNodes.length > 0) {
id = configNodes.shift();
node = flow.configs[id];
if (!activeNodes[id]) {
node = this.flow.configs[id];
if (!this.activeNodes[id]) {
var readyToCreate = true;

@@ -54,4 +126,4 @@ // This node doesn't exist.

for (var prop in node) {
if (node.hasOwnProperty(prop) && prop !== 'id' && prop !== 'wires' && prop !== '_users' && flow.configs[node[prop]]) {
if (!activeNodes[node[prop]]) {
if (node.hasOwnProperty(prop) && prop !== 'id' && prop !== 'wires' && prop !== '_users' && this.flow.configs[node[prop]]) {
if (!this.activeNodes[node[prop]]) {
// References a non-existent config node

@@ -70,5 +142,5 @@ // Add it to the back of the list to try again later

if (readyToCreate) {
newNode = createNode(node.type,node);
newNode = flowUtil.createNode(this,node);
if (newNode) {
activeNodes[id] = newNode;
this.activeNodes[id] = newNode;
}

@@ -81,5 +153,5 @@ }

for (var j=0;j<diff.rewired.length;j++) {
var rewireNode = activeNodes[diff.rewired[j]];
var rewireNode = this.activeNodes[diff.rewired[j]];
if (rewireNode) {
rewireNode.updateWires(flow.nodes[rewireNode.id].wires);
rewireNode.updateWires(this.flow.nodes[rewireNode.id].wires);
}

@@ -89,22 +161,34 @@ }

for (id in flow.nodes) {
if (flow.nodes.hasOwnProperty(id)) {
node = flow.nodes[id];
for (id in this.flow.nodes) {
if (this.flow.nodes.hasOwnProperty(id)) {
node = this.flow.nodes[id];
if (!node.subflow) {
if (!activeNodes[id]) {
newNode = createNode(node.type,node);
if (!this.activeNodes[id]) {
newNode = flowUtil.createNode(this,node);
if (newNode) {
activeNodes[id] = newNode;
this.activeNodes[id] = newNode;
}
}
} else {
if (!subflowInstanceNodes[id]) {
if (!this.subflowInstanceNodes[id]) {
try {
var nodes = createSubflow(flow.subflows[node.subflow]||global.subflows[node.subflow],node,flow.subflows,global.subflows,activeNodes);
subflowInstanceNodes[id] = nodes.map(function(n) { return n.id});
for (var i=0;i<nodes.length;i++) {
if (nodes[i]) {
activeNodes[nodes[i].id] = nodes[i];
}
}
var subflowDefinition = this.flow.subflows[node.subflow]||this.global.subflows[node.subflow]
// console.log("NEED TO CREATE A SUBFLOW",id,node.subflow);
this.subflowInstanceNodes[id] = true;
var subflow = Subflow.create(
this,
this.global,
subflowDefinition,
node
);
subflow.start();
this.activeNodes[id] = subflow.node;
this.subflowInstanceNodes[id] = subflow;
// this.subflowInstanceNodes[id] = nodes.map(function(n) { return n.id});
// for (var i=0;i<nodes.length;i++) {
// if (nodes[i]) {
// this.activeNodes[nodes[i].id] = nodes[i];
// }
// }
} catch(err) {

@@ -118,69 +202,64 @@ console.log(err.stack)

for (id in activeNodes) {
if (activeNodes.hasOwnProperty(id)) {
node = activeNodes[id];
var activeCount = Object.keys(this.activeNodes).length;
if (activeCount > 0) {
this.trace("------------------|--------------|-----------------");
this.trace(" id | type | alias");
this.trace("------------------|--------------|-----------------");
}
// Build the map of catch/status nodes.
for (id in this.activeNodes) {
if (this.activeNodes.hasOwnProperty(id)) {
node = this.activeNodes[id];
this.trace(" "+id.padEnd(16)+" | "+node.type.padEnd(12)+" | "+(node._alias||""));
if (node.type === "catch") {
catchNodeMap[node.z] = catchNodeMap[node.z] || [];
catchNodeMap[node.z].push(node);
this.catchNodes.push(node);
} else if (node.type === "status") {
statusNodeMap[node.z] = statusNodeMap[node.z] || [];
statusNodeMap[node.z].push(node);
this.statusNodes.push(node);
}
}
}
if (activeCount > 0) {
this.trace("------------------|--------------|-----------------");
}
// this.dump();
}
this.stop = function(stopList, removedList) {
return when.promise(function(resolve) {
var i;
if (stopList) {
for (i=0;i<stopList.length;i++) {
if (subflowInstanceNodes[stopList[i]]) {
// The first in the list is the instance node we already
// know about
stopList = stopList.concat(subflowInstanceNodes[stopList[i]].slice(1))
}
}
} else {
stopList = Object.keys(activeNodes);
}
// Convert the list to a map to avoid multiple scans of the list
var removedMap = {};
removedList = removedList || [];
removedList.forEach(function(id) {
removedMap[id] = true;
});
/**
* Stop this flow.
* The `stopList` argument helps define what needs to be stopped in the case
* of a modified-nodes/flows type deploy.
* @param {[type]} stopList [description]
* @param {[type]} removedList [description]
* @return {[type]} [description]
*/
stop(stopList, removedList) {
this.trace("stop "+this.TYPE);
var i;
if (!stopList) {
stopList = Object.keys(this.activeNodes);
}
// Convert the list to a map to avoid multiple scans of the list
var removedMap = {};
removedList = removedList || [];
removedList.forEach(function(id) {
removedMap[id] = true;
});
var promises = [];
for (i=0;i<stopList.length;i++) {
var node = activeNodes[stopList[i]];
if (node) {
delete activeNodes[stopList[i]];
if (subflowInstanceNodes[stopList[i]]) {
delete subflowInstanceNodes[stopList[i]];
var promises = [];
for (i=0;i<stopList.length;i++) {
var node = this.activeNodes[stopList[i]];
if (node) {
delete this.activeNodes[stopList[i]];
if (this.subflowInstanceNodes[stopList[i]]) {
try {
var subflow = this.subflowInstanceNodes[stopList[i]];
promises.push(stopNode(node,false).then(() => { subflow.stop() }));
} catch(err) {
node.error(err);
}
delete this.subflowInstanceNodes[stopList[i]];
} else {
try {
var removed = removedMap[stopList[i]];
promises.push(
when.promise(function(resolve, reject) {
var start;
var nt = node.type;
var nid = node.id;
var n = node;
when.promise(function(resolve) {
Log.trace("Stopping node "+nt+":"+nid+(removed?" removed":""));
start = Date.now();
resolve(n.close(removed));
}).timeout(nodeCloseTimeout).then(function(){
var delta = Date.now() - start;
Log.trace("Stopped node "+nt+":"+nid+" ("+delta+"ms)" );
resolve(delta);
},function(err) {
var delta = Date.now() - start;
n.error(Log._("nodes.flows.stopping-error",{message:err}));
Log.debug(err.stack);
reject(err);
});
})
);
promises.push(stopNode(node,removed).catch(()=>{}));
} catch(err) {

@@ -191,56 +270,132 @@ node.error(err);

}
when.settle(promises).then(function(results) {
resolve();
});
});
}
return Promise.all(promises);
}
this.update = function(_global,_flow) {
global = _global;
flow = _flow;
/**
* Update the flow definition. This doesn't change anything that is running.
* This should be called after `stop` and before `start`.
* @param {[type]} _global [description]
* @param {[type]} _flow [description]
* @return {[type]} [description]
*/
update(_global,_flow) {
this.global = _global;
this.flow = _flow;
}
this.getNode = function(id) {
return activeNodes[id];
/**
* Get a node instance from this flow. If the node is not known to this
* flow, pass the request up to the parent.
* @param {[type]} id [description]
* @return {[type]} [description]
*/
getNode(id) {
// console.log('getNode',id,!!this.activeNodes[id])
if (!id) {
return undefined;
}
// console.log((new Error().stack).toString().split("\n").slice(1,3).join("\n"))
if ((this.flow.configs && this.flow.configs[id]) || (this.flow.nodes && this.flow.nodes[id])) {
// This is a node owned by this flow, so return whatever we have got
// During a stop/restart, activeNodes could be null for this id
return this.activeNodes[id];
} else if (this.activeNodes[id]) {
// TEMP: this is a subflow internal node within this flow
return this.activeNodes[id];
}
return this.parent.getNode(id);
}
this.getActiveNodes = function() {
return activeNodes;
/**
* Get all of the nodes instantiated within this flow
* @return {[type]} [description]
*/
getActiveNodes() {
return this.activeNodes;
}
this.handleStatus = function(node,statusMessage) {
var targetStatusNodes = null;
var reportingNode = node;
var handled = false;
while (reportingNode && !handled) {
targetStatusNodes = statusNodeMap[reportingNode.z];
if (targetStatusNodes) {
targetStatusNodes.forEach(function(targetStatusNode) {
if (targetStatusNode.scope && targetStatusNode.scope.indexOf(node.id) === -1) {
return;
}
var message = {
status: {
text: "",
source: {
id: node.id,
type: node.type,
name: node.name
}
/**
* Get a flow setting value. This currently automatically defers to the parent
* flow which, as defined in ./index.js returns `process.env[key]`.
* This lays the groundwork for Subflow to have instance-specific settings
* @param {[type]} key [description]
* @return {[type]} [description]
*/
getSetting(key) {
return this.parent.getSetting(key);
}
/**
* Handle a status event from a node within this flow.
* @param {Node} node The original node that triggered the event
* @param {Object} statusMessage The status object
* @param {Node} reportingNode The node emitting the status event.
* This could be a subflow instance node when the status
* is being delegated up.
* @param {boolean} muteStatusEvent Whether to emit the status event
* @return {[type]} [description]
*/
handleStatus(node,statusMessage,reportingNode,muteStatusEvent) {
if (!reportingNode) {
reportingNode = node;
}
if (!muteStatusEvent) {
events.emit("node-status",{
id: node.id,
status:statusMessage
});
}
let handled = false;
if (this.id === 'global' && node.users) {
// This is a global config node
// Delegate status to any nodes using this config node
for (let userNode in node.users) {
if (node.users.hasOwnProperty(userNode)) {
node.users[userNode]._flow.handleStatus(node,statusMessage,node.users[userNode],true);
}
}
handled = true;
} else {
this.statusNodes.forEach(function(targetStatusNode) {
if (targetStatusNode.scope && targetStatusNode.scope.indexOf(reportingNode.id) === -1) {
return;
}
var message = {
status: {
text: "",
source: {
id: node.id,
type: node.type,
name: node.name
}
};
if (statusMessage.hasOwnProperty("text")) {
message.status.text = statusMessage.text.toString();
}
targetStatusNode.receive(message);
handled = true;
});
}
if (!handled) {
reportingNode = activeNodes[reportingNode.z];
}
};
if (statusMessage.hasOwnProperty("text")) {
message.status.text = statusMessage.text.toString();
}
targetStatusNode.receive(message);
handled = true;
});
}
return handled;
}
this.handleError = function(node,logMessage,msg) {
/**
* Handle an error event from a node within this flow. If there are no Catch
* nodes within this flow, pass the event to the parent flow.
* @param {[type]} node [description]
* @param {[type]} logMessage [description]
* @param {[type]} msg [description]
* @param {[type]} reportingNode [description]
* @return {[type]} [description]
*/
handleError(node,logMessage,msg,reportingNode) {
if (!reportingNode) {
reportingNode = node;
}
// console.log("HE",logMessage);
var count = 1;

@@ -258,250 +413,85 @@ if (msg && msg.hasOwnProperty("error") && msg.error !== null) {

}
var targetCatchNodes = null;
var throwingNode = node;
var handled = false;
while (throwingNode && !handled) {
targetCatchNodes = catchNodeMap[throwingNode.z];
if (targetCatchNodes) {
targetCatchNodes.forEach(function(targetCatchNode) {
if (targetCatchNode.scope && targetCatchNode.scope.indexOf(throwingNode.id) === -1) {
return;
}
var errorMessage;
if (msg) {
errorMessage = redUtil.cloneMessage(msg);
} else {
errorMessage = {};
}
if (errorMessage.hasOwnProperty("error")) {
errorMessage._error = errorMessage.error;
}
errorMessage.error = {
message: logMessage.toString(),
source: {
id: node.id,
type: node.type,
name: node.name,
count: count
}
};
if (logMessage.hasOwnProperty('stack')) {
errorMessage.error.stack = logMessage.stack;
}
targetCatchNode.receive(errorMessage);
handled = true;
});
}
if (!handled) {
throwingNode = activeNodes[throwingNode.z];
}
}
return handled;
}
}
let handled = false;
function createNode(type,config) {
var nn = null;
try {
var nt = typeRegistry.get(type);
if (nt) {
var conf = clone(config);
delete conf.credentials;
for (var p in conf) {
if (conf.hasOwnProperty(p)) {
flowUtil.mapEnvVarProperties(conf,p);
}
}
try {
nn = new nt(conf);
}
catch (err) {
Log.log({
level: Log.ERROR,
id:conf.id,
type: type,
msg: err
});
}
} else {
Log.error(Log._("nodes.flow.unknown-type", {type:type}));
}
} catch(err) {
Log.error(err);
}
return nn;
}
function createSubflow(sf,sfn,subflows,globalSubflows,activeNodes) {
//console.log("CREATE SUBFLOW",sf.id,sfn.id);
var nodes = [];
var node_map = {};
var newNodes = [];
var node;
var wires;
var i,j,k;
var createNodeInSubflow = function(def) {
node = clone(def);
var nid = redUtil.generateId();
node_map[node.id] = node;
node._alias = node.id;
node.id = nid;
node.z = sfn.id;
newNodes.push(node);
}
// Clone all of the subflow node definitions and give them new IDs
for (i in sf.configs) {
if (sf.configs.hasOwnProperty(i)) {
createNodeInSubflow(sf.configs[i]);
}
}
// Clone all of the subflow node definitions and give them new IDs
for (i in sf.nodes) {
if (sf.nodes.hasOwnProperty(i)) {
createNodeInSubflow(sf.nodes[i]);
}
}
// Look for any catch/status nodes and update their scope ids
// Update all subflow interior wiring to reflect new node IDs
for (i=0;i<newNodes.length;i++) {
node = newNodes[i];
if (node.wires) {
var outputs = node.wires;
for (j=0;j<outputs.length;j++) {
wires = outputs[j];
for (k=0;k<wires.length;k++) {
outputs[j][k] = node_map[outputs[j][k]].id
if (this.id === 'global' && node.users) {
// This is a global config node
// Delegate status to any nodes using this config node
for (let userNode in node.users) {
if (node.users.hasOwnProperty(userNode)) {
node.users[userNode]._flow.handleError(node,logMessage,msg,node.users[userNode]);
}
}
if ((node.type === 'catch' || node.type === 'status') && node.scope) {
node.scope = node.scope.map(function(id) {
return node_map[id]?node_map[id].id:""
})
} else {
for (var prop in node) {
if (node.hasOwnProperty(prop) && prop !== '_alias') {
if (node_map[node[prop]]) {
//console.log("Mapped",node.type,node.id,prop,node_map[node[prop]].id);
node[prop] = node_map[node[prop]].id;
}
}
handled = true;
} else {
this.catchNodes.forEach(function(targetCatchNode) {
if (targetCatchNode.scope && targetCatchNode.scope.indexOf(reportingNode.id) === -1) {
return;
}
}
}
}
// Create a subflow node to accept inbound messages and route appropriately
var Node = require("../Node");
var subflowInstance = {
id: sfn.id,
type: sfn.type,
z: sfn.z,
name: sfn.name,
wires: []
}
if (sf.in) {
subflowInstance.wires = sf.in.map(function(n) { return n.wires.map(function(w) { return node_map[w.id].id;})})
subflowInstance._originalWires = clone(subflowInstance.wires);
}
var subflowNode = new Node(subflowInstance);
subflowNode.on("input", function(msg) { this.send(msg);});
subflowNode._updateWires = subflowNode.updateWires;
subflowNode.updateWires = function(newWires) {
// Wire the subflow outputs
if (sf.out) {
var node,wires,i,j;
// Restore the original wiring to the internal nodes
subflowInstance.wires = clone(subflowInstance._originalWires);
for (i=0;i<sf.out.length;i++) {
wires = sf.out[i].wires;
for (j=0;j<wires.length;j++) {
if (wires[j].id != sf.id) {
node = node_map[wires[j].id];
if (node._originalWires) {
node.wires = clone(node._originalWires);
}
}
var errorMessage;
if (msg) {
errorMessage = redUtil.cloneMessage(msg);
} else {
errorMessage = {};
}
}
var modifiedNodes = {};
var subflowInstanceModified = false;
for (i=0;i<sf.out.length;i++) {
wires = sf.out[i].wires;
for (j=0;j<wires.length;j++) {
if (wires[j].id === sf.id) {
subflowInstance.wires[wires[j].port] = subflowInstance.wires[wires[j].port].concat(newWires[i]);
subflowInstanceModified = true;
} else {
node = node_map[wires[j].id];
node.wires[wires[j].port] = node.wires[wires[j].port].concat(newWires[i]);
modifiedNodes[node.id] = node;
if (errorMessage.hasOwnProperty("error")) {
errorMessage._error = errorMessage.error;
}
errorMessage.error = {
message: logMessage.toString(),
source: {
id: node.id,
type: node.type,
name: node.name,
count: count
}
};
if (logMessage.hasOwnProperty('stack')) {
errorMessage.error.stack = logMessage.stack;
}
}
Object.keys(modifiedNodes).forEach(function(id) {
var node = modifiedNodes[id];
subflowNode.instanceNodes[id].updateWires(node.wires);
targetCatchNode.receive(errorMessage);
handled = true;
});
if (subflowInstanceModified) {
subflowNode._updateWires(subflowInstance.wires);
}
}
return handled;
}
nodes.push(subflowNode);
// Wire the subflow outputs
if (sf.out) {
var modifiedNodes = {};
for (i=0;i<sf.out.length;i++) {
wires = sf.out[i].wires;
for (j=0;j<wires.length;j++) {
if (wires[j].id === sf.id) {
// A subflow input wired straight to a subflow output
subflowInstance.wires[wires[j].port] = subflowInstance.wires[wires[j].port].concat(sfn.wires[i])
subflowNode._updateWires(subflowInstance.wires);
} else {
node = node_map[wires[j].id];
modifiedNodes[node.id] = node;
if (!node._originalWires) {
node._originalWires = clone(node.wires);
}
node.wires[wires[j].port] = (node.wires[wires[j].port]||[]).concat(sfn.wires[i]);
dump() {
console.log("==================")
console.log(this.TYPE, this.id);
for (var id in this.activeNodes) {
if (this.activeNodes.hasOwnProperty(id)) {
var node = this.activeNodes[id];
console.log(" ",id.padEnd(16),node.type)
if (node.wires) {
console.log(" -> ",node.wires)
}
}
}
console.log("==================")
}
}
// Instantiate the nodes
for (i=0;i<newNodes.length;i++) {
node = newNodes[i];
var type = node.type;
var m = /^subflow:(.+)$/.exec(type);
if (!m) {
var newNode = createNode(type,node);
if (newNode) {
activeNodes[node.id] = newNode;
nodes.push(newNode);
}
} else {
var subflowId = m[1];
nodes = nodes.concat(createSubflow(subflows[subflowId]||globalSubflows[subflowId],node,subflows,globalSubflows,activeNodes));
}
}
subflowNode.instanceNodes = {};
nodes.forEach(function(node) {
subflowNode.instanceNodes[node.id] = node;
/**
* Stop an individual node within this flow.
*
* @param {[type]} node [description]
* @param {[type]} removed [description]
* @return {[type]} [description]
*/
function stopNode(node,removed) {
Log.trace("Stopping node "+node.type+":"+node.id+(removed?" removed":""));
const start = Date.now();
const closePromise = node.close(removed);
const closeTimeout = new Promise((resolve,reject) => {
setTimeout(() => {
reject("Close timed out");
}, nodeCloseTimeout);
});
return nodes;
return Promise.race([closePromise,closeTimeout]).then(() => {
var delta = Date.now() - start;
Log.trace("Stopped node "+node.type+":"+node.id+" ("+delta+"ms)" );
}).catch(err => {
node.error(Log._("nodes.flows.stopping-error",{message:err}));
Log.debug(err.stack);
})
}

@@ -514,7 +504,9 @@

Log = runtime.log;
Node = require("../Node");
Subflow = require("./Subflow");
Subflow.init(runtime);
},
create: function(global,conf) {
return new Flow(global,conf);
}
create: function(parent,global,conf) {
return new Flow(parent,global,conf);
},
Flow: Flow
}

@@ -45,3 +45,2 @@ /**

var activeNodesToFlow = {};
var subflowInstanceNodeMap = {};

@@ -145,4 +144,7 @@ var typeEventRegistered = false;

} else {
// Clone the provided config so it can be manipulated
config = clone(_config);
// Parse the configuration
newFlowConfig = flowUtil.parseConfig(clone(config));
// Generate a diff to identify what has changed
diff = flowUtil.diffConfigs(activeFlowConfig,newFlowConfig);

@@ -158,4 +160,10 @@

// Allow the credential store to remove anything no longer needed
credentials.clean(config);
// Remember whether credentials need saving or not
var credsDirty = credentials.dirty();
// Get the latest credentials and ask storage to save them (if needed)
// as well as the new flow configuration.
configSavePromise = credentials.export().then(function(creds) {

@@ -182,11 +190,16 @@ var saveConfig = {

if (forceStart || started) {
return stop(type,diff,muteLog).then(function() {
return context.clean(activeFlowConfig).then(function() {
start(type,diff,muteLog).then(function() {
events.emit("runtime-event",{id:"runtime-deploy",payload:{revision:flowRevision},retain: true});
});
return flowRevision;
// Flows are running (or should be)
// Stop the active flows (according to deploy type and the diff)
return stop(type,diff,muteLog).then(() => {
// Once stopped, allow context to remove anything no longer needed
return context.clean(activeFlowConfig)
}).then(() => {
// Start the active flows
start(type,diff,muteLog).then(() => {
events.emit("runtime-event",{id:"runtime-deploy",payload:{revision:flowRevision},retain: true});
});
}).catch(function(err) {
})
// Return the new revision asynchronously to the actual start
return flowRevision;
}).catch(function(err) { })
} else {

@@ -226,60 +239,7 @@ events.emit("runtime-event",{id:"runtime-deploy",payload:{revision:flowRevision},retain: true});

function delegateError(node,logMessage,msg) {
var handled = false;
if (activeFlows[node.z]) {
handled = activeFlows[node.z].handleError(node,logMessage,msg);
} else if (activeNodesToFlow[node.z] && activeFlows[activeNodesToFlow[node.z]]) {
handled = activeFlows[activeNodesToFlow[node.z]].handleError(node,logMessage,msg);
} else if (activeFlowConfig.subflows[node.z] && subflowInstanceNodeMap[node.id]) {
subflowInstanceNodeMap[node.id].forEach(function(n) {
handled = handled || delegateError(getNode(n),logMessage,msg);
});
}
return handled;
}
function handleError(node,logMessage,msg) {
var handled = false;
if (node.z) {
handled = delegateError(node,logMessage,msg);
} else {
if (activeFlowConfig.configs[node.id]) {
activeFlowConfig.configs[node.id]._users.forEach(function(id) {
var userNode = activeFlowConfig.allNodes[id];
handled = handled || delegateError(userNode,logMessage,msg);
})
}
}
return handled;
}
function delegateStatus(node,statusMessage) {
if (activeFlows[node.z]) {
activeFlows[node.z].handleStatus(node,statusMessage);
} else if (activeNodesToFlow[node.z] && activeFlows[activeNodesToFlow[node.z]]) {
activeFlows[activeNodesToFlow[node.z]].handleStatus(node,statusMessage);
}
}
function handleStatus(node,statusMessage) {
events.emit("node-status",{
id: node.id,
status:statusMessage
});
if (node.z) {
delegateStatus(node,statusMessage);
} else {
if (activeFlowConfig.configs[node.id]) {
activeFlowConfig.configs[node.id]._users.forEach(function(id) {
var userNode = activeFlowConfig.allNodes[id];
delegateStatus(userNode,statusMessage);
})
}
}
}
function start(type,diff,muteLog) {
//dumpActiveNodes();
type = type||"full";
started = true;
var i;
// If there are missing types, report them, emit the necessary runtime event and return
if (activeFlowConfig.missingTypes.length > 0) {

@@ -305,4 +265,6 @@ log.info(log._("nodes.flows.missing-types"));

events.emit("runtime-event",{id:"runtime-state",payload:{error:"missing-types", type:"warning",text:"notification.warnings.missing-types",types:activeFlowConfig.missingTypes},retain:true});
return when.resolve();
return Promise.resolve();
}
// In safe mode, don't actually start anything, emit the necessary runtime event and return
if (settings.safeMode) {

@@ -315,2 +277,3 @@ log.info("*****************************************************************")

}
if (!muteLog) {

@@ -323,12 +286,19 @@ if (type !== "full") {

}
var id;
if (type === "full") {
// A full start means everything should
// Check the 'global' flow is running
if (!activeFlows['global']) {
log.debug("red/nodes/flows.start : starting flow : global");
activeFlows['global'] = Flow.create(activeFlowConfig);
activeFlows['global'] = Flow.create(flowAPI,activeFlowConfig);
}
// Check each flow in the active configuration
for (id in activeFlowConfig.flows) {
if (activeFlowConfig.flows.hasOwnProperty(id)) {
if (!activeFlowConfig.flows[id].disabled && !activeFlows[id]) {
activeFlows[id] = Flow.create(activeFlowConfig,activeFlowConfig.flows[id]);
// This flow is not disabled, nor is it currently active, so create it
activeFlows[id] = Flow.create(flowAPI,activeFlowConfig,activeFlowConfig.flows[id]);
log.debug("red/nodes/flows.start : starting flow : "+id);

@@ -341,2 +311,5 @@ } else {

} else {
// A modified-type deploy means restarting things that have changed
// Update the global flow
activeFlows['global'].update(activeFlowConfig,activeFlowConfig);

@@ -347,5 +320,7 @@ for (id in activeFlowConfig.flows) {

if (activeFlows[id]) {
// This flow exists and is not disabled, so update it
activeFlows[id].update(activeFlowConfig,activeFlowConfig.flows[id]);
} else {
activeFlows[id] = Flow.create(activeFlowConfig,activeFlowConfig.flows[id]);
// This flow didn't previously exist, so create it
activeFlows[id] = Flow.create(flowAPI,activeFlowConfig,activeFlowConfig.flows[id]);
log.debug("red/nodes/flows.start : starting flow : "+id);

@@ -359,15 +334,16 @@ }

}
// Having created or updated all flows, now start them.
for (id in activeFlows) {
if (activeFlows.hasOwnProperty(id)) {
activeFlows[id].start(diff);
var activeNodes = activeFlows[id].getActiveNodes();
Object.keys(activeNodes).forEach(function(nid) {
activeNodesToFlow[nid] = id;
if (activeNodes[nid]._alias) {
subflowInstanceNodeMap[activeNodes[nid]._alias] = subflowInstanceNodeMap[activeNodes[nid]._alias] || [];
subflowInstanceNodeMap[activeNodes[nid]._alias].push(nid);
}
});
try {
activeFlows[id].start(diff);
// Create a map of node id to flow id and also a subflowInstance lookup map
var activeNodes = activeFlows[id].getActiveNodes();
Object.keys(activeNodes).forEach(function(nid) {
activeNodesToFlow[nid] = id;
});
} catch(err) {
console.log(err.stack);
}
}

@@ -390,3 +366,3 @@ }

}
return when.resolve();
return Promise.resolve();
}

@@ -396,3 +372,3 @@

if (!started) {
return when.resolve();
return Promise.resolve();
}

@@ -435,30 +411,23 @@ type = type||"full";

return when.promise(function(resolve,reject) {
when.settle(promises).then(function() {
for (id in activeNodesToFlow) {
if (activeNodesToFlow.hasOwnProperty(id)) {
if (!activeFlows[activeNodesToFlow[id]]) {
delete activeNodesToFlow[id];
}
return Promise.resolve(promises).then(function() {
for (id in activeNodesToFlow) {
if (activeNodesToFlow.hasOwnProperty(id)) {
if (!activeFlows[activeNodesToFlow[id]]) {
delete activeNodesToFlow[id];
}
}
if (stopList) {
stopList.forEach(function(id) {
delete activeNodesToFlow[id];
});
}
if (stopList) {
stopList.forEach(function(id) {
delete activeNodesToFlow[id];
});
}
if (!muteLog) {
if (type !== "full") {
log.info(log._("nodes.flows.stopped-modified-"+type));
} else {
log.info(log._("nodes.flows.stopped-flows"));
}
// Ideally we'd prune just what got stopped - but mapping stopList
// id to the list of subflow instance nodes is something only Flow
// can do... so cheat by wiping the map knowing it'll be rebuilt
// in start()
subflowInstanceNodeMap = {};
if (!muteLog) {
if (type !== "full") {
log.info(log._("nodes.flows.stopped-modified-"+type));
} else {
log.info(log._("nodes.flows.stopped-flows"));
}
}
resolve();
});
}
events.emit("nodes-stopped");
});

@@ -720,2 +689,10 @@ }

const flowAPI = {
getNode: getNode,
handleError: () => false,
handleStatus: () => false,
getSetting: k => process.env[k]
}
module.exports = {

@@ -759,4 +736,4 @@ init: init,

handleError: handleError,
handleStatus: handleStatus,
// handleError: handleError,
// handleStatus: handleStatus,

@@ -763,0 +740,0 @@ checkTypeInUse: checkTypeInUse,

@@ -18,2 +18,3 @@ /**

var redUtil = require("@node-red/util").util;
var Log = require("@node-red/util").log;
var subflowInstanceRE = /^subflow:(.+)$/;

@@ -44,18 +45,24 @@ var typeRegistry = require("@node-red/registry");

function mapEnvVarProperties(obj,prop) {
if (Buffer.isBuffer(obj[prop])) {
function mapEnvVarProperties(obj,prop,flow) {
var v = obj[prop];
if (Buffer.isBuffer(v)) {
return;
} else if (Array.isArray(obj[prop])) {
for (var i=0;i<obj[prop].length;i++) {
mapEnvVarProperties(obj[prop],i);
} else if (Array.isArray(v)) {
for (var i=0;i<v.length;i++) {
mapEnvVarProperties(v,i,flow);
}
} else if (typeof obj[prop] === 'string') {
if (obj[prop][0] === "$" && (EnvVarPropertyRE_old.test(obj[prop]) || EnvVarPropertyRE.test(obj[prop])) ) {
var envVar = obj[prop].substring(2,obj[prop].length-1);
obj[prop] = process.env.hasOwnProperty(envVar)?process.env[envVar]:obj[prop];
if (obj[prop][0] === "$" && (EnvVarPropertyRE_old.test(v) || EnvVarPropertyRE.test(v)) ) {
var envVar = v.substring(2,v.length-1);
if (!flow) {
obj[prop] = process.env.hasOwnProperty(envVar)?process.env[envVar]:v;
} else {
var r = flow.getSetting(envVar);
obj[prop] = r!==undefined?r:obj[prop];
}
}
} else {
for (var p in obj[prop]) {
if (obj[prop].hasOwnProperty(p)) {
mapEnvVarProperties(obj[prop],p);
for (var p in v) {
if (v.hasOwnProperty(p)) {
mapEnvVarProperties(v,p,flow);
}

@@ -432,5 +439,6 @@ }

// console.log(
// (added[id]?"+":(changed[id]?"!":" "))+(wiringChanged[id]?"w":" ")+(diff.linked.indexOf(id)!==-1?"~":" "),
// id,
// newConfig.allNodes[id].type,
// (added[id]?"a":(changed[id]?"c":" "))+(wiringChanged[id]?"w":" ")+(diff.linked.indexOf(id)!==-1?"l":" "),
// newConfig.allNodes[id].type.padEnd(10),
// id.padEnd(16),
// (newConfig.allNodes[id].z||"").padEnd(16),
// newConfig.allNodes[id].name||newConfig.allNodes[id].label||""

@@ -449,3 +457,42 @@ // );

return diff;
},
/**
* Create a new instance of a node
* @param {Flow} flow The containing flow
* @param {object} config The node configuration object
* @return {Node} The instance of the node
*/
createNode: function(flow,config) {
var newNode = null;
var type = config.type;
try {
var nodeTypeConstructor = typeRegistry.get(type);
if (nodeTypeConstructor) {
var conf = clone(config);
delete conf.credentials;
for (var p in conf) {
if (conf.hasOwnProperty(p)) {
mapEnvVarProperties(conf,p,flow);
}
}
try {
conf._flow = flow;
newNode = new nodeTypeConstructor(conf);
} catch (err) {
Log.log({
level: Log.ERROR,
id:conf.id,
type: type,
msg: err
});
}
} else {
Log.error(Log._("nodes.flow.unknown-type", {type:type}));
}
} catch(err) {
Log.error(err);
}
return newNode;
}
}

@@ -107,3 +107,3 @@ /**

if (creds.hasOwnProperty(p)) {
flowUtil.mapEnvVarProperties(creds,p);
flowUtil.mapEnvVarProperties(creds,p,node._flow);
}

@@ -110,0 +110,0 @@ }

@@ -19,3 +19,2 @@ /**

var EventEmitter = require("events").EventEmitter;
var when = require("when");

@@ -39,2 +38,8 @@ var redUtil = require("@node-red/util").util;

}
if (n._flow) {
// Make this a non-enumerable property as it may cause
// circular references. Any existing code that tries to JSON serialise
// the object (such as dashboard) will not like circular refs
Object.defineProperty(this,'_flow', {value: n._flow, })
}
this.updateWires(n.wires);

@@ -94,17 +99,30 @@ }

promises.push(
when.promise(function(resolve) {
var args = [];
if (callback.length === 2) {
args.push(!!removed);
new Promise((resolve) => {
try {
var args = [];
if (callback.length === 2) {
args.push(!!removed);
}
args.push(() => {
resolve();
});
callback.apply(node, args);
} catch(err) {
// TODO: error thrown in node async close callback
// We've never logged this properly.
resolve();
}
args.push(resolve);
callback.apply(node, args);
})
);
} else {
callback.call(node);
try {
callback.call(node);
} catch(err) {
// TODO: error thrown in node sync close callback
// We've never logged this properly.
}
}
}
if (promises.length > 0) {
return when.settle(promises).then(function() {
return Promise.all(promises).then(function() {
if (this._context) {

@@ -118,3 +136,3 @@ return context.delete(this._alias||this.id,this.z);

}
return;
return Promise.resolve();
}

@@ -138,3 +156,3 @@ };

this.metric("send",msg);
node = flows.get(this._wire);
node = this._flow.getNode(this._wire);
/* istanbul ignore else */

@@ -171,3 +189,3 @@ if (node) {

for (var j = 0; j < wires.length; j++) {
node = flows.get(wires[j]); // node at end of wire j
node = this._flow.getNode(wires[j]); // node at end of wire j
if (node) {

@@ -260,3 +278,3 @@ // for each msg to send eg. [[m1, m2, ...], ...]

if (msg) {
handled = flows.handleError(this,logMessage,msg);
handled = this._flow.handleError(this,logMessage,msg);
}

@@ -296,4 +314,5 @@ if (!handled) {

Node.prototype.status = function(status) {
flows.handleStatus(this,status);
this._flow.handleStatus(this,status);
};
module.exports = Node;
{
"name": "@node-red/runtime",
"version": "0.20.0-beta.3",
"version": "0.20.0-beta.4",
"license": "Apache-2.0",

@@ -19,4 +19,4 @@ "main": "./lib/index.js",

"dependencies": {
"@node-red/registry": "0.20.0-beta.3",
"@node-red/util": "0.20.0-beta.3",
"@node-red/registry": "0.20.0-beta.4",
"@node-red/util": "0.20.0-beta.4",
"clone": "2.1.2",

@@ -23,0 +23,0 @@ "express": "4.16.4",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc