New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

pg-copy-streams-binary

Package Overview
Dependencies
Maintainers
1
Versions
13
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

pg-copy-streams-binary - npm Package Compare versions

Comparing version 1.2.1 to 1.3.0

lib/fieldReader.js

6

index.js
module.exports = {
deparser: require('./lib/deparser'),
parser: require('./lib/parser'),
fieldReader: require('./lib/fieldReader'),
rawReader: require('./lib/rawReader'),
rowReader: require('./lib/rowReader'),
rowWriter: require('./lib/rowWriter'),
transform: require('./lib/transform'),
}

@@ -54,2 +54,11 @@ const ieee754 = require('ieee754')

// jsonb
const jsonb_send = function (buf, value) {
const jbuf = Buffer.from('\u0001' + JSON.stringify(value), 'utf-8')
buf.put(jbuf)
}
const jsonb_recv = function (buf) {
return JSON.parse(buf.slice(1).toString('utf-8'))
}
// float4

@@ -123,3 +132,3 @@ const float4send = function (buf, value) {

for (let i = 0; i < len; i++) {
deparse(buf, atype, flat[i])
encode(buf, atype, flat[i])
}

@@ -165,3 +174,3 @@ }

offset += UInt32Len
flat.push(parse(buf.slice(offset, offset + fieldLen), type))
flat.push(decode(buf.slice(offset, offset + fieldLen), type))
offset += fieldLen

@@ -191,2 +200,3 @@ }

json: { oid: 114, send: json_send, recv: json_recv },
jsonb: { oid: 3802, send: jsonb_send, recv: jsonb_recv },
float4: { oid: 700, send: float4send, recv: float4recv },

@@ -201,2 +211,3 @@ float8: { oid: 701, send: float8send, recv: float8recv },

_json: { oid: 199, send: array_send.bind(null, 'json'), recv: array_recv },
_jsonb: { oid: 3807, send: array_send.bind(null, 'jsonb'), recv: array_recv },
_float4: { oid: 1021, send: array_send.bind(null, 'float4'), recv: array_recv },

@@ -207,3 +218,3 @@ _float8: { oid: 1022, send: array_send.bind(null, 'float8'), recv: array_recv },

function deparse(buf, type, value) {
function encode(buf, type, value) {
// Add a UInt32 placeholder for the field length

@@ -228,3 +239,3 @@ buf.word32be(0)

function parse(buf, type) {
function decode(buf, type) {
return types[type].recv(buf)

@@ -239,4 +250,4 @@ }

types: types,
deparse: deparse,
parse: parse,
encode: encode,
decode: decode,
}
const through2 = require('through2')
const MultiFork = require('multi-fork')
const parser = require('./parser')
const deparser = require('./deparser')
const rowReader = require('./rowReader')
const rowWriter = require('./rowWriter')

@@ -20,3 +20,3 @@ const shift = function () {

const first = parser({ mapping: mapping })
const first = rowReader({ mapping: mapping })
const n = copyIns.length

@@ -36,3 +36,3 @@ let f = n

copyIns[i].on('finish', finish)
M.streams[i].pipe(shift()).pipe(deparser()).pipe(copyIns[i])
M.streams[i].pipe(shift()).pipe(rowWriter()).pipe(copyIns[i])
}

@@ -39,0 +39,0 @@ first.pipe(transform).pipe(M)

{
"name": "pg-copy-streams-binary",
"version": "1.2.1",
"version": "1.3.0",
"description": "Streams for parsing and deparsing the COPY binary format",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -5,3 +5,3 @@ ## pg-copy-streams-binary

Streams for parsing and deparsing the PostgreSQL COPY binary format.
This module contains helper streams for decoding and encoding the PostgreSQL COPY binary format.
Ingest streaming data into PostgresSQL or Export data from PostgreSQL and transform it into a stream, using the COPY BINARY format.

@@ -11,6 +11,6 @@

Well first you have to know that PostgreSQL has not-so-well-known mechanism that helps when importing into PostgreSQL from a source (_copy-in_)
Well first you have to know that PostgreSQL has a not-so-well-known mechanism that helps when importing into PostgreSQL from a source (_copy-in_)
or exporting to a sink from PostgreSQL (_copy-out_)
You should first go and get familiar with the [pg-copy-streams](https://github.com/brianc/node-pg-copy-streams) module that does
Before using this module, you should make sure to get familiar with the [pg-copy-streams](https://github.com/brianc/node-pg-copy-streams) module that does
the heavy lifting of handling the COPY part of the protocol flow.

@@ -26,6 +26,14 @@

Do you want to go there ? If you take the blue pill, then this module might be for you.
Do you want to go there ? If you choose to go down the BINARY road, this module can help.
It can be used to parse and deparse the PostgreSQL binary streams that are made available by the `pg-copy-streams` module.
It can be used to decode and encode the PostgreSQL binary streams that are made available by the `pg-copy-streams` module.
There are currently 5 helper Stream provided :
- rowReader
- fieldReader
- rawReader
- rowWriter
- transform
The main API is called `transform` an tries to hide many of those details. It can be used to easily do non trivial things like :

@@ -37,4 +45,80 @@

## Example
## rowReader
A rowReader is a Transform stream that takes a copyOut stream as input and outputs a sequence of rows.
The fields in each row are decoded according to the `options.mapping` definition.
### options.mapping
default: false
This option can be used to describe the rows that are beeing exported from PostgreSQL. Each and every field MUST be described.
For each index in the array, you MUST put an object `{ key: name, type: type }` where name will be the name given to the field at the corresponding index in the export. Note that it does not need to be equal to the database field name. The COPY protocol does not include the field names.
The type MUST correspond to the type of the column in the database. It must be a type that is implemented in the library.
the Parser will push rows with the corresponding keys.
When `mapping` is not given, the Parser will push rows as arrays of Buffers.
Check test directory for examples.
## fieldReader
A fieldReader is a Transform stream that takes a copyOut stream as input and outputs an objectMode stream that is a sequence of fields.
The fields are decoded according to the `options.mapping` definition and each field has the following keys :
- _fieldIndex: zero-based index of the field within a row
- _fieldCount: total number of fields within a row
- _fieldLength: byte-length of the field in binary representation (for bytea = number of bytes)
- name: name of the field, equal to the key of the field in the mapping definition
- value: value of the field. When mode is 'sync', this is the decoded value. When mode is 'async', this is a stream of _fieldLength bytes
Note that in fieldReader, each field can define a `mode = sync / async` attribute. When `mode = async`, the field output will be a Readable Stream.
This can help in scenarios when you do not want to gather a big field in memory but you will need to make sure that you read the field stream because if you do not read it, backpressure will kick in and you will not receive more fields.
Check test directory for examples.
### options.mapping
default: false
This option can be used to describe the rows that are beeing exported from PostgreSQL. Each and every field MUST be described.
For each index in the array, you MUST put an object `{ key: name, type: type, mode: mode }` where name will be the name given to the field at the corresponding index in the export. Note that it does not need to be equal to the database field name. The COPY protocol does not include the field names.
The type MUST correspond to the type of the column in the database. It must be a type that is implemented in the library.
The mode can be 'sync' or 'async'. Default is 'sync'
the Parser will push fields with the corresponding keys.
When `mapping` is not given, the Parser will push fields as arrays of Buffers.
## rawReader
A rawReader is a Transform stream that takes a copyOut stream as input and outputs raw field bytes.
Check test directory for examples.
## rowWriter
the deparser is usually used without arguments. It is a Transform Stream (always in object mode) that receives a stream of arrays, and outputs their PostgreSQL binary representation.
Each array is a sequence of { type:.. , value: ..} pairs, where `type` is a PostgreSQL type (cf section supported types) and `value` is the value that need to be deparsed.
Currently, make sure sure value is not the javascript `undefined` because this case is not handled in the deparser. The value can be `null` but don't forget that the target table field should be nullable or the database will complain.
Usually, you would want to use a through2 stream to prepare the arrays, and pipe this into the deparser.
Check test directory for examples.
### options.COPY_sendHeader
default: true
This option can be used to not send the header that PostgreSQL expects at the beginning of a COPY session.
You could use this if you want to pipe this stream to an already opened COPY session.
### options.COPY_sendTrailer
default: true
This option can be used to not send the header that PostgreSQL expects at the end of COPY session.
You could use this if you want to unpipe this stream pipe another one that will send more data and maybe finish the COPY session.
## Example of `transform`
This library is mostly interesting for ETL operations (Extract, Transformation, Load). When you just need Extract+Load, `pg-copy-streams` does the job and you don't need this library.

@@ -199,37 +283,2 @@

## API for deparser
the deparser is usually used without arguments. It is a Transform Stream (always in object mode) that receives a stream of arrays, and outputs their PostgreSQL binary representation.
Each array is a sequence of { type:.. , value: ..} pairs, where `type` is a PostgreSQL type (cf section supported types) and `value` is the value that need to be deparsed.
Currently, make sure sure value is not the javascript `undefined` because this case is not handled in the deparser. The value can be `null` but don't forget that the target table field should be nullable or the database will complain.
Usually, you would want to use a through2 stream to prepare the arrays, and pipe this into the deparser.
### options.COPY_sendHeader
default: true
This option can be used to not send the header that PostgreSQL expects at the beginning of a COPY session.
You could use this if you want to pipe this stream to an already opened COPY session.
### options.COPY_sendTrailer
default: true
This option can be used to not send the header that PostgreSQL expects at the end of COPY session.
You could use this if you want to unpipe this stream pipe another one that will send more data and maybe finish the COPY session.
## API for Parser
### options.mapping
default: false
This option can be used to describe the rows that are beeing exported from PostgreSQL. Each and every field MUST be described.
For each index in the array, you MUST put an object `{ key: name, type: type }` where name will be the name given to the field at the corresponding index in the export. Note that it does not need to be equal to the database field name. The COPY protocol does not include the field names.
The type MUST correspond to the type of the column in the database. It must be a type that is implemented in the library.
the Parser will push rows with the corresponding keys.
When `mapping` is not given, the Parser will push rows as arrays of Buffers.
## Currently supported types

@@ -245,2 +294,3 @@

- json
- jsonb
- timestamptz

@@ -254,2 +304,15 @@

### version 2.0.0 - published 2020-06-17
This is a breaking version because it was decided to rename some exported variables.
- Rename exported objects for improved lisibility
pg_types parse => decode
pg_types deparse => encode
parser => rowReader
deparser => rowWriter
- Implement fieldReader with async support
- Implement rawReader
- Add jsonb type support
### version 1.2.1 - published 2020-05-29

@@ -279,3 +342,3 @@

- [send/recv implementations for types in PostgreSQL](https://github.com/postgres/postgres/tree/master/src/backend/utils/adt)
- [default type OIDs in PostgreSQL catalog](https://github.com/postgres/postgres/blob/master/src/include/catalog/pg_type.h)
- [default type OIDs in PostgreSQL catalog](https://github.com/postgres/postgres/blob/master/src/include/catalog/pg_type.dat)

@@ -282,0 +345,0 @@ ## Acknowledgments

@@ -33,2 +33,8 @@ const pgtypes = require('../lib/pg_types')

{ t: 'json', v: { a: true, b: [1, 7] }, r: new BP().word32be(20).string('{"a":true,"b":[1,7]}', 'utf-8').buffer() },
{ t: 'jsonb', v: null, r: new BP().word32be(-1).buffer() },
{
t: 'jsonb',
v: { a: true, b: [1, 7] },
r: new BP().word32be(21).string('\u0001{"a":true,"b":[1,7]}', 'utf-8').buffer(),
},
// online float4+float8 hex converter, http://gregstoll.dyndns.org/~gregstoll/floattohex/

@@ -199,2 +205,19 @@ { t: 'float4', v: null, r: new BP().word32be(-1).buffer() },

},
{ t: '_jsonb', v: null, r: new BP().word32be(-1).buffer() },
{
t: '_jsonb',
v: [{ a: 1 }, { c: 3 }],
r: new BP()
.word32be(44)
.word32be(1)
.word32be(0)
.word32be(types['jsonb'].oid)
.word32be(2)
.word32be(1)
.word32be(8)
.string('\u0001{"a":1}', 'utf-8')
.word32be(8)
.string('\u0001{"c":3}', 'utf-8')
.buffer(),
},
{ t: '_float4', v: null, r: new BP().word32be(-1).buffer() },

@@ -201,0 +224,0 @@ {

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc