leo-connector-postgres
Advanced tools
Comparing version
@@ -575,2 +575,2 @@ const { | ||
return client; | ||
} | ||
} |
@@ -30,3 +30,22 @@ "use strict"; | ||
function deletesSetup(qualifiedTable, schema, field, value, where = "") { | ||
const toDeleteToken = (field) => { | ||
if (Array.isArray(field)) { | ||
return field.join('_'); | ||
} | ||
return field; | ||
}; | ||
const toEscapedId = (id) => { | ||
if (typeof id === 'object') { | ||
const escapedIds = Object.assign({},id); | ||
Object.keys(id).forEach(k => { | ||
escapedIds[k] = client.escapeValueNoToLower(id[k]); | ||
}); | ||
return escapedIds; | ||
} else { | ||
return client.escapeValueNoToLower(id); | ||
} | ||
}; | ||
function deletesSetup(qualifiedTable, schema, field, value, whereCurrent = "") { | ||
let colLookup = {}; | ||
@@ -38,5 +57,17 @@ schema.map(col => { | ||
let toDeleteCount = 0; | ||
if (where) { | ||
where = `and ${where}`; | ||
if (whereCurrent) { | ||
whereCurrent = `and ${whereCurrent}`; | ||
} | ||
const isDeletable = (field, id) => { | ||
let deletable = (id !== undefined); | ||
if (Array.isArray(field)) { | ||
field.forEach(f => { | ||
deletable = deletable && colLookup[f]; | ||
}); | ||
} else { | ||
deletable = deletable && colLookup[field]; | ||
} | ||
return deletable; | ||
}; | ||
@@ -46,3 +77,12 @@ function tryFlushDelete(done, force = false) { | ||
let deleteTasks = Object.keys(toDelete).map(col => { | ||
return deleteDone => client.query(`update ${qualifiedTable} set ${field} = ${value} where ${col} in (${toDelete[col].join(",")}) ${where}`, deleteDone); | ||
const idsToDelete = toDelete[col]; | ||
let whereClause = `where ${col} in (${toDelete[col].join(",")}) ${whereCurrent}`; | ||
if (typeof idsToDelete[0] === 'object') { | ||
whereClause = `where (${Object.keys(idsToDelete[0]).join(', ')}) | ||
in ((${idsToDelete.map(idObj => Object.values(idObj).join(',')).join('),(')}))`; | ||
} | ||
return deleteDone => client.query(` | ||
update ${qualifiedTable} set ${field} = ${value}, ${columnConfig._current} = false | ||
${whereClause} ${whereCurrent} | ||
`, deleteDone); | ||
}); | ||
@@ -62,10 +102,10 @@ async.parallelLimit(deleteTasks, 1, (err) => { | ||
return { | ||
add: function(obj, done) { | ||
let field = obj.__leo_delete__; | ||
let id = obj.__leo_delete_id__; | ||
if (id !== undefined && colLookup[field]) { | ||
if (!(field in toDelete)) { | ||
toDelete[field] = []; | ||
add: function (obj, done) { | ||
let token = toDeleteToken(obj.__leo_delete__); | ||
let id = toEscapedId(obj.__leo_delete_id__); | ||
if (isDeletable(obj.__leo_delete__, obj.__leo_delete_id__)) { | ||
if (!(token in toDelete)) { | ||
toDelete[token] = []; | ||
} | ||
toDelete[field].push(client.escapeValueNoToLower(id)); | ||
toDelete[token].push(id); | ||
toDeleteCount++; | ||
@@ -83,3 +123,3 @@ tryFlushDelete(done); | ||
client.importFact = function(stream, table, ids, callback, tableDef = {}) { | ||
client.importFact = function(stream, table, ids, callback) { | ||
const stagingTable = `staging_${table}`; | ||
@@ -106,18 +146,22 @@ const qualifiedStagingTable = `${columnConfig.stageSchema}.${stagingTable}`; | ||
tasks.push(done => { | ||
ls.pipe(stream, ls.through((obj, done, push) => { | ||
if (obj.__leo_delete__) { | ||
if (obj.__leo_delete__ == "id") { | ||
push(obj); | ||
ls.pipe( | ||
stream, | ||
ls.through((obj, done, push) => { | ||
if (obj.__leo_delete__) { | ||
if (obj.__leo_delete__ == "id") { | ||
push(obj); | ||
} | ||
deleteHandler.add(obj, done); | ||
} else { | ||
done(null, obj); | ||
} | ||
deleteHandler.add(obj, done); | ||
} else { | ||
done(null, obj); | ||
} | ||
}), client.streamToTable(qualifiedStagingTable), (err) => { | ||
if (err) { | ||
return done(err); | ||
} else { | ||
deleteHandler.flush(done); | ||
} | ||
}); | ||
}), | ||
client.streamToTable(qualifiedStagingTable), | ||
(err) => { | ||
if (err) { | ||
return done(err); | ||
} else { | ||
deleteHandler.flush(done); | ||
} | ||
}); | ||
}); | ||
@@ -189,3 +233,3 @@ | ||
client.importDimension = function(stream, table, sk, nk, scds, callback, tableDef = {}) { | ||
client.importDimension = function(stream, table, sk, nk, scds, callback) { | ||
const stagingTbl = `staging_${table}`; | ||
@@ -214,19 +258,24 @@ const qualifiedStagingTable = `${columnConfig.stageSchema}.${stagingTbl}`; | ||
//tasks.push(done => ls.pipe(stream, client.streamToTable(qualifiedStagingTable), done)); | ||
tasks.push(done => { | ||
ls.pipe(stream, ls.through((obj, done, push) => { | ||
if (obj.__leo_delete__) { | ||
if (obj.__leo_delete__ == "id") { | ||
push(obj); | ||
ls.pipe( | ||
stream, | ||
ls.through((obj, done, push) => { | ||
if (obj.__leo_delete__) { | ||
if (obj.__leo_delete__ == "id") { | ||
push(obj); | ||
} | ||
deleteHandler.add(obj, done); | ||
} else { | ||
done(null, obj); | ||
} | ||
deleteHandler.add(obj, done); | ||
} else { | ||
done(null, obj); | ||
} | ||
}), client.streamToTable(qualifiedStagingTable), (err) => { | ||
if (err) { | ||
return done(err); | ||
} else { | ||
deleteHandler.flush(done); | ||
} | ||
}); | ||
}), | ||
client.streamToTable(qualifiedStagingTable), | ||
(err) => { | ||
if (err) { | ||
return done(err); | ||
} else { | ||
deleteHandler.flush(done); | ||
} | ||
}); | ||
}); | ||
@@ -295,18 +344,5 @@ | ||
let tasks = []; | ||
let rowId = null; | ||
let totalRecords = 0; | ||
let totalRecords = 0; // TODO: This is not used to count records as it should | ||
tasks.push(done => connection.query(`analyze ${qualifiedStagingTable}_changes`, done)); | ||
tasks.push(done => { | ||
connection.query(`select max(${sk}) as maxid from ${qualifiedTable}`, (err, results) => { | ||
if (err) { | ||
return done(err); | ||
} | ||
rowId = results[0].maxid || 10000; | ||
totalRecords = (rowId - 10000); | ||
done(); | ||
}); | ||
}); | ||
//The following code relies on the fact that now() will return the same time during all transaction events | ||
tasks.push(done => connection.query(`Begin Transaction`, done)); | ||
@@ -317,3 +353,3 @@ | ||
connection.query(`INSERT INTO ${qualifiedTable} (${fields.join(',')}) | ||
SELECT row_number() over () + ${rowId}, ${allColumns.map(column => `coalesce(staging.${column}, prev.${column})`)}, ${dwClient.auditdate} as ${columnConfig._auditdate}, case when changes.isNew then '1900-01-01 00:00:00' else now() END as ${columnConfig._startdate}, '9999-01-01 00:00:00' as ${columnConfig._enddate}, true as ${columnConfig._current} | ||
SELECT nextval('${table}_skey_seq'), ${allColumns.map(column => `coalesce(staging.${column}, prev.${column})`)}, ${dwClient.auditdate} as ${columnConfig._auditdate}, case when changes.isNew then '1900-01-01 00:00:00'::timestamp else ${dwClient.auditdate}::timestamp END as ${columnConfig._startdate}, '9999-01-01 00:00:00' as ${columnConfig._enddate}, true as ${columnConfig._current} | ||
FROM ${qualifiedStagingTable}_changes changes | ||
@@ -330,3 +366,3 @@ JOIN ${qualifiedStagingTable} staging on ${nk.map(id => `staging.${id} = changes.${id}`).join(' and ')} | ||
let columns = scd1.map(column => `"${column}" = coalesce(staging."${column}", prev."${column}")`).concat(scd6.map(column => `"current_${column}" = coalesce(staging."${column}", prev."${column}")`)); | ||
columns.push(`"${columnConfig._enddate}" = case when changes.runSCD2 =1 then now() else prev."${columnConfig._enddate}" END`); | ||
columns.push(`"${columnConfig._enddate}" = case when changes.runSCD2 =1 then (${dwClient.auditdate}::timestamp - '1 usec'::interval) else prev."${columnConfig._enddate}" END`); | ||
columns.push(`"${columnConfig._current}" = case when changes.runSCD2 =1 then false else prev."${columnConfig._current}" END`); | ||
@@ -338,9 +374,8 @@ columns.push(`"${columnConfig._auditdate}" = ${dwClient.auditdate}`); | ||
JOIN ${qualifiedStagingTable} staging on ${nk.map(id => `staging.${id} = changes.${id}`).join(' and ')} | ||
where ${nk.map(id=>`prev.${id} = changes.${id}`).join(' and ')} and prev.${columnConfig._startdate} != now() and changes.isNew = false /*Need to make sure we are only updating the ones not just inserted through SCD2 otherwise we run into issues with multiple rows having .${columnConfig._current}*/ | ||
and (changes.runSCD1=1 OR changes.runSCD6=1 OR changes.runSCD2=1) | ||
where ${nk.map(id=>`prev.${id} = changes.${id}`).join(' and ')} and prev.${columnConfig._startdate} < ${dwClient.auditdate} and changes.isNew = false /*Need to make sure we are only updating the ones not just inserted through SCD2 otherwise we run into issues with multiple rows having .${columnConfig._current}*/ | ||
and prev.${columnConfig._current} | ||
and (changes.runSCD1=1 OR changes.runSCD6=1 OR changes.runSCD2=1) | ||
`, done); | ||
}); | ||
tasks.push(done => connection.query(`drop table ${qualifiedStagingTable}_changes`, done)); | ||
tasks.push(done => connection.query(`drop table ${qualifiedStagingTable}`, done)); | ||
async.series(tasks, err => { | ||
@@ -384,2 +419,9 @@ if (!err) { | ||
if (field.dimension && !isDate[field.dimension]) { | ||
const factTable = table; | ||
if (!field.on) { | ||
const dimId = field.dim_column.replace(/_key$/, '_id'); | ||
field.on = { | ||
[dimId]: dimId | ||
}; | ||
} | ||
if (!(unions[field.dimension])) { | ||
@@ -391,4 +433,11 @@ unions[field.dimension] = []; | ||
} | ||
let dimTableNk = tableNks[field.dimension][0]; | ||
unions[field.dimension].push(`select ${table}.${column} as id from ${table} left join ${field.dimension} on ${field.dimension}.${dimTableNk} = ${table}.${column} where ${field.dimension}.${dimTableNk} is null and ${table}.${columnConfig._auditdate} = ${dwClient.auditdate}`); | ||
const factNks = Object.keys(field.on); | ||
unions[field.dimension].push(` | ||
select ${factNks.map((fnk, i)=> `${factTable}.${fnk} as id${i}`).join(', ')} | ||
from ${factTable} | ||
left join ${field.dimension} | ||
on ${factNks.map((fnk)=> `${factTable}.${fnk} = ${field.dimension}.${field.on[fnk]}`).join(' AND ')} | ||
where ${factNks.map((fnk)=> `${field.dimension}.${field.on[fnk]} is null`).join(' AND ')} | ||
and ${factTable}.${columnConfig._auditdate} = ${dwClient.auditdate} | ||
`); | ||
} | ||
@@ -399,3 +448,3 @@ }); | ||
let sk = tableSks[table]; | ||
let nk = tableNks[table][0]; | ||
let nks = tableNks[table]; | ||
return (callback) => { | ||
@@ -409,12 +458,11 @@ let done = (err, data) => { | ||
trx = transaction; | ||
transaction.query(`select max(${sk}) as maxid from ${table}`, (err, results) => { | ||
if (err) { | ||
return done(err); | ||
} | ||
let rowId = results[0].maxid || 10000; | ||
let _auditdate = dwClient.auditdate; //results[0]._auditdate ? `'${results[0]._auditdate.replace(/^\d* */,"")}'` : "now()"; | ||
let unionQuery = unions[table].join("\nUNION\n"); | ||
transaction.query(`insert into ${table} (${sk}, ${nk}, ${columnConfig._auditdate}, ${columnConfig._startdate}, ${columnConfig._enddate}, ${columnConfig._current}) select row_number() over () + ${rowId}, sub.id, max(${_auditdate})::timestamp, '1900-01-01 00:00:00', '9999-01-01 00:00:00', true from (${unionQuery}) as sub where sub.id is not null group by sub.id`, (err) => { | ||
done(err); | ||
}); | ||
let _auditdate = dwClient.auditdate; //results[0]._auditdate ? `'${results[0]._auditdate.replace(/^\d* */,"")}'` : "now()"; | ||
let unionQuery = unions[table].join("\nUNION\n"); | ||
const insertQuery = ` | ||
insert into ${table} (${sk}, ${nks.join(', ')}, ${columnConfig._auditdate}, ${columnConfig._startdate}, ${columnConfig._enddate}, ${columnConfig._current}) | ||
select nextval('${table}_skey_seq'), ${nks.map((_, i)=> `sub.id${i}`).join(', ')}, max(${_auditdate})::timestamp, '1900-01-01 00:00:00', '9999-01-01 00:00:00', true from (${unionQuery}) as sub | ||
where ${nks.map((_, i)=> `sub.id${i} is not null`).join(' OR ')} group by ${nks.map((_, i)=> `sub.id${i}`).join(', ')}`; | ||
// console.log(insertQuery) | ||
transaction.query(insertQuery, (err) => { | ||
done(err); | ||
}); | ||
@@ -471,3 +519,3 @@ }).catch(done); | ||
where ${nk.map(id=>`dm.${id} = t.${id}`).join(' and ')} | ||
AND dm.${columnConfig._auditdate} = ${dwClient.auditdate} | ||
AND dm.${columnConfig._auditdate} = ${dwClient.auditdate} AND t.${columnConfig._auditdate} = ${dwClient.auditdate} | ||
`, done); | ||
@@ -490,2 +538,17 @@ } else { | ||
tableResults[table] = "Unmodified"; | ||
const isDimension = structures[table].isDimension; | ||
const sks = Object.keys(structures[table].structure).filter(f => structures[table].structure[f] === 'sk'); | ||
const hasSk = sks.length > 0; | ||
if (isDimension && hasSk) { | ||
const sk = sks[0]; | ||
tasks.push(done => { | ||
client.query(`select max(${sk}) as maxid from ${table}`, (err, results) => { | ||
if (err) { | ||
return done(err); | ||
} | ||
const rowId = results[0] ? results[0].maxid || 10000 : 10000; | ||
client.query(`create sequence if not exists ${table}_skey_seq start ${rowId + 1};`, done); | ||
}); | ||
}); | ||
} | ||
tasks.push(done => { | ||
@@ -492,0 +555,0 @@ client.describeTable(table, (err, fields) => { |
{ | ||
"name": "leo-connector-postgres", | ||
"version": "2.0.2", | ||
"version": "2.0.3-240-g6b01a4e", | ||
"description": "A Postgres database connector for use with Leo Platform", | ||
@@ -12,3 +12,5 @@ "repository": { | ||
"test": "mocha test/unit/**/*.test.js", | ||
"test:int": "mocha test/integration/**/*.test.js" | ||
"test:int": "npm run test:docker && sleep 5 && mocha test/integration/**/*.test.js", | ||
"test:docker": "docker container inspect --format='{{.Config.Hostname}}' leo-connector-postgres-integration-testor || docker run --name leo-connector-postgres-integration-testor -e POSTGRES_PASSWORD=mytestpassword -p 5432:5432 -d postgres:10", | ||
"test:docker:clean": "docker container inspect --format='{{.Config.Hostname}}' leo-connector-postgres-integration-testor; if [ $? -eq 0 ]; then docker stop leo-connector-postgres-integration-testor && docker rm leo-connector-postgres-integration-testor; fi" | ||
}, | ||
@@ -23,3 +25,3 @@ "author": "", | ||
"fast-csv": "^2.4.1", | ||
"leo-connector-common": "^2.0.0", | ||
"leo-connector-common": "entrata", | ||
"leo-sdk": "^2.2.8", | ||
@@ -35,3 +37,4 @@ "leo-streams": "^1.1.1", | ||
"mocha": "^5.2.0", | ||
"sinon": "^7.2.3" | ||
"sinon": "^7.2.3", | ||
"stream-array": "^1.1.2" | ||
}, | ||
@@ -38,0 +41,0 @@ "config": { |
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
132059
1.79%4149
1.47%5
25%1
Infinity%+ Added
+ Added
+ Added
+ Added
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
Updated