pg-copy-streams-binary
Advanced tools
Comparing version 1.0.1 to 1.0.2
module.exports = { | ||
deparser: require('./lib/deparser'), | ||
parser: require('./lib/parser'), | ||
transform: require('./lib/transform'), | ||
} |
@@ -14,6 +14,8 @@ /** | ||
var BufferPut = require('bufferput'); | ||
var deparse = require('./pg_types').deparse; | ||
var BufferPut = require('bufferput') | ||
var deparse = require('./pg_types').deparse | ||
var CopyStream = function(options) { | ||
options = options || {} | ||
options.objectMode = true | ||
Transform.call(this, options) | ||
@@ -20,0 +22,0 @@ |
@@ -17,2 +17,3 @@ /** | ||
var CopyStream = function(options) { | ||
options.objectMode = true; | ||
Transform.call(this, options) | ||
@@ -19,0 +20,0 @@ |
{ | ||
"name": "pg-copy-streams-binary", | ||
"version": "1.0.1", | ||
"version": "1.0.2", | ||
"description": "Streams for parsing and deparsing the COPY binary format", | ||
@@ -22,3 +22,5 @@ "main": "index.js", | ||
"ieee754": "^1.1.6", | ||
"int64-buffer": "^0.1.9" | ||
"int64-buffer": "^0.1.9", | ||
"multi-fork": "0.0.2", | ||
"through2": "^2.0.1" | ||
}, | ||
@@ -30,5 +32,4 @@ "devDependencies": { | ||
"pg": "^6.0.3", | ||
"pg-copy-streams": "^1.1.1", | ||
"through2": "^2.0.1" | ||
"pg-copy-streams": "^1.1.1" | ||
} | ||
} |
152
README.md
@@ -28,7 +28,12 @@ ## pg-copy-streams-binary | ||
## Examples | ||
The main API is called `transform` an tries to hide many of those details. It can be used to easily do non trivial things like : | ||
- transforming rows | ||
- expanding on the number of rows | ||
- forking rows into several databases at the same time, with the same of different structures | ||
## Example | ||
This library is mostly interesting for ETL operations (Extract, Transformation, Load). When you just need Extract+Load, `pg-copy-streams` does the job and you don't need this library. | ||
So Here is an example of Tranformation where you want to Extract data from database A (dsnA) and move it to database B (dsnB). But there is a twist. | ||
So Here is an example of Tranformation where you want to Extract data from database A (dsnA) and move it in two databases B (dsnB) and C (dsn C). But there is a twist. | ||
@@ -54,2 +59,12 @@ In database A, you have table of items | ||
Table C has the simple structure | ||
```sql | ||
CREATE TABLE generated (body text); | ||
``` | ||
And you want to fill it, for each source row, with a number `id` of rows (expanding the number of rows), with a body of "BODY: " + description. | ||
After all this is done, you want to add a line in the `generated` table with a body of "COUNT: " + total number of rows inserted (not counting this one) | ||
Here is a code that will do just this. | ||
@@ -59,6 +74,6 @@ | ||
var pg = require('pg'); | ||
var through2 = require('through2'); | ||
var copyOut = require('pg-copy-streams').to; | ||
var copyIn = require('pg-copy-streams').from; | ||
var parser = require('pg-copy-streams-binary').parser; | ||
var deparser = require('pg-copy-streams-binary').deparser; | ||
var pgCopyTransform = require('pg-copy-streams-binary').transform; | ||
@@ -73,50 +88,58 @@ var client = function(dsn) { | ||
var dsnB = null; // configure database B connection parameters | ||
var dsnC = null; // configure database C connection parameters | ||
var clientA = client(dsnA); | ||
var clientB = client(dsnB); | ||
var clientC = client(dsnC); | ||
var AStream = clientA.query(copyOut('COPY item TO STDOUT BINARY')) | ||
var Parser = new parser({ | ||
objectMode: true, | ||
mapping: [ | ||
{ key: 'id', type: 'int4' }, | ||
{ key: 'ref', type: 'text' }, | ||
{ key: 'description', type: 'text'}, | ||
]}) | ||
var Deparser = new deparser({ | ||
objectMode: true, | ||
mapping: [ | ||
function(row) { return { type: 'int4', value: parseInt(row.ref.split(':')[0])} }, | ||
function(row) { return { type: 'text', value: row.ref.split(':')[1] } }, | ||
function(row) { return { type: 'text', value: row.description.toLowerCase() } }, | ||
function(row) { | ||
var d = new Date('1999-01-01T00:00:00Z'); | ||
var numberOfDaysToAdd = parseInt(row.ref.split(':')[0]); | ||
d.setDate(d.getDate() + numberOfDaysToAdd); | ||
return { type: 'timestamptz', value: d } }, | ||
function(row) { | ||
var id = parseInt(row.ref.split(':')[0]); | ||
return { type: '_int2', value: [[id, id+1], [id+2, id+3]]} | ||
var AStream = clientA.query(copyOut('COPY item TO STDOUT BINARY')) | ||
var BStream = clientB.query(copyIn ('COPY product FROM STDIN BINARY')) | ||
var CStream = clientB.query(copyIn ('COPY generated FROM STDIN BINARY')) | ||
var transform = through2.obj( | ||
function(row, _, cb) { | ||
var id = parseInt(row.ref.split(':')[0]); | ||
var d = new Date('1999-01-01T00:00:00Z'); | ||
d.setDate(d.getDate() + id); | ||
count++ | ||
this.push([0, | ||
{ type: 'int4', value: id }, | ||
{ type: 'text', value: row.ref.split(':')[1] }, | ||
{ type: 'text', value: row.description.toLowerCase() }, | ||
{ type: 'timestamptz', value: d }, | ||
{ type: '_int2', value: [ [ id, id+1 ], [ id+2, id+3 ] ] } | ||
]) | ||
while (id > 0) { | ||
count++ | ||
this.push([1, | ||
{ type: 'text', value: 'BODY: ' + row.description } | ||
]); | ||
id--; | ||
} | ||
]}) | ||
var BStream = clientB.query(copyIn ('COPY product FROM STDIN BINARY')) | ||
cb() | ||
}, | ||
function(cb) { | ||
this.push([1, | ||
{ type: 'text', value: 'COUNT: ' + count} | ||
]) | ||
cb() | ||
} | ||
); | ||
var runStream = function(callback) { | ||
// listen for errors | ||
AStream.on('error', callback) | ||
Parser.on('error', callback) | ||
Deparser.on('error', callback) | ||
BStream.on('error', callback) | ||
var count = 0; | ||
var pct = pgCopyTransform({ | ||
mapping: [{key:'id',type:'int4'}, {key:'ref',type:'text'},{key:'description',type:'text'}], | ||
transform: transform, | ||
targets: [BStream, CStream], | ||
}); | ||
// listen for the end of ingestion | ||
BStream.on('finish', callback); | ||
AStream.pipe(Parser).pipe(Deparser).pipe(BStream); | ||
} | ||
runStream(function(err) { | ||
// done !! | ||
clientA.end() | ||
clientB.end() | ||
pct.on('close', function() { | ||
// Done ! | ||
clientA.end(); | ||
clientB.end(); | ||
clientC.end(); | ||
}) | ||
AStream.pipe(pct); | ||
``` | ||
@@ -126,2 +149,45 @@ | ||
## API for `transform(options)` | ||
This method returns a Writable Stream. You should pipe a `pg-copy-streams.to` Stream into it. Make sure the corresponding command is in BINARY format. | ||
There are 3 must-have options and 1 important Event. | ||
### option `mapping` | ||
This is an array of { key:, type: } elements. There MUST be as many elements defined as there are columns that are fetched by the COPY command. | ||
The keys are arbitrary and each database row in the COPY command will be translated into an object with those keys holding the values corresponding to their position in the `mapping` array and their respective position in the COPY command. | ||
### option `targets` | ||
This is an array of `pg-copy-streams.from` Streams. The transform operation can deliver transformed rows to several tables in several databases at the same time. Each target will be referenced by its index in the transform stream. | ||
### option `transform` | ||
This MUST be a classical PassThrough Stream. You can build it with `through2` for example. This stream is a Duplex Stream that takes a serie of rows as input and that outputs another serie of rows. | ||
It can remove rows, add rows, transform rows. Whatever a through stream can do. | ||
When rows are pushed, they should have the format | ||
```js | ||
this.push([index, | ||
{ type: .., value: .. }, | ||
{ type: .., value: .. }, | ||
{ type: .., value: .. }, | ||
]) | ||
``` | ||
where `index` is an integer that corresponds to the target COPY command in the `targets` option. | ||
The { type: .., value: ..} elements MUST correspond to the number of fields in the target COPY command and the types must correspond to the associated types in the database. The transform operation can change the types of the data in the incoming rows, but it must always adhere to the types of the target table in the COPY target because there will be no coercion in the database and the binary protocol must send the data exactly as it is expected in the table. | ||
### Event `close` | ||
The Writable Stream will emit a `close` event, following the node.js documentation | ||
> The 'close' event is emitted when the stream and any of its underlying resources (a file descriptor, for example) have been closed. The event indicates that no more events will be emitted, and no further computation will occur. | ||
Not all Streams emit a `close` event but this one does because it is necessary to wait for the end of all the underlying COPY FROM STDIN BINARY commands on the targets. `close` is emitted when all the underlying COPY commands have emitted their respective `finish` event. | ||
## API for Deparser | ||
@@ -128,0 +194,0 @@ |
@@ -5,7 +5,8 @@ var assert = require('assert') | ||
var pg = require('pg'); | ||
var copyOut = require('pg-copy-streams').to; | ||
var copyIn = require('pg-copy-streams').from; | ||
var parser = require('../').parser; | ||
var deparser = require('../').deparser; | ||
var pgCopyOut = require('pg-copy-streams').to; | ||
var pgCopyIn = require('pg-copy-streams').from; | ||
var through2 = require('through2'); | ||
var pgCopyTransform = require('../').transform; | ||
var client = function(dsn) { | ||
@@ -17,8 +18,6 @@ var client = new pg.Client(dsn); | ||
var dsnA = null; // configure database A connection parameters | ||
var dsnB = null; // configure database B connection parameters | ||
var clientA = client(); | ||
var clientB = client(); | ||
var clientC = client(); | ||
var clientA = client(dsnA); | ||
var clientB = client(dsnB); | ||
var queriesA = [ | ||
@@ -36,64 +35,86 @@ "DROP TABLE IF EXISTS item", | ||
var queriesC = [ | ||
"DROP TABLE IF EXISTS generated", | ||
"CREATE TABLE generated (body text)" | ||
] | ||
// we simplify by observing here that A=B when tests are executed | ||
async.eachSeries(queriesA.concat(queriesB), clientA.query.bind(clientA), function(err) { | ||
async.eachSeries(queriesA.concat(queriesB, queriesC), clientA.query.bind(clientA), function(err) { | ||
assert.ifError(err) | ||
var AStream = clientA.query(copyOut('COPY item TO STDOUT BINARY')) | ||
var Parser = new parser({ | ||
objectMode: true, | ||
mapping: [ | ||
{ key: 'id', type: 'int4' }, | ||
{ key: 'ref', type: 'text' }, | ||
{ key: 'description', type: 'text'}, | ||
] | ||
}) | ||
var Deparser = new deparser({ | ||
objectMode: true, | ||
mapping: [ | ||
function(row) { return { type: 'int4', value: parseInt(row.ref.split(':')[0])} }, | ||
function(row) { return { type: 'text', value: row.ref.split(':')[1] } }, | ||
function(row) { return { type: 'text', value: row.description.toLowerCase() } }, | ||
function(row) { | ||
var d = new Date('1999-01-01T00:00:00Z'); | ||
var numberOfDaysToAdd = parseInt(row.ref.split(':')[0]); | ||
d.setDate(d.getDate() + numberOfDaysToAdd); | ||
return { type: 'timestamptz', value: d } }, | ||
function(row) { | ||
var id = parseInt(row.ref.split(':')[0]); | ||
return { type: '_int2', value: [[id, id+1], [id+2, id+3]]} | ||
}, | ||
] | ||
}) | ||
var BStream = clientB.query(copyIn ('COPY product FROM STDIN BINARY')) | ||
var runStream = function(callback) { | ||
BStream.on('finish', callback); | ||
AStream.pipe(Parser).pipe(Deparser).pipe(BStream); | ||
} | ||
var copyOut = clientA.query(pgCopyOut('COPY item TO STDOUT BINARY')) | ||
var copyIns = [ | ||
clientB.query(pgCopyIn ('COPY product FROM STDIN BINARY')), | ||
clientC.query(pgCopyIn ('COPY generated FROM STDIN BINARY')), | ||
] | ||
runStream(function(err) { | ||
assert.ifError(err) | ||
clientA.query('SELECT * FROM item', function(err, res) { | ||
assert.equal(res.rowCount, 2, 'expected 2 tuples on A, but got ' + res.rowCount); | ||
clientA.end(); | ||
var count = 0; | ||
var pct = pgCopyTransform({ | ||
mapping: [{key:'id',type:'int4'}, {key:'ref',type:'text'},{key:'description',type:'text'}], | ||
targets: copyIns, | ||
transform: through2.obj(function(row, _, cb) { | ||
var id = parseInt(row.ref.split(':')[0]); | ||
var d = new Date('1999-01-01T00:00:00Z'); | ||
d.setDate(d.getDate() + id); | ||
count++ | ||
this.push([0, | ||
{ type: 'int4', value: id }, | ||
{ type: 'text', value: row.ref.split(':')[1] }, | ||
{ type: 'text', value: row.description.toLowerCase() }, | ||
{ type: 'timestamptz', value: d }, | ||
{ type: '_int2', value: [ [ id, id+1 ], [ id+2, id+3 ] ] } | ||
]) | ||
while (id > 0) { | ||
count++ | ||
this.push([1, | ||
{ type: 'text', value: 'BODY: ' + row.description } | ||
]); | ||
id--; | ||
} | ||
cb() | ||
},function(cb) { | ||
this.push([1, { type: 'text', value: 'COUNT: ' + count}]) | ||
cb() | ||
}) | ||
clientB.query('SELECT * FROM product ORDER BY code ASC', function(err, res) { | ||
var d = new Date('1999-01-01T00:00:00Z'); | ||
assert.equal(res.rowCount, 2, 'expected 2 tuples on B, but got ' + res.rowCount); | ||
}) | ||
pct.on('close', function(err) { | ||
assert.ifError(err) | ||
clientA.query('SELECT * FROM item', function(err, res) { | ||
assert.equal(res.rowCount, 2, 'expected 2 tuples on A, but got ' + res.rowCount); | ||
clientA.end(); | ||
}) | ||
clientB.query('SELECT * FROM product ORDER BY code ASC', function(err, res) { | ||
var d = new Date('1999-01-01T00:00:00Z'); | ||
assert.equal(res.rowCount, 2, 'expected 2 tuples on B, but got ' + res.rowCount); | ||
// first row | ||
assert.equal(res.rows[0].code, 1) | ||
assert.equal(res.rows[0].label, 'CTX') | ||
assert.equal(res.rows[0].description, 'a little item') | ||
assert.equal(res.rows[0].ts_creation.getTime(), d.getTime() + 1*24*60*60*1000) | ||
assert.equal(JSON.stringify(res.rows[0].matrix), "[[1,2],[3,4]]") | ||
// first row | ||
assert.equal(res.rows[0].code, 1) | ||
assert.equal(res.rows[0].label, 'CTX') | ||
assert.equal(res.rows[0].description, 'a little item') | ||
assert.equal(res.rows[0].ts_creation.getTime(), d.getTime() + 1*24*60*60*1000) | ||
assert.equal(JSON.stringify(res.rows[0].matrix), "[[1,2],[3,4]]") | ||
// second row | ||
assert.equal(res.rows[1].code, 2) | ||
assert.equal(res.rows[1].label, 'CTX') | ||
assert.equal(res.rows[1].description, 'a big item') | ||
assert.equal(JSON.stringify(res.rows[1].matrix), "[[2,3],[4,5]]") | ||
// second row | ||
assert.equal(res.rows[1].code, 2) | ||
assert.equal(res.rows[1].label, 'CTX') | ||
assert.equal(res.rows[1].description, 'a big item') | ||
assert.equal(JSON.stringify(res.rows[1].matrix), "[[2,3],[4,5]]") | ||
clientB.end(); | ||
}) | ||
}) | ||
clientB.end(); | ||
}) | ||
clientC.query('SELECT * FROM generated ORDER BY body ASC', function(err, res) { | ||
assert.equal(res.rows[0].body, 'BODY: A BIG item') | ||
assert.equal(res.rows[1].body, 'BODY: A BIG item') | ||
assert.equal(res.rows[2].body, 'BODY: A little item') | ||
assert.equal(res.rows[3].body, 'COUNT: 5') | ||
clientC.end(); | ||
}) | ||
} | ||
) | ||
copyOut.pipe(pct); | ||
}) |
45117
5
18
799
290
6
+ Addedmulti-fork@0.0.2
+ Addedthrough2@^2.0.1
+ Addedcore-util-is@1.0.3(transitive)
+ Addedinherits@2.0.4(transitive)
+ Addedisarray@1.0.0(transitive)
+ Addedmulti-fork@0.0.2(transitive)
+ Addedprocess-nextick-args@2.0.1(transitive)
+ Addedreadable-stream@2.3.8(transitive)
+ Addedsafe-buffer@5.1.2(transitive)
+ Addedstring_decoder@1.1.1(transitive)
+ Addedthrough2@2.0.5(transitive)
+ Addedutil-deprecate@1.0.2(transitive)
+ Addedxtend@4.0.2(transitive)