leo-connector-common
Advanced tools
Comparing version 1.4.5 to 1.5.0-beta1
146
botHelper.js
@@ -6,5 +6,4 @@ "use strict"; | ||
const MAX = 5000; | ||
const sqlstring = require('sqlstring'); | ||
const moment = require('moment'); | ||
module.exports = function(event, context, sdk) { | ||
@@ -40,3 +39,4 @@ if (!event || !context || !sdk) { | ||
joins = {}, | ||
sqlQuery; | ||
sqlQuery, | ||
databases = []; | ||
@@ -63,2 +63,17 @@ return { | ||
includeSchema: function(database) { | ||
if (Array.isArray(database)) { | ||
database.forEach(d => { | ||
if (databases.indexOf(d) === -1) { | ||
databases.push(d); | ||
} | ||
}); | ||
} else { | ||
if (database.indexOf(database) === -1) { | ||
database.push(table); | ||
} | ||
} | ||
return this; | ||
}, | ||
// do stuff to build the domain objects | ||
@@ -76,33 +91,83 @@ run: function (callback) { | ||
// @todo handle deletes separatley | ||
if (obj.update && obj.update[table]) { | ||
obj[table] = obj.update[table]; | ||
// @todo handle deletes separately | ||
if (obj.update) { | ||
obj = obj.update; | ||
} | ||
// only process if we have any data for this table | ||
if (obj[table] && obj[table].length) { | ||
if (Array.isArray(tables[table])) { // if we passed in an array of primary keys | ||
// turn the object into an array of items | ||
objArray.push(obj[table].map(row => { | ||
let array = []; | ||
for (let key of tables[table]) { | ||
array.push(row[key]); | ||
} | ||
if (Array.isArray(obj)) { | ||
// only process if we have any data for this table | ||
if (obj[table] && obj[table].length) { | ||
if (Array.isArray(tables[table])) { // if we passed in an array of primary keys | ||
// turn the object into an array of items | ||
objArray.push(obj[table].map(row => { | ||
let array = []; | ||
for (let key of tables[table]) { | ||
array.push(row[key]); | ||
} | ||
return array; | ||
})); | ||
return array; | ||
})); | ||
// if the value of any of the tables is a SELECT query, replace ? with the IDs in obj[table] | ||
} else if (tables[table].match(/^SELECT/)) { | ||
async.doWhilst((done) => { | ||
// if the value of any of the tables is a SELECT query, replace ? with the IDs in obj[table] | ||
} else if (tables[table].match(/^SELECT/)) { | ||
async.doWhilst((done) => { | ||
// split the ID's up into no more than 5k for each query | ||
let ids = obj[table].splice(0, params.limit || MAX); | ||
objArray.push(tables[table].replace(/\?/g, ids.filter(id => {return id != undefined}).join())); | ||
done(); | ||
}, () => obj[table].length); | ||
} else { | ||
// we just have id's. Push them into the object | ||
objArray.push(obj[table]); | ||
// split the ID's up into no more than 5k for each query | ||
let ids = obj[table].splice(0, params.limit || MAX); | ||
objArray.push(tables[table].replace(/\?/g, ids.filter(id => { | ||
return id != undefined | ||
}).join())); | ||
done(); | ||
}, () => obj[table].length); | ||
} else { | ||
// we just have id's. Push them into the object | ||
objArray.push(obj[table]); | ||
} | ||
} | ||
} else if (typeof obj === 'object') { | ||
// @todo Added for multiple databases | ||
if (!objArray[database]) { | ||
objArray[database] = { | ||
[table]: [] | ||
}; | ||
} else if (!objArray[database][table]) { | ||
objArray[database][table] = []; | ||
} | ||
// @todo end added for multiple databases | ||
// this contains obj[database][table][ids] | ||
// loop through the database and see if we have any matching tables | ||
Object.keys(obj).forEach(database => { | ||
// only process if we have any data for this table | ||
if (obj[database][table] && obj[database][table].length) { | ||
if (Array.isArray(tables[table])) { // if we passed in an array of primary keys | ||
// turn the object into an array of items | ||
objArray.push(obj[database][table].map(row => { | ||
let array = []; | ||
for (let key of tables[table]) { | ||
array.push(row[key]); | ||
} | ||
return array; | ||
})); | ||
// if the value of any of the tables is a SELECT query, replace ? with the IDs in obj[database][table] | ||
} else if (tables[table] && tables[table].match(/^SELECT/)) { | ||
async.doWhilst((done) => { | ||
// split the ID's up into no more than 5k for each query | ||
let ids = obj[database][table].splice(0, params.limit || MAX); | ||
objArray.push(tables[table].replace(/\?/g, ids.filter(id => { | ||
return id != undefined | ||
}).join())); | ||
done(); | ||
}, () => obj[database][table].length); | ||
} else { | ||
if (!databases.length || databases.indexOf(database) !== -1) { | ||
// we just have id's. Push them into the object | ||
objArray.push(obj[database][table]); | ||
} | ||
} | ||
} | ||
}); | ||
} | ||
@@ -116,3 +181,9 @@ }); | ||
if (Array.isArray(ids)) { | ||
idsList = ids.filter(id => {return id != undefined}).join(); | ||
idsList = ids.filter(id => {return id != undefined}).map(id => { | ||
if (isNaN(id)) { | ||
return sqlstring.escape(id); | ||
} | ||
return id; | ||
}).join(); | ||
} | ||
@@ -271,2 +342,3 @@ | ||
let trackedTables = {}; | ||
let omitTables = []; | ||
@@ -290,3 +362,17 @@ // get the starting point | ||
}, | ||
omitTable: function(table) { | ||
if (Array.isArray(table)) { | ||
table.forEach(t => { | ||
if (table.indexOf(t) === -1) { | ||
omitTables.push(t); | ||
} | ||
}); | ||
} else { | ||
if (table.indexOf(table) === -1) { | ||
omitTables.push(table); | ||
} | ||
} | ||
}, | ||
run: function(callback) { | ||
params.omitTables = omitTables; | ||
let stream = params.connector.streamChanges(params.connection, trackedTables, params); | ||
@@ -311,2 +397,4 @@ | ||
}; | ||
this.trackDatabaseChanges = this.trackTableChanges; | ||
}; |
{ | ||
"name": "leo-connector-common", | ||
"version": "1.4.5", | ||
"version": "1.5.0-beta1", | ||
"description": "Common package for all Leo Platform database connectors", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
"use strict"; | ||
const logger = require("leo-sdk/lib/logger")("leo.connector.sql"); | ||
const logger = require("leo-logger")("leo.connector.sql"); | ||
const async = require("async"); | ||
@@ -51,3 +51,3 @@ const leo = require("leo-sdk"); | ||
submit(push, done); | ||
} else if (typeof sql == "function") { | ||
} else if (typeof sql === "function") { | ||
lib.processIds(sqlClient, obj, sql, null, (err, newIds) => { | ||
@@ -86,3 +86,3 @@ if (err) { | ||
}); | ||
async.parallelLimit(tasks, 10, (err, results) => { | ||
async.parallelLimit(tasks, 10, (err) => { | ||
if (err) { | ||
@@ -115,3 +115,3 @@ done(err); | ||
let r = domainObj(ids, builder.createLoader); | ||
if (typeof r.get == "function") { | ||
if (typeof r.get === "function") { | ||
r = r.get(); | ||
@@ -131,12 +131,6 @@ } | ||
Object.keys(obj.joins).forEach(name => { | ||
let t = obj.joins[name]; | ||
if (t.type === "one_to_many") { | ||
domains[id][name] = []; | ||
} else { | ||
domains[id][name] = {}; | ||
} | ||
domains[id][name] = []; | ||
}); | ||
}); | ||
function mapResults(results, fields, each) { | ||
@@ -182,2 +176,3 @@ let mappings = []; | ||
// handle the main query for the domain object loader | ||
tasks.push(done => { | ||
@@ -197,7 +192,7 @@ if (!obj.id) { | ||
logger.error('ID: "' + obj.id + '" not found in object:'); | ||
} else if (!domains[row[obj.id]]) { | ||
logger.error('ID: "' + obj.id + '" with a value of: "' + row[obj.id] + '" does not match any ID in the domain object. This could be caused by using a WHERE clause on an ID that differs from the SELECT ID'); | ||
} else if (!domains[id]) { | ||
logger.error('ID: "' + obj.id + '" with a value of: "' + id + '" does not match any ID in the domain object. This could be caused by using a WHERE clause on an ID that differs from the SELECT ID'); | ||
} else { | ||
//We need to keep the domain relationships in tact | ||
domains[row[obj.id]] = Object.assign(domains[row[obj.id]], row); | ||
domains[id] = Object.assign(domains[id], row); | ||
} | ||
@@ -211,6 +206,8 @@ }); | ||
// handle join queries | ||
Object.keys(obj.joins).forEach(name => { | ||
let t = obj.joins[name]; | ||
if (t.type === "one_to_many") { | ||
if (t.type && t.type === "one_to_one") { | ||
logger.error('one-to-one joins are not supported as a separate function. Please include them in the main query.') | ||
} else { | ||
tasks.push(done => { | ||
@@ -235,26 +232,6 @@ sqlClient.query(t.sql, (err, results, fields) => { | ||
}); | ||
} else { | ||
tasks.push(done => { | ||
sqlClient.query(t.sql, (err, results, fields) => { | ||
if (err) { | ||
return done(err); | ||
} else if (!results.length) { | ||
return done(); | ||
} | ||
mapResults(results, fields, row => { | ||
if (row.length) { | ||
if (t.transform) { | ||
row = t.transform(row); | ||
} | ||
domains[row[t.on]][name] = row; | ||
} | ||
}); | ||
done(); | ||
}, { | ||
inRowMode: true | ||
}); | ||
}); | ||
} | ||
}); | ||
// process all the tasks | ||
async.parallelLimit(tasks, 5, (err) => { | ||
@@ -261,0 +238,0 @@ if (err) { |
@@ -10,5 +10,7 @@ const async = require("async"); | ||
idlist.forEach(idthing => { | ||
// array of ids | ||
if (Array.isArray(idthing)) { | ||
ids = ids.concat(idthing); | ||
} else if (idthing && typeof idthing == "object") { | ||
} else if (idthing && typeof idthing === "object") { | ||
// idthing is an object with ids | ||
if (idthing.ids && idthing.ids.length >= 1) { | ||
@@ -53,3 +55,4 @@ tasks.push((done) => { | ||
} | ||
} else if (typeof idthing == "string") { | ||
} else if (typeof idthing === "string") { | ||
// idthing is a query string | ||
tasks.push((done) => { | ||
@@ -56,0 +59,0 @@ sqlClient.query(idthing, (err, results, fields) => { |
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
96591
3525
1