🚀 Big News: Socket Acquires Coana to Bring Reachability Analysis to Every Appsec Team.Learn more →

leo-connector-postgres

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

leo-connector-postgres - npm Package Compare versions

Comparing version

to
5.0.0-awsv3

@@ -14,6 +14,2 @@ const {

// need to confirm there is no issue adding these dependencies
const leo = require('leo-sdk');
const ls = leo.streams;
require('pg').types.setTypeParser(1114, (val) => {

@@ -25,2 +21,4 @@ val += 'Z';

const ls = require('leo-sdk').streams;
let queryCount = 0;

@@ -578,172 +576,4 @@ module.exports = function (config) {

},
streamToTableFromS3: (table, config) => {
const ts = table.split('.');
let schema = 'public';
let shortTable = table;
if (ts.length > 1) {
schema = ts[0];
shortTable = ts[1];
}
let columns = [];
let stream;
let myClient = null;
let pending = null;
let ended = false;
let csvopts = { delimiter: '|' };
let keepS3Files = config.keepS3Files != null ? config.keepS3Files : false;
// Get prefix for S3 file path
let s3prefix =
config.s3prefix ||
process.env.AWS_LAMBDA_FUNCTION_NAME ||
'dw_redshift_ingest';
s3prefix = s3prefix.replace(/^\/*(.*?)\/*$/, '$1'); // Remove leading and trailing '/'
// clean audit date to use in S3 file path
let cleanAuditDate = client.auditdate
.replace(/'/g, '')
.replace(/:/g, '-');
let s3FileName = `files/${s3prefix}/${cleanAuditDate}/${table}.csv`;
client.connect().then(
c => {
client
.describeTable(shortTable, schema)
.then(result => {
if (ended) {
c.release(true);
return;
}
columns = result.map(f => f.column_name);
myClient = c;
stream = ls.toS3(leo.configuration.resources.LeoS3, s3FileName);
// copyFrom uses `end` but s3 `finish` so pipe finish to end
stream.on('finish', () => stream.emit('end'));
stream.on('error', function(err) {
logger.error(`COPY error: ${err.where}`, err);
process.exit();
});
if (pending) {
pending();
}
})
.catch(err => {
logger.error(err);
});
},
err => {
logger.error(err);
}
);
let count = 0;
function nonNull(v) {
if (v === '' || v === null || v === undefined) {
return '\\N';
} else if (typeof v === 'string' && (v.search(/\r/) !== -1 || v.search(/\n/) !== -1)) {
if (config.version !== 'redshift') {
return v.replace(/\r\n?/g, '\n');
} else {
return v.replace(/\r\n?/g, '\n').replace(/\n/g, `\\n`);
}
} else {
return v;
}
}
return ls.pipeline(
csv.createWriteStream({
...csvopts,
headers: false,
transform: (row, done) => {
if (!myClient) {
pending = () => {
done(
null,
columns.map(f => nonNull(row[f]))
);
};
} else {
done(
null,
columns.map(f => nonNull(row[f]))
);
}
},
}),
ls.write(
(r, done) => {
count++;
if (count % 10000 === 0) {
logger.info(table + ': ' + count);
}
if (!stream.write(r)) {
stream.once('drain', done);
} else {
done(null);
}
},
done => {
ended = true;
logger.debug(table + ': stream done');
if (stream) {
stream.on('end', err => {
logger.debug(table + ': stream ended', err || '');
// wrap done callback to release the connection
function innerDone(err) {
myClient.release(true);
logger.debug(table + ': stream client released', err || '');
done(err);
}
if (err) {
innerDone(err);
} else {
// Once the S3 file is complete run copy to load the staging table
let f = columns.map(f => `"${f}"`);
let file = `s3://${leo.configuration.s3}/${s3FileName}`;
let manifest = '';
let role = config.loaderRole;
myClient.query(
`copy ${table} (${f}) from '${file}' ${manifest} ${role ? `credentials 'aws_iam_role=${role}'` : ''
} NULL AS '\\\\N' format csv DELIMITER '|' ACCEPTINVCHARS TRUNCATECOLUMNS ACCEPTANYDATE TIMEFORMAT 'auto' COMPUPDATE OFF`,
copyErr => {
if (keepS3Files) {
innerDone(copyErr);
} else {
// Delete the S3 files when done
ls.s3.deleteObject(
{
Bucket: leo.configuration.s3,
Key: s3FileName,
},
deleteError => {
if (deleteError) {
logger.info(
'file failed to delete:',
s3FileName,
deleteError
);
}
innerDone(copyErr);
}
);
}
}
);
}
});
stream.end();
} else {
done();
}
}
)
);
streamToTableFromS3: () => {
// opts = Object.assign({}, opts || {});
},

@@ -750,0 +580,0 @@ };

@@ -7,3 +7,3 @@ 'use strict';

module.exports = function(config, columnConfig) {
module.exports = function (config, columnConfig) {
let client = postgres(config);

@@ -31,18 +31,6 @@ let dwClient = client;

// Control flow for both of these configurations set to true has not been added. An error will be thrown until that is supported.
if (
(config.hashedSurrogateKeys && !config.bypassSlowlyChangingDimensions) // hashed surrogate keys and slowly changing dimensions not supported
|| (config.hashedSurrogateKeys && config.bypassSlowlyChangingDimensions === undefined) // covering the case where slowly changing dimensions has been omitted
) {
logger.error(`Unsupported configuration, bypassSlowlyChangingDimensions:${config.bypassSlowlyChangingDimensions} hashedSurrogateKeys:${config.hashedSurrogateKeys}.`);
process.exit();
};
client.getDimensionColumn = columnConfig.dimColumnTransform;
client.columnConfig = columnConfig;
let deleteFlushCount = config.deleteFlushCount || 1000;
function deletesSetup(qualifiedTable, schema, field, value, where = '') {
function deletesSetup (qualifiedTable, schema, field, value, where = '') {
let colLookup = {};

@@ -58,4 +46,4 @@ schema.map(col => {

function tryFlushDelete(done, force = false) {
if (force || toDeleteCount >= deleteFlushCount) {
function tryFlushDelete (done, force = false) {
if (force || toDeleteCount >= 1000) {
let deleteTasks = Object.keys(toDelete).map(col => {

@@ -77,3 +65,3 @@ return deleteDone => client.query(`update ${qualifiedTable} set ${field} = ${value}, ${columnConfig._auditdate} = ${dwClient.auditdate} where ${col} in (${toDelete[col].join(',')}) ${where}`, deleteDone);

return {
add: function(obj, done) {
add: function (obj, done) {
let field = obj.__leo_delete__;

@@ -105,22 +93,16 @@ let id = obj.__leo_delete_id__;

try {
tempTables.forEach(table => {
tasks.push(done => client.query(`drop table if exists ${table}`, done));
});
tempTables.forEach(table => {
tasks.push(done => client.query(`drop table ${table}`, done));
});
await new Promise((resolve, reject) => {
async.series(tasks, err => {
if (err) {
reject(err);
} else {
tempTables = [];
resolve('Cleaned up temp tables');
}
});
return new Promise(resolve => {
async.series(tasks, err => {
if (err) {
throw err;
} else {
tempTables = [];
return resolve('Cleaned up temp tables');
}
});
} catch (dropError) {
// Temp tables are cleaned up before use in the next invocation
// so just log it and move on
logger.info('Error dropping temp tables', dropError);
}
});
}

@@ -131,3 +113,3 @@

client.importFact = function(stream, table, ids, callback) {
client.importFact = function (stream, table, ids, callback) {
const stagingTable = `${columnConfig.stageTablePrefix}_${table}`;

@@ -148,38 +130,8 @@ const qualifiedStagingTable = `${columnConfig.stageSchema}.${stagingTable}`;

let sortKey;
let sortKeyType;
tempTables.push(qualifiedStagingTable);
tasks.push(done => client.query(`drop table if exists ${qualifiedStagingTable}`, done));
if (config.version !== 'redshift') {
tasks.push(done => client.query(`create table ${qualifiedStagingTable} (like ${qualifiedTable})`, done));
tasks.push(done => client.query(`create index ${stagingTable}_id on ${qualifiedStagingTable} (${ids.join(', ')})`, done));
} else {
// get sortkey for joins
tasks.push(done => {
client.query(`SELECT sortkey,
sortkeytype
FROM public.v_dist_sort_key
WHERE table_name = '${table}';`, (err, results) => {
if (err) {
return done(err);
} else {
if (results[0].sortKey != null) {
sortKey = results[0].sortkey;
sortKeyType = results[0].sortkeytype;
};
done();
};
});
});
tasks.push(done =>
client.query(`CREATE TABLE ${qualifiedStagingTable}
DISTSTYLE ALL
SORTKEY (${sortKey != null ? sortKey : ids[0]})
AS SELECT *
FROM ${qualifiedTable}
LIMIT 0;`, done));
};
tasks.push(done => client.query(`drop table if exists ${qualifiedStagingTable}_changes`, done));
tasks.push(done => client.query(`create table ${qualifiedStagingTable} (like ${qualifiedTable})`, done));
tasks.push(done => client.query(`create index ${stagingTable}_id on ${qualifiedStagingTable} (${ids.join(', ')})`, done));
// tasks.push(done => ls.pipe(stream, client.streamToTable(qualifiedStagingTable), done));
tasks.push(done => {

@@ -195,3 +147,3 @@ ls.pipe(stream, ls.through((obj, done, push) => {

}
}), config.version !== 'redshift' ? client.streamToTable(qualifiedStagingTable) : client.streamToTableFromS3(qualifiedStagingTable, config), (err) => {
}), client.streamToTable(qualifiedStagingTable), (err) => {
if (err) {

@@ -205,5 +157,3 @@ return done(err);

if (config.version !== 'redshift') {
tasks.push(done => client.query(`analyze ${qualifiedStagingTable}`, done));
};
tasks.push(done => client.query(`analyze ${qualifiedStagingTable}`, done));

@@ -221,117 +171,32 @@ client.describeTable(table).then(result => {

let totalRecords = 0;
// The following code relies on the fact that now()/sysdate will return the same time during all transaction events
// The following code relies on the fact that now() will return the same time during all transaction events
tasks.push(done => connection.query(`Begin Transaction`, done));
if (!config.hashedSurrogateKeys) {
tasks.push(done => {
connection.query(`select 1 as total from ${qualifiedTable} limit 1`, (err, results) => {
if (err) {
return done(err);
}
totalRecords = results.length;
done();
});
tasks.push(done => {
connection.query(`select 1 as total from ${qualifiedTable} limit 1`, (err, results) => {
if (err) {
return done(err);
}
totalRecords = results.length;
done();
});
tasks.push(done => {
connection.query(`Update ${qualifiedTable} prev
SET ${columns.map(column => `${column} = coalesce(staging.${column}, prev.${column})`)}, ${columnConfig._deleted} = coalesce(prev.${columnConfig._deleted}, false), ${columnConfig._auditdate} = ${dwClient.auditdate}
FROM ${qualifiedStagingTable} staging
where ${ids.map(id => `prev.${id} = staging.${id}`).join(' and ')}`
, done);
});
});
tasks.push(done => {
connection.query(`Update ${qualifiedTable} prev
SET ${columns.map(column => `${column} = coalesce(staging.${column}, prev.${column})`)}, ${columnConfig._deleted} = coalesce(prev.${columnConfig._deleted}, false), ${columnConfig._auditdate} = ${dwClient.auditdate}
FROM ${qualifiedStagingTable} staging
where ${ids.map(id => `prev.${id} = staging.${id}`).join(' and ')}
`, done);
});
// Now insert any we were missing
tasks.push(done => {
connection.query(`INSERT INTO ${qualifiedTable} (${columns.join(',')},${columnConfig._deleted},${columnConfig._auditdate})
SELECT ${columns.map(column => `staging.${column}`)}, false AS ${columnConfig._deleted}, ${dwClient.auditdate} as ${columnConfig._auditdate}
FROM ${qualifiedStagingTable} staging
WHERE NOT EXISTS ( SELECT *
FROM ${qualifiedTable} as prev
WHERE ${ids.map(id => `prev.${id} = staging.${id}`).join(' and ')})`, done);
});
} else {
let naturalKeyLowerBound;
let naturalKeyFilter;
// Now insert any we were missing
tasks.push(done => {
connection.query(`INSERT INTO ${qualifiedTable} (${columns.join(',')},${columnConfig._deleted},${columnConfig._auditdate})
SELECT ${columns.map(column => `coalesce(staging.${column}, prev.${column})`)}, coalesce(prev.${columnConfig._deleted}, false), ${dwClient.auditdate} as ${columnConfig._auditdate}
FROM ${qualifiedStagingTable} staging
LEFT JOIN ${qualifiedTable} as prev on ${ids.map(id => `prev.${id} = staging.${id}`).join(' and ')}
WHERE prev.${ids[0]} is null
`, done);
});
// tasks.push(done => connection.query(`drop table ${stagingTbl}`, done));
// Get lower bound for natural key to avoid unnecessary scanning
tasks.push(done => {
connection.query(`SELECT MIN(${(sortKey != null) ? sortKey : ids[0]}) AS minid,
CAST(COUNT(*) AS INT) AS cnt
FROM ${qualifiedStagingTable};`, (err, results) => {
if (err) {
return done(err);
} else {
totalRecords = results[0].cnt;
naturalKeyLowerBound = results[0].minid;
if (naturalKeyLowerBound !== null) {
if (sortKeyType === 'int4' || sortKeyType === 'int8') {
naturalKeyFilter = `${results[0].minid}`;
} else if (sortKeyType === 'varchar') {
naturalKeyFilter = `'${results[0].minid}'`;
} else if (sortKeyType === 'timestamp' && Date.parse(results[0].minid.split(' ')[1])) {
naturalKeyFilter = `'${results[0].minid.split(' ')[1]}'`;
};
};
done();
};
});
});
const qualifiedStagingTablePrevious = `${qualifiedStagingTable}_previous`;
tempTables.push(`${qualifiedStagingTablePrevious}`);
tasks.push(done => connection.query(`DROP TABLE IF EXISTS ${qualifiedStagingTablePrevious};`, done));
tasks.push(done => {
connection.query(`CREATE TABLE ${qualifiedStagingTablePrevious}
DISTSTYLE ALL
AS SELECT *
FROM ${qualifiedTable}
LIMIT 0;`, done);
});
// Retreive copy of existing data
tasks.push(done => {
connection.query(`INSERT INTO ${qualifiedStagingTablePrevious} (${columns.map(column => `${column}`).join(`, `)}, ${columnConfig._deleted})
SELECT ${columns.map(column => `${column}`).join(`, `)},
${columnConfig._deleted}
FROM ${qualifiedTable} AS base
WHERE EXISTS ( SELECT *
FROM ${qualifiedStagingTable} AS staging
WHERE ${ids.map(id => `base.${id} = staging.${id}`).join(` AND `)}
${(naturalKeyFilter !== undefined) ? `AND staging.${(sortKey != null) ? sortKey : ids[0]} >= ${naturalKeyFilter}` : ``})
${(naturalKeyFilter !== undefined) ? `AND base.${(sortKey != null) ? sortKey : ids[0]} >= ${naturalKeyFilter}` : ``};`, done);
});
// Merge exiting data into staged copy
tasks.push(done => {
connection.query(`UPDATE ${qualifiedStagingTable} AS staging
SET ${columns.map(column => `${column} = COALESCE(staging.${column}, prev.${column})`).join(`,`)},
${columnConfig._deleted} = COALESCE(prev.${columnConfig._deleted}, false)
FROM ${qualifiedStagingTablePrevious} AS prev
WHERE ${ids.map(id => `prev.${id} = staging.${id}`).join(` AND `)}`, done);
});
// Set auditdate and _deleted for stage data
tasks.push(done => {
connection.query(`UPDATE ${qualifiedStagingTable}
SET ${columnConfig._auditdate} = ${dwClient.auditdate},
${columnConfig._deleted} = COALESCE(${columnConfig._deleted}, false); `, done);
});
// Delete and reinsert data - avoids costly updates on large tables
tasks.push(done => {
connection.query(`DELETE FROM ${qualifiedTable}
USING ${qualifiedStagingTable}
WHERE ${ids.map(id => `${qualifiedTable}.${id} = ${qualifiedStagingTable}.${id}`).join(` AND `)}
${(naturalKeyFilter !== undefined) ? `AND ${qualifiedTable}.${sortKey != null ? sortKey : ids[0]} >= ${naturalKeyFilter}` : ``}; `, done);
});
tasks.push(done => {
connection.query(`INSERT INTO ${qualifiedTable} (${columns.map(column => `${column}`).join(`, `)}, ${columnConfig._auditdate}, ${columnConfig._deleted})
SELECT ${columns.map(column => `${column}`).join(`, `)},
${columnConfig._auditdate},
${columnConfig._deleted}
FROM ${qualifiedStagingTable}; `, done);
});
};
async.series(tasks, err => {

@@ -348,3 +213,3 @@ if (!err) {

connection.release();
callback(e || err, d);
callback(e, d);
});

@@ -358,3 +223,3 @@ }

client.importDimension = function(stream, table, sk, nk, scds, callback, tableDef = {}) {
client.importDimension = function (stream, table, sk, nk, scds, callback, tableDef = {}) {
const stagingTbl = `${columnConfig.stageTablePrefix}_${table}`;

@@ -377,45 +242,9 @@ const qualifiedStagingTable = `${columnConfig.stageSchema}.${stagingTbl}`;

let sortKey;
let sortKeyType;
// Prepare staging tables
tempTables.push(qualifiedStagingTable);
tasks.push(done => client.query(`drop table if exists ${qualifiedStagingTable}`, done));
tasks.push(done => client.query(`drop table if exists ${qualifiedStagingTable}_changes`, done));
if (config.version !== 'redshift') {
tasks.push(done => client.query(`create table ${qualifiedStagingTable} (like ${qualifiedTable})`, done));
tasks.push(done => client.query(`create index ${stagingTbl}_id on ${qualifiedStagingTable} (${nk.join(', ')})`, done));
} else {
// get sortkey for joins
tasks.push(done => {
client.query(`SELECT sortkey,
sortkeytype
FROM public.v_dist_sort_key
WHERE table_name = '${table}';`, (err, results) => {
if (err) {
return done(err);
} else {
if (results[0].sortkey != null) {
sortKey = results[0].sortkey;
sortKeyType = results[0].sortkeytype;
};
done();
};
});
});
tasks.push(done =>
// Create staging table with DISTSTYLE ALL to prevent cross talk
client.query(`CREATE TABLE ${qualifiedStagingTable}
DISTSTYLE ALL
SORTKEY(${sortKey != null ? sortKey : nk[0]})
AS SELECT *
FROM ${qualifiedTable}
LIMIT 0;`, done));
};
// Retain surrogate key column when using hashed keys on redshift // column must be dropped for postgres
if (!config.hashedSurrogateKeys && config.version !== 'redshift') {
tasks.push(done => client.query(`alter table ${qualifiedStagingTable} drop column ${sk}`, done));
};
tasks.push(done => client.query(`create table ${qualifiedStagingTable} (like ${qualifiedTable})`, done));
tasks.push(done => client.query(`create index ${stagingTbl}_id on ${qualifiedStagingTable} (${nk.join(', ')})`, done));
tasks.push(done => client.query(`alter table ${qualifiedStagingTable} drop column ${sk}`, done));
// tasks.push(done => ls.pipe(stream, client.streamToTable(qualifiedStagingTable), done));
tasks.push(done => {

@@ -431,3 +260,3 @@ ls.pipe(stream, ls.through((obj, done, push) => {

}
}), config.version !== 'redshift' ? client.streamToTable(qualifiedStagingTable) : client.streamToTableFromS3(qualifiedStagingTable, config), (err) => {
}), client.streamToTable(qualifiedStagingTable), (err) => {
if (err) {

@@ -441,5 +270,3 @@ return done(err);

if (config.version !== 'redshift') {
tasks.push(done => client.query(`analyze ${qualifiedStagingTable}`, done));
};
tasks.push(done => client.query(`analyze ${qualifiedStagingTable}`, done));

@@ -453,2 +280,3 @@ client.describeTable(table).then(result => {

// let scd0 = scds[0] || []; // Not Used
let scd2 = scds[2] || [];

@@ -469,191 +297,84 @@ let scd3 = scds[3] || [];

if (!config.bypassSlowlyChangingDimensions) {
if (scd1.length) {
scdSQL.push(`CASE WHEN md5(${scd1.map(f => 'md5(coalesce(s.' + f + '::text,\'\'))').join(' || ')}) = md5(${scd1.map(f => 'md5(coalesce(d.' + f + '::text,\'\'))').join(' || ')}) THEN 0 WHEN d.${nk[0]} is null then 0 ELSE 1 END as runSCD1`);
} else {
scdSQL.push(`0 as runSCD1`);
}
if (scd2.length) {
scdSQL.push(`CASE WHEN d.${nk[0]} is null then 1 WHEN md5(${scd2.map(f => 'md5(coalesce(s.' + f + '::text,\'\'))').join(' || ')}) = md5(${scd2.map(f => 'md5(coalesce(d.' + f + '::text,\'\'))').join(' || ')}) THEN 0 ELSE 1 END as runSCD2`);
} else {
scdSQL.push(`CASE WHEN d.${nk[0]} is null then 1 ELSE 0 END as runSCD2`);
}
if (scd3.length) {
scdSQL.push(`CASE WHEN md5(${scd3.map(f => 'md5(coalesce(s.' + f + '::text,\'\'))').join(' || ')}) = md5(${scd3.map(f => 'md5(coalesce(d.' + f + '::text,\'\'))').join(' || ')}) THEN 0 WHEN d.${nk[0]} is null then 0 ELSE 1 END as runSCD3`);
} else {
scdSQL.push(`0 as runSCD3`);
}
if (scd6.length) {
scdSQL.push(`CASE WHEN md5(${scd6.map(f => 'md5(coalesce(s.' + f + '::text,\'\'))').join(' || ')}) = md5(${scd6.map(f => 'md5(coalesce(d.' + f + '::text,\'\'))').join(' || ')}) THEN 0 WHEN d.${nk[0]} is null then 0 ELSE 1 END as runSCD6`);
} else {
scdSQL.push(`0 as runSCD6`);
}
};
let fields = [sk].concat(allColumns).concat([columnConfig._auditdate, columnConfig._startdate, columnConfig._enddate, columnConfig._current]);
let tasks = [];
let totalRecords = 0;
if (!config.hashedSurrogateKeys && !config.bypassSlowlyChangingDimensions) {
tempTables.push(`${qualifiedStagingTable}_changes`);
connection.query(`create table ${qualifiedStagingTable}_changes as
select ${nk.map(id => `s.${id}`).join(', ')}, d.${nk[0]} is null as isNew,
${scdSQL.join(',\n')}
FROM ${qualifiedStagingTable} s
LEFT JOIN ${qualifiedTable} d on ${nk.map(id => `d.${id} = s.${id}`).join(' and ')} and d.${columnConfig._current}`, (err) => {
if (err) {
logger.error(err);
process.exit();
}
let rowId = null;
tasks.push(done => connection.query(`analyze ${qualifiedStagingTable}_changes`, done));
tasks.push(done => {
connection.query(`select max(${sk}) as maxid from ${qualifiedTable}`, (err, results) => {
if (err) {
return done(err);
}
rowId = results[0].maxid || 10000;
totalRecords = (rowId - 10000);
done();
});
});
// The following code relies on the fact that now()/sysdate will return the same time during all transaction events
tasks.push(done => connection.query(`Begin Transaction`, done));
tasks.push(done => {
connection.query(`INSERT INTO ${qualifiedTable} (${fields.join(',')})
SELECT row_number() over() + ${rowId}, ${allColumns.map(column => `coalesce(staging.${column}, prev.${column})`)}, ${dwClient.auditdate} as ${columnConfig._auditdate},
case when changes.isNew then '1900-01-01 00:00:00' else ${config.version === 'redshift' ? 'sysdate' : 'now()'} END as ${columnConfig._startdate},
'9999-01-01 00:00:00' as ${columnConfig._enddate},
true as ${columnConfig._current}
FROM ${qualifiedStagingTable}_changes changes
JOIN ${qualifiedStagingTable} staging on ${nk.map(id => `staging.${id} = changes.${id}`).join(' and ')}
LEFT JOIN ${qualifiedTable} as prev on ${nk.map(id => `prev.${id} = changes.${id}`).join(' and ')} and prev.${columnConfig._current}
WHERE (changes.runSCD2 =1 OR changes.runSCD6=1)
`, done);
});
// This needs to be done last
tasks.push(done => {
// RUN SCD1 / SCD6 columns (where we update the old records)
let columns = scd1.map(column => `"${column}" = coalesce(staging."${column}", prev."${column}")`).concat(scd6.map(column => `"current_${column}" = coalesce(staging."${column}", prev."${column}")`));
columns.push(`"${columnConfig._enddate}" = case when changes.runSCD2 =1 then ${config.version === 'redshift' ? 'sysdate' : 'now()'} else prev."${columnConfig._enddate}" END`);
columns.push(`"${columnConfig._current}" = case when changes.runSCD2 =1 then false else prev."${columnConfig._current}" END`);
columns.push(`"${columnConfig._auditdate}" = ${dwClient.auditdate}`);
connection.query(`update ${qualifiedTable} as prev
set ${columns.join(', ')}
FROM ${qualifiedStagingTable}_changes changes
JOIN ${qualifiedStagingTable} staging on ${nk.map(id => `staging.${id} = changes.${id}`).join(' and ')}
where ${nk.map(id => `prev.${id} = changes.${id}`).join(' and ')} and prev.${columnConfig._startdate} != ${config.version === 'redshift' ? 'sysdate' : 'now()'} and changes.isNew = false /*Need to make sure we are only updating the ones not just inserted through SCD2 otherwise we run into issues with multiple rows having .${columnConfig._current}*/
and (changes.runSCD1=1 OR changes.runSCD6=1 OR changes.runSCD2=1)
`, done);
});
async.series(tasks, err => {
if (!err) {
connection.query(`commit`, e => {
connection.release();
callback(e || err, {
count: totalRecords,
});
});
} else {
connection.query(`rollback`, (e, d) => {
connection.release();
callback(e || err, d);
});
}
});
});
// if (!scd2.length && !scd3.length && !scd6.length) {
// scdSQL.push(`1 as runSCD1`);
// } else
if (scd1.length) {
scdSQL.push(`CASE WHEN md5(${scd1.map(f => 'md5(coalesce(s.' + f + '::text,\'\'))').join(' || ')}) = md5(${scd1.map(f => 'md5(coalesce(d.' + f + '::text,\'\'))').join(' || ')}) THEN 0 WHEN d.${nk[0]} is null then 0 ELSE 1 END as runSCD1`);
} else {
let naturalKeyLowerBound;
let naturalKeyFilter;
scdSQL.push(`0 as runSCD1`);
}
if (scd2.length) {
scdSQL.push(`CASE WHEN d.${nk[0]} is null then 1 WHEN md5(${scd2.map(f => 'md5(coalesce(s.' + f + '::text,\'\'))').join(' || ')}) = md5(${scd2.map(f => 'md5(coalesce(d.' + f + '::text,\'\'))').join(' || ')}) THEN 0 ELSE 1 END as runSCD2`);
} else {
scdSQL.push(`CASE WHEN d.${nk[0]} is null then 1 ELSE 0 END as runSCD2`);
}
if (scd3.length) {
scdSQL.push(`CASE WHEN md5(${scd3.map(f => 'md5(coalesce(s.' + f + '::text,\'\'))').join(' || ')}) = md5(${scd3.map(f => 'md5(coalesce(d.' + f + '::text,\'\'))').join(' || ')}) THEN 0 WHEN d.${nk[0]} is null then 0 ELSE 1 END as runSCD3`);
} else {
scdSQL.push(`0 as runSCD3`);
}
if (scd6.length) {
scdSQL.push(`CASE WHEN md5(${scd6.map(f => 'md5(coalesce(s.' + f + '::text,\'\'))').join(' || ')}) = md5(${scd6.map(f => 'md5(coalesce(d.' + f + '::text,\'\'))').join(' || ')}) THEN 0 WHEN d.${nk[0]} is null then 0 ELSE 1 END as runSCD6`);
} else {
scdSQL.push(`0 as runSCD6`);
}
// Get lower bound for natural key to avoid unnecessary scanning
// let's figure out which SCDs needs to happen
tempTables.push(`${qualifiedStagingTable}_changes`);
connection.query(`create table ${qualifiedStagingTable}_changes as
select ${nk.map(id => `s.${id}`).join(', ')}, d.${nk[0]} is null as isNew,
${scdSQL.join(',\n')}
FROM ${qualifiedStagingTable} s
LEFT JOIN ${qualifiedTable} d on ${nk.map(id => `d.${id} = s.${id}`).join(' and ')} and d.${columnConfig._current}`, (err) => {
if (err) {
logger.error(err);
process.exit();
}
let tasks = [];
let rowId = null;
let totalRecords = 0;
tasks.push(done => connection.query(`analyze ${qualifiedStagingTable}_changes`, done));
tasks.push(done => {
connection.query(`SELECT MIN(${(sortKey != null) ? sortKey : nk[0]}) AS minid,
CAST(COUNT(*) AS INT) AS cnt
FROM ${qualifiedStagingTable};`, (err, results) => {
connection.query(`select max(${sk}) as maxid from ${qualifiedTable}`, (err, results) => {
if (err) {
return done(err);
} else {
totalRecords = results[0].cnt;
naturalKeyLowerBound = results[0].minid;
if (naturalKeyLowerBound !== null) {
if (sortKeyType === 'int4' || sortKeyType === 'int8') {
naturalKeyFilter = `${results[0].minid}`;
} else if (sortKeyType === 'varchar') {
naturalKeyFilter = `'${results[0].minid}'`;
} else if (sortKeyType === 'timestamp' && Date.parse(results[0].minid.split(' ')[1])) {
naturalKeyFilter = `'${results[0].minid.split(' ')[1]}'`;
};
};
done();
};
}
rowId = results[0].maxid || 10000;
totalRecords = (rowId - 10000);
done();
});
});
const qualifiedStagingTablePrevious = `${qualifiedStagingTable}_previous`;
tempTables.push(`${qualifiedStagingTablePrevious}`);
tasks.push(done => connection.query(`DROP TABLE IF EXISTS ${qualifiedStagingTablePrevious};`, done));
tasks.push(done => {
connection.query(`CREATE TABLE ${qualifiedStagingTablePrevious}
DISTSTYLE ALL
AS SELECT *
FROM ${qualifiedTable}
LIMIT 0;`, done);
});
// The following code relies on the fact that now() will return the same time during all transaction events
tasks.push(done => connection.query(`Begin Transaction`, done));
// Set auditdate and surrogate key for stage data (not sure if the sk is actually necessary)
tasks.push(done => {
connection.query(`UPDATE ${qualifiedStagingTable}
SET ${columnConfig._auditdate} = ${dwClient.auditdate},
${sk} = farmFingerPrint64(${nk.map(id => `${id}`).join(`|| '-' ||`)}); `, done);
let fields = [sk].concat(allColumns).concat([columnConfig._auditdate, columnConfig._startdate, columnConfig._enddate, columnConfig._current]);
connection.query(`INSERT INTO ${qualifiedTable} (${fields.join(',')})
SELECT row_number() over () + ${rowId}, ${allColumns.map(column => `coalesce(staging.${column}, prev.${column})`)}, ${dwClient.auditdate} as ${columnConfig._auditdate}, case when changes.isNew then '1900-01-01 00:00:00' else now() END as ${columnConfig._startdate}, '9999-01-01 00:00:00' as ${columnConfig._enddate}, true as ${columnConfig._current}
FROM ${qualifiedStagingTable}_changes changes
JOIN ${qualifiedStagingTable} staging on ${nk.map(id => `staging.${id} = changes.${id}`).join(' and ')}
LEFT JOIN ${qualifiedTable} as prev on ${nk.map(id => `prev.${id} = changes.${id}`).join(' and ')} and prev.${columnConfig._current}
WHERE (changes.runSCD2 =1 OR changes.runSCD6=1)
`, done);
});
tasks.push(done => connection.query(`BEGIN TRANSACTION;`, done));
// Retreive copy of existing data
// This needs to be done last
tasks.push(done => {
connection.query(`INSERT INTO ${qualifiedStagingTablePrevious}(${fields.map(column => `${column}`).join(`, `)})
SELECT ${fields.map(column => `${column}`).join(`, `)}
FROM ${qualifiedTable} AS base
WHERE EXISTS(SELECT *
FROM ${qualifiedStagingTable} AS staging
WHERE base.${sk} = staging.${sk}
${(naturalKeyFilter !== undefined) ? `AND staging.${(sortKey != null) ? sortKey : nk[0]} >= ${naturalKeyFilter}` : ``})
${(naturalKeyFilter !== undefined) ? `AND base.${(sortKey != null) ? sortKey : nk[0]} >= ${naturalKeyFilter}` : ``}; `, done);
// RUN SCD1 / SCD6 columns (where we update the old records)
let columns = scd1.map(column => `"${column}" = coalesce(staging."${column}", prev."${column}")`).concat(scd6.map(column => `"current_${column}" = coalesce(staging."${column}", prev."${column}")`));
columns.push(`"${columnConfig._enddate}" = case when changes.runSCD2 =1 then now() else prev."${columnConfig._enddate}" END`);
columns.push(`"${columnConfig._current}" = case when changes.runSCD2 =1 then false else prev."${columnConfig._current}" END`);
columns.push(`"${columnConfig._auditdate}" = ${dwClient.auditdate}`);
connection.query(`update ${qualifiedTable} as prev
set ${columns.join(', ')}
FROM ${qualifiedStagingTable}_changes changes
JOIN ${qualifiedStagingTable} staging on ${nk.map(id => `staging.${id} = changes.${id}`).join(' and ')}
where ${nk.map(id => `prev.${id} = changes.${id}`).join(' and ')} and prev.${columnConfig._startdate} != now() and changes.isNew = false /*Need to make sure we are only updating the ones not just inserted through SCD2 otherwise we run into issues with multiple rows having .${columnConfig._current}*/
and (changes.runSCD1=1 OR changes.runSCD6=1 OR changes.runSCD2=1)
`, done);
});
// Merge exiting data into staged copy
tasks.push(done => {
connection.query(`UPDATE ${qualifiedStagingTable} AS staging
SET ${fields.map(column => `${column} = COALESCE(staging.${column}, prev.${column})`).join(`, `)}
FROM ${qualifiedStagingTablePrevious} AS prev
WHERE staging.${sk} = prev.${sk}`, done);
});
// Set default SCD values
tasks.push(done => {
connection.query(`UPDATE ${qualifiedStagingTable}
SET ${columnConfig._startdate} = COALESCE(${columnConfig._startdate},'1900-01-01 00:00:00'),
${columnConfig._enddate} = COALESCE(${columnConfig._enddate},'9999-01-01 00:00:00'),
${columnConfig._current} = COALESCE(${columnConfig._current},true);`, done);
});
// Delete and reinsert data - avoids costly updates on large tables
tasks.push(done => {
connection.query(`DELETE FROM ${qualifiedTable}
USING ${qualifiedStagingTable}
WHERE ${qualifiedTable}.${sk} = ${qualifiedStagingTable}.${sk}
${(naturalKeyFilter !== undefined) ? `AND ${qualifiedTable}.${sortKey != null ? sortKey : nk[0]} >= ${naturalKeyFilter}` : ``}; `, done);
});
tasks.push(done => {
connection.query(`INSERT INTO ${qualifiedTable} (${fields.map(column => `${column}`).join(`, `)})
SELECT ${fields.map(column => `${column}`).join(`, `)}
FROM ${qualifiedStagingTable}; `, done);
});
// tasks.push(done => connection.query(`drop table ${qualifiedStagingTable}_changes`, done));
// tasks.push(done => connection.query(`drop table ${qualifiedStagingTable}`, done));
async.series(tasks, err => {

@@ -670,7 +391,7 @@ if (!err) {

connection.release();
callback(e || err, d);
callback(e, d);
});
}
});
};
});
});

@@ -681,66 +402,62 @@ }).catch(callback);

client.insertMissingDimensions = function(usedTables, tableConfig, tableSks, tableNks, callback) {
if (config.hashedSurrogateKeys) {
callback(null);
} else {
let unions = {};
let isDate = {
d_date: true,
d_datetime: true,
d_time: true,
date: true,
datetime: true,
dim_date: true,
dim_datetime: true,
dim_time: true,
time: true,
};
Object.keys(usedTables).map(table => {
Object.keys(tableConfig[table].structure).map(column => {
let field = tableConfig[table].structure[column];
if (field.dimension && !isDate[field.dimension]) {
if (!(unions[field.dimension])) {
unions[field.dimension] = [];
}
if (typeof tableNks[field.dimension] === 'undefined') {
throw new Error(`${field.dimension} not found in tableNks`);
}
let dimTableNk = tableNks[field.dimension][0];
unions[field.dimension].push(`select ${table}.${column} as id from ${table} left join ${field.dimension} on ${field.dimension}.${dimTableNk} = ${table}.${column} where ${field.dimension}.${dimTableNk} is null and ${table}.${columnConfig._auditdate} = ${dwClient.auditdate}`);
client.insertMissingDimensions = function (usedTables, tableConfig, tableSks, tableNks, callback) {
let unions = {};
let isDate = {
d_date: true,
d_datetime: true,
d_time: true,
date: true,
datetime: true,
dim_date: true,
dim_datetime: true,
dim_time: true,
time: true,
};
Object.keys(usedTables).map(table => {
Object.keys(tableConfig[table].structure).map(column => {
let field = tableConfig[table].structure[column];
if (field.dimension && !isDate[field.dimension]) {
if (!(unions[field.dimension])) {
unions[field.dimension] = [];
}
});
if (typeof tableNks[field.dimension] === 'undefined') {
throw new Error(`${field.dimension} not found in tableNks`);
}
let dimTableNk = tableNks[field.dimension][0];
unions[field.dimension].push(`select ${table}.${column} as id from ${table} left join ${field.dimension} on ${field.dimension}.${dimTableNk} = ${table}.${column} where ${field.dimension}.${dimTableNk} is null and ${table}.${columnConfig._auditdate} = ${dwClient.auditdate}`);
}
});
let missingDimTasks = Object.keys(unions).map(table => {
let sk = tableSks[table];
let nk = tableNks[table][0];
return (callback) => {
let done = (err, data) => {
trx && trx.release();
callback(err, data);
};
let trx;
client.connect().then(transaction => {
trx = transaction;
transaction.query(`select max(${sk}) as maxid from ${table}`, (err, results) => {
if (err) {
return done(err);
}
let rowId = results[0].maxid || 10000;
let _auditdate = dwClient.auditdate;
let unionQuery = unions[table].join('\nUNION\n');
transaction.query(`insert into ${table} (${sk}, ${nk}, ${columnConfig._auditdate}, ${columnConfig._startdate}, ${columnConfig._enddate}, ${columnConfig._current}) select row_number() over() + ${rowId}, sub.id, max(${_auditdate})::timestamp, '1900-01-01 00:00:00', '9999-01-01 00:00:00', true from(${unionQuery}) as sub where sub.id is not null group by sub.id`, (err) => {
done(err);
});
});
let missingDimTasks = Object.keys(unions).map(table => {
let sk = tableSks[table];
let nk = tableNks[table][0];
return (callback) => {
let done = (err, data) => {
trx && trx.release();
callback(err, data);
};
let trx;
client.connect().then(transaction => {
trx = transaction;
transaction.query(`select max(${sk}) as maxid from ${table}`, (err, results) => {
if (err) {
return done(err);
}
let rowId = results[0].maxid || 10000;
let _auditdate = dwClient.auditdate; // results[0]._auditdate ? `'${results[0]._auditdate.replace(/^\d* */,"")}'` : "now()";
let unionQuery = unions[table].join('\nUNION\n');
transaction.query(`insert into ${table} (${sk}, ${nk}, ${columnConfig._auditdate}, ${columnConfig._startdate}, ${columnConfig._enddate}, ${columnConfig._current}) select row_number() over () + ${rowId}, sub.id, max(${_auditdate})::timestamp, '1900-01-01 00:00:00', '9999-01-01 00:00:00', true from (${unionQuery}) as sub where sub.id is not null group by sub.id`, (err) => {
done(err);
});
}).catch(done);
};
});
async.parallelLimit(missingDimTasks, 10, (missingDimError) => {
logger.info(`Missing Dimensions ${!missingDimError && 'Inserted'} ----------------------------`, missingDimError || '');
callback(missingDimError);
});
};
});
}).catch(done);
};
});
async.parallelLimit(missingDimTasks, 10, (missingDimError) => {
logger.info(`Missing Dimensions ${!missingDimError && 'Inserted'} ----------------------------`, missingDimError || '');
callback(missingDimError);
});
};
client.linkDimensions = function(table, links, nk, callback, tableStatus) {
client.linkDimensions = function (table, links, nk, callback, tableStatus) {
client.describeTable(table).then(() => {

@@ -750,55 +467,8 @@ let tasks = [];

const qualifiedTable = `public.${table}`;
const linkAuditdate = client.escapeValueNoToLower(new Date().toISOString().replace(/\.\d*Z/, 'Z'));
// Only run analyze on the table if this is the first load
if (tableStatus === 'First Load' && config.version !== 'redshift') {
tasks.push(done => client.query(`ANALYZE ${table}`, done));
if (tableStatus === 'First Load') {
tasks.push(done => client.query(`analyze ${table}`, done));
}
const qualifiedStagingTable = `${columnConfig.stageSchema}.${columnConfig.stageTablePrefix}_${table}`;
let naturalKeyLowerBound;
let naturalKeyFilter;
let sortKey;
let sortKeyType;
// Get lower bound for natural key to avoid unnecessary scanning
if (config.version === 'redshift') {
tasks.push(done => {
client.query(`SELECT sortkey,
sortkeytype
FROM public.v_dist_sort_key
WHERE table_name = '${table}';`, (err, results) => {
if (err) {
return done(err);
} else {
if (results[0].sortKey != null) {
sortKey = results[0].sortkey;
sortKeyType = results[0].sortkeytype;
};
done();
};
});
});
tasks.push(done => {
client.query(`SELECT MIN(${(sortKey != null) ? sortKey : nk[0]}) AS minid
FROM ${qualifiedStagingTable}; `, (err, results) => {
if (err) {
return done(err);
} else {
naturalKeyLowerBound = results[0].minid;
if (naturalKeyLowerBound !== null) {
if (sortKeyType === 'int4' || sortKeyType === 'int8') {
naturalKeyFilter = `${results[0].minid}`;
} else if (sortKeyType === 'varchar') {
naturalKeyFilter = `'${results[0].minid}'`;
} else if (sortKeyType === 'timestamp' && Date.parse(results[0].minid.split(' ')[1])) {
naturalKeyFilter = `'${results[0].minid.split(' ')[1]}'`;
};
};
done();
};
});
});
};
tasks.push(done => {

@@ -808,23 +478,17 @@ let joinTables = links.map(link => {

sets.push(`${link.destination}_date = coalesce(t.${link.source}::date - '1400-01-01'::date + 10000, 1)`);
sets.push(`${link.destination}_time = coalesce(EXTRACT(EPOCH FROM ${(config.version !== 'redshift') ? `` : `'1970-01-01'::date +`} t.${link.source}::time) + 10000, 1)`);
sets.push(`${link.destination}_time = coalesce(EXTRACT(EPOCH from t.${link.source}::time) + 10000, 1)`);
} else if (columnConfig.useSurrogateDateKeys && (link.table === 'd_date' || link.table === 'date' || link.table === 'dim_date')) {
sets.push(`${link.destination}_date = coalesce(t.${link.source}::date - '1400-01-01'::date + 10000, 1)`);
} else if (columnConfig.useSurrogateDateKeys && (link.table === 'd_time' || link.table === 'time' || link.table === 'dim_time')) {
sets.push(`${link.destination}_time = coalesce(EXTRACT(EPOCH FROM ${(config.version !== 'redshift') ? `` : `'1970-01-01'::date +`} t.${link.source}::time) + 10000, 1)`);
sets.push(`${link.destination}_time = coalesce(EXTRACT(EPOCH from t.${link.source}::time) + 10000, 1)`);
} else {
if (config.hashedSurrogateKeys) {
sets.push(`${link.destination} = coalesce(farmFingerPrint64(t.${link.source}), 1)`);
return ``;
} else {
sets.push(`${link.destination} = coalesce(${link.join_id}_join_table.${link.sk}, 1)`);
var joinOn = `${link.join_id}_join_table.${link.on} = t.${link.source} `;
if (Array.isArray(link.source)) {
joinOn = link.source.map((v, i) => `${link.join_id}_join_table.${link.on[i]} = t.${v}`).join(' AND ');
}
return `LEFT JOIN ${link.table} ${link.join_id}_join_table
ON ${joinOn}
AND t.${link.link_date} >= ${link.join_id}_join_table.${columnConfig._startdate}
AND (t.${link.link_date} <= ${link.join_id}_join_table.${columnConfig._enddate} or ${link.join_id}_join_table.${columnConfig._current})`;
};
sets.push(`${link.destination} = coalesce(${link.join_id}_join_table.${link.sk}, 1)`);
var joinOn = `${link.join_id}_join_table.${link.on} = t.${link.source}`;
if (Array.isArray(link.source)) {
joinOn = link.source.map((v, i) => `${link.join_id}_join_table.${link.on[i]} = t.${v}`).join(' AND ');
}
return `LEFT JOIN ${link.table} ${link.join_id}_join_table
on ${joinOn}
and t.${link.link_date} >= ${link.join_id}_join_table.${columnConfig._startdate}
and (t.${link.link_date} <= ${link.join_id}_join_table.${columnConfig._enddate} or ${link.join_id}_join_table.${columnConfig._current})`;
}

@@ -834,9 +498,9 @@ });

if (sets.length) {
client.query(`UPDATE ${table} dm
SET ${sets.join(', ')}, ${columnConfig._auditdate} = ${linkAuditdate}
FROM ${table} t
${config.hashedSurrogateKeys ? '' : joinTables.join('\n')}
WHERE ${nk.map(id => `dm.${id} = t.${id}`).join(' AND ')}
AND dm.${columnConfig._auditdate} = ${dwClient.auditdate} AND t.${columnConfig._auditdate} = ${dwClient.auditdate}
${(naturalKeyFilter !== undefined) ? `AND t.${(sortKey != null) ? sortKey : nk[0]} >= ${naturalKeyFilter}` : ``}`, done);
client.query(`Update ${table} dm
SET ${sets.join(', ')}, ${columnConfig._auditdate} = ${linkAuditdate}
FROM ${table} t
${joinTables.join('\n')}
where ${nk.map(id => `dm.${id} = t.${id}`).join(' and ')}
AND dm.${columnConfig._auditdate} = ${dwClient.auditdate} AND t.${columnConfig._auditdate} = ${dwClient.auditdate}
`, done);
} else {

@@ -854,3 +518,3 @@ done();

client.changeTableStructure = async function(structures) {
client.changeTableStructure = async function (structures) {
let tasks = [];

@@ -880,3 +544,3 @@ let tableResults = {};

missingFields[missingDim] = {
type: (config.hashedSurrogateKeys) ? 'bigint' : 'integer',
type: 'integer',
};

@@ -923,3 +587,3 @@ }

client.createTable = async function(table, definition) {
client.createTable = async function (table, definition) {
let fields = [];

@@ -940,3 +604,3 @@ let defaults = [];

sk: true,
type: `${(config.hashedSurrogateKeys) ? 'bigint' : 'integer'} primary key`,
type: 'integer primary key',
};

@@ -990,3 +654,3 @@ } else if (typeof field === 'string') {

} else if (field.dimension) {
fields.push(`${columnConfig.dimColumnTransform(key, field)} ${(config.hashedSurrogateKeys) ? 'bigint' : 'integer'}`);
fields.push(`${columnConfig.dimColumnTransform(key, field)} integer`);
defaults.push({

@@ -1005,3 +669,3 @@ column: columnConfig.dimColumnTransform(key, field),

let sql = `create table ${table} (
${fields.join(',\n')}
${fields.join(',\n')}
)`;

@@ -1030,3 +694,3 @@

column: columnConfig._auditdate,
value: config.version === 'redshift' ? 'sysdate' : 'now()',
value: 'now()',
}, {

@@ -1066,3 +730,3 @@ column: columnConfig._startdate,

};
client.updateTable = async function(table, definition) {
client.updateTable = async function (table, definition) {
let fields = [];

@@ -1074,3 +738,3 @@ let queries = [];

field = {
type: `${(config.hashedSurrogateKeys) ? 'bigint' : 'integer'} primary key`,
type: 'integer primary key',
};

@@ -1111,3 +775,3 @@ } else if (typeof field === 'string') {

add column ${fields.join(',\n add column ')}
`];
`];

@@ -1124,3 +788,3 @@ // redshift doesn't support multi 'add column' in one query

return new Promise(resolve => {
async.eachSeries(sqls, function(sql, done) {
async.eachSeries(sqls, function (sql, done) {
client.query(sql, err => done(err));

@@ -1137,3 +801,3 @@ }, err => {

client.findAuditDate = function(table, callback) {
client.findAuditDate = function (table, callback) {
client.query(`select to_char(max(${columnConfig._auditdate}), 'YYYY-MM-DD HH24:MI:SS') as max FROM ${client.escapeId(table)}`, (err, auditdate) => {

@@ -1155,3 +819,3 @@ if (err) {

client.exportChanges = function(table, fields, remoteAuditdate, opts, callback) {
client.exportChanges = function (table, fields, remoteAuditdate, opts, callback) {
let auditdateCompare = remoteAuditdate.auditdate != null ? `${columnConfig._auditdate} >= ${client.escapeValue(remoteAuditdate.auditdate)}` : `${columnConfig._auditdate} is null`;

@@ -1175,3 +839,3 @@ client.query(`select count(*) as count FROM ${client.escapeId(table)} WHERE ${auditdateCompare}`, (err, result) => {

${where}
`, (err, result) => {
`, (err, result) => {
if (err) {

@@ -1225,3 +889,3 @@ callback(err);

client.importChanges = function(file, table, fields, opts, callback) {
client.importChanges = function (file, table, fields, opts, callback) {
if (typeof opts === 'function') {

@@ -1287,6 +951,6 @@ callback = opts;

}
tasks.push(function(done) {
tasks.push(function (done) {
client.query(`insert into ${tableName} select * from ${qualifiedStagingTable}`, done);
});
tasks.push(function(done) {
tasks.push(function (done) {
client.query(`select count(*) from ${qualifiedStagingTable}`, (err, result) => {

@@ -1297,3 +961,3 @@ loadCount = result && parseInt(result[0].count);

});
tasks.push(function(done) {
tasks.push(function (done) {
client.query(`drop table if exists ${qualifiedStagingTable}`, done);

@@ -1300,0 +964,0 @@ });

{
"name": "leo-connector-postgres",
"version": "4.0.21-beta",
"version": "5.0.0-awsv3",
"description": "A Postgres database connector for use with Leo Platform",

@@ -22,9 +22,9 @@ "repository": {

"fast-csv": "2.4.1",
"leo-connector-common": "4.0.11-beta",
"pg": "7.8.2",
"leo-connector-common": "^5.0.0-awsv3",
"pg": "^8.11.3",
"pg-copy-streams": "2.2.2",
"pg-format": "1.0.4"
"pg-format": "1.0.4",
"pg-pool": "^3.6.1"
},
"devDependencies": {
"@dsco/layer-leo": "2.1.29",
"chai": "^4.2.0",

@@ -31,0 +31,0 @@ "eslint": "^5.13.0",