leo-connector-postgres
Advanced tools
Comparing version
@@ -14,6 +14,2 @@ const { | ||
// need to confirm there is no issue adding these dependencies | ||
const leo = require('leo-sdk'); | ||
const ls = leo.streams; | ||
require('pg').types.setTypeParser(1114, (val) => { | ||
@@ -25,2 +21,4 @@ val += 'Z'; | ||
const ls = require('leo-sdk').streams; | ||
let queryCount = 0; | ||
@@ -578,172 +576,4 @@ module.exports = function (config) { | ||
}, | ||
streamToTableFromS3: (table, config) => { | ||
const ts = table.split('.'); | ||
let schema = 'public'; | ||
let shortTable = table; | ||
if (ts.length > 1) { | ||
schema = ts[0]; | ||
shortTable = ts[1]; | ||
} | ||
let columns = []; | ||
let stream; | ||
let myClient = null; | ||
let pending = null; | ||
let ended = false; | ||
let csvopts = { delimiter: '|' }; | ||
let keepS3Files = config.keepS3Files != null ? config.keepS3Files : false; | ||
// Get prefix for S3 file path | ||
let s3prefix = | ||
config.s3prefix || | ||
process.env.AWS_LAMBDA_FUNCTION_NAME || | ||
'dw_redshift_ingest'; | ||
s3prefix = s3prefix.replace(/^\/*(.*?)\/*$/, '$1'); // Remove leading and trailing '/' | ||
// clean audit date to use in S3 file path | ||
let cleanAuditDate = client.auditdate | ||
.replace(/'/g, '') | ||
.replace(/:/g, '-'); | ||
let s3FileName = `files/${s3prefix}/${cleanAuditDate}/${table}.csv`; | ||
client.connect().then( | ||
c => { | ||
client | ||
.describeTable(shortTable, schema) | ||
.then(result => { | ||
if (ended) { | ||
c.release(true); | ||
return; | ||
} | ||
columns = result.map(f => f.column_name); | ||
myClient = c; | ||
stream = ls.toS3(leo.configuration.resources.LeoS3, s3FileName); | ||
// copyFrom uses `end` but s3 `finish` so pipe finish to end | ||
stream.on('finish', () => stream.emit('end')); | ||
stream.on('error', function(err) { | ||
logger.error(`COPY error: ${err.where}`, err); | ||
process.exit(); | ||
}); | ||
if (pending) { | ||
pending(); | ||
} | ||
}) | ||
.catch(err => { | ||
logger.error(err); | ||
}); | ||
}, | ||
err => { | ||
logger.error(err); | ||
} | ||
); | ||
let count = 0; | ||
function nonNull(v) { | ||
if (v === '' || v === null || v === undefined) { | ||
return '\\N'; | ||
} else if (typeof v === 'string' && (v.search(/\r/) !== -1 || v.search(/\n/) !== -1)) { | ||
if (config.version !== 'redshift') { | ||
return v.replace(/\r\n?/g, '\n'); | ||
} else { | ||
return v.replace(/\r\n?/g, '\n').replace(/\n/g, `\\n`); | ||
} | ||
} else { | ||
return v; | ||
} | ||
} | ||
return ls.pipeline( | ||
csv.createWriteStream({ | ||
...csvopts, | ||
headers: false, | ||
transform: (row, done) => { | ||
if (!myClient) { | ||
pending = () => { | ||
done( | ||
null, | ||
columns.map(f => nonNull(row[f])) | ||
); | ||
}; | ||
} else { | ||
done( | ||
null, | ||
columns.map(f => nonNull(row[f])) | ||
); | ||
} | ||
}, | ||
}), | ||
ls.write( | ||
(r, done) => { | ||
count++; | ||
if (count % 10000 === 0) { | ||
logger.info(table + ': ' + count); | ||
} | ||
if (!stream.write(r)) { | ||
stream.once('drain', done); | ||
} else { | ||
done(null); | ||
} | ||
}, | ||
done => { | ||
ended = true; | ||
logger.debug(table + ': stream done'); | ||
if (stream) { | ||
stream.on('end', err => { | ||
logger.debug(table + ': stream ended', err || ''); | ||
// wrap done callback to release the connection | ||
function innerDone(err) { | ||
myClient.release(true); | ||
logger.debug(table + ': stream client released', err || ''); | ||
done(err); | ||
} | ||
if (err) { | ||
innerDone(err); | ||
} else { | ||
// Once the S3 file is complete run copy to load the staging table | ||
let f = columns.map(f => `"${f}"`); | ||
let file = `s3://${leo.configuration.s3}/${s3FileName}`; | ||
let manifest = ''; | ||
let role = config.loaderRole; | ||
myClient.query( | ||
`copy ${table} (${f}) from '${file}' ${manifest} ${role ? `credentials 'aws_iam_role=${role}'` : '' | ||
} NULL AS '\\\\N' format csv DELIMITER '|' ACCEPTINVCHARS TRUNCATECOLUMNS ACCEPTANYDATE TIMEFORMAT 'auto' COMPUPDATE OFF`, | ||
copyErr => { | ||
if (keepS3Files) { | ||
innerDone(copyErr); | ||
} else { | ||
// Delete the S3 files when done | ||
ls.s3.deleteObject( | ||
{ | ||
Bucket: leo.configuration.s3, | ||
Key: s3FileName, | ||
}, | ||
deleteError => { | ||
if (deleteError) { | ||
logger.info( | ||
'file failed to delete:', | ||
s3FileName, | ||
deleteError | ||
); | ||
} | ||
innerDone(copyErr); | ||
} | ||
); | ||
} | ||
} | ||
); | ||
} | ||
}); | ||
stream.end(); | ||
} else { | ||
done(); | ||
} | ||
} | ||
) | ||
); | ||
streamToTableFromS3: () => { | ||
// opts = Object.assign({}, opts || {}); | ||
}, | ||
@@ -750,0 +580,0 @@ }; |
@@ -7,3 +7,3 @@ 'use strict'; | ||
module.exports = function(config, columnConfig) { | ||
module.exports = function (config, columnConfig) { | ||
let client = postgres(config); | ||
@@ -31,18 +31,6 @@ let dwClient = client; | ||
// Control flow for both of these configurations set to true has not been added. An error will be thrown until that is supported. | ||
if ( | ||
(config.hashedSurrogateKeys && !config.bypassSlowlyChangingDimensions) // hashed surrogate keys and slowly changing dimensions not supported | ||
|| (config.hashedSurrogateKeys && config.bypassSlowlyChangingDimensions === undefined) // covering the case where slowly changing dimensions has been omitted | ||
) { | ||
logger.error(`Unsupported configuration, bypassSlowlyChangingDimensions:${config.bypassSlowlyChangingDimensions} hashedSurrogateKeys:${config.hashedSurrogateKeys}.`); | ||
process.exit(); | ||
}; | ||
client.getDimensionColumn = columnConfig.dimColumnTransform; | ||
client.columnConfig = columnConfig; | ||
let deleteFlushCount = config.deleteFlushCount || 1000; | ||
function deletesSetup(qualifiedTable, schema, field, value, where = '') { | ||
function deletesSetup (qualifiedTable, schema, field, value, where = '') { | ||
let colLookup = {}; | ||
@@ -58,4 +46,4 @@ schema.map(col => { | ||
function tryFlushDelete(done, force = false) { | ||
if (force || toDeleteCount >= deleteFlushCount) { | ||
function tryFlushDelete (done, force = false) { | ||
if (force || toDeleteCount >= 1000) { | ||
let deleteTasks = Object.keys(toDelete).map(col => { | ||
@@ -77,3 +65,3 @@ return deleteDone => client.query(`update ${qualifiedTable} set ${field} = ${value}, ${columnConfig._auditdate} = ${dwClient.auditdate} where ${col} in (${toDelete[col].join(',')}) ${where}`, deleteDone); | ||
return { | ||
add: function(obj, done) { | ||
add: function (obj, done) { | ||
let field = obj.__leo_delete__; | ||
@@ -105,22 +93,16 @@ let id = obj.__leo_delete_id__; | ||
try { | ||
tempTables.forEach(table => { | ||
tasks.push(done => client.query(`drop table if exists ${table}`, done)); | ||
}); | ||
tempTables.forEach(table => { | ||
tasks.push(done => client.query(`drop table ${table}`, done)); | ||
}); | ||
await new Promise((resolve, reject) => { | ||
async.series(tasks, err => { | ||
if (err) { | ||
reject(err); | ||
} else { | ||
tempTables = []; | ||
resolve('Cleaned up temp tables'); | ||
} | ||
}); | ||
return new Promise(resolve => { | ||
async.series(tasks, err => { | ||
if (err) { | ||
throw err; | ||
} else { | ||
tempTables = []; | ||
return resolve('Cleaned up temp tables'); | ||
} | ||
}); | ||
} catch (dropError) { | ||
// Temp tables are cleaned up before use in the next invocation | ||
// so just log it and move on | ||
logger.info('Error dropping temp tables', dropError); | ||
} | ||
}); | ||
} | ||
@@ -131,3 +113,3 @@ | ||
client.importFact = function(stream, table, ids, callback) { | ||
client.importFact = function (stream, table, ids, callback) { | ||
const stagingTable = `${columnConfig.stageTablePrefix}_${table}`; | ||
@@ -148,38 +130,8 @@ const qualifiedStagingTable = `${columnConfig.stageSchema}.${stagingTable}`; | ||
let sortKey; | ||
let sortKeyType; | ||
tempTables.push(qualifiedStagingTable); | ||
tasks.push(done => client.query(`drop table if exists ${qualifiedStagingTable}`, done)); | ||
if (config.version !== 'redshift') { | ||
tasks.push(done => client.query(`create table ${qualifiedStagingTable} (like ${qualifiedTable})`, done)); | ||
tasks.push(done => client.query(`create index ${stagingTable}_id on ${qualifiedStagingTable} (${ids.join(', ')})`, done)); | ||
} else { | ||
// get sortkey for joins | ||
tasks.push(done => { | ||
client.query(`SELECT sortkey, | ||
sortkeytype | ||
FROM public.v_dist_sort_key | ||
WHERE table_name = '${table}';`, (err, results) => { | ||
if (err) { | ||
return done(err); | ||
} else { | ||
if (results[0].sortKey != null) { | ||
sortKey = results[0].sortkey; | ||
sortKeyType = results[0].sortkeytype; | ||
}; | ||
done(); | ||
}; | ||
}); | ||
}); | ||
tasks.push(done => | ||
client.query(`CREATE TABLE ${qualifiedStagingTable} | ||
DISTSTYLE ALL | ||
SORTKEY (${sortKey != null ? sortKey : ids[0]}) | ||
AS SELECT * | ||
FROM ${qualifiedTable} | ||
LIMIT 0;`, done)); | ||
}; | ||
tasks.push(done => client.query(`drop table if exists ${qualifiedStagingTable}_changes`, done)); | ||
tasks.push(done => client.query(`create table ${qualifiedStagingTable} (like ${qualifiedTable})`, done)); | ||
tasks.push(done => client.query(`create index ${stagingTable}_id on ${qualifiedStagingTable} (${ids.join(', ')})`, done)); | ||
// tasks.push(done => ls.pipe(stream, client.streamToTable(qualifiedStagingTable), done)); | ||
tasks.push(done => { | ||
@@ -195,3 +147,3 @@ ls.pipe(stream, ls.through((obj, done, push) => { | ||
} | ||
}), config.version !== 'redshift' ? client.streamToTable(qualifiedStagingTable) : client.streamToTableFromS3(qualifiedStagingTable, config), (err) => { | ||
}), client.streamToTable(qualifiedStagingTable), (err) => { | ||
if (err) { | ||
@@ -205,5 +157,3 @@ return done(err); | ||
if (config.version !== 'redshift') { | ||
tasks.push(done => client.query(`analyze ${qualifiedStagingTable}`, done)); | ||
}; | ||
tasks.push(done => client.query(`analyze ${qualifiedStagingTable}`, done)); | ||
@@ -221,117 +171,32 @@ client.describeTable(table).then(result => { | ||
let totalRecords = 0; | ||
// The following code relies on the fact that now()/sysdate will return the same time during all transaction events | ||
// The following code relies on the fact that now() will return the same time during all transaction events | ||
tasks.push(done => connection.query(`Begin Transaction`, done)); | ||
if (!config.hashedSurrogateKeys) { | ||
tasks.push(done => { | ||
connection.query(`select 1 as total from ${qualifiedTable} limit 1`, (err, results) => { | ||
if (err) { | ||
return done(err); | ||
} | ||
totalRecords = results.length; | ||
done(); | ||
}); | ||
tasks.push(done => { | ||
connection.query(`select 1 as total from ${qualifiedTable} limit 1`, (err, results) => { | ||
if (err) { | ||
return done(err); | ||
} | ||
totalRecords = results.length; | ||
done(); | ||
}); | ||
tasks.push(done => { | ||
connection.query(`Update ${qualifiedTable} prev | ||
SET ${columns.map(column => `${column} = coalesce(staging.${column}, prev.${column})`)}, ${columnConfig._deleted} = coalesce(prev.${columnConfig._deleted}, false), ${columnConfig._auditdate} = ${dwClient.auditdate} | ||
FROM ${qualifiedStagingTable} staging | ||
where ${ids.map(id => `prev.${id} = staging.${id}`).join(' and ')}` | ||
, done); | ||
}); | ||
}); | ||
tasks.push(done => { | ||
connection.query(`Update ${qualifiedTable} prev | ||
SET ${columns.map(column => `${column} = coalesce(staging.${column}, prev.${column})`)}, ${columnConfig._deleted} = coalesce(prev.${columnConfig._deleted}, false), ${columnConfig._auditdate} = ${dwClient.auditdate} | ||
FROM ${qualifiedStagingTable} staging | ||
where ${ids.map(id => `prev.${id} = staging.${id}`).join(' and ')} | ||
`, done); | ||
}); | ||
// Now insert any we were missing | ||
tasks.push(done => { | ||
connection.query(`INSERT INTO ${qualifiedTable} (${columns.join(',')},${columnConfig._deleted},${columnConfig._auditdate}) | ||
SELECT ${columns.map(column => `staging.${column}`)}, false AS ${columnConfig._deleted}, ${dwClient.auditdate} as ${columnConfig._auditdate} | ||
FROM ${qualifiedStagingTable} staging | ||
WHERE NOT EXISTS ( SELECT * | ||
FROM ${qualifiedTable} as prev | ||
WHERE ${ids.map(id => `prev.${id} = staging.${id}`).join(' and ')})`, done); | ||
}); | ||
} else { | ||
let naturalKeyLowerBound; | ||
let naturalKeyFilter; | ||
// Now insert any we were missing | ||
tasks.push(done => { | ||
connection.query(`INSERT INTO ${qualifiedTable} (${columns.join(',')},${columnConfig._deleted},${columnConfig._auditdate}) | ||
SELECT ${columns.map(column => `coalesce(staging.${column}, prev.${column})`)}, coalesce(prev.${columnConfig._deleted}, false), ${dwClient.auditdate} as ${columnConfig._auditdate} | ||
FROM ${qualifiedStagingTable} staging | ||
LEFT JOIN ${qualifiedTable} as prev on ${ids.map(id => `prev.${id} = staging.${id}`).join(' and ')} | ||
WHERE prev.${ids[0]} is null | ||
`, done); | ||
}); | ||
// tasks.push(done => connection.query(`drop table ${stagingTbl}`, done)); | ||
// Get lower bound for natural key to avoid unnecessary scanning | ||
tasks.push(done => { | ||
connection.query(`SELECT MIN(${(sortKey != null) ? sortKey : ids[0]}) AS minid, | ||
CAST(COUNT(*) AS INT) AS cnt | ||
FROM ${qualifiedStagingTable};`, (err, results) => { | ||
if (err) { | ||
return done(err); | ||
} else { | ||
totalRecords = results[0].cnt; | ||
naturalKeyLowerBound = results[0].minid; | ||
if (naturalKeyLowerBound !== null) { | ||
if (sortKeyType === 'int4' || sortKeyType === 'int8') { | ||
naturalKeyFilter = `${results[0].minid}`; | ||
} else if (sortKeyType === 'varchar') { | ||
naturalKeyFilter = `'${results[0].minid}'`; | ||
} else if (sortKeyType === 'timestamp' && Date.parse(results[0].minid.split(' ')[1])) { | ||
naturalKeyFilter = `'${results[0].minid.split(' ')[1]}'`; | ||
}; | ||
}; | ||
done(); | ||
}; | ||
}); | ||
}); | ||
const qualifiedStagingTablePrevious = `${qualifiedStagingTable}_previous`; | ||
tempTables.push(`${qualifiedStagingTablePrevious}`); | ||
tasks.push(done => connection.query(`DROP TABLE IF EXISTS ${qualifiedStagingTablePrevious};`, done)); | ||
tasks.push(done => { | ||
connection.query(`CREATE TABLE ${qualifiedStagingTablePrevious} | ||
DISTSTYLE ALL | ||
AS SELECT * | ||
FROM ${qualifiedTable} | ||
LIMIT 0;`, done); | ||
}); | ||
// Retreive copy of existing data | ||
tasks.push(done => { | ||
connection.query(`INSERT INTO ${qualifiedStagingTablePrevious} (${columns.map(column => `${column}`).join(`, `)}, ${columnConfig._deleted}) | ||
SELECT ${columns.map(column => `${column}`).join(`, `)}, | ||
${columnConfig._deleted} | ||
FROM ${qualifiedTable} AS base | ||
WHERE EXISTS ( SELECT * | ||
FROM ${qualifiedStagingTable} AS staging | ||
WHERE ${ids.map(id => `base.${id} = staging.${id}`).join(` AND `)} | ||
${(naturalKeyFilter !== undefined) ? `AND staging.${(sortKey != null) ? sortKey : ids[0]} >= ${naturalKeyFilter}` : ``}) | ||
${(naturalKeyFilter !== undefined) ? `AND base.${(sortKey != null) ? sortKey : ids[0]} >= ${naturalKeyFilter}` : ``};`, done); | ||
}); | ||
// Merge exiting data into staged copy | ||
tasks.push(done => { | ||
connection.query(`UPDATE ${qualifiedStagingTable} AS staging | ||
SET ${columns.map(column => `${column} = COALESCE(staging.${column}, prev.${column})`).join(`,`)}, | ||
${columnConfig._deleted} = COALESCE(prev.${columnConfig._deleted}, false) | ||
FROM ${qualifiedStagingTablePrevious} AS prev | ||
WHERE ${ids.map(id => `prev.${id} = staging.${id}`).join(` AND `)}`, done); | ||
}); | ||
// Set auditdate and _deleted for stage data | ||
tasks.push(done => { | ||
connection.query(`UPDATE ${qualifiedStagingTable} | ||
SET ${columnConfig._auditdate} = ${dwClient.auditdate}, | ||
${columnConfig._deleted} = COALESCE(${columnConfig._deleted}, false); `, done); | ||
}); | ||
// Delete and reinsert data - avoids costly updates on large tables | ||
tasks.push(done => { | ||
connection.query(`DELETE FROM ${qualifiedTable} | ||
USING ${qualifiedStagingTable} | ||
WHERE ${ids.map(id => `${qualifiedTable}.${id} = ${qualifiedStagingTable}.${id}`).join(` AND `)} | ||
${(naturalKeyFilter !== undefined) ? `AND ${qualifiedTable}.${sortKey != null ? sortKey : ids[0]} >= ${naturalKeyFilter}` : ``}; `, done); | ||
}); | ||
tasks.push(done => { | ||
connection.query(`INSERT INTO ${qualifiedTable} (${columns.map(column => `${column}`).join(`, `)}, ${columnConfig._auditdate}, ${columnConfig._deleted}) | ||
SELECT ${columns.map(column => `${column}`).join(`, `)}, | ||
${columnConfig._auditdate}, | ||
${columnConfig._deleted} | ||
FROM ${qualifiedStagingTable}; `, done); | ||
}); | ||
}; | ||
async.series(tasks, err => { | ||
@@ -348,3 +213,3 @@ if (!err) { | ||
connection.release(); | ||
callback(e || err, d); | ||
callback(e, d); | ||
}); | ||
@@ -358,3 +223,3 @@ } | ||
client.importDimension = function(stream, table, sk, nk, scds, callback, tableDef = {}) { | ||
client.importDimension = function (stream, table, sk, nk, scds, callback, tableDef = {}) { | ||
const stagingTbl = `${columnConfig.stageTablePrefix}_${table}`; | ||
@@ -377,45 +242,9 @@ const qualifiedStagingTable = `${columnConfig.stageSchema}.${stagingTbl}`; | ||
let sortKey; | ||
let sortKeyType; | ||
// Prepare staging tables | ||
tempTables.push(qualifiedStagingTable); | ||
tasks.push(done => client.query(`drop table if exists ${qualifiedStagingTable}`, done)); | ||
tasks.push(done => client.query(`drop table if exists ${qualifiedStagingTable}_changes`, done)); | ||
if (config.version !== 'redshift') { | ||
tasks.push(done => client.query(`create table ${qualifiedStagingTable} (like ${qualifiedTable})`, done)); | ||
tasks.push(done => client.query(`create index ${stagingTbl}_id on ${qualifiedStagingTable} (${nk.join(', ')})`, done)); | ||
} else { | ||
// get sortkey for joins | ||
tasks.push(done => { | ||
client.query(`SELECT sortkey, | ||
sortkeytype | ||
FROM public.v_dist_sort_key | ||
WHERE table_name = '${table}';`, (err, results) => { | ||
if (err) { | ||
return done(err); | ||
} else { | ||
if (results[0].sortkey != null) { | ||
sortKey = results[0].sortkey; | ||
sortKeyType = results[0].sortkeytype; | ||
}; | ||
done(); | ||
}; | ||
}); | ||
}); | ||
tasks.push(done => | ||
// Create staging table with DISTSTYLE ALL to prevent cross talk | ||
client.query(`CREATE TABLE ${qualifiedStagingTable} | ||
DISTSTYLE ALL | ||
SORTKEY(${sortKey != null ? sortKey : nk[0]}) | ||
AS SELECT * | ||
FROM ${qualifiedTable} | ||
LIMIT 0;`, done)); | ||
}; | ||
// Retain surrogate key column when using hashed keys on redshift // column must be dropped for postgres | ||
if (!config.hashedSurrogateKeys && config.version !== 'redshift') { | ||
tasks.push(done => client.query(`alter table ${qualifiedStagingTable} drop column ${sk}`, done)); | ||
}; | ||
tasks.push(done => client.query(`create table ${qualifiedStagingTable} (like ${qualifiedTable})`, done)); | ||
tasks.push(done => client.query(`create index ${stagingTbl}_id on ${qualifiedStagingTable} (${nk.join(', ')})`, done)); | ||
tasks.push(done => client.query(`alter table ${qualifiedStagingTable} drop column ${sk}`, done)); | ||
// tasks.push(done => ls.pipe(stream, client.streamToTable(qualifiedStagingTable), done)); | ||
tasks.push(done => { | ||
@@ -431,3 +260,3 @@ ls.pipe(stream, ls.through((obj, done, push) => { | ||
} | ||
}), config.version !== 'redshift' ? client.streamToTable(qualifiedStagingTable) : client.streamToTableFromS3(qualifiedStagingTable, config), (err) => { | ||
}), client.streamToTable(qualifiedStagingTable), (err) => { | ||
if (err) { | ||
@@ -441,5 +270,3 @@ return done(err); | ||
if (config.version !== 'redshift') { | ||
tasks.push(done => client.query(`analyze ${qualifiedStagingTable}`, done)); | ||
}; | ||
tasks.push(done => client.query(`analyze ${qualifiedStagingTable}`, done)); | ||
@@ -453,2 +280,3 @@ client.describeTable(table).then(result => { | ||
// let scd0 = scds[0] || []; // Not Used | ||
let scd2 = scds[2] || []; | ||
@@ -469,191 +297,84 @@ let scd3 = scds[3] || []; | ||
if (!config.bypassSlowlyChangingDimensions) { | ||
if (scd1.length) { | ||
scdSQL.push(`CASE WHEN md5(${scd1.map(f => 'md5(coalesce(s.' + f + '::text,\'\'))').join(' || ')}) = md5(${scd1.map(f => 'md5(coalesce(d.' + f + '::text,\'\'))').join(' || ')}) THEN 0 WHEN d.${nk[0]} is null then 0 ELSE 1 END as runSCD1`); | ||
} else { | ||
scdSQL.push(`0 as runSCD1`); | ||
} | ||
if (scd2.length) { | ||
scdSQL.push(`CASE WHEN d.${nk[0]} is null then 1 WHEN md5(${scd2.map(f => 'md5(coalesce(s.' + f + '::text,\'\'))').join(' || ')}) = md5(${scd2.map(f => 'md5(coalesce(d.' + f + '::text,\'\'))').join(' || ')}) THEN 0 ELSE 1 END as runSCD2`); | ||
} else { | ||
scdSQL.push(`CASE WHEN d.${nk[0]} is null then 1 ELSE 0 END as runSCD2`); | ||
} | ||
if (scd3.length) { | ||
scdSQL.push(`CASE WHEN md5(${scd3.map(f => 'md5(coalesce(s.' + f + '::text,\'\'))').join(' || ')}) = md5(${scd3.map(f => 'md5(coalesce(d.' + f + '::text,\'\'))').join(' || ')}) THEN 0 WHEN d.${nk[0]} is null then 0 ELSE 1 END as runSCD3`); | ||
} else { | ||
scdSQL.push(`0 as runSCD3`); | ||
} | ||
if (scd6.length) { | ||
scdSQL.push(`CASE WHEN md5(${scd6.map(f => 'md5(coalesce(s.' + f + '::text,\'\'))').join(' || ')}) = md5(${scd6.map(f => 'md5(coalesce(d.' + f + '::text,\'\'))').join(' || ')}) THEN 0 WHEN d.${nk[0]} is null then 0 ELSE 1 END as runSCD6`); | ||
} else { | ||
scdSQL.push(`0 as runSCD6`); | ||
} | ||
}; | ||
let fields = [sk].concat(allColumns).concat([columnConfig._auditdate, columnConfig._startdate, columnConfig._enddate, columnConfig._current]); | ||
let tasks = []; | ||
let totalRecords = 0; | ||
if (!config.hashedSurrogateKeys && !config.bypassSlowlyChangingDimensions) { | ||
tempTables.push(`${qualifiedStagingTable}_changes`); | ||
connection.query(`create table ${qualifiedStagingTable}_changes as | ||
select ${nk.map(id => `s.${id}`).join(', ')}, d.${nk[0]} is null as isNew, | ||
${scdSQL.join(',\n')} | ||
FROM ${qualifiedStagingTable} s | ||
LEFT JOIN ${qualifiedTable} d on ${nk.map(id => `d.${id} = s.${id}`).join(' and ')} and d.${columnConfig._current}`, (err) => { | ||
if (err) { | ||
logger.error(err); | ||
process.exit(); | ||
} | ||
let rowId = null; | ||
tasks.push(done => connection.query(`analyze ${qualifiedStagingTable}_changes`, done)); | ||
tasks.push(done => { | ||
connection.query(`select max(${sk}) as maxid from ${qualifiedTable}`, (err, results) => { | ||
if (err) { | ||
return done(err); | ||
} | ||
rowId = results[0].maxid || 10000; | ||
totalRecords = (rowId - 10000); | ||
done(); | ||
}); | ||
}); | ||
// The following code relies on the fact that now()/sysdate will return the same time during all transaction events | ||
tasks.push(done => connection.query(`Begin Transaction`, done)); | ||
tasks.push(done => { | ||
connection.query(`INSERT INTO ${qualifiedTable} (${fields.join(',')}) | ||
SELECT row_number() over() + ${rowId}, ${allColumns.map(column => `coalesce(staging.${column}, prev.${column})`)}, ${dwClient.auditdate} as ${columnConfig._auditdate}, | ||
case when changes.isNew then '1900-01-01 00:00:00' else ${config.version === 'redshift' ? 'sysdate' : 'now()'} END as ${columnConfig._startdate}, | ||
'9999-01-01 00:00:00' as ${columnConfig._enddate}, | ||
true as ${columnConfig._current} | ||
FROM ${qualifiedStagingTable}_changes changes | ||
JOIN ${qualifiedStagingTable} staging on ${nk.map(id => `staging.${id} = changes.${id}`).join(' and ')} | ||
LEFT JOIN ${qualifiedTable} as prev on ${nk.map(id => `prev.${id} = changes.${id}`).join(' and ')} and prev.${columnConfig._current} | ||
WHERE (changes.runSCD2 =1 OR changes.runSCD6=1) | ||
`, done); | ||
}); | ||
// This needs to be done last | ||
tasks.push(done => { | ||
// RUN SCD1 / SCD6 columns (where we update the old records) | ||
let columns = scd1.map(column => `"${column}" = coalesce(staging."${column}", prev."${column}")`).concat(scd6.map(column => `"current_${column}" = coalesce(staging."${column}", prev."${column}")`)); | ||
columns.push(`"${columnConfig._enddate}" = case when changes.runSCD2 =1 then ${config.version === 'redshift' ? 'sysdate' : 'now()'} else prev."${columnConfig._enddate}" END`); | ||
columns.push(`"${columnConfig._current}" = case when changes.runSCD2 =1 then false else prev."${columnConfig._current}" END`); | ||
columns.push(`"${columnConfig._auditdate}" = ${dwClient.auditdate}`); | ||
connection.query(`update ${qualifiedTable} as prev | ||
set ${columns.join(', ')} | ||
FROM ${qualifiedStagingTable}_changes changes | ||
JOIN ${qualifiedStagingTable} staging on ${nk.map(id => `staging.${id} = changes.${id}`).join(' and ')} | ||
where ${nk.map(id => `prev.${id} = changes.${id}`).join(' and ')} and prev.${columnConfig._startdate} != ${config.version === 'redshift' ? 'sysdate' : 'now()'} and changes.isNew = false /*Need to make sure we are only updating the ones not just inserted through SCD2 otherwise we run into issues with multiple rows having .${columnConfig._current}*/ | ||
and (changes.runSCD1=1 OR changes.runSCD6=1 OR changes.runSCD2=1) | ||
`, done); | ||
}); | ||
async.series(tasks, err => { | ||
if (!err) { | ||
connection.query(`commit`, e => { | ||
connection.release(); | ||
callback(e || err, { | ||
count: totalRecords, | ||
}); | ||
}); | ||
} else { | ||
connection.query(`rollback`, (e, d) => { | ||
connection.release(); | ||
callback(e || err, d); | ||
}); | ||
} | ||
}); | ||
}); | ||
// if (!scd2.length && !scd3.length && !scd6.length) { | ||
// scdSQL.push(`1 as runSCD1`); | ||
// } else | ||
if (scd1.length) { | ||
scdSQL.push(`CASE WHEN md5(${scd1.map(f => 'md5(coalesce(s.' + f + '::text,\'\'))').join(' || ')}) = md5(${scd1.map(f => 'md5(coalesce(d.' + f + '::text,\'\'))').join(' || ')}) THEN 0 WHEN d.${nk[0]} is null then 0 ELSE 1 END as runSCD1`); | ||
} else { | ||
let naturalKeyLowerBound; | ||
let naturalKeyFilter; | ||
scdSQL.push(`0 as runSCD1`); | ||
} | ||
if (scd2.length) { | ||
scdSQL.push(`CASE WHEN d.${nk[0]} is null then 1 WHEN md5(${scd2.map(f => 'md5(coalesce(s.' + f + '::text,\'\'))').join(' || ')}) = md5(${scd2.map(f => 'md5(coalesce(d.' + f + '::text,\'\'))').join(' || ')}) THEN 0 ELSE 1 END as runSCD2`); | ||
} else { | ||
scdSQL.push(`CASE WHEN d.${nk[0]} is null then 1 ELSE 0 END as runSCD2`); | ||
} | ||
if (scd3.length) { | ||
scdSQL.push(`CASE WHEN md5(${scd3.map(f => 'md5(coalesce(s.' + f + '::text,\'\'))').join(' || ')}) = md5(${scd3.map(f => 'md5(coalesce(d.' + f + '::text,\'\'))').join(' || ')}) THEN 0 WHEN d.${nk[0]} is null then 0 ELSE 1 END as runSCD3`); | ||
} else { | ||
scdSQL.push(`0 as runSCD3`); | ||
} | ||
if (scd6.length) { | ||
scdSQL.push(`CASE WHEN md5(${scd6.map(f => 'md5(coalesce(s.' + f + '::text,\'\'))').join(' || ')}) = md5(${scd6.map(f => 'md5(coalesce(d.' + f + '::text,\'\'))').join(' || ')}) THEN 0 WHEN d.${nk[0]} is null then 0 ELSE 1 END as runSCD6`); | ||
} else { | ||
scdSQL.push(`0 as runSCD6`); | ||
} | ||
// Get lower bound for natural key to avoid unnecessary scanning | ||
// let's figure out which SCDs needs to happen | ||
tempTables.push(`${qualifiedStagingTable}_changes`); | ||
connection.query(`create table ${qualifiedStagingTable}_changes as | ||
select ${nk.map(id => `s.${id}`).join(', ')}, d.${nk[0]} is null as isNew, | ||
${scdSQL.join(',\n')} | ||
FROM ${qualifiedStagingTable} s | ||
LEFT JOIN ${qualifiedTable} d on ${nk.map(id => `d.${id} = s.${id}`).join(' and ')} and d.${columnConfig._current}`, (err) => { | ||
if (err) { | ||
logger.error(err); | ||
process.exit(); | ||
} | ||
let tasks = []; | ||
let rowId = null; | ||
let totalRecords = 0; | ||
tasks.push(done => connection.query(`analyze ${qualifiedStagingTable}_changes`, done)); | ||
tasks.push(done => { | ||
connection.query(`SELECT MIN(${(sortKey != null) ? sortKey : nk[0]}) AS minid, | ||
CAST(COUNT(*) AS INT) AS cnt | ||
FROM ${qualifiedStagingTable};`, (err, results) => { | ||
connection.query(`select max(${sk}) as maxid from ${qualifiedTable}`, (err, results) => { | ||
if (err) { | ||
return done(err); | ||
} else { | ||
totalRecords = results[0].cnt; | ||
naturalKeyLowerBound = results[0].minid; | ||
if (naturalKeyLowerBound !== null) { | ||
if (sortKeyType === 'int4' || sortKeyType === 'int8') { | ||
naturalKeyFilter = `${results[0].minid}`; | ||
} else if (sortKeyType === 'varchar') { | ||
naturalKeyFilter = `'${results[0].minid}'`; | ||
} else if (sortKeyType === 'timestamp' && Date.parse(results[0].minid.split(' ')[1])) { | ||
naturalKeyFilter = `'${results[0].minid.split(' ')[1]}'`; | ||
}; | ||
}; | ||
done(); | ||
}; | ||
} | ||
rowId = results[0].maxid || 10000; | ||
totalRecords = (rowId - 10000); | ||
done(); | ||
}); | ||
}); | ||
const qualifiedStagingTablePrevious = `${qualifiedStagingTable}_previous`; | ||
tempTables.push(`${qualifiedStagingTablePrevious}`); | ||
tasks.push(done => connection.query(`DROP TABLE IF EXISTS ${qualifiedStagingTablePrevious};`, done)); | ||
tasks.push(done => { | ||
connection.query(`CREATE TABLE ${qualifiedStagingTablePrevious} | ||
DISTSTYLE ALL | ||
AS SELECT * | ||
FROM ${qualifiedTable} | ||
LIMIT 0;`, done); | ||
}); | ||
// The following code relies on the fact that now() will return the same time during all transaction events | ||
tasks.push(done => connection.query(`Begin Transaction`, done)); | ||
// Set auditdate and surrogate key for stage data (not sure if the sk is actually necessary) | ||
tasks.push(done => { | ||
connection.query(`UPDATE ${qualifiedStagingTable} | ||
SET ${columnConfig._auditdate} = ${dwClient.auditdate}, | ||
${sk} = farmFingerPrint64(${nk.map(id => `${id}`).join(`|| '-' ||`)}); `, done); | ||
let fields = [sk].concat(allColumns).concat([columnConfig._auditdate, columnConfig._startdate, columnConfig._enddate, columnConfig._current]); | ||
connection.query(`INSERT INTO ${qualifiedTable} (${fields.join(',')}) | ||
SELECT row_number() over () + ${rowId}, ${allColumns.map(column => `coalesce(staging.${column}, prev.${column})`)}, ${dwClient.auditdate} as ${columnConfig._auditdate}, case when changes.isNew then '1900-01-01 00:00:00' else now() END as ${columnConfig._startdate}, '9999-01-01 00:00:00' as ${columnConfig._enddate}, true as ${columnConfig._current} | ||
FROM ${qualifiedStagingTable}_changes changes | ||
JOIN ${qualifiedStagingTable} staging on ${nk.map(id => `staging.${id} = changes.${id}`).join(' and ')} | ||
LEFT JOIN ${qualifiedTable} as prev on ${nk.map(id => `prev.${id} = changes.${id}`).join(' and ')} and prev.${columnConfig._current} | ||
WHERE (changes.runSCD2 =1 OR changes.runSCD6=1) | ||
`, done); | ||
}); | ||
tasks.push(done => connection.query(`BEGIN TRANSACTION;`, done)); | ||
// Retreive copy of existing data | ||
// This needs to be done last | ||
tasks.push(done => { | ||
connection.query(`INSERT INTO ${qualifiedStagingTablePrevious}(${fields.map(column => `${column}`).join(`, `)}) | ||
SELECT ${fields.map(column => `${column}`).join(`, `)} | ||
FROM ${qualifiedTable} AS base | ||
WHERE EXISTS(SELECT * | ||
FROM ${qualifiedStagingTable} AS staging | ||
WHERE base.${sk} = staging.${sk} | ||
${(naturalKeyFilter !== undefined) ? `AND staging.${(sortKey != null) ? sortKey : nk[0]} >= ${naturalKeyFilter}` : ``}) | ||
${(naturalKeyFilter !== undefined) ? `AND base.${(sortKey != null) ? sortKey : nk[0]} >= ${naturalKeyFilter}` : ``}; `, done); | ||
// RUN SCD1 / SCD6 columns (where we update the old records) | ||
let columns = scd1.map(column => `"${column}" = coalesce(staging."${column}", prev."${column}")`).concat(scd6.map(column => `"current_${column}" = coalesce(staging."${column}", prev."${column}")`)); | ||
columns.push(`"${columnConfig._enddate}" = case when changes.runSCD2 =1 then now() else prev."${columnConfig._enddate}" END`); | ||
columns.push(`"${columnConfig._current}" = case when changes.runSCD2 =1 then false else prev."${columnConfig._current}" END`); | ||
columns.push(`"${columnConfig._auditdate}" = ${dwClient.auditdate}`); | ||
connection.query(`update ${qualifiedTable} as prev | ||
set ${columns.join(', ')} | ||
FROM ${qualifiedStagingTable}_changes changes | ||
JOIN ${qualifiedStagingTable} staging on ${nk.map(id => `staging.${id} = changes.${id}`).join(' and ')} | ||
where ${nk.map(id => `prev.${id} = changes.${id}`).join(' and ')} and prev.${columnConfig._startdate} != now() and changes.isNew = false /*Need to make sure we are only updating the ones not just inserted through SCD2 otherwise we run into issues with multiple rows having .${columnConfig._current}*/ | ||
and (changes.runSCD1=1 OR changes.runSCD6=1 OR changes.runSCD2=1) | ||
`, done); | ||
}); | ||
// Merge exiting data into staged copy | ||
tasks.push(done => { | ||
connection.query(`UPDATE ${qualifiedStagingTable} AS staging | ||
SET ${fields.map(column => `${column} = COALESCE(staging.${column}, prev.${column})`).join(`, `)} | ||
FROM ${qualifiedStagingTablePrevious} AS prev | ||
WHERE staging.${sk} = prev.${sk}`, done); | ||
}); | ||
// Set default SCD values | ||
tasks.push(done => { | ||
connection.query(`UPDATE ${qualifiedStagingTable} | ||
SET ${columnConfig._startdate} = COALESCE(${columnConfig._startdate},'1900-01-01 00:00:00'), | ||
${columnConfig._enddate} = COALESCE(${columnConfig._enddate},'9999-01-01 00:00:00'), | ||
${columnConfig._current} = COALESCE(${columnConfig._current},true);`, done); | ||
}); | ||
// Delete and reinsert data - avoids costly updates on large tables | ||
tasks.push(done => { | ||
connection.query(`DELETE FROM ${qualifiedTable} | ||
USING ${qualifiedStagingTable} | ||
WHERE ${qualifiedTable}.${sk} = ${qualifiedStagingTable}.${sk} | ||
${(naturalKeyFilter !== undefined) ? `AND ${qualifiedTable}.${sortKey != null ? sortKey : nk[0]} >= ${naturalKeyFilter}` : ``}; `, done); | ||
}); | ||
tasks.push(done => { | ||
connection.query(`INSERT INTO ${qualifiedTable} (${fields.map(column => `${column}`).join(`, `)}) | ||
SELECT ${fields.map(column => `${column}`).join(`, `)} | ||
FROM ${qualifiedStagingTable}; `, done); | ||
}); | ||
// tasks.push(done => connection.query(`drop table ${qualifiedStagingTable}_changes`, done)); | ||
// tasks.push(done => connection.query(`drop table ${qualifiedStagingTable}`, done)); | ||
async.series(tasks, err => { | ||
@@ -670,7 +391,7 @@ if (!err) { | ||
connection.release(); | ||
callback(e || err, d); | ||
callback(e, d); | ||
}); | ||
} | ||
}); | ||
}; | ||
}); | ||
}); | ||
@@ -681,66 +402,62 @@ }).catch(callback); | ||
client.insertMissingDimensions = function(usedTables, tableConfig, tableSks, tableNks, callback) { | ||
if (config.hashedSurrogateKeys) { | ||
callback(null); | ||
} else { | ||
let unions = {}; | ||
let isDate = { | ||
d_date: true, | ||
d_datetime: true, | ||
d_time: true, | ||
date: true, | ||
datetime: true, | ||
dim_date: true, | ||
dim_datetime: true, | ||
dim_time: true, | ||
time: true, | ||
}; | ||
Object.keys(usedTables).map(table => { | ||
Object.keys(tableConfig[table].structure).map(column => { | ||
let field = tableConfig[table].structure[column]; | ||
if (field.dimension && !isDate[field.dimension]) { | ||
if (!(unions[field.dimension])) { | ||
unions[field.dimension] = []; | ||
} | ||
if (typeof tableNks[field.dimension] === 'undefined') { | ||
throw new Error(`${field.dimension} not found in tableNks`); | ||
} | ||
let dimTableNk = tableNks[field.dimension][0]; | ||
unions[field.dimension].push(`select ${table}.${column} as id from ${table} left join ${field.dimension} on ${field.dimension}.${dimTableNk} = ${table}.${column} where ${field.dimension}.${dimTableNk} is null and ${table}.${columnConfig._auditdate} = ${dwClient.auditdate}`); | ||
client.insertMissingDimensions = function (usedTables, tableConfig, tableSks, tableNks, callback) { | ||
let unions = {}; | ||
let isDate = { | ||
d_date: true, | ||
d_datetime: true, | ||
d_time: true, | ||
date: true, | ||
datetime: true, | ||
dim_date: true, | ||
dim_datetime: true, | ||
dim_time: true, | ||
time: true, | ||
}; | ||
Object.keys(usedTables).map(table => { | ||
Object.keys(tableConfig[table].structure).map(column => { | ||
let field = tableConfig[table].structure[column]; | ||
if (field.dimension && !isDate[field.dimension]) { | ||
if (!(unions[field.dimension])) { | ||
unions[field.dimension] = []; | ||
} | ||
}); | ||
if (typeof tableNks[field.dimension] === 'undefined') { | ||
throw new Error(`${field.dimension} not found in tableNks`); | ||
} | ||
let dimTableNk = tableNks[field.dimension][0]; | ||
unions[field.dimension].push(`select ${table}.${column} as id from ${table} left join ${field.dimension} on ${field.dimension}.${dimTableNk} = ${table}.${column} where ${field.dimension}.${dimTableNk} is null and ${table}.${columnConfig._auditdate} = ${dwClient.auditdate}`); | ||
} | ||
}); | ||
let missingDimTasks = Object.keys(unions).map(table => { | ||
let sk = tableSks[table]; | ||
let nk = tableNks[table][0]; | ||
return (callback) => { | ||
let done = (err, data) => { | ||
trx && trx.release(); | ||
callback(err, data); | ||
}; | ||
let trx; | ||
client.connect().then(transaction => { | ||
trx = transaction; | ||
transaction.query(`select max(${sk}) as maxid from ${table}`, (err, results) => { | ||
if (err) { | ||
return done(err); | ||
} | ||
let rowId = results[0].maxid || 10000; | ||
let _auditdate = dwClient.auditdate; | ||
let unionQuery = unions[table].join('\nUNION\n'); | ||
transaction.query(`insert into ${table} (${sk}, ${nk}, ${columnConfig._auditdate}, ${columnConfig._startdate}, ${columnConfig._enddate}, ${columnConfig._current}) select row_number() over() + ${rowId}, sub.id, max(${_auditdate})::timestamp, '1900-01-01 00:00:00', '9999-01-01 00:00:00', true from(${unionQuery}) as sub where sub.id is not null group by sub.id`, (err) => { | ||
done(err); | ||
}); | ||
}); | ||
let missingDimTasks = Object.keys(unions).map(table => { | ||
let sk = tableSks[table]; | ||
let nk = tableNks[table][0]; | ||
return (callback) => { | ||
let done = (err, data) => { | ||
trx && trx.release(); | ||
callback(err, data); | ||
}; | ||
let trx; | ||
client.connect().then(transaction => { | ||
trx = transaction; | ||
transaction.query(`select max(${sk}) as maxid from ${table}`, (err, results) => { | ||
if (err) { | ||
return done(err); | ||
} | ||
let rowId = results[0].maxid || 10000; | ||
let _auditdate = dwClient.auditdate; // results[0]._auditdate ? `'${results[0]._auditdate.replace(/^\d* */,"")}'` : "now()"; | ||
let unionQuery = unions[table].join('\nUNION\n'); | ||
transaction.query(`insert into ${table} (${sk}, ${nk}, ${columnConfig._auditdate}, ${columnConfig._startdate}, ${columnConfig._enddate}, ${columnConfig._current}) select row_number() over () + ${rowId}, sub.id, max(${_auditdate})::timestamp, '1900-01-01 00:00:00', '9999-01-01 00:00:00', true from (${unionQuery}) as sub where sub.id is not null group by sub.id`, (err) => { | ||
done(err); | ||
}); | ||
}).catch(done); | ||
}; | ||
}); | ||
async.parallelLimit(missingDimTasks, 10, (missingDimError) => { | ||
logger.info(`Missing Dimensions ${!missingDimError && 'Inserted'} ----------------------------`, missingDimError || ''); | ||
callback(missingDimError); | ||
}); | ||
}; | ||
}); | ||
}).catch(done); | ||
}; | ||
}); | ||
async.parallelLimit(missingDimTasks, 10, (missingDimError) => { | ||
logger.info(`Missing Dimensions ${!missingDimError && 'Inserted'} ----------------------------`, missingDimError || ''); | ||
callback(missingDimError); | ||
}); | ||
}; | ||
client.linkDimensions = function(table, links, nk, callback, tableStatus) { | ||
client.linkDimensions = function (table, links, nk, callback, tableStatus) { | ||
client.describeTable(table).then(() => { | ||
@@ -750,55 +467,8 @@ let tasks = []; | ||
const qualifiedTable = `public.${table}`; | ||
const linkAuditdate = client.escapeValueNoToLower(new Date().toISOString().replace(/\.\d*Z/, 'Z')); | ||
// Only run analyze on the table if this is the first load | ||
if (tableStatus === 'First Load' && config.version !== 'redshift') { | ||
tasks.push(done => client.query(`ANALYZE ${table}`, done)); | ||
if (tableStatus === 'First Load') { | ||
tasks.push(done => client.query(`analyze ${table}`, done)); | ||
} | ||
const qualifiedStagingTable = `${columnConfig.stageSchema}.${columnConfig.stageTablePrefix}_${table}`; | ||
let naturalKeyLowerBound; | ||
let naturalKeyFilter; | ||
let sortKey; | ||
let sortKeyType; | ||
// Get lower bound for natural key to avoid unnecessary scanning | ||
if (config.version === 'redshift') { | ||
tasks.push(done => { | ||
client.query(`SELECT sortkey, | ||
sortkeytype | ||
FROM public.v_dist_sort_key | ||
WHERE table_name = '${table}';`, (err, results) => { | ||
if (err) { | ||
return done(err); | ||
} else { | ||
if (results[0].sortKey != null) { | ||
sortKey = results[0].sortkey; | ||
sortKeyType = results[0].sortkeytype; | ||
}; | ||
done(); | ||
}; | ||
}); | ||
}); | ||
tasks.push(done => { | ||
client.query(`SELECT MIN(${(sortKey != null) ? sortKey : nk[0]}) AS minid | ||
FROM ${qualifiedStagingTable}; `, (err, results) => { | ||
if (err) { | ||
return done(err); | ||
} else { | ||
naturalKeyLowerBound = results[0].minid; | ||
if (naturalKeyLowerBound !== null) { | ||
if (sortKeyType === 'int4' || sortKeyType === 'int8') { | ||
naturalKeyFilter = `${results[0].minid}`; | ||
} else if (sortKeyType === 'varchar') { | ||
naturalKeyFilter = `'${results[0].minid}'`; | ||
} else if (sortKeyType === 'timestamp' && Date.parse(results[0].minid.split(' ')[1])) { | ||
naturalKeyFilter = `'${results[0].minid.split(' ')[1]}'`; | ||
}; | ||
}; | ||
done(); | ||
}; | ||
}); | ||
}); | ||
}; | ||
tasks.push(done => { | ||
@@ -808,23 +478,17 @@ let joinTables = links.map(link => { | ||
sets.push(`${link.destination}_date = coalesce(t.${link.source}::date - '1400-01-01'::date + 10000, 1)`); | ||
sets.push(`${link.destination}_time = coalesce(EXTRACT(EPOCH FROM ${(config.version !== 'redshift') ? `` : `'1970-01-01'::date +`} t.${link.source}::time) + 10000, 1)`); | ||
sets.push(`${link.destination}_time = coalesce(EXTRACT(EPOCH from t.${link.source}::time) + 10000, 1)`); | ||
} else if (columnConfig.useSurrogateDateKeys && (link.table === 'd_date' || link.table === 'date' || link.table === 'dim_date')) { | ||
sets.push(`${link.destination}_date = coalesce(t.${link.source}::date - '1400-01-01'::date + 10000, 1)`); | ||
} else if (columnConfig.useSurrogateDateKeys && (link.table === 'd_time' || link.table === 'time' || link.table === 'dim_time')) { | ||
sets.push(`${link.destination}_time = coalesce(EXTRACT(EPOCH FROM ${(config.version !== 'redshift') ? `` : `'1970-01-01'::date +`} t.${link.source}::time) + 10000, 1)`); | ||
sets.push(`${link.destination}_time = coalesce(EXTRACT(EPOCH from t.${link.source}::time) + 10000, 1)`); | ||
} else { | ||
if (config.hashedSurrogateKeys) { | ||
sets.push(`${link.destination} = coalesce(farmFingerPrint64(t.${link.source}), 1)`); | ||
return ``; | ||
} else { | ||
sets.push(`${link.destination} = coalesce(${link.join_id}_join_table.${link.sk}, 1)`); | ||
var joinOn = `${link.join_id}_join_table.${link.on} = t.${link.source} `; | ||
if (Array.isArray(link.source)) { | ||
joinOn = link.source.map((v, i) => `${link.join_id}_join_table.${link.on[i]} = t.${v}`).join(' AND '); | ||
} | ||
return `LEFT JOIN ${link.table} ${link.join_id}_join_table | ||
ON ${joinOn} | ||
AND t.${link.link_date} >= ${link.join_id}_join_table.${columnConfig._startdate} | ||
AND (t.${link.link_date} <= ${link.join_id}_join_table.${columnConfig._enddate} or ${link.join_id}_join_table.${columnConfig._current})`; | ||
}; | ||
sets.push(`${link.destination} = coalesce(${link.join_id}_join_table.${link.sk}, 1)`); | ||
var joinOn = `${link.join_id}_join_table.${link.on} = t.${link.source}`; | ||
if (Array.isArray(link.source)) { | ||
joinOn = link.source.map((v, i) => `${link.join_id}_join_table.${link.on[i]} = t.${v}`).join(' AND '); | ||
} | ||
return `LEFT JOIN ${link.table} ${link.join_id}_join_table | ||
on ${joinOn} | ||
and t.${link.link_date} >= ${link.join_id}_join_table.${columnConfig._startdate} | ||
and (t.${link.link_date} <= ${link.join_id}_join_table.${columnConfig._enddate} or ${link.join_id}_join_table.${columnConfig._current})`; | ||
} | ||
@@ -834,9 +498,9 @@ }); | ||
if (sets.length) { | ||
client.query(`UPDATE ${table} dm | ||
SET ${sets.join(', ')}, ${columnConfig._auditdate} = ${linkAuditdate} | ||
FROM ${table} t | ||
${config.hashedSurrogateKeys ? '' : joinTables.join('\n')} | ||
WHERE ${nk.map(id => `dm.${id} = t.${id}`).join(' AND ')} | ||
AND dm.${columnConfig._auditdate} = ${dwClient.auditdate} AND t.${columnConfig._auditdate} = ${dwClient.auditdate} | ||
${(naturalKeyFilter !== undefined) ? `AND t.${(sortKey != null) ? sortKey : nk[0]} >= ${naturalKeyFilter}` : ``}`, done); | ||
client.query(`Update ${table} dm | ||
SET ${sets.join(', ')}, ${columnConfig._auditdate} = ${linkAuditdate} | ||
FROM ${table} t | ||
${joinTables.join('\n')} | ||
where ${nk.map(id => `dm.${id} = t.${id}`).join(' and ')} | ||
AND dm.${columnConfig._auditdate} = ${dwClient.auditdate} AND t.${columnConfig._auditdate} = ${dwClient.auditdate} | ||
`, done); | ||
} else { | ||
@@ -854,3 +518,3 @@ done(); | ||
client.changeTableStructure = async function(structures) { | ||
client.changeTableStructure = async function (structures) { | ||
let tasks = []; | ||
@@ -880,3 +544,3 @@ let tableResults = {}; | ||
missingFields[missingDim] = { | ||
type: (config.hashedSurrogateKeys) ? 'bigint' : 'integer', | ||
type: 'integer', | ||
}; | ||
@@ -923,3 +587,3 @@ } | ||
client.createTable = async function(table, definition) { | ||
client.createTable = async function (table, definition) { | ||
let fields = []; | ||
@@ -940,3 +604,3 @@ let defaults = []; | ||
sk: true, | ||
type: `${(config.hashedSurrogateKeys) ? 'bigint' : 'integer'} primary key`, | ||
type: 'integer primary key', | ||
}; | ||
@@ -990,3 +654,3 @@ } else if (typeof field === 'string') { | ||
} else if (field.dimension) { | ||
fields.push(`${columnConfig.dimColumnTransform(key, field)} ${(config.hashedSurrogateKeys) ? 'bigint' : 'integer'}`); | ||
fields.push(`${columnConfig.dimColumnTransform(key, field)} integer`); | ||
defaults.push({ | ||
@@ -1005,3 +669,3 @@ column: columnConfig.dimColumnTransform(key, field), | ||
let sql = `create table ${table} ( | ||
${fields.join(',\n')} | ||
${fields.join(',\n')} | ||
)`; | ||
@@ -1030,3 +694,3 @@ | ||
column: columnConfig._auditdate, | ||
value: config.version === 'redshift' ? 'sysdate' : 'now()', | ||
value: 'now()', | ||
}, { | ||
@@ -1066,3 +730,3 @@ column: columnConfig._startdate, | ||
}; | ||
client.updateTable = async function(table, definition) { | ||
client.updateTable = async function (table, definition) { | ||
let fields = []; | ||
@@ -1074,3 +738,3 @@ let queries = []; | ||
field = { | ||
type: `${(config.hashedSurrogateKeys) ? 'bigint' : 'integer'} primary key`, | ||
type: 'integer primary key', | ||
}; | ||
@@ -1111,3 +775,3 @@ } else if (typeof field === 'string') { | ||
add column ${fields.join(',\n add column ')} | ||
`]; | ||
`]; | ||
@@ -1124,3 +788,3 @@ // redshift doesn't support multi 'add column' in one query | ||
return new Promise(resolve => { | ||
async.eachSeries(sqls, function(sql, done) { | ||
async.eachSeries(sqls, function (sql, done) { | ||
client.query(sql, err => done(err)); | ||
@@ -1137,3 +801,3 @@ }, err => { | ||
client.findAuditDate = function(table, callback) { | ||
client.findAuditDate = function (table, callback) { | ||
client.query(`select to_char(max(${columnConfig._auditdate}), 'YYYY-MM-DD HH24:MI:SS') as max FROM ${client.escapeId(table)}`, (err, auditdate) => { | ||
@@ -1155,3 +819,3 @@ if (err) { | ||
client.exportChanges = function(table, fields, remoteAuditdate, opts, callback) { | ||
client.exportChanges = function (table, fields, remoteAuditdate, opts, callback) { | ||
let auditdateCompare = remoteAuditdate.auditdate != null ? `${columnConfig._auditdate} >= ${client.escapeValue(remoteAuditdate.auditdate)}` : `${columnConfig._auditdate} is null`; | ||
@@ -1175,3 +839,3 @@ client.query(`select count(*) as count FROM ${client.escapeId(table)} WHERE ${auditdateCompare}`, (err, result) => { | ||
${where} | ||
`, (err, result) => { | ||
`, (err, result) => { | ||
if (err) { | ||
@@ -1225,3 +889,3 @@ callback(err); | ||
client.importChanges = function(file, table, fields, opts, callback) { | ||
client.importChanges = function (file, table, fields, opts, callback) { | ||
if (typeof opts === 'function') { | ||
@@ -1287,6 +951,6 @@ callback = opts; | ||
} | ||
tasks.push(function(done) { | ||
tasks.push(function (done) { | ||
client.query(`insert into ${tableName} select * from ${qualifiedStagingTable}`, done); | ||
}); | ||
tasks.push(function(done) { | ||
tasks.push(function (done) { | ||
client.query(`select count(*) from ${qualifiedStagingTable}`, (err, result) => { | ||
@@ -1297,3 +961,3 @@ loadCount = result && parseInt(result[0].count); | ||
}); | ||
tasks.push(function(done) { | ||
tasks.push(function (done) { | ||
client.query(`drop table if exists ${qualifiedStagingTable}`, done); | ||
@@ -1300,0 +964,0 @@ }); |
{ | ||
"name": "leo-connector-postgres", | ||
"version": "4.0.21-beta", | ||
"version": "5.0.0-awsv3", | ||
"description": "A Postgres database connector for use with Leo Platform", | ||
@@ -22,9 +22,9 @@ "repository": { | ||
"fast-csv": "2.4.1", | ||
"leo-connector-common": "4.0.11-beta", | ||
"pg": "7.8.2", | ||
"leo-connector-common": "^5.0.0-awsv3", | ||
"pg": "^8.11.3", | ||
"pg-copy-streams": "2.2.2", | ||
"pg-format": "1.0.4" | ||
"pg-format": "1.0.4", | ||
"pg-pool": "^3.6.1" | ||
}, | ||
"devDependencies": { | ||
"@dsco/layer-leo": "2.1.29", | ||
"chai": "^4.2.0", | ||
@@ -31,0 +31,0 @@ "eslint": "^5.13.0", |
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 1 instance in 1 package
4
-20%2
-33.33%141492
-11.61%6
20%4203
-9.9%+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
Updated