@wmfs/pg-delta-file
Advanced tools
Comparing version 1.4.0 to 1.5.0
@@ -0,1 +1,26 @@ | ||
# [1.5.0](https://github.com/wmfs/pg-delta-file/compare/v1.4.0...v1.5.0) (2018-09-05) | ||
### ✨ Features | ||
* pg-delta-file takes option filterFunction, which can be used to strip out unwanted rows ([9671498](https://github.com/wmfs/pg-delta-file/commit/9671498)) | ||
### 🛠 Builds | ||
* **deps-dev:** update [@semantic-release](https://github.com/semantic-release)/git requirement from 7.0.1 to 7.0.2 ([e3bc519](https://github.com/wmfs/pg-delta-file/commit/e3bc519)) | ||
* **deps-dev:** update [@semantic-release](https://github.com/semantic-release)/git requirement from 7.0.2 to 7.0.3 ([df5d0fe](https://github.com/wmfs/pg-delta-file/commit/df5d0fe)) | ||
* **deps-dev:** update codecov requirement from 3.0.4 to 3.1.0 ([19066d5](https://github.com/wmfs/pg-delta-file/commit/19066d5)) | ||
* **deps-dev:** update nyc requirement from 12.0.2 to 13.0.1 ([83fcdf7](https://github.com/wmfs/pg-delta-file/commit/83fcdf7)) | ||
* **deps-dev:** update semantic-release requirement from 15.9.11 to 15.9.12 ([887b2b7](https://github.com/wmfs/pg-delta-file/commit/887b2b7)) | ||
* **deps-dev:** update semantic-release requirement from 15.9.8 to 15.9.9 ([d54d837](https://github.com/wmfs/pg-delta-file/commit/d54d837)) | ||
* **deps-dev:** update semantic-release requirement from 15.9.9 to 15.9.11 ([3f56985](https://github.com/wmfs/pg-delta-file/commit/3f56985)) | ||
* **dev-deps:** Move to standard 12.0.1 ([0d92228](https://github.com/wmfs/pg-delta-file/commit/0d92228)) | ||
### 🚨 Tests | ||
* New (failing) test for filterFunction ([a3aa474](https://github.com/wmfs/pg-delta-file/commit/a3aa474)) | ||
* table driven tests ([15f9230](https://github.com/wmfs/pg-delta-file/commit/15f9230)) | ||
# [1.4.0](https://github.com/wmfs/pg-delta-file/compare/v1.3.0...v1.4.0) (2018-08-14) | ||
@@ -2,0 +27,0 @@ |
@@ -6,3 +6,3 @@ const fs = require('fs') | ||
module.exports = async function generateDelta (options) { | ||
const deltaFileWriteStream = fs.createWriteStream(options.outputFilepath, {defaultEncoding: 'utf8'}) | ||
const deltaFileWriteStream = fs.createWriteStream(options.outputFilepath, { defaultEncoding: 'utf8' }) | ||
const info = { | ||
@@ -30,3 +30,3 @@ totalCount: 0 | ||
const csvTransformer = new Transformer(info, model, options) | ||
csvTransformer.pipe(outStream, {end: false}) | ||
csvTransformer.pipe(outStream, { end: false }) | ||
csvTransformer.write('DUMMY') | ||
@@ -44,5 +44,5 @@ } // writeHeaderLine | ||
const csvTransformer = new Transformer(info, model, options) | ||
dbStream.pipe(csvTransformer).pipe(outStream, {end: false}) | ||
dbStream.pipe(csvTransformer).pipe(outStream, { end: false }) | ||
} | ||
return options.client.run([{sql: sql, params: [options.since], action: csvTransform}]) | ||
return options.client.run([{ sql: sql, params: [options.since], action: csvTransform }]) | ||
} // extractModel | ||
@@ -49,0 +49,0 @@ |
@@ -7,58 +7,47 @@ const Transform = require('stream').Transform | ||
constructor (info, model, options) { | ||
super({objectMode: true}) | ||
super({ objectMode: true }) | ||
this.info = info | ||
this.transformerFn = options.transformFunction ? options.transformFunction : identityTransform | ||
this.filterFn = options.filterFunction ? options.filterFunction : () => true | ||
const transformers = [] | ||
this.transformers = options.csvExtracts[model].map(csvColumnSource => { | ||
switch (csvColumnSource[0]) { | ||
case '$': | ||
const functionName = csvColumnSource.slice(1) | ||
switch (functionName) { | ||
case 'ROW_NUM': | ||
return (row, info) => info.totalCount | ||
case 'ACTION': | ||
return row => { | ||
// TODO: handle deleted action | ||
const createdCol = options.createdColumnName || '_created' | ||
const modifiedCol = options.modifiedColumnName || '_modified' | ||
const created = new Date(row[createdCol]) | ||
const modified = new Date(row[modifiedCol]) | ||
const since = new Date(options.since) | ||
options.csvExtracts[model].forEach( | ||
function (csvColumnSource) { | ||
switch (csvColumnSource[0]) { | ||
case '$': | ||
const functionName = csvColumnSource.slice(1) | ||
switch (functionName) { | ||
case 'ROW_NUM': | ||
transformers.push((row, info) => info.totalCount) | ||
break | ||
case 'ACTION': | ||
transformers.push(row => { | ||
// TODO: handle deleted action | ||
const createdCol = options.createdColumnName || '_created' | ||
const modifiedCol = options.modifiedColumnName || '_modified' | ||
const created = new Date(row[createdCol]) | ||
const modified = new Date(row[modifiedCol]) | ||
const since = new Date(options.since) | ||
if (modified >= since && created >= since) { | ||
return options.actionAliases.insert | ||
} | ||
if (modified >= since && created <= since) { | ||
return options.actionAliases.update | ||
} | ||
}) | ||
break | ||
case 'TIMESTAMP': | ||
transformers.push(() => DateTime.local().toLocaleString(DateTime.TIME_24_WITH_SECONDS)) | ||
break | ||
case 'DATESTAMP': | ||
transformers.push(() => DateTime.local().toISODate()) | ||
break | ||
case 'DATETIMESTAMP': | ||
transformers.push(() => DateTime.local().toISO()) | ||
break | ||
default: | ||
transformers.push(() => `Unknown fn $${functionName}`) | ||
} | ||
break | ||
case '@': | ||
const columnName = csvColumnSource.slice(1) | ||
transformers.push(row => row[columnName]) | ||
break | ||
default: | ||
transformers.push(() => csvColumnSource) | ||
} | ||
if (modified >= since && created >= since) { | ||
return options.actionAliases.insert | ||
} | ||
if (modified >= since && created <= since) { | ||
return options.actionAliases.update | ||
} | ||
} | ||
case 'TIMESTAMP': | ||
return () => DateTime.local().toLocaleString(DateTime.TIME_24_WITH_SECONDS) | ||
case 'DATESTAMP': | ||
return () => DateTime.local().toISODate() | ||
case 'DATETIMESTAMP': | ||
return () => DateTime.local().toISO() | ||
default: | ||
return () => `Unknown fn $${functionName}` | ||
} | ||
case '@': | ||
const columnName = csvColumnSource.slice(1) | ||
return row => row[columnName] | ||
default: | ||
return () => csvColumnSource | ||
} | ||
) | ||
this.transformers = transformers | ||
} | ||
}) | ||
} // constructor | ||
@@ -71,2 +60,7 @@ _transform (sourceRow, encoding, callback) { | ||
if (!this.filterFn(outputValues)) { | ||
this.info.totalCount-- // I'm not a super-fan of having to diddle this back, but there we are | ||
return callback(null, null) | ||
} | ||
this.transformerFn( | ||
@@ -76,3 +70,3 @@ outputValues, | ||
) | ||
} | ||
} // _transform | ||
} | ||
@@ -79,0 +73,0 @@ |
{ | ||
"name": "@wmfs/pg-delta-file", | ||
"version": "1.4.0", | ||
"version": "1.5.0", | ||
"description": "Outputs change-only-update CSV files (or “delta” files) that contain all the necessary actions required to re-synchronize rows in a cloned table.", | ||
@@ -32,11 +32,11 @@ "author": "West Midlands Fire Service", | ||
"chai": "4.1.2", | ||
"codecov": "3.0.4", | ||
"codecov": "3.1.0", | ||
"conventional-changelog-metahub": "2.0.2", | ||
"cz-conventional-changelog": "2.1.0", | ||
"mocha": "5.2.0", | ||
"nyc": "12.0.2", | ||
"semantic-release": "15.9.8", | ||
"standard": "11.0.1", | ||
"nyc": "13.0.1", | ||
"semantic-release": "15.9.12", | ||
"standard": "12.0.1", | ||
"@semantic-release/changelog": "3.0.0", | ||
"@semantic-release/git": "7.0.1", | ||
"@semantic-release/git": "7.0.3", | ||
"@wmfs/hl-pg-client": "1.1.3" | ||
@@ -43,0 +43,0 @@ }, |
@@ -33,2 +33,3 @@ # pg-delta-file | ||
transformFunction: (row, callback) => { ... } // optional data transformation | ||
filterFunction: (row) => { ... } // option filter predicate | ||
csvExtracts: { | ||
@@ -35,0 +36,0 @@ '[schema.]people': [ |
@@ -37,12 +37,10 @@ /* eslint-env mocha */ | ||
describe('pg-delta-file', () => { | ||
it('generate the delta file', async () => { | ||
const outputFile = path.resolve(__dirname, 'output', 'single-delta.csv') | ||
const expectedFile = path.resolve(__dirname, 'fixtures', 'expected', 'single-delta.csv') | ||
const info = await generateDelta( | ||
{ | ||
const tests = [ | ||
{ | ||
name: 'single delta', | ||
file: 'single-delta.csv', | ||
count: 5, | ||
delta: { | ||
namespace: 'springfield', | ||
client: client, | ||
since: '2016-06-03 15:02:38.000000 GMT', | ||
outputFilepath: outputFile, | ||
actionAliases: { | ||
@@ -65,22 +63,10 @@ insert: 'i', | ||
} | ||
) | ||
expect(info.totalCount).to.eql(5) | ||
const output = readRecords(outputFile) | ||
const expected = readRecords(expectedFile) | ||
expect(output).to.eql(expected) | ||
}) | ||
it('generate delta file with header', async () => { | ||
const outputFile = path.resolve(__dirname, 'output', 'with-header.csv') | ||
const expectedFile = path.resolve(__dirname, 'fixtures', 'expected', 'with-header.csv') | ||
const info = await generateDelta( | ||
{ | ||
}, | ||
{ | ||
name: 'delta file with header', | ||
file: 'with-header.csv', | ||
count: 7, | ||
delta: { | ||
namespace: 'springfield', | ||
client: client, | ||
since: '2016-06-03 15:02:38.000000 GMT', | ||
outputFilepath: outputFile, | ||
actionAliases: { | ||
@@ -115,22 +101,10 @@ insert: 'i', | ||
} | ||
) | ||
expect(info.totalCount).to.eql(7) | ||
const output = readRecords(outputFile) | ||
const expected = readRecords(expectedFile) | ||
expect(output).to.eql(expected) | ||
}) | ||
it('should generate delta file for both tables', async () => { | ||
const outputFile = path.resolve(__dirname, 'output', 'multiple-delta.csv') | ||
const expectedFile = path.resolve(__dirname, 'fixtures', 'expected', 'multiple-delta.csv') | ||
const info = await generateDelta( | ||
{ | ||
}, | ||
{ | ||
name: 'delta file for both tables', | ||
file: 'multiple-delta.csv', | ||
count: 6, | ||
delta: { | ||
namespace: 'springfield', // to be inferred | ||
client: client, | ||
since: '2017-06-02 15:02:38.000000 GMT', | ||
outputFilepath: path.resolve(__dirname, './output', './multiple-delta.csv'), | ||
actionAliases: { | ||
@@ -158,21 +132,10 @@ insert: 'i', | ||
} | ||
) | ||
expect(info.totalCount).to.eql(6) | ||
const output = readRecords(outputFile) | ||
const expected = readRecords(expectedFile) | ||
expect(output).to.eql(expected) | ||
}) | ||
it('generate the delta file with transformer', async () => { | ||
const outputFile = path.resolve(__dirname, 'output', 'upper-cased.csv') | ||
const expectedFile = path.resolve(__dirname, 'fixtures', 'expected', 'upper-cased.csv') | ||
const info = await generateDelta( | ||
{ | ||
}, | ||
{ | ||
name: 'delta file with transformer', | ||
file: 'upper-cased.csv', | ||
count: 5, | ||
delta: { | ||
namespace: 'springfield', | ||
client: client, | ||
since: '2016-06-03 15:02:38.000000 GMT', | ||
outputFilepath: outputFile, | ||
actionAliases: { | ||
@@ -200,11 +163,85 @@ insert: 'i', | ||
} | ||
) | ||
}, | ||
{ | ||
name: 'delta file with filter', | ||
file: 'filtered.csv', | ||
count: 3, | ||
delta: { | ||
namespace: 'springfield', | ||
since: '2016-06-03 15:02:38.000000 GMT', | ||
actionAliases: { | ||
insert: 'i', | ||
update: 'u', | ||
delete: 'd' | ||
}, | ||
filterFunction: function (row) { | ||
return +row[6] < 60 | ||
}, | ||
csvExtracts: { | ||
people: [ | ||
73, | ||
'$ACTION', | ||
'$ROW_NUM', | ||
'@social_security_id', | ||
'@first_name', | ||
'@last_name', | ||
'@age' | ||
] | ||
} | ||
} | ||
}, | ||
{ | ||
name: 'delta file with filter and transform', | ||
file: 'filtered-upper-cased.csv', | ||
count: 3, | ||
delta: { | ||
namespace: 'springfield', | ||
since: '2016-06-03 15:02:38.000000 GMT', | ||
actionAliases: { | ||
insert: 'i', | ||
update: 'u', | ||
delete: 'd' | ||
}, | ||
filterFunction: function (row) { | ||
return +row[6] < 60 | ||
}, | ||
transformFunction: function (row, callback) { | ||
row[4] = row[4].toUpperCase() | ||
row[5] = row[5].toUpperCase() | ||
callback(null, row) | ||
}, | ||
csvExtracts: { | ||
people: [ | ||
73, | ||
'$ACTION', | ||
'$ROW_NUM', | ||
'@social_security_id', | ||
'@first_name', | ||
'@last_name', | ||
'@age' | ||
] | ||
} | ||
} | ||
} | ||
] | ||
expect(info.totalCount).to.eql(5) | ||
for (const test of tests) { | ||
it(test.name, async () => { | ||
const outputFile = path.resolve(__dirname, 'output', test.file) | ||
const expectedFile = path.resolve(__dirname, 'fixtures', 'expected', test.file) | ||
test.delta.client = client | ||
test.delta.outputFilepath = outputFile | ||
const output = readRecords(outputFile) | ||
const expected = readRecords(expectedFile) | ||
const info = await generateDelta( | ||
test.delta | ||
) | ||
expect(output).to.eql(expected) | ||
}) | ||
expect(info.totalCount).to.eql(test.count) | ||
const output = readRecords(outputFile) | ||
const expected = readRecords(expectedFile) | ||
expect(output).to.eql(expected) | ||
}) | ||
} | ||
}) | ||
@@ -225,5 +262,5 @@ | ||
function readRecords (fileName) { | ||
const file = fs.readFileSync(fileName, {encoding: 'utf8'}) | ||
const file = fs.readFileSync(fileName, { encoding: 'utf8' }) | ||
const rows = file.split('\n') | ||
return rows | ||
} |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
30379
26
412
61