pg-copy-streams-binary
Advanced tools
Comparing version 1.2.1 to 1.3.0
module.exports = { | ||
deparser: require('./lib/deparser'), | ||
parser: require('./lib/parser'), | ||
fieldReader: require('./lib/fieldReader'), | ||
rawReader: require('./lib/rawReader'), | ||
rowReader: require('./lib/rowReader'), | ||
rowWriter: require('./lib/rowWriter'), | ||
transform: require('./lib/transform'), | ||
} |
@@ -54,2 +54,11 @@ const ieee754 = require('ieee754') | ||
// jsonb | ||
const jsonb_send = function (buf, value) { | ||
const jbuf = Buffer.from('\u0001' + JSON.stringify(value), 'utf-8') | ||
buf.put(jbuf) | ||
} | ||
const jsonb_recv = function (buf) { | ||
return JSON.parse(buf.slice(1).toString('utf-8')) | ||
} | ||
// float4 | ||
@@ -123,3 +132,3 @@ const float4send = function (buf, value) { | ||
for (let i = 0; i < len; i++) { | ||
deparse(buf, atype, flat[i]) | ||
encode(buf, atype, flat[i]) | ||
} | ||
@@ -165,3 +174,3 @@ } | ||
offset += UInt32Len | ||
flat.push(parse(buf.slice(offset, offset + fieldLen), type)) | ||
flat.push(decode(buf.slice(offset, offset + fieldLen), type)) | ||
offset += fieldLen | ||
@@ -191,2 +200,3 @@ } | ||
json: { oid: 114, send: json_send, recv: json_recv }, | ||
jsonb: { oid: 3802, send: jsonb_send, recv: jsonb_recv }, | ||
float4: { oid: 700, send: float4send, recv: float4recv }, | ||
@@ -201,2 +211,3 @@ float8: { oid: 701, send: float8send, recv: float8recv }, | ||
_json: { oid: 199, send: array_send.bind(null, 'json'), recv: array_recv }, | ||
_jsonb: { oid: 3807, send: array_send.bind(null, 'jsonb'), recv: array_recv }, | ||
_float4: { oid: 1021, send: array_send.bind(null, 'float4'), recv: array_recv }, | ||
@@ -207,3 +218,3 @@ _float8: { oid: 1022, send: array_send.bind(null, 'float8'), recv: array_recv }, | ||
function deparse(buf, type, value) { | ||
function encode(buf, type, value) { | ||
// Add a UInt32 placeholder for the field length | ||
@@ -228,3 +239,3 @@ buf.word32be(0) | ||
function parse(buf, type) { | ||
function decode(buf, type) { | ||
return types[type].recv(buf) | ||
@@ -239,4 +250,4 @@ } | ||
types: types, | ||
deparse: deparse, | ||
parse: parse, | ||
encode: encode, | ||
decode: decode, | ||
} |
const through2 = require('through2') | ||
const MultiFork = require('multi-fork') | ||
const parser = require('./parser') | ||
const deparser = require('./deparser') | ||
const rowReader = require('./rowReader') | ||
const rowWriter = require('./rowWriter') | ||
@@ -20,3 +20,3 @@ const shift = function () { | ||
const first = parser({ mapping: mapping }) | ||
const first = rowReader({ mapping: mapping }) | ||
const n = copyIns.length | ||
@@ -36,3 +36,3 @@ let f = n | ||
copyIns[i].on('finish', finish) | ||
M.streams[i].pipe(shift()).pipe(deparser()).pipe(copyIns[i]) | ||
M.streams[i].pipe(shift()).pipe(rowWriter()).pipe(copyIns[i]) | ||
} | ||
@@ -39,0 +39,0 @@ first.pipe(transform).pipe(M) |
{ | ||
"name": "pg-copy-streams-binary", | ||
"version": "1.2.1", | ||
"version": "1.3.0", | ||
"description": "Streams for parsing and deparsing the COPY binary format", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
147
README.md
@@ -5,3 +5,3 @@ ## pg-copy-streams-binary | ||
Streams for parsing and deparsing the PostgreSQL COPY binary format. | ||
This module contains helper streams for decoding and encoding the PostgreSQL COPY binary format. | ||
Ingest streaming data into PostgresSQL or Export data from PostgreSQL and transform it into a stream, using the COPY BINARY format. | ||
@@ -11,6 +11,6 @@ | ||
Well first you have to know that PostgreSQL has not-so-well-known mechanism that helps when importing into PostgreSQL from a source (_copy-in_) | ||
Well first you have to know that PostgreSQL has a not-so-well-known mechanism that helps when importing into PostgreSQL from a source (_copy-in_) | ||
or exporting to a sink from PostgreSQL (_copy-out_) | ||
You should first go and get familiar with the [pg-copy-streams](https://github.com/brianc/node-pg-copy-streams) module that does | ||
Before using this module, you should make sure to get familiar with the [pg-copy-streams](https://github.com/brianc/node-pg-copy-streams) module that does | ||
the heavy lifting of handling the COPY part of the protocol flow. | ||
@@ -26,6 +26,14 @@ | ||
Do you want to go there ? If you take the blue pill, then this module might be for you. | ||
Do you want to go there ? If you choose to go down the BINARY road, this module can help. | ||
It can be used to parse and deparse the PostgreSQL binary streams that are made available by the `pg-copy-streams` module. | ||
It can be used to decode and encode the PostgreSQL binary streams that are made available by the `pg-copy-streams` module. | ||
There are currently 5 helper Stream provided : | ||
- rowReader | ||
- fieldReader | ||
- rawReader | ||
- rowWriter | ||
- transform | ||
The main API is called `transform` an tries to hide many of those details. It can be used to easily do non trivial things like : | ||
@@ -37,4 +45,80 @@ | ||
## Example | ||
## rowReader | ||
A rowReader is a Transform stream that takes a copyOut stream as input and outputs a sequence of rows. | ||
The fields in each row are decoded according to the `options.mapping` definition. | ||
### options.mapping | ||
default: false | ||
This option can be used to describe the rows that are beeing exported from PostgreSQL. Each and every field MUST be described. | ||
For each index in the array, you MUST put an object `{ key: name, type: type }` where name will be the name given to the field at the corresponding index in the export. Note that it does not need to be equal to the database field name. The COPY protocol does not include the field names. | ||
The type MUST correspond to the type of the column in the database. It must be a type that is implemented in the library. | ||
the Parser will push rows with the corresponding keys. | ||
When `mapping` is not given, the Parser will push rows as arrays of Buffers. | ||
Check test directory for examples. | ||
## fieldReader | ||
A fieldReader is a Transform stream that takes a copyOut stream as input and outputs an objectMode stream that is a sequence of fields. | ||
The fields are decoded according to the `options.mapping` definition and each field has the following keys : | ||
- _fieldIndex: zero-based index of the field within a row | ||
- _fieldCount: total number of fields within a row | ||
- _fieldLength: byte-length of the field in binary representation (for bytea = number of bytes) | ||
- name: name of the field, equal to the key of the field in the mapping definition | ||
- value: value of the field. When mode is 'sync', this is the decoded value. When mode is 'async', this is a stream of _fieldLength bytes | ||
Note that in fieldReader, each field can define a `mode = sync / async` attribute. When `mode = async`, the field output will be a Readable Stream. | ||
This can help in scenarios when you do not want to gather a big field in memory but you will need to make sure that you read the field stream because if you do not read it, backpressure will kick in and you will not receive more fields. | ||
Check test directory for examples. | ||
### options.mapping | ||
default: false | ||
This option can be used to describe the rows that are beeing exported from PostgreSQL. Each and every field MUST be described. | ||
For each index in the array, you MUST put an object `{ key: name, type: type, mode: mode }` where name will be the name given to the field at the corresponding index in the export. Note that it does not need to be equal to the database field name. The COPY protocol does not include the field names. | ||
The type MUST correspond to the type of the column in the database. It must be a type that is implemented in the library. | ||
The mode can be 'sync' or 'async'. Default is 'sync' | ||
the Parser will push fields with the corresponding keys. | ||
When `mapping` is not given, the Parser will push fields as arrays of Buffers. | ||
## rawReader | ||
A rawReader is a Transform stream that takes a copyOut stream as input and outputs raw field bytes. | ||
Check test directory for examples. | ||
## rowWriter | ||
the deparser is usually used without arguments. It is a Transform Stream (always in object mode) that receives a stream of arrays, and outputs their PostgreSQL binary representation. | ||
Each array is a sequence of { type:.. , value: ..} pairs, where `type` is a PostgreSQL type (cf section supported types) and `value` is the value that need to be deparsed. | ||
Currently, make sure sure value is not the javascript `undefined` because this case is not handled in the deparser. The value can be `null` but don't forget that the target table field should be nullable or the database will complain. | ||
Usually, you would want to use a through2 stream to prepare the arrays, and pipe this into the deparser. | ||
Check test directory for examples. | ||
### options.COPY_sendHeader | ||
default: true | ||
This option can be used to not send the header that PostgreSQL expects at the beginning of a COPY session. | ||
You could use this if you want to pipe this stream to an already opened COPY session. | ||
### options.COPY_sendTrailer | ||
default: true | ||
This option can be used to not send the header that PostgreSQL expects at the end of COPY session. | ||
You could use this if you want to unpipe this stream pipe another one that will send more data and maybe finish the COPY session. | ||
## Example of `transform` | ||
This library is mostly interesting for ETL operations (Extract, Transformation, Load). When you just need Extract+Load, `pg-copy-streams` does the job and you don't need this library. | ||
@@ -199,37 +283,2 @@ | ||
## API for deparser | ||
the deparser is usually used without arguments. It is a Transform Stream (always in object mode) that receives a stream of arrays, and outputs their PostgreSQL binary representation. | ||
Each array is a sequence of { type:.. , value: ..} pairs, where `type` is a PostgreSQL type (cf section supported types) and `value` is the value that need to be deparsed. | ||
Currently, make sure sure value is not the javascript `undefined` because this case is not handled in the deparser. The value can be `null` but don't forget that the target table field should be nullable or the database will complain. | ||
Usually, you would want to use a through2 stream to prepare the arrays, and pipe this into the deparser. | ||
### options.COPY_sendHeader | ||
default: true | ||
This option can be used to not send the header that PostgreSQL expects at the beginning of a COPY session. | ||
You could use this if you want to pipe this stream to an already opened COPY session. | ||
### options.COPY_sendTrailer | ||
default: true | ||
This option can be used to not send the header that PostgreSQL expects at the end of COPY session. | ||
You could use this if you want to unpipe this stream pipe another one that will send more data and maybe finish the COPY session. | ||
## API for Parser | ||
### options.mapping | ||
default: false | ||
This option can be used to describe the rows that are beeing exported from PostgreSQL. Each and every field MUST be described. | ||
For each index in the array, you MUST put an object `{ key: name, type: type }` where name will be the name given to the field at the corresponding index in the export. Note that it does not need to be equal to the database field name. The COPY protocol does not include the field names. | ||
The type MUST correspond to the type of the column in the database. It must be a type that is implemented in the library. | ||
the Parser will push rows with the corresponding keys. | ||
When `mapping` is not given, the Parser will push rows as arrays of Buffers. | ||
## Currently supported types | ||
@@ -245,2 +294,3 @@ | ||
- json | ||
- jsonb | ||
- timestamptz | ||
@@ -254,2 +304,15 @@ | ||
### version 2.0.0 - published 2020-06-17 | ||
This is a breaking version because it was decided to rename some exported variables. | ||
- Rename exported objects for improved lisibility | ||
pg_types parse => decode | ||
pg_types deparse => encode | ||
parser => rowReader | ||
deparser => rowWriter | ||
- Implement fieldReader with async support | ||
- Implement rawReader | ||
- Add jsonb type support | ||
### version 1.2.1 - published 2020-05-29 | ||
@@ -279,3 +342,3 @@ | ||
- [send/recv implementations for types in PostgreSQL](https://github.com/postgres/postgres/tree/master/src/backend/utils/adt) | ||
- [default type OIDs in PostgreSQL catalog](https://github.com/postgres/postgres/blob/master/src/include/catalog/pg_type.h) | ||
- [default type OIDs in PostgreSQL catalog](https://github.com/postgres/postgres/blob/master/src/include/catalog/pg_type.dat) | ||
@@ -282,0 +345,0 @@ ## Acknowledgments |
@@ -33,2 +33,8 @@ const pgtypes = require('../lib/pg_types') | ||
{ t: 'json', v: { a: true, b: [1, 7] }, r: new BP().word32be(20).string('{"a":true,"b":[1,7]}', 'utf-8').buffer() }, | ||
{ t: 'jsonb', v: null, r: new BP().word32be(-1).buffer() }, | ||
{ | ||
t: 'jsonb', | ||
v: { a: true, b: [1, 7] }, | ||
r: new BP().word32be(21).string('\u0001{"a":true,"b":[1,7]}', 'utf-8').buffer(), | ||
}, | ||
// online float4+float8 hex converter, http://gregstoll.dyndns.org/~gregstoll/floattohex/ | ||
@@ -199,2 +205,19 @@ { t: 'float4', v: null, r: new BP().word32be(-1).buffer() }, | ||
}, | ||
{ t: '_jsonb', v: null, r: new BP().word32be(-1).buffer() }, | ||
{ | ||
t: '_jsonb', | ||
v: [{ a: 1 }, { c: 3 }], | ||
r: new BP() | ||
.word32be(44) | ||
.word32be(1) | ||
.word32be(0) | ||
.word32be(types['jsonb'].oid) | ||
.word32be(2) | ||
.word32be(1) | ||
.word32be(8) | ||
.string('\u0001{"a":1}', 'utf-8') | ||
.word32be(8) | ||
.string('\u0001{"c":3}', 'utf-8') | ||
.buffer(), | ||
}, | ||
{ t: '_float4', v: null, r: new BP().word32be(-1).buffer() }, | ||
@@ -201,0 +224,0 @@ { |
Sorry, the diff of this file is not supported yet
Deprecated
MaintenanceThe maintainer of the package marked it as deprecated. This could indicate that a single version should not be used, or that the package is no longer maintained and any new vulnerabilities will not be fixed.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
66053
22
1561
364
1