node-red-contrib-connectionmanager
Advanced tools
Comparing version 0.0.2 to 0.0.3
const logger = new (require("node-red-contrib-logger"))("cm-statement"); | ||
logger.sendInfo("Copyright 2020 Jaroslav Peter Prib"); | ||
const ts=(new Date().toString()).split(' '); | ||
console.log([parseInt(ts[2],10),ts[1],ts[4]].join(' ')+" - [info] cm-statement Copyright 2019 Jaroslav Peter Prib"); | ||
function Mapping(msg) { | ||
@@ -14,12 +11,9 @@ let params=[]; | ||
function processArray(node,msg,source,index) { | ||
if(logger.active) logger.send({label:"processArray",msg:msg._msgid}); | ||
if(index>=source.length) { | ||
node.send([msg]); | ||
if(msg.error) { | ||
node.status({ fill: 'red', shape: 'ring', text: "Error" }); | ||
node.lastError=true | ||
node[node.onErrorAction||"terminate"].apply(node,[msg]); | ||
} else { | ||
if(node.lastError) { | ||
node.status({ fill: 'green', shape: 'ring', text: "OK" }); | ||
} | ||
error(node,msg); | ||
} else if(!node.sqlok) { | ||
node.status({ fill: 'green', shape: 'ring', text: "OK" }); | ||
} | ||
@@ -32,2 +26,3 @@ return; | ||
(result)=>{ | ||
if(logger.active) logger.send({label:"processArray query result",msg:msg._msgid}); | ||
msg.cm.requestTS.after=new Date(); | ||
@@ -37,68 +32,80 @@ msg.cm.requestTS.elapse=msg.requestTS.after-msg.requestTS.before; | ||
processArray.apply(this,[node,msg,source,++index]); | ||
if(!node.sqlok) { | ||
node.status({ fill: 'green', shape: 'ring', text: "OK" }); | ||
node.sqlok=true; | ||
} | ||
}, | ||
(result,err)=>{ | ||
if(logger.active) logger.send({label:"processArray query error",msg:msg._msgid,error:err}); | ||
msg.cm.requestTS.after=new Date(); | ||
msg.cm.requestTS.elapse=msg.requestTS.after-msg.requestTS.before; msg.result.push(result); | ||
msg.cm.requestTS.elapse=msg.cm.requestTS.after-msg.cm.requestTS.before; msg.result.push(result); | ||
msg.error=msg.error||[]; | ||
msg.error[index]=err; | ||
if(node.isLogError) node.error(JSON.stringify(err)); | ||
processArray.apply(this,[node,msg,source,++index]); | ||
} | ||
]); | ||
} catch(e) { | ||
} catch(ex) { | ||
msg.result.push(null); | ||
msg.error=msg.error||[]; | ||
msg.error[index]=e.message; | ||
if(node.isLogError) node.error(JSON.stringify(e.message)); | ||
if(node.sqlok) { | ||
node.status({ fill: 'red', shape: 'ring', text: "Error" }); | ||
node.sqlok=false; | ||
} | ||
msg.error[index]=ex.message; | ||
error(node,msg); | ||
} | ||
} | ||
function error(node,msg,result,err) { | ||
if(logger.active) logger.send({label:"error",msg:msg._msgid,result:result,error:err}) | ||
if(!node.sqlok) { | ||
node.sqlok=false; | ||
node.status({ fill: 'red', shape: 'ring', text: "Error" }); | ||
} | ||
try{ | ||
if(result) msg.result=result; | ||
if(err) msg.error=err; | ||
if(node.isLogError) node.error(JSON.stringify(err)); | ||
node.onErrorActionFunction(msg); | ||
} catch(ex){ | ||
logger.sendError("query error handling failure "+ex.message); | ||
} | ||
} | ||
module.exports = function(RED) { | ||
function cmStatementNode(n) { | ||
RED.nodes.createNode(this,n); | ||
var node=Object.assign(this,n); | ||
node.prepareSQL=(node.prepare=="yes"); | ||
node.sqlok=false; | ||
node.isLogError=(node.logError=="yes"); | ||
node.terminate=function(msg) { | ||
node.error("Message terminated due to an error", msg); | ||
if(msg.cm) { | ||
node.error("releasing connections as message terminated"); | ||
msg.cm.release.apply(node,[msg, | ||
function() {return;}, // ok | ||
function(e) {node.error(e);} //error | ||
]); | ||
} | ||
}; | ||
node.both=function(msg) { | ||
node.send([msg,msg]); | ||
}; | ||
node.onlyWithRelease=function(msg) { | ||
node.error("releasing connections as error with release"); | ||
msg.cm.release.apply(node,[msg, | ||
function() { // ok | ||
node.send([null,msg]); | ||
}, | ||
function(e) { //error | ||
node.error("rollback error "+e); | ||
node.send([null,msg]); | ||
} | ||
]); | ||
}; | ||
node.only=function(msg) { | ||
node.send([null,msg]); | ||
}; | ||
node.ignore=function(msg) { | ||
node.send([msg]); | ||
}; | ||
node.log("set onErrorAction "+n.onErrorAction); | ||
node.onErrorAction=n.onErrorAction; | ||
let setParam; | ||
function cmStatementNode(n) { | ||
RED.nodes.createNode(this,n); | ||
var node=Object.assign(this,n); | ||
node.prepareSQL=(node.prepare=="yes"); | ||
node.sqlok=false; | ||
node.isLogError=(node.logError=="yes"); | ||
node.terminate=function(msg) { | ||
node.error("Message terminated due to an error", msg); | ||
if(msg.cm) { | ||
node.error("releasing connections as message terminated"); | ||
msg.cm.release.apply(node,[msg, | ||
()=>{return;}, // ok | ||
(err)=>{node.error(err);} //error | ||
]); | ||
} | ||
}; | ||
node.both=function(msg) { | ||
if(logger.active) logger.send({label:"on error both",msg:msg._msgid}); | ||
node.send([msg,msg]); | ||
}; | ||
node.onlyWithRelease=function(msg) { | ||
node.error("releasing connections as error with release"); | ||
msg.cm.release.apply(node,[msg, | ||
function() { // ok | ||
node.send([null,msg]); | ||
}, | ||
function(e) { //error | ||
node.error("rollback error "+e); | ||
node.send([null,msg]); | ||
} | ||
]); | ||
}; | ||
node.only=function(msg) { | ||
if(logger.active) logger.send({label:"on error only",msg:msg._msgid}); | ||
node.send([null,msg]); | ||
}; | ||
node.ignore=function(msg) { | ||
if(logger.active) logger.send({label:"on error ingore",msg:msg._msgid}); | ||
node.send([msg]); | ||
}; | ||
const callError=node.onErrorAction||"terminate"; | ||
node.log("set onErrorAction "+callError); | ||
node.onErrorActionFunction=node[callError].bind(node); | ||
let setParam; | ||
switch (node.param) { | ||
@@ -150,38 +157,38 @@ case 'msg.payload': | ||
break; | ||
} | ||
if(node.prepareSQL) { | ||
} | ||
if(node.prepareSQL) { | ||
node.status({ fill: 'yellow', shape: 'ring', text: "Prepare not initialized" }); | ||
} | ||
node.flow={ | ||
get:(()=>node.context().flow.get.apply(node,arguments)) | ||
}; | ||
node.global={ | ||
get:()=>(node.context().global.get.apply(node,arguments)) | ||
}; | ||
node.env={ | ||
get:((envVar)=>node._flow.getSetting(envVar)) | ||
}; | ||
node.on('input', function (msg) { | ||
if(!msg.cm) { | ||
msg.error="no connections established by previous nodes"; | ||
node.send([null,msg]); | ||
return; | ||
} | ||
if(node.getArraySource) { // implies array mapping | ||
msg.result=[]; | ||
delete msg.error; | ||
try{ | ||
processArray.apply(this,[node,msg,node.getArraySource(node,msg,node.flow,node.global,node.env),0]); | ||
} catch(e) { | ||
msg.error=e.toString(); | ||
if(node.isLogError) node.error(e.toString()); | ||
node[node.onErrorAction||"terminate"].apply(node,[msg]); | ||
node.status({ fill: 'red', shape: 'ring', text: "Error" }); | ||
} | ||
return; | ||
} | ||
} | ||
node.flow={ | ||
get:(()=>node.context().flow.get.apply(node,arguments)) | ||
}; | ||
node.global={ | ||
get:()=>(node.context().global.get.apply(node,arguments)) | ||
}; | ||
node.env={ | ||
get:((envVar)=>node._flow.getSetting(envVar)) | ||
}; | ||
node.on('input', function (msg) { | ||
if(!msg.cm) { | ||
msg.error="no connections established by previous nodes"; | ||
node.send([null,msg]); | ||
return; | ||
} | ||
if(node.getArraySource) { // implies array mapping | ||
if(logger.active) logger.send({label:"query array",msg:msg._msgid}); | ||
msg.result=[]; | ||
delete msg.error; | ||
try{ | ||
processArray.apply(this,[node,msg,node.getArraySource(node,msg,node.flow,node.global,node.env),0]); | ||
} catch(ex) { | ||
error(node,msg,null,ex.message) | ||
} | ||
return; | ||
} | ||
if(logger.active) logger.send({label:"query",msg:msg._msgid}); | ||
msg.cm.query.apply(node,[msg,node.connection,node.statement,setParam.apply(node,[node,msg]), | ||
function (result) { | ||
function (result) { | ||
if(logger.active) logger.send({label:"query OK",msg:msg._msgid}); | ||
msg.result=result; | ||
@@ -194,14 +201,10 @@ node.send([msg]); | ||
}, | ||
function(result,err) { | ||
msg.result=result; | ||
msg.error=err; | ||
if(node.isLogError) node.error(JSON.stringify(err)); | ||
node[node.onErrorAction||"terminate"].apply(node,[msg]); | ||
node.status({ fill: 'red', shape: 'ring', text: "Error" }); | ||
node.sqlok=false; | ||
function(result,err) { | ||
if(logger.active) logger.send({label:"query error",msg:msg._msgid,error:err}); | ||
error(node,msg,result,err); | ||
} | ||
]); | ||
}); | ||
} | ||
RED.nodes.registerType(logger.label,cmStatementNode); | ||
]); | ||
}); | ||
} | ||
RED.nodes.registerType(logger.label,cmStatementNode); | ||
}; |
const logger = new (require("node-red-contrib-logger"))("Connection Manager"); | ||
logger.sendInfo("Copyright 2020 Jaroslav Peter Prib"); | ||
var debug=false; | ||
function toggleDebug() { | ||
debug=!debug; | ||
logger.send("connection-manager toggleDebug state: " +debug); | ||
logger.setOn(); | ||
} | ||
var Pools={}; | ||
let Pools={}; | ||
function getPool(id) { | ||
@@ -17,6 +14,6 @@ return Pools[id]; | ||
function connectionProcessorComplete(msg,done,error,results,errors) { | ||
if(debug) logger.send("connectionProcessorComplete "+msg.cm.running); | ||
if(logger.active) logger.send("connectionProcessorComplete "+msg.cm.running); | ||
if(--msg.cm.running==0) { | ||
if(errors) { | ||
if(debug) logger.send("connectionProcessorComplete errors "+JSON.stringify(errors)); | ||
if(logger.active) logger.send("connectionProcessorComplete errors "+JSON.stringify(errors)); | ||
error.apply(this,[results,errors]); | ||
@@ -29,7 +26,7 @@ } else { | ||
function connectionProcessor(msg,action,done,error,a1,a2) { | ||
if(debug) logger.send("connectionProcessor action: "+action+" arguments "+JSON.stringify({a1:a1,a2:a2})); | ||
if(logger.active) logger.send("connectionProcessor action: "+action+" arguments "+JSON.stringify({a1:a1,a2:a2})); | ||
msg.cm.running++; | ||
var c,results={},errors,pool; | ||
for(var connection in msg.cm.connection) { | ||
if(debug) logger.send("connectionProcessor connection: "+connection); | ||
let c,results={},errors,pool; | ||
for(let connection in msg.cm.connection) { | ||
if(logger.active) logger.send("connectionProcessor connection: "+connection); | ||
msg.cm.running++; | ||
@@ -44,3 +41,3 @@ c=msg.cm.connection[connection]; | ||
(err)=>{ | ||
if(debug) logger.send("connectionProcessor connection: "+connection+" error: "+err); | ||
if(logger.active) logger.send("connectionProcessor connection: "+connection+" error: "+err); | ||
errors=errors||{}; | ||
@@ -50,3 +47,3 @@ try{ | ||
} catch(e) { | ||
console.error("connectionProcessor catch error: "+e); | ||
logger.sendError("connectionProcessor catch error: "+e); | ||
} | ||
@@ -62,5 +59,5 @@ connectionProcessorComplete(msg,done,error,results,errors); | ||
function cmProcessor(msg,action,done,error,a1,a2) { | ||
if(debug) logger.send("cmProcessor action: "+action); | ||
if(logger.active) logger.send("cmProcessor action: "+action); | ||
if(!msg.cm) {done(); return;} | ||
var thisObject=this; | ||
const thisObject=this; | ||
stackProcessor.apply(this,[msg,action, | ||
@@ -72,3 +69,3 @@ ()=>connectionProcessor.apply(thisObject,[msg,action,done,error,a1,a2]), | ||
function query(msg,connection,statement,params,done,error) { | ||
if(debug) logger.send("query connection: "+connection+" prepare: "+this.prepareSQL+" statement: "+statement); | ||
if(logger.active) logger.send("query connection: "+connection+" prepare: "+this.prepareSQL+" statement: "+statement); | ||
if(!statement || statement.trim()=="") { | ||
@@ -85,11 +82,11 @@ error({},"empty query"); | ||
} | ||
var pool=getPool(connector.pool); | ||
const pool=getPool(connector.pool); | ||
if(debug) logger.send("query connection: "+connection+" prepare: "+this.prepareSQL+" preparable: "+pool.preparable); | ||
if(logger.active) logger.send("query connection: "+connection+" prepare: "+this.prepareSQL+" preparable: "+pool.preparable); | ||
if(this.prepareSQL && pool.preparable) { | ||
var node=this; | ||
const node=this; | ||
pool.prepare(connector, | ||
(prepared)=>pool.exec(prepared,done,error,params), | ||
(err)=>{ | ||
if(debug) logger.send("query prepare error "+err); | ||
if(logger.active) logger.send("query prepare error "+err); | ||
error({},err); | ||
@@ -105,3 +102,3 @@ }, | ||
(err)=>{ | ||
if(debug) logger.send("query error "+JSON.stringify(err)); | ||
if(logger.active) logger.send("query error "+JSON.stringify(err)); | ||
error({},err); | ||
@@ -114,12 +111,12 @@ }, | ||
} | ||
if(debug) logger.send("query connection all connections"); | ||
if(logger.active) logger.send("query connection all connections"); | ||
if(this.prepareSQL) { | ||
var node=this; | ||
const node=this; | ||
connectionProcessor.apply(node,[msg,"prepare", | ||
(prepared)=>{ | ||
if(debug) logger.send("query prepare exec node: "+node.id+" params: "+JSON.stringify(params)); | ||
if(logger.active) logger.send("query prepare exec node: "+node.id+" params: "+JSON.stringify(params)); | ||
connectionProcessor.apply(node,[msg,"exec",done,error,node.id,params]); | ||
}, | ||
(result,err)=>{ | ||
if(debug) logger.send("query prepare error(s) "+JSON.stringify(err)); | ||
if(logger.active) logger.send("query prepare error(s) "+JSON.stringify(err)); | ||
error(result,err); | ||
@@ -142,3 +139,3 @@ }, | ||
function release(msg,done,error) { | ||
if(debug) logger.send("release"); | ||
if(logger.active) logger.send("release"); | ||
if(msg.cm.autoCommit) { | ||
@@ -148,11 +145,11 @@ cmProcessor.apply(this,[msg,"release",done,error]); | ||
} | ||
var thisObject=this, | ||
const thisObject=this, | ||
action=this.rollbackTransaction?"rollback":"commit"; | ||
cmProcessor.apply(this,[msg,action, | ||
()=>{ | ||
if(debug) logger.send("release "+action+" all now releasing"); | ||
if(logger.active) logger.send("release "+action+" all now releasing"); | ||
cmProcessor.apply(thisObject,[msg,"release",done,error]); | ||
}, | ||
(err)=> { | ||
if(debug) logger.send("release "+action+" all with error now releasing "+JSON.stringify(err)); | ||
if(logger.active) logger.send("release "+action+" all with error now releasing "+JSON.stringify(err)); | ||
cmProcessor.apply(thisObject,[msg,"release", | ||
@@ -166,3 +163,3 @@ (result)=>error(result,err), | ||
function stackProcessor(msg,action,done,error) { | ||
if(debug) logger.send("stackProcessor action: "+action); | ||
if(logger.active) logger.send("stackProcessor action: "+action); | ||
if(msg.cm.stack.length==0) { | ||
@@ -172,3 +169,3 @@ if(msg.cm.running==0) done(); | ||
} | ||
var r=msg.cm.stack.pop(); | ||
let r=msg.cm.stack.pop(); | ||
if(!r.hasOwnProperty('action')) { | ||
@@ -199,13 +196,23 @@ stackProcessor.apply(this,[msg,action,done,error]); | ||
this.prepared=[]; // {id:id} | ||
try{ | ||
this.driver=DriverType[this.driverType]; | ||
if(this.driver==null) throw Error("Driver not supported"); | ||
this.autoCommit=this.driver.autoCommit||true; | ||
this.preparable=!(this.driver.prepareIsQuery||false); | ||
} catch(ex) { | ||
const err="Driver load failed, may need install by 'npm install "+this.driverType+"', drivers aren't install by default to minimise foot print"; | ||
this.node.error(err); | ||
logger.sendError("ConnectionPool "+ex.message); | ||
} | ||
} | ||
ConnectionPool.prototype.beginTransaction=function(c,done,error) { | ||
if(debug) logger.send("ConnectionPool beginTransaction"); | ||
if(logger.active) logger.send("ConnectionPool beginTransaction"); | ||
this.driver.beginTransaction(this.pool[c.id],done,error); | ||
} | ||
ConnectionPool.prototype.checkDeadConnection=function(c,errorMessage) { | ||
if(debug) logger.send("ConnectionPool.checkDeadConnection"); | ||
var msg=getMessageString(errorMessage); | ||
if(logger.active) logger.send({label:"ConnectionPool.checkDeadConnection"}); | ||
const msg=getMessageString(errorMessage); | ||
if(this.driver.errorDeadConnection) { | ||
if(this.driver.errorDeadConnection.includes(msg)) { | ||
if(debug) logger.send("ConnectionPool.isDeadConnection set connection null and releasing"); | ||
if(logger.active) logger.send("ConnectionPool.isDeadConnection set connection null and releasing"); | ||
this.pool[c.id]=null; // on release this deletes connection so not used again | ||
@@ -219,3 +226,3 @@ this.release(c); | ||
this.node.error("rolling back active connection as close issued"); | ||
var thisObject=this; | ||
const thisObject=this; | ||
this.rollback.apply(this,[c, | ||
@@ -231,4 +238,4 @@ function() {thisObject.close(c,done,error);} | ||
this.node.log("closing all connections"); | ||
var running=1; | ||
for(var c in this.pool) { | ||
let running=1; | ||
for(let c in this.pool) { | ||
running++; | ||
@@ -242,5 +249,5 @@ this.pool[n].close.apply(this.pool,[c, | ||
ConnectionPool.prototype.commit=function(c,done,error) { | ||
var pool=this; | ||
const pool=this; | ||
this.driver.commit(this.pool[c.id],done,(err)=>{ | ||
if(debug) logger.send("ConnectionPool commit "+err); | ||
if(logger.active) logger.send("ConnectionPool commit "+err); | ||
pool.checkDeadConnection(c,err); | ||
@@ -258,3 +265,3 @@ error(err) | ||
ConnectionPool.prototype.exec=function(c,done,error,id,params) { | ||
if(debug) logger.send("ConnectionPool exec connection id: "+c.id+" prepared id: "+id+" params: "+JSON.stringify(params)); | ||
if(logger.active) logger.send("ConnectionPool exec connection id: "+c.id+" prepared id: "+id+" params: "+JSON.stringify(params)); | ||
if(!this.preparable) { | ||
@@ -265,5 +272,5 @@ this.query(c,done,error,this.prepared[c.id][id],params); | ||
this.lastUsed[c.id]=new Date(); | ||
var pool=this; | ||
const pool=this; | ||
this.driver.exec(this.prepared[c.id][id],params,done,(err)=>{ | ||
if(debug) logger.send("ConnectionPool exec "+err); | ||
if(logger.active) logger.send("ConnectionPool exec "+err); | ||
pool.checkDeadConnection(c,err); | ||
@@ -274,21 +281,7 @@ error(err) | ||
ConnectionPool.prototype.getConnection=function(done,error) { | ||
if(debug) logger.send("ConnectionPool getConnection"); | ||
if(this.drive==undefined) { | ||
if(debug) logger.send("ConnectionPool getConnection set driver "+this.driverType); | ||
try{ | ||
this.driver=DriverType[this.driverType]; | ||
if(this.driver==null) throw Error("Driver returned null"); | ||
this.autoCommit=this.driver.autoCommit||true; | ||
this.preparable=!(this.driver.prepareIsQuery||false); | ||
} catch(e) { | ||
var err="Driver load failed, may need install by 'npm install "+this.driverType+"', drivers aren't install by default to minimise foot print"; | ||
this.node.error(err); | ||
error(err); | ||
return; | ||
} | ||
} | ||
var connectionPool=this; | ||
if(logger.active) logger.send("ConnectionPool getConnection"); | ||
let connectionPool=this; | ||
if(this.free.length) { | ||
if(debug) logger.send("ConnectionPool getConnection free"); | ||
var c=connectionPool.free.pop(); | ||
if(logger.active) logger.send("ConnectionPool getConnection free"); | ||
const c=connectionPool.free.pop(); | ||
connectionPool.active.push(c); | ||
@@ -298,3 +291,3 @@ done( {id:c, pool:connectionPool.node.name} ); | ||
} | ||
if(debug) logger.send("ConnectionPool getConnection create new connection"); | ||
if(logger.active) logger.send("ConnectionPool getConnection create new connection"); | ||
if(++this.newConnections>this.size) { // this.pool.length updated too late | ||
@@ -307,3 +300,3 @@ connectionPool.error("maximum pool size "+this.size,error); | ||
connectionPool.node.log("new connection "+connectionPool.node.name); | ||
var c = connectionPool.pool.find((e)=>e==null); | ||
let c=connectionPool.pool.find((e)=>e==null); | ||
if(c) { | ||
@@ -326,5 +319,5 @@ connectionPool.pool[c]=connection; | ||
ConnectionPool.prototype.prepare=function(c,done,error,sql,id) { | ||
if(debug) logger.send("ConnectionPool prepare connection id: "+c.id+" prepare id: "+id+" sql: "+sql); | ||
if(logger.active) logger.send("ConnectionPool prepare connection id: "+c.id+" prepare id: "+id+" sql: "+sql); | ||
if(this.prepared[c.id]) { | ||
if(debug) logger.send("ConnectionPool prepare already done"); | ||
if(logger.active) logger.send("ConnectionPool prepare already done"); | ||
if(this.prepared[c.id][id]) { | ||
@@ -339,16 +332,22 @@ done(this.prepared[c.id][id]); | ||
if(!this.preparable) { | ||
if(debug) logger.send("ConnectionPool prepare not available, simulating prepare"); | ||
this.prepared[c.id][id]=(this.driver.translateSQL?this.driver.translateSQL(sql):sql); | ||
if(logger.active) logger.send("ConnectionPool prepare not available, simulating prepare"); | ||
try{ | ||
this.prepared[c.id][id]=(this.driver.translateSQL?this.driver.translateSQL(sql):sql); | ||
} catch(ex) { | ||
if(logger.active) logger.send("ConnectionPool translateSQL "+ex.message); | ||
error(ex.message); | ||
return; | ||
} | ||
done(sql); | ||
return; | ||
} | ||
var pool=this; | ||
const pool=this; | ||
this.driver.prepare(this.pool[c.id],(this.driver.translateSQL?this.driver.translateSQL(sql):sql), | ||
(prepared)=>{ | ||
pool.prepared[c.id][id]=prepared; | ||
if(debug) logger.send("ConnectionPool prepared calling done"); | ||
if(logger.active) logger.send("ConnectionPool prepared calling done"); | ||
done(prepared); | ||
}, | ||
(err)=>{ | ||
if(debug) logger.send("ConnectionPool prepare "+err); | ||
if(logger.active) logger.send("ConnectionPool prepare "+err); | ||
error(err) | ||
@@ -358,7 +357,7 @@ }); | ||
ConnectionPool.prototype.query=function(c,done,error,sql,params) { | ||
if(debug) logger.send("ConnectionPool query connection id: "+c.id+" sql: "+sql+" parms: "+JSON.stringify(params)); | ||
if(logger.active) logger.send("ConnectionPool query connection id: "+c.id+" sql: "+sql+" parms: "+JSON.stringify(params)); | ||
this.lastUsed[c.id]=new Date(); | ||
var pool=this; | ||
const pool=this; | ||
this.driver.query(this.pool[c.id],(this.driver.translateSQL?this.driver.translateSQL(sql):sql),params,done, (err)=>{ | ||
if(debug) logger.send("ConnectionPool query "+err); | ||
if(logger.active) logger.send({label:"ConnectionPool query error",error:err}); | ||
pool.checkDeadConnection(c,err); | ||
@@ -369,3 +368,3 @@ error(err); | ||
ConnectionPool.prototype.release=function(c,done) { | ||
if(debug) logger.send("ConnectionPool.release "+c.id); | ||
if(logger.active) logger.send("ConnectionPool.release "+c.id); | ||
this.returnConnection(c.id); | ||
@@ -375,6 +374,6 @@ if(done) done(); | ||
ConnectionPool.prototype.returnConnection=function(c) { | ||
if(debug) logger.send("ConnectionPool.returnConnection "+c); | ||
if(logger.active) logger.send("ConnectionPool.returnConnection "+c); | ||
this.active.splice(this.active.indexOf(c),1); | ||
if(this.pool[c]==null) { | ||
if(debug) logger.send("ConnectionPool.returnConnection "+c+" is bad, not placed on free chain"); | ||
if(logger.active) logger.send("ConnectionPool.returnConnection "+c+" is bad, not placed on free chain"); | ||
return; | ||
@@ -385,6 +384,6 @@ } | ||
ConnectionPool.prototype.rollback=function(c,done,error) { | ||
if(debug) logger.send("ConnectionPool.rollback "); | ||
var pool=this; | ||
if(logger.active) logger.send("ConnectionPool.rollback "); | ||
const pool=this; | ||
this.driver.rollback(this.pool[c.id],done,(err)=>{ | ||
if(debug) logger.send("ConnectionPool rollback "+err); | ||
if(logger.active) logger.send("ConnectionPool rollback "+err); | ||
pool.checkDeadConnection(c,err); | ||
@@ -396,5 +395,5 @@ error(err) | ||
try{ | ||
var thisObject=this, | ||
const thisObject=this, | ||
staleTimestamp= new Date(Date.now() - (1 * 60 * 1000)); | ||
for(var connectionID in this.active) { | ||
for(let connectionID in this.active) { | ||
if(this.lastUsed[connectionID] < staleTimestamp) { | ||
@@ -406,7 +405,7 @@ this.node.error("Releasing long running connection with rollback "+connectionID); | ||
(err)=>thisObject.release.apply(thisObject,[{id:connectionID},()=>{thisObject.node.warn("Releasing connection "+connectionID+" rollback failed: "+err);}]) | ||
]); | ||
]); | ||
} | ||
} | ||
} catch(e) { | ||
console.error("releaseStaleConnections failed: "+e) | ||
logger.sendError("releaseStaleConnections failed: "+e) | ||
} | ||
@@ -434,3 +433,3 @@ } | ||
RED.nodes.createNode(this,n); | ||
var node=Object.assign(this,n,{port:Number(n.port)}); | ||
const node=Object.assign(this,n,{port:Number(n.port)}); | ||
node.connectionPool=new ConnectionPool(node); | ||
@@ -442,3 +441,3 @@ node.toggleDebug=toggleDebug; | ||
if(!msg.cm) { | ||
var cm={id:node.id,running:0,connection:{},stack:[] | ||
const cm={id:node.id,running:0,connection:{},stack:[] | ||
,commit:commit,rollback:rollback,release:release | ||
@@ -509,11 +508,11 @@ ,query:query | ||
Driver.prototype.beginTransactionNoAction=function(conn,done,error) { | ||
if(debug) logger.send("Driver.beginTransactionNoAction"); | ||
if(logger.active) logger.send("Driver.beginTransactionNoAction"); | ||
done(); | ||
} | ||
Driver.prototype.beginTransactionSql=function(conn,done,error) { | ||
if(debug) logger.send("Driver.beginTransactionSql"); | ||
if(logger.active) logger.send("Driver.beginTransactionSql"); | ||
this.query(conn,"Start Transaction",null,done,error); | ||
}; | ||
Driver.prototype.close=function(conn,done,error) { | ||
if(debug) logger.send("close"); | ||
if(logger.active) logger.send("close"); | ||
conn.close().then(done,(err,result)=>{ | ||
@@ -528,16 +527,16 @@ if(error) { | ||
Driver.prototype.commitNoAction=function(conn,done,error) { | ||
if(debug) logger.send("Driver.commitNoAction"); | ||
if(logger.active) logger.send("Driver.commitNoAction"); | ||
done(); | ||
}; | ||
Driver.prototype.commitSql=function(conn,done,error) { | ||
if(debug) logger.send("Driver.commit"); | ||
if(logger.active) logger.send("Driver.commit"); | ||
this.query(conn,"commit",null,done,error); | ||
}; | ||
Driver.prototype.getOptions=function(node) { | ||
if(debug) logger.send("Driver.getOptions "+JSON.stringify(this.optionsMapping)); | ||
if(logger.active) logger.send("Driver.getOptions "+JSON.stringify(this.optionsMapping)); | ||
if(!this.options) { | ||
this.options=Object.assign({},this.optionsMapping); | ||
for(var i in this.optionsMapping ) { | ||
for(let i in this.optionsMapping ) { | ||
try{ | ||
if(debug) logger.send("Driver.getOptions propery "+i+" set to configuration property "+this.optionsMapping[i]); | ||
if(logger.active) logger.send("Driver.getOptions propery "+i+" set to configuration property "+this.optionsMapping[i]); | ||
if(node.credentials && node.credentials.hasOwnProperty(this.optionsMapping[i])) { | ||
@@ -560,9 +559,9 @@ this.options[i]=node.credentials[this.optionsMapping[i]]; | ||
try{ | ||
var options=this.getOptions(node); | ||
if(debug) logger.send("getConnectionC "+JSON.stringify(Object.assign({},options,{password:"***masked"}))); | ||
var thisObject=this; | ||
var c = new (this.Driver())(options); | ||
const options=this.getOptions(node); | ||
if(logger.active) logger.send("getConnectionC "+JSON.stringify(Object.assign({},options,{password:"***masked"}))); | ||
const thisObject=this; | ||
const c=new (this.Driver())(options); | ||
c.connect((err)=>{ | ||
if(err) { | ||
if(debug) logger.send("getConnection error "+err); | ||
if(logger.active) logger.send("getConnection error "+err); | ||
error(err); | ||
@@ -578,3 +577,3 @@ return; | ||
} catch(e) { | ||
console.error("Driver.getConnectionC error: "+e); | ||
logger.sendError("Driver.getConnectionC error: "+e); | ||
error(e); | ||
@@ -585,4 +584,4 @@ } | ||
try{ | ||
var options=this.getOptions(node); | ||
if(debug) logger.send("getConnectionC "+JSON.stringify(Object.assign({},options,{password:"***masked"}))); | ||
const options=this.getOptions(node); | ||
if(logger.active) logger.send("getConnectionC "+JSON.stringify(Object.assign({},options,{password:"***masked"}))); | ||
if(!this.driverInstance) this.driverInstance= new (this.Driver()); | ||
@@ -593,3 +592,3 @@ let thisObject=this, | ||
if(err) { | ||
if(debug) logger.send("getConnection error "+err); | ||
if(logger.active) logger.send("getConnection error "+err); | ||
error(err); | ||
@@ -605,3 +604,3 @@ return; | ||
} catch(e) { | ||
console.error("Driver.getConnectionC error: "+e); | ||
logger.sendError("Driver.getConnectionC error: "+e); | ||
error(e); | ||
@@ -612,4 +611,4 @@ } | ||
try{ | ||
let options=this.getOptions(node); | ||
if(debug) logger.send("getConnectionNeo4j "+JSON.stringify(Object.assign({},options))); | ||
const options=this.getOptions(node); | ||
if(logger.active) logger.send("getConnectionNeo4j "+JSON.stringify(Object.assign({},options))); | ||
let neo4j=new this.Driver(), | ||
@@ -625,3 +624,3 @@ driver=neo4j.driver("bolt://"+options.host+":"+options.host, neo4j.auth.basic(options.user,options.password)); | ||
} catch(e) { | ||
console.error("Driver.getConnectionNeo4j error: "+e); | ||
logger.sendError("Driver.getConnectionNeo4j error: "+e); | ||
error(e); | ||
@@ -632,4 +631,4 @@ } | ||
try{ | ||
let options=this.getOptions(node); | ||
if(debug) logger.send("getConnectionQ "+JSON.stringify(Object.assign({},options,{password:"***masked"}))); | ||
const options=this.getOptions(node); | ||
if(logger.active) logger.send("getConnectionQ "+JSON.stringify(Object.assign({},options,{password:"***masked"}))); | ||
let c = new this.Driver(options), | ||
@@ -646,3 +645,3 @@ thisObject=this; | ||
(err)=>{ | ||
if(debug) logger.send("query error "+err); | ||
if(logger.active) logger.send("query error "+err); | ||
error(err); | ||
@@ -652,3 +651,3 @@ } | ||
} catch(e) { | ||
console.error("Driver.getConnectionQ error: "+e); | ||
logger.sendError("Driver.getConnectionQ error: "+e); | ||
error(e); | ||
@@ -658,16 +657,16 @@ } | ||
Driver.prototype.execQ=function(preparedSql,params,done,error) { | ||
if(debug) logger.send("Driver.execQ "+JSON.stringify({params:params})); | ||
var thisObject=this; | ||
if(logger.active) logger.send("Driver.execQ "+JSON.stringify({params:params})); | ||
const thisObject=this; | ||
try{ | ||
preparedSql.exec(params||this.paramNull).then( | ||
(result)=>{ | ||
if(debug) logger.send("Driver.execQ first 100 chars results"+JSON.stringify(result||"<null>").substring(1,100)); | ||
if(logger.active) logger.send("Driver.execQ first 100 chars results"+JSON.stringify(result||"<null>").substring(1,100)); | ||
done(result); | ||
}, | ||
(err)=>{ | ||
if(debug) logger.send("Driver.execQ fail: "+err); | ||
if(logger.active) logger.send("Driver.execQ fail: "+err); | ||
try{ | ||
error(err); | ||
} catch(e) { | ||
console.error("Driver.execQ fail error: "+e+" stack:\n"+e.stack); | ||
logger.sendError("Driver.execQ fail error: "+e+" stack:\n"+e.stack); | ||
} | ||
@@ -677,3 +676,3 @@ } | ||
} catch(e) { | ||
console.error("Driver.execQ error: "+e); | ||
logger.sendError("Driver.execQ error: "+e); | ||
error(e); | ||
@@ -683,12 +682,12 @@ } | ||
Driver.prototype.prepareQ=function(conn,sql,done,error) { | ||
if(debug) logger.send("Driver.prepareQ "+JSON.stringify({sql:sql})); | ||
var thisObject=this; | ||
if(logger.active) logger.send("Driver.prepareQ "+JSON.stringify({sql:sql})); | ||
const thisObject=this; | ||
try{ | ||
conn.prepare(sql).then( | ||
(prepResult)=>{ | ||
if(debug) logger.send("Driver.prepareQ prepared completed"); | ||
if(logger.active) logger.send("Driver.prepareQ prepared completed"); | ||
done(prepResult); | ||
}, | ||
(err)=>{ | ||
if(debug) logger.send("Driver.prepareQ fail: "+err); | ||
if(logger.active) logger.send("Driver.prepareQ fail: "+err); | ||
error(err); | ||
@@ -698,3 +697,3 @@ } | ||
} catch(e) { | ||
console.error("Driver.prepareQ error: "+e); | ||
logger.sendError("Driver.prepareQ error: "+e); | ||
error(e); | ||
@@ -704,11 +703,11 @@ } | ||
Driver.prototype.queryC=function(conn,sql,params,done,error) { | ||
if(debug) logger.send("Driver.queryC "+JSON.stringify({sql:sql,params:params})); | ||
var thisObject=this; | ||
if(logger.active) logger.send("Driver.queryC "+JSON.stringify({sql:sql,params:params})); | ||
const thisObject=this; | ||
try{ | ||
conn.query(sql,(params||this.paramNull),(err, result) => { | ||
if(err) { | ||
if(debug) logger.send("Driver.queryC error: "+err); | ||
if(logger.active) logger.send("Driver.queryC error: "+err); | ||
error(err); | ||
} else { | ||
if(debug) logger.send("Driver.queryC first 100 chars results"+JSON.stringify(result||"<null>").substring(1,100)); | ||
if(logger.active) logger.send("Driver.queryC first 100 chars results"+JSON.stringify(result||"<null>").substring(1,100)); | ||
done(result); | ||
@@ -718,3 +717,3 @@ } | ||
} catch(e) { | ||
console.error("Driver.queryC error: "+e); | ||
logger.sendError("Driver.queryC error: "+e); | ||
error(e); | ||
@@ -724,7 +723,7 @@ } | ||
Driver.prototype.queryNeo4j=function(session,cmd,params,done,error) { | ||
if(debug) logger.send("Driver.queryNeo4j "+JSON.stringify({cmd:cmd,params:params})); | ||
if(logger.active) logger.send("Driver.queryNeo4j "+JSON.stringify({cmd:cmd,params:params})); | ||
try{ | ||
session.run(cmd,(params||this.paramNull)).then(done).catch(error); | ||
} catch(e) { | ||
console.error("Driver.queryNeo4j error: "+e); | ||
logger.sendError("Driver.queryNeo4j error: "+e); | ||
error(e); | ||
@@ -734,16 +733,16 @@ } | ||
Driver.prototype.queryQ=function(conn,sql,params,done,error) { | ||
if(debug) logger.send("Driver.queryQ "+JSON.stringify({sql:sql,params:params})); | ||
var thisObject=this; | ||
if(logger.active) logger.send("Driver.queryQ "+JSON.stringify({sql:sql,params:params})); | ||
const thisObject=this; | ||
try{ | ||
conn.query(sql,(params||this.paramNull)).then( | ||
(result)=>{ | ||
if(debug) logger.send("Driver.queryQ first 100 chars results"+JSON.stringify(result||"<null>").substring(1,100)); | ||
if(logger.active) logger.send("Driver.queryQ first 100 chars results"+JSON.stringify(result||"<null>").substring(1,100)); | ||
done(result); | ||
}, | ||
(err)=>{ | ||
if(debug) logger.send("Driver.queryQ fail: "+err); | ||
if(logger.active) logger.send("Driver.queryQ fail: "+err); | ||
try{ | ||
error(err); | ||
} catch(e) { | ||
console.error("Driver.queryQ fail error: "+e+" stack:\n"+e.stack); | ||
logger.sendError("Driver.queryQ fail error: "+e+" stack:\n"+e.stack); | ||
} | ||
@@ -753,3 +752,3 @@ } | ||
} catch(e) { | ||
console.error("Driver.queryQ error: "+e); | ||
logger.sendError("Driver.queryQ error: "+e); | ||
error(e); | ||
@@ -759,3 +758,3 @@ } | ||
Driver.prototype.rollback=function(conn,done,error) { | ||
if(debug) logger.send("Driver.rollback"); | ||
if(logger.active) logger.send("Driver.rollback"); | ||
this.query(conn,"rollback",null,done,error); | ||
@@ -766,11 +765,13 @@ }; | ||
}; | ||
var DriverType = { | ||
let DriverType = { | ||
'db2': new Driver({ | ||
Driver: function() { | ||
return require('ibm_db'); | ||
return require(this.requireName); | ||
}, | ||
requireName:'ibm_db', | ||
getConnection: Driver.prototype.getConnectionO | ||
}), | ||
'monetdb': new Driver({ | ||
Driver:(()=>require('monetdb')({maxReconnects:0,debug:false})), | ||
Driver:(()=>require(this.requireName)({maxReconnects:0,debug:false})), | ||
requireName:'monetdb', | ||
autoCommit:true, | ||
@@ -797,4 +798,5 @@ getConnection: Driver.prototype.getConnectionQ, | ||
Driver: function() { | ||
return require('neo4j-driver').v1; | ||
return require(this.requireName).v1; | ||
}, | ||
requireName:'neo4j-driver', | ||
autoCommit:false, | ||
@@ -815,3 +817,3 @@ beginTransaction:Driver.prototype.beginTransactionNoAction, | ||
translateSQL:function(sql) { | ||
var r=""; | ||
let r=""; | ||
sql.split('?').forEach((e,i)=>r+=e+"$"+(i+1)); | ||
@@ -818,0 +820,0 @@ return r.slice(0, -2); |
{ | ||
"name": "node-red-contrib-connectionmanager", | ||
"version": "0.0.2", | ||
"version": "0.0.3", | ||
"description": "Node-RED implements generalised connections manager.", | ||
"dependencies": { | ||
"logger": "git+https://github.com/peterprib/node-red-contrib-logger.git", | ||
"node-red-contrib-logger": "git+https://github.com/peterprib/node-red-contrib-logger.git" | ||
"node-red-contrib-logger": "git+https://github.com/peterprib/node-red-contrib-logger.git", | ||
"pg": "*" | ||
}, | ||
@@ -24,3 +25,4 @@ "devDependencies": { | ||
"lint": "eslint --ext .js ./", | ||
"lint-fix": "eslint --fix --ext .js ./" | ||
"lint-fix": "eslint --fix --ext .js ./", | ||
"fix": "npm audit fix" | ||
}, | ||
@@ -27,0 +29,0 @@ "repository": { |
@@ -130,2 +130,4 @@ # [node-red-contrib-connectionmanager][2] | ||
0.0.3 fix bug with error handling and arrays. Add pg in to package dependencies. More debug details. | ||
0.0.2 get rid of monetdb warning. Add in access to flow.get env.get, global.get | ||
@@ -132,0 +134,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Wildcard dependency
QualityPackage has a dependency with a floating version range. This can cause issues if the dependency publishes a new major version.
Found 1 instance in 1 package
Dynamic require
Supply chain riskDynamic require can indicate the package is performing dangerous or unsafe dynamic code execution.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
413564
2053
146
3
2
3
+ Addedpg@*
+ Addedpg@8.13.1(transitive)
+ Addedpg-cloudflare@1.1.1(transitive)
+ Addedpg-connection-string@2.7.0(transitive)
+ Addedpg-int8@1.0.1(transitive)
+ Addedpg-pool@3.7.0(transitive)
+ Addedpg-protocol@1.7.0(transitive)
+ Addedpg-types@2.2.0(transitive)
+ Addedpgpass@1.0.5(transitive)
+ Addedpostgres-array@2.0.0(transitive)
+ Addedpostgres-bytea@1.0.0(transitive)
+ Addedpostgres-date@1.0.7(transitive)
+ Addedpostgres-interval@1.2.0(transitive)
+ Addedsplit2@4.2.0(transitive)
+ Addedxtend@4.0.2(transitive)