leo-connector-postgres
Advanced tools
Comparing version
@@ -0,0 +0,0 @@ "use strict"; |
@@ -0,0 +0,0 @@ const pg = require("pg"); |
@@ -0,0 +0,0 @@ "use strict"; |
@@ -0,0 +0,0 @@ const { |
@@ -0,0 +0,0 @@ 'use strict'; |
@@ -7,3 +7,3 @@ 'use strict'; | ||
module.exports = function (config, columnConfig) { | ||
module.exports = function(config, columnConfig) { | ||
let client = postgres(config); | ||
@@ -73,3 +73,3 @@ let dwClient = client; | ||
return { | ||
add: function (obj, done) { | ||
add: function(obj, done) { | ||
let field = obj.__leo_delete__; | ||
@@ -101,16 +101,22 @@ let id = obj.__leo_delete_id__; | ||
tempTables.forEach(table => { | ||
tasks.push(done => client.query(`drop table ${table}`, done)); | ||
}); | ||
try { | ||
tempTables.forEach(table => { | ||
tasks.push(done => client.query(`drop table if exists ${table}`, done)); | ||
}); | ||
return new Promise(resolve => { | ||
async.series(tasks, err => { | ||
if (err) { | ||
throw err; | ||
} else { | ||
tempTables = []; | ||
return resolve('Cleaned up temp tables'); | ||
} | ||
await new Promise((resolve, reject) => { | ||
async.series(tasks, err => { | ||
if (err) { | ||
reject(err); | ||
} else { | ||
tempTables = []; | ||
resolve('Cleaned up temp tables'); | ||
} | ||
}); | ||
}); | ||
}); | ||
} catch (dropError) { | ||
// Temp tables are cleaned up before use in the next invocation | ||
// so just log it and move on | ||
logger.info('Error dropping temp tables', dropError); | ||
} | ||
} | ||
@@ -121,3 +127,3 @@ | ||
client.importFact = function (stream, table, ids, callback) { | ||
client.importFact = function(stream, table, ids, callback) { | ||
const stagingTable = `${columnConfig.stageTablePrefix}_${table}`; | ||
@@ -334,3 +340,3 @@ const qualifiedStagingTable = `${columnConfig.stageSchema}.${stagingTable}`; | ||
connection.release(); | ||
callback(e, d); | ||
callback(e || err, d); | ||
}); | ||
@@ -344,3 +350,3 @@ } | ||
client.importDimension = function (stream, table, sk, nk, scds, callback, tableDef = {}) { | ||
client.importDimension = function(stream, table, sk, nk, scds, callback, tableDef = {}) { | ||
const stagingTbl = `${columnConfig.stageTablePrefix}_${table}`; | ||
@@ -546,3 +552,3 @@ const qualifiedStagingTable = `${columnConfig.stageSchema}.${stagingTbl}`; | ||
connection.release(); | ||
callback(e, d); | ||
callback(e || err, d); | ||
}); | ||
@@ -652,3 +658,3 @@ } | ||
connection.release(); | ||
callback(e, d); | ||
callback(e || err, d); | ||
}); | ||
@@ -663,3 +669,3 @@ } | ||
client.insertMissingDimensions = function (usedTables, tableConfig, tableSks, tableNks, callback) { | ||
client.insertMissingDimensions = function(usedTables, tableConfig, tableSks, tableNks, callback) { | ||
if (config.hashedSurrogateKeys) { | ||
@@ -727,3 +733,3 @@ callback(null); | ||
client.linkDimensions = function (table, links, nk, callback, tableStatus) { | ||
client.linkDimensions = function(table, links, nk, callback, tableStatus) { | ||
client.describeTable(table).then(() => { | ||
@@ -834,3 +840,3 @@ let tasks = []; | ||
client.changeTableStructure = async function (structures) { | ||
client.changeTableStructure = async function(structures) { | ||
let tasks = []; | ||
@@ -902,3 +908,3 @@ let tableResults = {}; | ||
client.createTable = async function (table, definition) { | ||
client.createTable = async function(table, definition) { | ||
let fields = []; | ||
@@ -1041,3 +1047,3 @@ let defaults = []; | ||
}; | ||
client.updateTable = async function (table, definition) { | ||
client.updateTable = async function(table, definition) { | ||
let fields = []; | ||
@@ -1097,3 +1103,3 @@ let queries = []; | ||
return new Promise(resolve => { | ||
async.eachSeries(sqls, function (sql, done) { | ||
async.eachSeries(sqls, function(sql, done) { | ||
client.query(sql, err => done(err)); | ||
@@ -1110,3 +1116,3 @@ }, err => { | ||
client.findAuditDate = function (table, callback) { | ||
client.findAuditDate = function(table, callback) { | ||
client.query(`select to_char(max(${columnConfig._auditdate}), 'YYYY-MM-DD HH24:MI:SS') as max FROM ${client.escapeId(table)}`, (err, auditdate) => { | ||
@@ -1128,3 +1134,3 @@ if (err) { | ||
client.exportChanges = function (table, fields, remoteAuditdate, opts, callback) { | ||
client.exportChanges = function(table, fields, remoteAuditdate, opts, callback) { | ||
let auditdateCompare = remoteAuditdate.auditdate != null ? `${columnConfig._auditdate} >= ${client.escapeValue(remoteAuditdate.auditdate)}` : `${columnConfig._auditdate} is null`; | ||
@@ -1197,3 +1203,3 @@ client.query(`select count(*) as count FROM ${client.escapeId(table)} WHERE ${auditdateCompare}`, (err, result) => { | ||
client.importChanges = function (file, table, fields, opts, callback) { | ||
client.importChanges = function(file, table, fields, opts, callback) { | ||
if (typeof opts === 'function') { | ||
@@ -1259,6 +1265,6 @@ callback = opts; | ||
} | ||
tasks.push(function (done) { | ||
tasks.push(function(done) { | ||
client.query(`insert into ${tableName} select * from ${qualifiedStagingTable}`, done); | ||
}); | ||
tasks.push(function (done) { | ||
tasks.push(function(done) { | ||
client.query(`select count(*) from ${qualifiedStagingTable}`, (err, result) => { | ||
@@ -1269,3 +1275,3 @@ loadCount = result && parseInt(result[0].count); | ||
}); | ||
tasks.push(function (done) { | ||
tasks.push(function(done) { | ||
client.query(`drop table if exists ${qualifiedStagingTable}`, done); | ||
@@ -1279,2 +1285,2 @@ }); | ||
return client; | ||
}; | ||
}; |
@@ -0,0 +0,0 @@ // upper and lower is bigger |
@@ -0,0 +0,0 @@ /* |
{ | ||
"name": "leo-connector-postgres", | ||
"version": "4.0.19-beta", | ||
"version": "4.0.20-beta", | ||
"description": "A Postgres database connector for use with Leo Platform", | ||
@@ -5,0 +5,0 @@ "repository": { |
@@ -0,0 +0,0 @@ # Documentation |
@@ -0,0 +0,0 @@ "use strict"; |
@@ -0,0 +0,0 @@ { |
160077
3.47%4664
0.15%