🚀 Big News: Socket Acquires Coana to Bring Reachability Analysis to Every Appsec Team.Learn more
Socket
Sign inDemoInstall
Socket

leo-connector-postgres

Package Overview
Dependencies
Maintainers
4
Versions
121
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

leo-connector-postgres - npm Package Compare versions

Comparing version

to
2.0.1-218-g57d761c

2

lib/connect.js

@@ -575,2 +575,2 @@ const {

return client;
}
}

@@ -30,3 +30,22 @@ "use strict";

function deletesSetup(qualifiedTable, schema, field, value, where = "") {
const toDeleteToken = (field) => {
if (Array.isArray(field)) {
return field.join('_');
}
return field;
};
const toEscapedId = (id) => {
if (typeof id === 'object') {
const escapedIds = Object.assign({},id);
Object.keys(id).forEach(k => {
escapedIds[k] = client.escapeValueNoToLower(id[k]);
});
return escapedIds;
} else {
return client.escapeValueNoToLower(id);
}
};
function deletesSetup(qualifiedTable, schema, field, value, whereCurrent = "") {
let colLookup = {};

@@ -38,5 +57,17 @@ schema.map(col => {

let toDeleteCount = 0;
if (where) {
where = `and ${where}`;
if (whereCurrent) {
whereCurrent = `and ${whereCurrent}`;
}
const isDeletable = (field, id) => {
let deletable = (id !== undefined);
if (Array.isArray(field)) {
field.forEach(f => {
deletable = deletable && colLookup[f];
});
} else {
deletable = deletable && colLookup[field];
}
return deletable;
};

@@ -46,3 +77,12 @@ function tryFlushDelete(done, force = false) {

let deleteTasks = Object.keys(toDelete).map(col => {
return deleteDone => client.query(`update ${qualifiedTable} set ${field} = ${value} where ${col} in (${toDelete[col].join(",")}) ${where}`, deleteDone);
const idsToDelete = toDelete[col];
let whereClause = `where ${col} in (${toDelete[col].join(",")}) ${whereCurrent}`;
if (typeof idsToDelete[0] === 'object') {
whereClause = `where (${Object.keys(idsToDelete[0]).join(', ')})
in ((${idsToDelete.map(idObj => Object.values(idObj).join(',')).join('),(')}))`;
}
return deleteDone => client.query(`
update ${qualifiedTable} set ${field} = ${value}, ${columnConfig._current} = false
${whereClause} ${whereCurrent}
`, deleteDone);
});

@@ -62,10 +102,10 @@ async.parallelLimit(deleteTasks, 1, (err) => {

return {
add: function(obj, done) {
let field = obj.__leo_delete__;
let id = obj.__leo_delete_id__;
if (id !== undefined && colLookup[field]) {
if (!(field in toDelete)) {
toDelete[field] = [];
add: function (obj, done) {
let token = toDeleteToken(obj.__leo_delete__);
let id = toEscapedId(obj.__leo_delete_id__);
if (isDeletable(obj.__leo_delete__, obj.__leo_delete_id__)) {
if (!(token in toDelete)) {
toDelete[token] = [];
}
toDelete[field].push(client.escapeValueNoToLower(id));
toDelete[token].push(id);
toDeleteCount++;

@@ -83,3 +123,3 @@ tryFlushDelete(done);

client.importFact = function(stream, table, ids, callback, tableDef = {}) {
client.importFact = function(stream, table, ids, callback) {
const stagingTable = `staging_${table}`;

@@ -106,18 +146,22 @@ const qualifiedStagingTable = `${columnConfig.stageSchema}.${stagingTable}`;

tasks.push(done => {
ls.pipe(stream, ls.through((obj, done, push) => {
if (obj.__leo_delete__) {
if (obj.__leo_delete__ == "id") {
push(obj);
ls.pipe(
stream,
ls.through((obj, done, push) => {
if (obj.__leo_delete__) {
if (obj.__leo_delete__ == "id") {
push(obj);
}
deleteHandler.add(obj, done);
} else {
done(null, obj);
}
deleteHandler.add(obj, done);
} else {
done(null, obj);
}
}), client.streamToTable(qualifiedStagingTable), (err) => {
if (err) {
return done(err);
} else {
deleteHandler.flush(done);
}
});
}),
client.streamToTable(qualifiedStagingTable),
(err) => {
if (err) {
return done(err);
} else {
deleteHandler.flush(done);
}
});
});

@@ -189,3 +233,3 @@

client.importDimension = function(stream, table, sk, nk, scds, callback, tableDef = {}) {
client.importDimension = function(stream, table, sk, nk, scds, callback) {
const stagingTbl = `staging_${table}`;

@@ -214,19 +258,24 @@ const qualifiedStagingTable = `${columnConfig.stageSchema}.${stagingTbl}`;

//tasks.push(done => ls.pipe(stream, client.streamToTable(qualifiedStagingTable), done));
tasks.push(done => {
ls.pipe(stream, ls.through((obj, done, push) => {
if (obj.__leo_delete__) {
if (obj.__leo_delete__ == "id") {
push(obj);
ls.pipe(
stream,
ls.through((obj, done, push) => {
if (obj.__leo_delete__) {
if (obj.__leo_delete__ == "id") {
push(obj);
}
deleteHandler.add(obj, done);
} else {
done(null, obj);
}
deleteHandler.add(obj, done);
} else {
done(null, obj);
}
}), client.streamToTable(qualifiedStagingTable), (err) => {
if (err) {
return done(err);
} else {
deleteHandler.flush(done);
}
});
}),
client.streamToTable(qualifiedStagingTable),
(err) => {
if (err) {
return done(err);
} else {
deleteHandler.flush(done);
}
});
});

@@ -295,18 +344,5 @@

let tasks = [];
let rowId = null;
let totalRecords = 0;
let totalRecords = 0; // TODO: This is not used to count records as it should
tasks.push(done => connection.query(`analyze ${qualifiedStagingTable}_changes`, done));
tasks.push(done => {
connection.query(`select max(${sk}) as maxid from ${qualifiedTable}`, (err, results) => {
if (err) {
return done(err);
}
rowId = results[0].maxid || 10000;
totalRecords = (rowId - 10000);
done();
});
});
//The following code relies on the fact that now() will return the same time during all transaction events
tasks.push(done => connection.query(`Begin Transaction`, done));

@@ -317,3 +353,3 @@

connection.query(`INSERT INTO ${qualifiedTable} (${fields.join(',')})
SELECT row_number() over () + ${rowId}, ${allColumns.map(column => `coalesce(staging.${column}, prev.${column})`)}, ${dwClient.auditdate} as ${columnConfig._auditdate}, case when changes.isNew then '1900-01-01 00:00:00' else now() END as ${columnConfig._startdate}, '9999-01-01 00:00:00' as ${columnConfig._enddate}, true as ${columnConfig._current}
SELECT nextval('${table}_skey_seq'), ${allColumns.map(column => `coalesce(staging.${column}, prev.${column})`)}, ${dwClient.auditdate} as ${columnConfig._auditdate}, case when changes.isNew then '1900-01-01 00:00:00'::timestamp else ${dwClient.auditdate}::timestamp END as ${columnConfig._startdate}, '9999-01-01 00:00:00' as ${columnConfig._enddate}, true as ${columnConfig._current}
FROM ${qualifiedStagingTable}_changes changes

@@ -330,3 +366,3 @@ JOIN ${qualifiedStagingTable} staging on ${nk.map(id => `staging.${id} = changes.${id}`).join(' and ')}

let columns = scd1.map(column => `"${column}" = coalesce(staging."${column}", prev."${column}")`).concat(scd6.map(column => `"current_${column}" = coalesce(staging."${column}", prev."${column}")`));
columns.push(`"${columnConfig._enddate}" = case when changes.runSCD2 =1 then now() else prev."${columnConfig._enddate}" END`);
columns.push(`"${columnConfig._enddate}" = case when changes.runSCD2 =1 then ${dwClient.auditdate}::timestamp else prev."${columnConfig._enddate}" END`);
columns.push(`"${columnConfig._current}" = case when changes.runSCD2 =1 then false else prev."${columnConfig._current}" END`);

@@ -339,8 +375,7 @@ columns.push(`"${columnConfig._auditdate}" = ${dwClient.auditdate}`);

where ${nk.map(id=>`prev.${id} = changes.${id}`).join(' and ')} and prev.${columnConfig._startdate} != now() and changes.isNew = false /*Need to make sure we are only updating the ones not just inserted through SCD2 otherwise we run into issues with multiple rows having .${columnConfig._current}*/
and (changes.runSCD1=1 OR changes.runSCD6=1 OR changes.runSCD2=1)
and prev.${columnConfig._current}
and (changes.runSCD1=1 OR changes.runSCD6=1 OR changes.runSCD2=1)
`, done);
});
tasks.push(done => connection.query(`drop table ${qualifiedStagingTable}_changes`, done));
tasks.push(done => connection.query(`drop table ${qualifiedStagingTable}`, done));
async.series(tasks, err => {

@@ -384,2 +419,9 @@ if (!err) {

if (field.dimension && !isDate[field.dimension]) {
const factTable = table;
if (!field.on) {
const dimId = field.dim_column.replace(/_key$/, '_id');
field.on = {
[dimId]: dimId
};
}
if (!(unions[field.dimension])) {

@@ -391,4 +433,11 @@ unions[field.dimension] = [];

}
let dimTableNk = tableNks[field.dimension][0];
unions[field.dimension].push(`select ${table}.${column} as id from ${table} left join ${field.dimension} on ${field.dimension}.${dimTableNk} = ${table}.${column} where ${field.dimension}.${dimTableNk} is null and ${table}.${columnConfig._auditdate} = ${dwClient.auditdate}`);
const factNks = Object.keys(field.on);
unions[field.dimension].push(`
select ${factNks.map((fnk, i)=> `${factTable}.${fnk} as id${i}`).join(', ')}
from ${factTable}
left join ${field.dimension}
on ${factNks.map((fnk)=> `${factTable}.${fnk} = ${field.dimension}.${field.on[fnk]}`).join(' AND ')}
where ${factNks.map((fnk)=> `${field.dimension}.${field.on[fnk]} is null`).join(' AND ')}
and ${factTable}.${columnConfig._auditdate} = ${dwClient.auditdate}
`);
}

@@ -399,3 +448,3 @@ });

let sk = tableSks[table];
let nk = tableNks[table][0];
let nks = tableNks[table];
return (callback) => {

@@ -409,12 +458,11 @@ let done = (err, data) => {

trx = transaction;
transaction.query(`select max(${sk}) as maxid from ${table}`, (err, results) => {
if (err) {
return done(err);
}
let rowId = results[0].maxid || 10000;
let _auditdate = dwClient.auditdate; //results[0]._auditdate ? `'${results[0]._auditdate.replace(/^\d* */,"")}'` : "now()";
let unionQuery = unions[table].join("\nUNION\n");
transaction.query(`insert into ${table} (${sk}, ${nk}, ${columnConfig._auditdate}, ${columnConfig._startdate}, ${columnConfig._enddate}, ${columnConfig._current}) select row_number() over () + ${rowId}, sub.id, max(${_auditdate})::timestamp, '1900-01-01 00:00:00', '9999-01-01 00:00:00', true from (${unionQuery}) as sub where sub.id is not null group by sub.id`, (err) => {
done(err);
});
let _auditdate = dwClient.auditdate; //results[0]._auditdate ? `'${results[0]._auditdate.replace(/^\d* */,"")}'` : "now()";
let unionQuery = unions[table].join("\nUNION\n");
const insertQuery = `
insert into ${table} (${sk}, ${nks.join(', ')}, ${columnConfig._auditdate}, ${columnConfig._startdate}, ${columnConfig._enddate}, ${columnConfig._current})
select nextval('${table}_skey_seq'), ${nks.map((_, i)=> `sub.id${i}`).join(', ')}, max(${_auditdate})::timestamp, '1900-01-01 00:00:00', '9999-01-01 00:00:00', true from (${unionQuery}) as sub
where ${nks.map((_, i)=> `sub.id${i} is not null`).join(' OR ')} group by ${nks.map((_, i)=> `sub.id${i}`).join(', ')}`;
// console.log(insertQuery)
transaction.query(insertQuery, (err) => {
done(err);
});

@@ -471,3 +519,3 @@ }).catch(done);

where ${nk.map(id=>`dm.${id} = t.${id}`).join(' and ')}
AND dm.${columnConfig._auditdate} = ${dwClient.auditdate} AND t.${columnConfig._auditdate} = ${dwClient.auditdate}
AND dm.${columnConfig._auditdate} = ${dwClient.auditdate}
`, done);

@@ -490,2 +538,17 @@ } else {

tableResults[table] = "Unmodified";
const isDimension = structures[table].isDimension;
const sks = Object.keys(structures[table].structure).filter(f => structures[table].structure[f] === 'sk');
const hasSk = sks.length > 0;
if (isDimension && hasSk) {
const sk = sks[0];
tasks.push(done => {
client.query(`select max(${sk}) as maxid from ${table}`, (err, results) => {
if (err) {
return done(err);
}
const rowId = results[0] ? results[0].maxid || 10000 : 10000;
client.query(`create sequence if not exists ${table}_skey_seq start ${rowId + 1};`, done);
});
});
}
tasks.push(done => {

@@ -492,0 +555,0 @@ client.describeTable(table, (err, fields) => {

{
"name": "leo-connector-postgres",
"version": "2.0.0",
"version": "2.0.1-218-g57d761c",
"description": "A Postgres database connector for use with Leo Platform",

@@ -12,3 +12,5 @@ "repository": {

"test": "mocha test/unit/**/*.test.js",
"test:int": "mocha test/integration/**/*.test.js"
"test:int": "npm run test:docker && sleep 5 && mocha test/integration/**/*.test.js",
"test:docker": "docker container inspect --format='{{.Config.Hostname}}' leo-connector-postgres-integration-testor || docker run --name leo-connector-postgres-integration-testor -e POSTGRES_PASSWORD=mytestpassword -p 5432:5432 -d postgres:10",
"test:docker:clean": "docker container inspect --format='{{.Config.Hostname}}' leo-connector-postgres-integration-testor; if [ $? -eq 0 ]; then docker stop leo-connector-postgres-integration-testor && docker rm leo-connector-postgres-integration-testor; fi"
},

@@ -23,9 +25,8 @@ "author": "",

"fast-csv": "^2.4.1",
"leo-connector-common": "^2.0.0",
"leo-sdk": "^2.2.4",
"leo-connector-common": "entrata-beta",
"leo-sdk": "^2.2.8",
"leo-streams": "^1.1.1",
"pg": "^7.8.0",
"pg-copy-streams": "^1.2.0",
"pg-format": "^1.0.4",
"pg-native": "^3.0.0"
"pg-format": "^1.0.4"
},

@@ -36,3 +37,4 @@ "devDependencies": {

"mocha": "^5.2.0",
"sinon": "^7.2.3"
"sinon": "^7.2.3",
"stream-array": "^1.1.2"
},

@@ -39,0 +41,0 @@ "config": {