pg-copy-streams-binary
Advanced tools
Comparing version 1.2.0 to 1.2.1
@@ -7,82 +7,77 @@ /** | ||
module.exports = function(txt, options) { | ||
module.exports = function (txt, options) { | ||
return new CopyStream(txt, options) | ||
} | ||
var Transform = require('stream').Transform | ||
var util = require('util') | ||
const { Transform } = require('stream') | ||
var BufferPut = require('bufferput') | ||
var deparse = require('./pg_types').deparse | ||
const BufferPut = require('bufferput') | ||
const { deparse } = require('./pg_types') | ||
var CopyStream = function(options) { | ||
options = options || {} | ||
options.objectMode = true | ||
Transform.call(this, options) | ||
class CopyStream extends Transform { | ||
constructor(options = {}) { | ||
options.objectMode = true | ||
super(options) | ||
this._headerSent = (options.COPY_sendHeader === false); | ||
this._trailerSent = (options.COPY_sendTrailer === false); | ||
this._headerSent = options.COPY_sendHeader === false | ||
this._trailerSent = options.COPY_sendTrailer === false | ||
// PGCOPY\n\377\r\n\0 | ||
this.COPYSignature = Buffer.from([0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00]); | ||
} | ||
// PGCOPY\n\377\r\n\0 | ||
this.COPYSignature = Buffer.from([0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00]) | ||
} | ||
util.inherits(CopyStream, Transform) | ||
CopyStream.prototype.sendHeader = function(buf) { | ||
buf.put(this.COPYSignature); | ||
buf.word32be(0); // flags field (OID are not included in data) | ||
buf.word32be(0); // Header extention area is empty | ||
} | ||
sendHeader(buf) { | ||
buf.put(this.COPYSignature) | ||
buf.word32be(0) // flags field (OID are not included in data) | ||
buf.word32be(0) // Header extention area is empty | ||
} | ||
CopyStream.prototype._transform = function(chunk, enc, cb) { | ||
_transform(chunk, enc, cb) { | ||
const buf = new BufferPut() | ||
const fieldCount = chunk.length | ||
var buf = new BufferPut(); | ||
var fieldCount = chunk.length; | ||
// See [1] - File Header Section | ||
if (!this._headerSent) { | ||
this._headerSent = true | ||
this.sendHeader(buf) | ||
} | ||
// See [1] - File Header Section | ||
if (!this._headerSent) { | ||
this._headerSent = true; | ||
this.sendHeader(buf); | ||
} | ||
// See [1] - Tuples Section | ||
// Each tuple begins with a 16-bit integer count of the number of fields in the tuple. | ||
// (Presently, all tuples in a table will have the same count, but that might not always be true.) | ||
buf.word16be(fieldCount); | ||
// See [1] - Tuples Section | ||
// Each tuple begins with a 16-bit integer count of the number of fields in the tuple. | ||
// (Presently, all tuples in a table will have the same count, but that might not always be true.) | ||
buf.word16be(fieldCount) | ||
// See [1] - Tuples Section | ||
// Then, repeated for each field in the tuple, there is a 32-bit length word followed by that many bytes of field data. | ||
// (The length word does not include itself, and can be zero.) | ||
var i; | ||
var vec; | ||
for (i=0; i<fieldCount; i++) { | ||
vec = chunk[i]; | ||
deparse(buf, vec.type, vec.value); | ||
// See [1] - Tuples Section | ||
// Then, repeated for each field in the tuple, there is a 32-bit length word followed by that many bytes of field data. | ||
// (The length word does not include itself, and can be zero.) | ||
let i | ||
let vec | ||
for (i = 0; i < fieldCount; i++) { | ||
vec = chunk[i] | ||
deparse(buf, vec.type, vec.value) | ||
} | ||
this.push(buf.buffer()) | ||
cb() | ||
} | ||
this.push(buf.buffer()); | ||
_flush(cb) { | ||
const buf = new BufferPut() | ||
cb(); | ||
} | ||
// See [1] - File Header Section | ||
if (!this._headerSent) { | ||
this._headerSent = true | ||
this.sendHeader(buf) | ||
} | ||
CopyStream.prototype._flush = function(cb) { | ||
// See [1] - File Trailer section | ||
if (!this._trailerSent) { | ||
this._trailerSent = true | ||
buf.put(Buffer.from([0xff, 0xff])) | ||
} | ||
var buf = new BufferPut(); | ||
// See [1] - File Header Section | ||
if (!this._headerSent) { | ||
this._headerSent = true; | ||
this.sendHeader(buf); | ||
this.push(buf.buffer()) | ||
cb() | ||
} | ||
// See [1] - File Trailer section | ||
if (!this._trailerSent) { | ||
this._trailerSent = true; | ||
buf.put(Buffer.from([0xff, 0xff])); | ||
} | ||
this.push(buf.buffer()); | ||
cb(); | ||
} |
@@ -7,95 +7,122 @@ /** | ||
module.exports = function(txt, options) { | ||
module.exports = function (txt, options) { | ||
return new CopyStream(txt, options) | ||
} | ||
var Transform = require('stream').Transform | ||
var util = require('util') | ||
var BP = require('bufferput'); | ||
var parse = require('./pg_types').parse; | ||
const { Transform } = require('stream') | ||
const BP = require('bufferput') | ||
const { parse } = require('./pg_types') | ||
const BufferList = require('bl/BufferList') | ||
var CopyStream = function(options) { | ||
options.objectMode = true; | ||
Transform.call(this, options) | ||
const PG_HEADER = 0 | ||
const PG_ROW_START = 1 | ||
const PG_FIELD_START = 2 | ||
const PG_FIELD_DATA = 3 | ||
const PG_FIELD_END = 4 | ||
const PG_TRAILER = 5 | ||
// PGCOPY\n\377\r\n\0 (signature + flags field + Header extension area length) | ||
this.COPYHeaderFull = (new BP()) | ||
.put(Buffer.from([0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00])) | ||
.word32be(0) | ||
.word32be(0) | ||
.buffer(); | ||
class CopyStream extends Transform { | ||
constructor(options) { | ||
options.objectMode = true | ||
super(options) | ||
this.COPYTrailer = Buffer.from([0xff, 0xff]); | ||
// PGCOPY\n\377\r\n\0 (signature + flags field + Header extension area length) | ||
this.COPYHeaderFull = new BP() | ||
.put(Buffer.from([0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00])) | ||
.word32be(0) | ||
.word32be(0) | ||
.buffer() | ||
this._headerReceived = false; | ||
this._trailerReceived = false; | ||
this._remainder = false; | ||
this._buffer = new BufferList() | ||
this._state = PG_HEADER | ||
this._row = null | ||
this._fieldCount = null | ||
this._fieldIndex = null | ||
this._fieldLength = null | ||
this._fieldBuffer = null | ||
this.mapping = options.mapping || false | ||
this.mapping = options.mapping || false | ||
} | ||
} | ||
_transform(chunk, enc, cb) { | ||
this._buffer.append(chunk) | ||
while (this._buffer.length > 0) { | ||
if (PG_HEADER === this._state) { | ||
if (this._buffer.length < this.COPYHeaderFull.length) break | ||
if (!this.COPYHeaderFull.equals(this._buffer.slice(0, this.COPYHeaderFull.length))) { | ||
return cb(new Error('COPY BINARY Header mismatch')) | ||
} | ||
this._buffer.consume(this.COPYHeaderFull.length) | ||
this._state = PG_ROW_START | ||
} | ||
util.inherits(CopyStream, Transform) | ||
CopyStream.prototype._transform = function(chunk, enc, cb) { | ||
if(this._remainder && chunk) { | ||
chunk = Buffer.concat([this._remainder, chunk]) | ||
} | ||
if (PG_ROW_START === this._state) { | ||
if (this._buffer.length < 2) break | ||
this._fieldCount = this._buffer.readUInt16BE(0) | ||
this._buffer.consume(2) | ||
const UInt16_0xffff = 65535 | ||
if (this._fieldCount === UInt16_0xffff) { | ||
this._state = PG_TRAILER | ||
} else { | ||
this._row = this.mapping ? {} : [] | ||
this._state = PG_FIELD_START | ||
this._fieldIndex = -1 | ||
} | ||
} | ||
var offset = 0; | ||
if (!this._headerReceived && chunk.length >= this.COPYHeaderFull.length) { | ||
if (this.COPYHeaderFull.equals(chunk.slice(0, this.COPYHeaderFull.length))) { | ||
this._headerReceived = true; | ||
offset += this.COPYHeaderFull.length; | ||
} | ||
} | ||
if (PG_TRAILER === this._state) { | ||
this.push(null) | ||
this._row = null | ||
this._fieldBuffer = null | ||
return cb() | ||
} | ||
// Copy-out mode (data transfer from the server) is initiated when the backend executes a COPY TO STDOUT SQL statement. | ||
// The backend sends a CopyOutResponse message to the frontend, followed by zero or more CopyData messages (always one per row) | ||
var UInt16Len = 2; | ||
while (this._headerReceived && (chunk.length - offset) >= UInt16Len) { | ||
var fieldCount = chunk.readUInt16BE(offset); | ||
offset += 2; | ||
var UInt32Len = 4; | ||
var UInt16_0xff = 65535; | ||
var UInt32_0xffffffff = 4294967295; | ||
if (fieldCount === UInt16_0xff) { | ||
this._trailerReceived = true; | ||
this.push(null); | ||
return cb(); | ||
} | ||
var fields = this.mapping ? {} : []; | ||
for (var i=0; i<fieldCount; i++) { | ||
var v; | ||
var fieldLen = chunk.readUInt32BE(offset); | ||
offset += UInt32Len; | ||
if (fieldLen === UInt32_0xffffffff) { | ||
v = null; | ||
} else { | ||
var v = chunk.slice(offset, offset + fieldLen); | ||
if (PG_FIELD_START === this._state) { | ||
if (this._buffer.length < 4) break | ||
this._fieldIndex++ | ||
this._fieldLength = this._buffer.readUInt32BE(0) | ||
this._buffer.consume(4) | ||
const UInt32_0xffffffff = 4294967295 /* Magic value for NULL */ | ||
if (this._fieldLength === UInt32_0xffffffff) { | ||
this._fieldBuffer = null | ||
this._fieldLength = 0 | ||
this._state = PG_FIELD_END | ||
} else { | ||
this._fieldBuffer = new BufferList() | ||
this._state = PG_FIELD_DATA | ||
} | ||
} | ||
if (PG_FIELD_DATA === this._state) { | ||
if (this._buffer.length === 0) break | ||
const bl = this._buffer.shallowSlice(0, this._fieldLength) | ||
this._fieldBuffer.append(bl) | ||
this._fieldLength -= bl.length | ||
this._buffer.consume(bl.length) | ||
if (this._fieldLength === 0) { | ||
this._state = PG_FIELD_END | ||
} | ||
} | ||
if (PG_FIELD_END === this._state) { | ||
if (this._fieldBuffer && this.mapping) { | ||
this._fieldBuffer = parse(this._fieldBuffer.slice(), this.mapping[this._fieldIndex].type) | ||
} | ||
if (this.mapping) { | ||
v = parse(v, this.mapping[i].type) | ||
this._row[this.mapping[this._fieldIndex].key] = this._fieldBuffer | ||
} else { | ||
this._row.push(this._fieldBuffer) | ||
} | ||
offset += fieldLen; | ||
this._state = PG_FIELD_START | ||
if (this._fieldIndex === this._fieldCount - 1) { | ||
this.push(this._row) | ||
this._state = PG_ROW_START | ||
} | ||
} | ||
if (this.mapping) { | ||
fields[this.mapping[i].key] = v; | ||
} else { | ||
fields.push(v); | ||
} | ||
} | ||
this.push(fields); | ||
cb() | ||
} | ||
if(chunk.length - offset) { | ||
var slice = chunk.slice(offset) | ||
this._remainder = slice | ||
} else { | ||
this._remainder = false | ||
} | ||
cb(); | ||
} | ||
CopyStream.prototype._flush = function(cb) { | ||
cb(); | ||
} |
@@ -1,52 +0,51 @@ | ||
var ieee754 = require('ieee754'); | ||
var int64 = require('int64-buffer'); | ||
var BufferPut = require('bufferput'); | ||
const ieee754 = require('ieee754') | ||
const int64 = require('int64-buffer') | ||
// bool | ||
var boolsend = function(buf, value) { | ||
buf.word8be(value ? 1 : 0); | ||
const boolsend = function (buf, value) { | ||
buf.word8be(value ? 1 : 0) | ||
} | ||
var boolrecv = function(buf) { | ||
return buf.readUInt8(0) ? true : false; | ||
const boolrecv = function (buf) { | ||
return buf.readUInt8(0) ? true : false | ||
} | ||
// bytea | ||
var byteasend = function(buf, value) { | ||
const byteasend = function (buf, value) { | ||
buf.put(value) | ||
} | ||
var bytearecv = function(buf) { | ||
return buf; | ||
const bytearecv = function (buf) { | ||
return buf | ||
} | ||
// int2 | ||
var int2send = function(buf, value) { | ||
buf.word16be(value); | ||
const int2send = function (buf, value) { | ||
buf.word16be(value) | ||
} | ||
var int2recv = function(buf) { | ||
return buf.readInt16BE(0); | ||
const int2recv = function (buf) { | ||
return buf.readInt16BE(0) | ||
} | ||
// int4 | ||
var int4send = function(buf, value) { | ||
buf.word32be(value); | ||
const int4send = function (buf, value) { | ||
buf.word32be(value) | ||
} | ||
var int4recv = function(buf) { | ||
return buf.readInt32BE(0); | ||
const int4recv = function (buf) { | ||
return buf.readInt32BE(0) | ||
} | ||
// text | ||
var textsend = function(buf, value) { | ||
var tbuf = Buffer.from(value, 'utf-8'); | ||
buf.put(tbuf); | ||
const textsend = function (buf, value) { | ||
const tbuf = Buffer.from(value, 'utf-8') | ||
buf.put(tbuf) | ||
} | ||
var textrecv = function(buf) { | ||
return buf.toString('utf-8'); | ||
const textrecv = function (buf) { | ||
return buf.toString('utf-8') | ||
} | ||
// json | ||
var json_send = function(buf, value) { | ||
var jbuf = Buffer.from(JSON.stringify(value), 'utf-8'); | ||
buf.put(jbuf); | ||
const json_send = function (buf, value) { | ||
const jbuf = Buffer.from(JSON.stringify(value), 'utf-8') | ||
buf.put(jbuf) | ||
} | ||
var json_recv = function(buf) { | ||
const json_recv = function (buf) { | ||
return JSON.parse(buf.toString('utf-8')) | ||
@@ -56,8 +55,8 @@ } | ||
// float4 | ||
var float4send = function(buf, value) { | ||
var fbuf = Buffer.alloc(4); | ||
const float4send = function (buf, value) { | ||
const fbuf = Buffer.alloc(4) | ||
ieee754.write(fbuf, value, 0, false, 23, 4) | ||
buf.put(fbuf); | ||
buf.put(fbuf) | ||
} | ||
var float4recv = function(buf) { | ||
const float4recv = function (buf) { | ||
return ieee754.read(buf, 0, false, 23, 4) | ||
@@ -67,8 +66,8 @@ } | ||
// float8 | ||
var float8send = function(buf, value) { | ||
var fbuf = Buffer.alloc(8); | ||
const float8send = function (buf, value) { | ||
const fbuf = Buffer.alloc(8) | ||
ieee754.write(fbuf, value, 0, false, 52, 8) | ||
buf.put(fbuf); | ||
buf.put(fbuf) | ||
} | ||
var float8recv = function(buf) { | ||
const float8recv = function (buf) { | ||
return ieee754.read(buf, 0, false, 52, 8) | ||
@@ -81,31 +80,31 @@ } | ||
// to interact with other architures | ||
var timestamptz_send = function(buf, value) { | ||
const timestamptz_send = function (buf, value) { | ||
// postgres origin of time is 01/01/2000 | ||
var ts = value.getTime() - 946684800000; | ||
ts = 1000 * ts; // add unknown usecs | ||
var tbuf = Buffer.alloc(8); | ||
int64.Int64BE(tbuf, 0, ts); | ||
buf.put(tbuf); | ||
let ts = value.getTime() - 946684800000 | ||
ts = 1000 * ts // add unknown usecs | ||
const tbuf = Buffer.alloc(8) | ||
int64.Int64BE(tbuf, 0, ts) | ||
buf.put(tbuf) | ||
} | ||
var timestamptz_recv = function(buf) { | ||
var ts = int64.Int64BE(buf); | ||
ts = ts / 1000; | ||
ts = ts + 946684800000; | ||
return new Date(ts); | ||
const timestamptz_recv = function (buf) { | ||
let ts = int64.Int64BE(buf) | ||
ts = ts / 1000 | ||
ts = ts + 946684800000 | ||
return new Date(ts) | ||
} | ||
// array | ||
var array_send = function(atype, buf, value) { | ||
var tmp; | ||
var ndim = 0; | ||
const array_send = function (atype, buf, value) { | ||
let tmp | ||
let ndim = 0 | ||
// count # of dimensions | ||
tmp = value; | ||
tmp = value | ||
while (Array.isArray(tmp)) { | ||
ndim++; | ||
tmp = tmp[0]; | ||
ndim++ | ||
tmp = tmp[0] | ||
} | ||
buf.word32be(ndim); // ndim | ||
buf.word32be(0); // hasnull | ||
buf.word32be(types[atype].oid); // elem oid | ||
buf.word32be(ndim) // ndim | ||
buf.word32be(0) // hasnull | ||
buf.word32be(types[atype].oid) // elem oid | ||
@@ -115,14 +114,13 @@ // for each dimension, declare | ||
// - index of first item in dimension (1) | ||
var i; | ||
var tmp = value; | ||
for(i=0; i<ndim; i++) { | ||
buf.word32be(tmp.length); | ||
buf.word32be(1); | ||
tmp = tmp[0]; | ||
tmp = value | ||
for (let i = 0; i < ndim; i++) { | ||
buf.word32be(tmp.length) | ||
buf.word32be(1) | ||
tmp = tmp[0] | ||
} | ||
// elems are flattened on 1-dim | ||
var flat = flatten(value); | ||
var i, len = flat.length; | ||
for (i=0; i<len; i++) { | ||
const flat = flatten(value) | ||
const len = flat.length | ||
for (let i = 0; i < len; i++) { | ||
deparse(buf, atype, flat[i]) | ||
@@ -133,82 +131,81 @@ } | ||
// array | ||
var array_recv = function(buf) { | ||
var offset = 0; | ||
var UInt32Len = 4; | ||
var ndim = buf.readUInt32BE(offset); | ||
offset += UInt32Len; | ||
var hasnull = buf.readUInt32BE(offset); | ||
offset += UInt32Len; | ||
var typoid = buf.readUInt32BE(offset); | ||
offset += UInt32Len; | ||
var type; | ||
var found = false; | ||
const array_recv = function (buf) { | ||
let offset = 0 | ||
const UInt32Len = 4 | ||
const ndim = buf.readUInt32BE(offset) | ||
offset += UInt32Len | ||
// eslint-disable-next-line no-unused-vars | ||
const hasnull = buf.readUInt32BE(offset) | ||
offset += UInt32Len | ||
const typoid = buf.readUInt32BE(offset) | ||
offset += UInt32Len | ||
let type | ||
// eslint-disable-next-line no-unused-vars | ||
let found = false | ||
for (type in types) { | ||
if (types[type].oid === typoid) { | ||
found = true; | ||
break; | ||
found = true | ||
break | ||
} | ||
} | ||
// description of dimensions | ||
var dims = []; | ||
var lowers = []; | ||
var len = 1; | ||
for (var i=0; i<ndim; i++) { | ||
var n = buf.readUInt32BE(offset); | ||
len = len * n; | ||
dims.push(n); | ||
offset += UInt32Len; | ||
lowers.push(buf.readUInt32BE(offset)); | ||
offset += UInt32Len; | ||
const dims = [] | ||
const lowers = [] | ||
let len = 1 | ||
for (let i = 0; i < ndim; i++) { | ||
const n = buf.readUInt32BE(offset) | ||
len = len * n | ||
dims.push(n) | ||
offset += UInt32Len | ||
lowers.push(buf.readUInt32BE(offset)) | ||
offset += UInt32Len | ||
} | ||
// fetch flattenned data | ||
var flat = []; | ||
for (var i=0; i<len; i++) { | ||
var fieldLen = buf.readUInt32BE(offset); | ||
offset += UInt32Len; | ||
flat.push(parse(buf.slice(offset, offset + fieldLen), type)); | ||
offset += fieldLen; | ||
let flat = [] | ||
for (let i = 0; i < len; i++) { | ||
const fieldLen = buf.readUInt32BE(offset) | ||
offset += UInt32Len | ||
flat.push(parse(buf.slice(offset, offset + fieldLen), type)) | ||
offset += fieldLen | ||
} | ||
var size; | ||
dims.shift(); | ||
while(size = dims.pop()) { | ||
flat = chunk(flat, size); | ||
let size | ||
dims.shift() | ||
while ((size = dims.pop())) { | ||
flat = chunk(flat, size) | ||
} | ||
return flat; | ||
return flat | ||
} | ||
function chunk(array, size) { | ||
var result = [] | ||
for (var i=0;i<array.length;i+=size) | ||
result.push( array.slice(i,i+size) ) | ||
const result = [] | ||
for (let i = 0; i < array.length; i += size) result.push(array.slice(i, i + size)) | ||
return result | ||
} | ||
// Note that send function names are kept identical to their names in the PostgreSQL source code. | ||
var types = { | ||
'bool': { oid: 16, send: boolsend, recv: boolrecv }, | ||
'bytea': { oid: 17, send: byteasend, recv: bytearecv }, | ||
'int2': { oid: 21, send: int2send, recv: int2recv }, | ||
'int4': { oid: 23, send: int4send, recv: int4recv }, | ||
'text': { oid: 25, send: textsend, recv: textrecv }, | ||
'json': { oid: 114, send: json_send, recv: json_recv }, | ||
'float4': { oid: 700, send: float4send, recv: float4recv }, | ||
'float8': { oid: 701, send: float8send, recv: float8recv }, | ||
'timestamptz': { oid: 1184, send: timestamptz_send, recv: timestamptz_recv }, | ||
'_bool': { oid: 1000, send: array_send.bind(null, 'bool'), recv: array_recv }, | ||
'_bytea': { oid: 1001, send: array_send.bind(null, 'bytea'), recv: array_recv }, | ||
'_int2': { oid: 1005, send: array_send.bind(null, 'int2'), recv: array_recv }, | ||
'_int4': { oid: 1007, send: array_send.bind(null, 'int4'), recv: array_recv }, | ||
'_text': { oid: 1009, send: array_send.bind(null, 'text'), recv: array_recv }, | ||
'_json': { oid: 199, send: array_send.bind(null, 'json'), recv: array_recv }, | ||
'_float4': { oid: 1021, send: array_send.bind(null, 'float4'), recv: array_recv }, | ||
'_float8': { oid: 1022, send: array_send.bind(null, 'float8'), recv: array_recv }, | ||
'_timestamptz': { oid: 1185, send: array_send.bind(null, 'timestamptz'), recv: array_recv }, | ||
const types = { | ||
bool: { oid: 16, send: boolsend, recv: boolrecv }, | ||
bytea: { oid: 17, send: byteasend, recv: bytearecv }, | ||
int2: { oid: 21, send: int2send, recv: int2recv }, | ||
int4: { oid: 23, send: int4send, recv: int4recv }, | ||
text: { oid: 25, send: textsend, recv: textrecv }, | ||
json: { oid: 114, send: json_send, recv: json_recv }, | ||
float4: { oid: 700, send: float4send, recv: float4recv }, | ||
float8: { oid: 701, send: float8send, recv: float8recv }, | ||
timestamptz: { oid: 1184, send: timestamptz_send, recv: timestamptz_recv }, | ||
_bool: { oid: 1000, send: array_send.bind(null, 'bool'), recv: array_recv }, | ||
_bytea: { oid: 1001, send: array_send.bind(null, 'bytea'), recv: array_recv }, | ||
_int2: { oid: 1005, send: array_send.bind(null, 'int2'), recv: array_recv }, | ||
_int4: { oid: 1007, send: array_send.bind(null, 'int4'), recv: array_recv }, | ||
_text: { oid: 1009, send: array_send.bind(null, 'text'), recv: array_recv }, | ||
_json: { oid: 199, send: array_send.bind(null, 'json'), recv: array_recv }, | ||
_float4: { oid: 1021, send: array_send.bind(null, 'float4'), recv: array_recv }, | ||
_float8: { oid: 1022, send: array_send.bind(null, 'float8'), recv: array_recv }, | ||
_timestamptz: { oid: 1185, send: array_send.bind(null, 'timestamptz'), recv: array_recv }, | ||
} | ||
function deparse(buf, type, value) { | ||
// Add a UInt32 placeholder for the field length | ||
buf.word32be(0); | ||
var lenField = buf.words[buf.words.length-1]; | ||
buf.word32be(0) | ||
const lenField = buf.words[buf.words.length - 1] | ||
@@ -218,26 +215,26 @@ // See [1] - Tuples Section | ||
if (value === null) { | ||
lenField.value = -1; | ||
lenField.value = -1 | ||
// Then, repeated for each field in the tuple, there is a 32-bit length word followed by | ||
// that many bytes of field data. | ||
// Then, repeated for each field in the tuple, there is a 32-bit length word followed by | ||
// that many bytes of field data. | ||
} else if (types[type]) { | ||
var offset = buf.len; | ||
types[type].send(buf, value); | ||
lenField.value = buf.len - offset; | ||
const offset = buf.len | ||
types[type].send(buf, value) | ||
lenField.value = buf.len - offset | ||
} | ||
return buf; | ||
return buf | ||
} | ||
function parse(buf, type) { | ||
return types[type].recv(buf); | ||
return types[type].recv(buf) | ||
} | ||
function flatten(arr) { | ||
return arr.reduce((acc, val) => Array.isArray(val) ? acc.concat(flatten(val)) : acc.concat(val), []); | ||
return arr.reduce((acc, val) => (Array.isArray(val) ? acc.concat(flatten(val)) : acc.concat(val)), []) | ||
} | ||
module.exports = { | ||
types: types, | ||
deparse: deparse, | ||
parse: parse | ||
types: types, | ||
deparse: deparse, | ||
parse: parse, | ||
} |
@@ -1,9 +0,9 @@ | ||
var through2 = require('through2') | ||
var MultiFork = require('multi-fork') | ||
const through2 = require('through2') | ||
const MultiFork = require('multi-fork') | ||
var parser = require('./parser') | ||
var deparser = require('./deparser') | ||
const parser = require('./parser') | ||
const deparser = require('./deparser') | ||
var shift = function() { | ||
return through2.obj(function(row,_,cb) { | ||
const shift = function () { | ||
return through2.obj(function (row, _, cb) { | ||
row.shift() | ||
@@ -15,26 +15,26 @@ this.push(row) | ||
module.exports = function (opt) { | ||
const { mapping } = opt | ||
const { transform } = opt | ||
const copyIns = opt.targets | ||
module.exports = function(opt) { | ||
var mapping = opt.mapping | ||
var transform = opt.transform | ||
var copyIns = opt.targets | ||
var first = parser({ mapping: mapping }); | ||
var n = copyIns.length; | ||
var f = n; | ||
var finish = function() { | ||
f--; | ||
if (f===0) { | ||
first.emit('close'); | ||
const first = parser({ mapping: mapping }) | ||
const n = copyIns.length | ||
let f = n | ||
const finish = function () { | ||
f-- | ||
if (f === 0) { | ||
first.emit('close') | ||
} | ||
} | ||
var classifier = function(row, cb) { cb(null, row[0]) } | ||
var M = new MultiFork(n, { classifier: classifier }) | ||
for(var i=0; i<n; i++) { | ||
copyIns[i].on('finish', finish); | ||
const classifier = function (row, cb) { | ||
cb(null, row[0]) | ||
} | ||
const M = new MultiFork(n, { classifier: classifier }) | ||
for (let i = 0; i < n; i++) { | ||
copyIns[i].on('finish', finish) | ||
M.streams[i].pipe(shift()).pipe(deparser()).pipe(copyIns[i]) | ||
} | ||
first.pipe(transform).pipe(M); | ||
return first; | ||
first.pipe(transform).pipe(M) | ||
return first | ||
} |
{ | ||
"name": "pg-copy-streams-binary", | ||
"version": "1.2.0", | ||
"version": "1.2.1", | ||
"description": "Streams for parsing and deparsing the COPY binary format", | ||
"main": "index.js", | ||
"scripts": { | ||
"test": "node test" | ||
"test": "npm run lint && mocha", | ||
"lint": "[ \"$(node -v | cut -d. -f1)\" = \"v8\" ] || eslint '**/*.{js,ts}'" | ||
}, | ||
@@ -19,2 +20,3 @@ "repository": { | ||
"dependencies": { | ||
"bl": "^4.0.2", | ||
"bufferput": "^0.1.3", | ||
@@ -28,7 +30,20 @@ "ieee754": "^1.1.13", | ||
"async": "^2.6.2", | ||
"concat-stream": "^2.0.0", | ||
"deeper": "^2.1.0", | ||
"eslint": "^6.8.0", | ||
"eslint-config-prettier": "^6.11.0", | ||
"eslint-plugin-prettier": "^3.1.3", | ||
"gonna": "0.0.0", | ||
"mocha": "^7.2.0", | ||
"pg": "^7.9.0", | ||
"pg-copy-streams": "^2.2.2" | ||
"pg-copy-streams": "^5.0.0", | ||
"prettier": "^2.0.5" | ||
}, | ||
"prettier": { | ||
"semi": false, | ||
"printWidth": 120, | ||
"arrowParens": "always", | ||
"trailingComma": "es5", | ||
"singleQuote": true | ||
} | ||
} |
144
README.md
@@ -10,4 +10,4 @@ ## pg-copy-streams-binary | ||
Well first you have to know that PostgreSQL has not-so-well-known mechanism that helps when importing into PostgreSQL from a source (*copy-in*) | ||
or exporting to a sink from PostgreSQL (*copy-out*) | ||
Well first you have to know that PostgreSQL has not-so-well-known mechanism that helps when importing into PostgreSQL from a source (_copy-in_) | ||
or exporting to a sink from PostgreSQL (_copy-out_) | ||
@@ -19,5 +19,5 @@ You should first go and get familiar with the [pg-copy-streams](https://github.com/brianc/node-pg-copy-streams) module that does | ||
When dealing with the COPY mechanism, you can use different formats for *copy-out* or *copy-in* : text, csv or binary. | ||
When dealing with the COPY mechanism, you can use different formats for _copy-out_ or _copy-in_ : text, csv or binary. | ||
The text and csv formats are interesting but they have some limitations due to the fact that they are text based, need field separators, escaping, etc. Have you ever been in the CSV hell ? | ||
The text and csv formats are interesting but they have some limitations due to the fact that they are text based, need field separators, escaping, etc. Have you ever been in the CSV hell ? | ||
@@ -31,6 +31,7 @@ The PostgreSQL documentation states : Many programs produce strange and occasionally perverse CSV files, so the file format is more a convention than a standard. Thus you might encounter some files that cannot be imported using this mechanism, and COPY might produce files that other programs cannot process. | ||
The main API is called `transform` an tries to hide many of those details. It can be used to easily do non trivial things like : | ||
- transforming rows | ||
- expanding on the number of rows | ||
- forking rows into several databases at the same time, with the same of different structures | ||
- transforming rows | ||
- expanding on the number of rows | ||
- forking rows into several databases at the same time, with the same of different structures | ||
## Example | ||
@@ -67,3 +68,3 @@ | ||
And you want to fill it, for each source row, with a number `id` of rows (expanding the number of rows), with a body of "BODY: " + description. | ||
And you want to fill it, for each source row, with a number `id` of rows (expanding the number of rows), with a body of "BODY: " + description. | ||
@@ -75,33 +76,34 @@ After all this is done, you want to add a line in the `generated` table with a body of "COUNT: " + total number of rows inserted (not counting this one) | ||
```js | ||
var pg = require('pg'); | ||
var through2 = require('through2'); | ||
var copyOut = require('pg-copy-streams').to; | ||
var copyIn = require('pg-copy-streams').from; | ||
var pgCopyTransform = require('pg-copy-streams-binary').transform; | ||
var pg = require('pg') | ||
var through2 = require('through2') | ||
var copyOut = require('pg-copy-streams').to | ||
var copyIn = require('pg-copy-streams').from | ||
var pgCopyTransform = require('pg-copy-streams-binary').transform | ||
var client = function(dsn) { | ||
var client = new pg.Client(dsn); | ||
client.connect(); | ||
return client; | ||
var client = function (dsn) { | ||
var client = new pg.Client(dsn) | ||
client.connect() | ||
return client | ||
} | ||
var dsnA = null; // configure database A connection parameters | ||
var dsnB = null; // configure database B connection parameters | ||
var dsnC = null; // configure database C connection parameters | ||
var dsnA = null // configure database A connection parameters | ||
var dsnB = null // configure database B connection parameters | ||
var dsnC = null // configure database C connection parameters | ||
var clientA = client(dsnA); | ||
var clientB = client(dsnB); | ||
var clientC = client(dsnC); | ||
var clientA = client(dsnA) | ||
var clientB = client(dsnB) | ||
var clientC = client(dsnC) | ||
var AStream = clientA.query(copyOut('COPY item TO STDOUT BINARY')) | ||
var BStream = clientB.query(copyIn ('COPY product FROM STDIN BINARY')) | ||
var CStream = clientB.query(copyIn ('COPY generated FROM STDIN BINARY')) | ||
var BStream = clientB.query(copyIn('COPY product FROM STDIN BINARY')) | ||
var CStream = clientB.query(copyIn('COPY generated FROM STDIN BINARY')) | ||
var transform = through2.obj( | ||
function(row, _, cb) { | ||
var id = parseInt(row.ref.split(':')[0]); | ||
var d = new Date('1999-01-01T00:00:00Z'); | ||
d.setDate(d.getDate() + id); | ||
function (row, _, cb) { | ||
var id = parseInt(row.ref.split(':')[0]) | ||
var d = new Date('1999-01-01T00:00:00Z') | ||
d.setDate(d.getDate() + id) | ||
count++ | ||
this.push([0, | ||
this.push([ | ||
0, | ||
{ type: 'int4', value: id }, | ||
@@ -111,37 +113,42 @@ { type: 'text', value: row.ref.split(':')[1] }, | ||
{ type: 'timestamptz', value: d }, | ||
{ type: '_int2', value: [ [ id, id+1 ], [ id+2, id+3 ] ] } | ||
{ | ||
type: '_int2', | ||
value: [ | ||
[id, id + 1], | ||
[id + 2, id + 3], | ||
], | ||
}, | ||
]) | ||
while (id > 0) { | ||
count++ | ||
this.push([1, | ||
{ type: 'text', value: 'BODY: ' + row.description } | ||
]); | ||
id--; | ||
this.push([1, { type: 'text', value: 'BODY: ' + row.description }]) | ||
id-- | ||
} | ||
cb() | ||
}, | ||
function(cb) { | ||
this.push([1, | ||
{ type: 'text', value: 'COUNT: ' + count} | ||
]) | ||
function (cb) { | ||
this.push([1, { type: 'text', value: 'COUNT: ' + count }]) | ||
cb() | ||
} | ||
); | ||
) | ||
var count = 0; | ||
var count = 0 | ||
var pct = pgCopyTransform({ | ||
mapping: [{key:'id',type:'int4'}, {key:'ref',type:'text'},{key:'description',type:'text'}], | ||
mapping: [ | ||
{ key: 'id', type: 'int4' }, | ||
{ key: 'ref', type: 'text' }, | ||
{ key: 'description', type: 'text' }, | ||
], | ||
transform: transform, | ||
targets: [BStream, CStream], | ||
}); | ||
}) | ||
pct.on('close', function() { | ||
pct.on('close', function () { | ||
// Done ! | ||
clientA.end(); | ||
clientB.end(); | ||
clientC.end(); | ||
clientA.end() | ||
clientB.end() | ||
clientC.end() | ||
}) | ||
AStream.pipe(pct); | ||
AStream.pipe(pct) | ||
``` | ||
@@ -216,3 +223,2 @@ | ||
## API for Parser | ||
@@ -235,23 +241,29 @@ | ||
* bool | ||
* bytea | ||
* int2, int4 | ||
* float4, float8 | ||
* text | ||
* json | ||
* timestamptz | ||
- bool | ||
- bytea | ||
- int2, int4 | ||
- float4, float8 | ||
- text | ||
- json | ||
- timestamptz | ||
Note that when types are mentioned in the `mapping` option, it should be stricly equal to one of theses types. pgadmin might sometimes mention aliases (like integer instead of int4) and you should not use these aliases. | ||
The types for array (one or more dimentions) corresponds to the type prefixed with an underscore. So an array of int4, int4[], needs to be referenced as _int4 without any mention of the dimensions. This is because the dimension information is embedded in the binary format. | ||
The types for array (one or more dimentions) corresponds to the type prefixed with an underscore. So an array of int4, int4[], needs to be referenced as \_int4 without any mention of the dimensions. This is because the dimension information is embedded in the binary format. | ||
## changelog | ||
### version 1.2.1 - published 2020-05-29 | ||
- Fix a compatibility bug introduced via `pg-copy-streams` 3.0. The parser can now handle rows that span across several stream chunks | ||
- Migration of tests to mocha | ||
## Warnings & Disclaimer | ||
There are many details in the binary protocol, and as usual, the devil is in the details. | ||
* Currently, operations are considered to happen on table WITHOUT OIDS. Usage on table WITH OIDS has not been tested. | ||
* In Arrays null placeholders are not implemented (no spot in the array can be empty). | ||
* In Arrays, the first element of a dimension is always at index 1. | ||
* Errors handling has not yet been tuned so do not expect explicit error messages | ||
- Currently, operations are considered to happen on table WITHOUT OIDS. Usage on table WITH OIDS has not been tested. | ||
- In Arrays null placeholders are not implemented (no spot in the array can be empty). | ||
- In Arrays, the first element of a dimension is always at index 1. | ||
- Errors handling has not yet been tuned so do not expect explicit error messages | ||
@@ -266,5 +278,5 @@ The PostgreSQL documentation states it clearly : "a binary-format file is less portable across machine architectures and PostgreSQL versions". | ||
* [COPY documentation, including binary format](https://www.postgresql.org/docs/current/static/sql-copy.html) | ||
* [send/recv implementations for types in PostgreSQL](https://github.com/postgres/postgres/tree/master/src/backend/utils/adt) | ||
* [default type OIDs in PostgreSQL catalog](https://github.com/postgres/postgres/blob/master/src/include/catalog/pg_type.h) | ||
- [COPY documentation, including binary format](https://www.postgresql.org/docs/current/static/sql-copy.html) | ||
- [send/recv implementations for types in PostgreSQL](https://github.com/postgres/postgres/tree/master/src/backend/utils/adt) | ||
- [default type OIDs in PostgreSQL catalog](https://github.com/postgres/postgres/blob/master/src/include/catalog/pg_type.h) | ||
@@ -298,3 +310,1 @@ ## Acknowledgments | ||
THE SOFTWARE. | ||
@@ -1,9 +0,9 @@ | ||
var assert = require('assert'); | ||
var gonna = require('gonna'); | ||
var pg = require('pg'); | ||
var deparser = require('../').deparser; | ||
var copy = require('pg-copy-streams').from; | ||
const assert = require('assert') | ||
const util = require('util') | ||
const pg = require('pg') | ||
const { deparser } = require('../') | ||
const { from: copyFrom } = require('pg-copy-streams') | ||
var client = function() { | ||
var client = new pg.Client() | ||
const getClient = function () { | ||
const client = new pg.Client() | ||
client.connect() | ||
@@ -13,70 +13,93 @@ return client | ||
var testEmpty = function() { | ||
var fromClient = client(); | ||
fromClient.query('CREATE TEMP TABLE plug (col1 text)') | ||
var txt = 'COPY plug FROM STDIN BINARY' | ||
var copyIn = fromClient.query(copy(txt)) | ||
var copyUn = deparser({objectMode: true}); | ||
copyUn.pipe(copyIn); | ||
copyUn.end() | ||
var done = gonna('empty rows should not trigger error'); | ||
copyIn.on('end', function() { | ||
done(); | ||
fromClient.end(); | ||
describe('integration test - copyIn', () => { | ||
it('ingesting an empty flow should not trigger an error', (done) => { | ||
const client = getClient() | ||
client.query('CREATE TEMP TABLE plug (col1 text)') | ||
const sql = 'COPY plug FROM STDIN BINARY' | ||
const copyIn = client.query(copyFrom(sql)) | ||
const encoder = deparser({ objectMode: true }) | ||
encoder.pipe(copyIn) | ||
const cleanup = (err) => { | ||
done(err) | ||
client.end() | ||
} | ||
copyIn.on('finish', cleanup) | ||
copyIn.on('error', cleanup) | ||
copyIn.on('error', cleanup) | ||
encoder.end() | ||
}) | ||
} | ||
testEmpty(); | ||
var testType = function(type, ndim, value, expectedText) { | ||
var fromClient = client() | ||
var atype = type; | ||
if (ndim > 0) { | ||
atype = '_' + atype; | ||
} | ||
var coltype = type; | ||
while(ndim>0) { | ||
coltype += '[]'; | ||
ndim--; | ||
} | ||
fromClient.query('CREATE TEMP TABLE plug (col1 '+coltype+')') | ||
const samples = [ | ||
['bool', 0, null, null], | ||
['bool', 0, true, 'true'], | ||
['bool', 0, false, 'false'], | ||
['bool', 1, [true, false], '{t,f}'], | ||
['int2', 0, 0, '0'], | ||
['int2', 0, 7, '7'], | ||
['int2', 1, [2, 9], '{2,9}'], | ||
[ | ||
'int2', | ||
2, | ||
[ | ||
[1, 2], | ||
[3, 4], | ||
], | ||
'{{1,2},{3,4}}', | ||
], | ||
['int4', 0, null, null], | ||
['int4', 0, 7, '7'], | ||
['int4', 1, [2, 9], '{2,9}'], | ||
[ | ||
'int4', | ||
2, | ||
[ | ||
[1, 2], | ||
[3, 4], | ||
], | ||
'{{1,2},{3,4}}', | ||
], | ||
['float4', 0, 0.2736, '0.2736'], | ||
['float4', 0, 2.928e27, '2.928e+27'], | ||
['float8', 0, 7.23e50, '7.23e+50'], | ||
['json', 0, { a: 1, b: 2 }, '{"a":1,"b":2}'], | ||
['json', 1, [{ a: 1 }, {}], '{"{\\"a\\":1}","{}"}'], | ||
['timestamptz', 0, new Date('2017-04-25T18:22:00Z'), '2017-04-25 18:22:00+00'], | ||
] | ||
var txt = 'COPY plug FROM STDIN BINARY' | ||
var copyIn = fromClient.query(copy(txt)) | ||
var copyUn = deparser({objectMode: true}); | ||
copyUn.pipe(copyIn); | ||
copyUn.end([ | ||
{ type: atype, value: value}, | ||
]); | ||
var countDone = gonna('have correct count') | ||
copyIn.on('end', function() { | ||
var sql = 'SELECT col1::text FROM plug'; | ||
fromClient.query(sql, function(err, res) { | ||
assert.ifError(err) | ||
assert.equal(res.rows[0].col1, expectedText, 'expected ' + expectedText + ' for ' + coltype + ' row but got ' + (res.rows.length ? res.rows[0].col1 : '0 rows')) | ||
countDone() | ||
fromClient.end() | ||
samples.forEach(function (s) { | ||
const [type, ndim, value, expectedText] = s | ||
it(`test type ${type}: ${util.inspect(value)}`, (done) => { | ||
const client = getClient() | ||
const atype = (ndim > 0 ? '_' : '') + type | ||
const coltype = type + '[]'.repeat(ndim) | ||
client.query('CREATE TEMP TABLE plug (col1 ' + coltype + ')') | ||
const sql = 'COPY plug FROM STDIN BINARY' | ||
const copyIn = client.query(copyFrom(sql)) | ||
const encoder = deparser({ objectMode: true }) | ||
encoder.pipe(copyIn) | ||
copyIn.on('finish', () => { | ||
const sql = 'SELECT col1::text FROM plug' | ||
client.query(sql, function (err, res) { | ||
client.end() | ||
if (err) return done(err) | ||
try { | ||
assert.equal( | ||
res.rows[0].col1, | ||
expectedText, | ||
'expected ' + | ||
expectedText + | ||
' for ' + | ||
coltype + | ||
' row but got ' + | ||
(res.rows.length ? res.rows[0].col1 : '0 rows') | ||
) | ||
} catch (err) { | ||
return done(err) | ||
} | ||
done() | ||
}) | ||
}) | ||
encoder.end([{ type: atype, value: value }]) | ||
}) | ||
}) | ||
} | ||
testType('bool',0,null,null) | ||
testType('bool',0,true,'true') | ||
testType('bool',0,false,'false') | ||
testType('bool',1,[true, false],'{t,f}') | ||
testType('int2', 0, 0, '0'); | ||
testType('int2', 0, 7, '7'); | ||
testType('int2', 1, [2,9], '{2,9}'); | ||
testType('int2', 2, [[1,2],[3,4]], '{{1,2},{3,4}}'); | ||
testType('int4', 0, null, null); | ||
testType('int4', 0, 7, '7'); | ||
testType('int4', 1, [2,9], '{2,9}'); | ||
testType('int4', 2, [[1,2],[3,4]], '{{1,2},{3,4}}'); | ||
testType('float4', 0, 0.2736, '0.2736') | ||
testType('float4', 0, 2.928e+27, '2.928e+27') | ||
testType('float8', 0, 7.23e+50, '7.23e+50') | ||
testType('json', 0, { a:1, b:2 }, '{"a":1,"b":2}') | ||
testType('json', 1, [{a:1},{}], '{"{\\"a\\":1}","{}"}') | ||
testType('timestamptz', 0, new Date('2017-04-25T18:22:00Z'), '2017-04-25 18:22:00+00') | ||
}) |
@@ -1,13 +0,10 @@ | ||
var assert = require('assert'); | ||
var gonna = require('gonna'); | ||
var pg = require('pg'); | ||
var parser = require('../').parser; | ||
var copy = require('pg-copy-streams').to; | ||
var pgtypes = require('../lib/pg_types'); | ||
var types = pgtypes.types; | ||
var through2 = require('through2') | ||
var deepEqual = require('deeper'); | ||
const assert = require('assert') | ||
const pg = require('pg') | ||
const { parser } = require('../') | ||
const { to: copyTo } = require('pg-copy-streams') | ||
const through2 = require('through2') | ||
const concat = require('concat-stream') | ||
var client = function() { | ||
var client = new pg.Client() | ||
const getClient = function () { | ||
const client = new pg.Client() | ||
client.connect() | ||
@@ -17,73 +14,112 @@ return client | ||
var samples = { | ||
'bool': [null, true, false ], | ||
'bytea': [new Buffer([0x61]), null, new Buffer([0x62])], | ||
'int2': [23, -59, null], | ||
'int4': [2938, null, -99283], | ||
'text': ['aaa', 'ééé', null], | ||
'json': [JSON.stringify({}), JSON.stringify([1,2]), null], | ||
'float4': [0.26350000500679016, null, -3.2929999872755022e-12], | ||
'float8': [9000.12, 9.23e+29, null], | ||
'timestamptz': [new Date('2000-01-01T00:00:00Z'), null, new Date('1972-04-25T18:22:00Z')], | ||
const samples = { | ||
bool: [null, true, false], | ||
bytea: [Buffer.from([0x61]), null, Buffer.from([0x62])], | ||
int2: [23, -59, null], | ||
int4: [2938, null, -99283], | ||
text: ['aaa', 'ééé', null], | ||
json: [JSON.stringify({}), JSON.stringify([1, 2]), null], | ||
float4: [0.26350000500679016, null, -3.2929999872755022e-12], | ||
float8: [9000.12, 9.23e29, null], | ||
timestamptz: [new Date('2000-01-01T00:00:00Z'), null, new Date('1972-04-25T18:22:00Z')], | ||
} | ||
var testParser = function() { | ||
var fromClient = client() | ||
var idx = 1; | ||
var fields = []; | ||
var placeholders = []; | ||
var mapping = []; | ||
var rows = []; | ||
for (var t in samples) { | ||
fields.push('c'+idx+' '+t); | ||
placeholders.push('$'+idx); | ||
mapping.push({ key: 'c'+idx, type: t}) | ||
for (var c=0; c<samples[t].length;c++) { | ||
rows[c] = rows[c] || []; | ||
rows[c].push(samples[t][c]) | ||
describe('integration test - copyOut', () => { | ||
it('test INSERT / COPY TO round trip', (done) => { | ||
const client = getClient() | ||
let idx = 1 | ||
const fields = [] | ||
const placeholders = [] | ||
const mapping = [] | ||
const rows = [] | ||
for (const t in samples) { | ||
fields.push('c' + idx + ' ' + t) | ||
placeholders.push('$' + idx) | ||
mapping.push({ key: 'c' + idx, type: t }) | ||
for (let c = 0; c < samples[t].length; c++) { | ||
rows[c] = rows[c] || [] | ||
rows[c].push(samples[t][c]) | ||
} | ||
idx++ | ||
} | ||
idx++ | ||
} | ||
fromClient.query('CREATE TEMP TABLE plug ('+fields.join(',')+')') | ||
for (var i=0; i<rows.length; i++) { | ||
fromClient.query('INSERT INTO plug VALUES ('+placeholders.join(',')+')', rows[i]) | ||
} | ||
client.query('CREATE TEMP TABLE plug (' + fields.join(',') + ')') | ||
for (let i = 0; i < rows.length; i++) { | ||
client.query('INSERT INTO plug VALUES (' + placeholders.join(',') + ')', rows[i]) | ||
} | ||
var txt = 'COPY plug TO STDOUT BINARY' | ||
var copyOut = fromClient.query(copy(txt)) | ||
var p = parser({objectMode: true, mapping: mapping }); | ||
var countDone = gonna('have correct count') | ||
var idx = 0; | ||
copyOut.pipe(p).pipe(through2.obj(function(obj, _, cb) { | ||
for (var i = 0; i < mapping.length; i++) { | ||
var expected = samples[mapping[i].type][idx]; | ||
var got = obj[mapping[i].key]; | ||
if (expected !== null && got !== null) { | ||
switch(mapping[i].type) { | ||
case 'bytea': | ||
expected = expected.toString(); | ||
got = got.toString(); | ||
break; | ||
case 'json': | ||
got = JSON.stringify(got); | ||
break; | ||
case 'timestamptz': | ||
expected = expected.getTime(); | ||
got = got.getTime(); | ||
break; | ||
} | ||
} | ||
assert.equal(expected, got, 'Mismatch for ' + mapping[i][1] + ' expected ' + expected + ' got ' + got) | ||
const sql = 'COPY plug TO STDOUT BINARY' | ||
const copyOut = client.query(copyTo(sql)) | ||
const p = parser({ objectMode: true, mapping: mapping }) | ||
idx = 0 | ||
const pipeline = copyOut.pipe(p).pipe( | ||
through2.obj( | ||
function (obj, _, cb) { | ||
for (let i = 0; i < mapping.length; i++) { | ||
let expected = samples[mapping[i].type][idx] | ||
let result = obj[mapping[i].key] | ||
if (expected !== null && result !== null) { | ||
switch (mapping[i].type) { | ||
case 'bytea': | ||
expected = expected.toString() | ||
result = result.toString() | ||
break | ||
case 'json': | ||
result = JSON.stringify(result) | ||
break | ||
case 'timestamptz': | ||
expected = expected.getTime() | ||
result = result.getTime() | ||
break | ||
} | ||
} | ||
try { | ||
assert.equal( | ||
expected, | ||
result, | ||
'Mismatch for ' + mapping[i].type + ' expected ' + expected + ' got ' + result | ||
) | ||
} catch (err) { | ||
return cb(err) | ||
} | ||
} | ||
idx++ | ||
cb() | ||
}, | ||
function (cb) { | ||
client.end() | ||
try { | ||
assert.equal(rows.length, idx, `Received a total of ${idx} rows when we were expecting ${rows.length}`) | ||
} catch (err) { | ||
return cb(err) | ||
} | ||
done() | ||
cb() | ||
} | ||
) | ||
) | ||
pipeline.on('error', (err) => { | ||
client.end() | ||
done(err) | ||
}) | ||
}) | ||
it('extract large bytea field', (done) => { | ||
const power = 16 | ||
const sql = "COPY (select (repeat('-', CAST(2^" + power + ' AS int)))::bytea) TO STDOUT BINARY' | ||
const client = getClient() | ||
const copyOutStream = client.query(copyTo(sql)) | ||
const assertResult = (arr) => { | ||
client.end() | ||
assert.deepEqual(arr[0].c1, Buffer.alloc(Math.pow(2, power), '-')) | ||
done() | ||
} | ||
idx++; | ||
cb(); | ||
}, function(cb) { | ||
countDone(); | ||
fromClient.end(); | ||
})); | ||
} | ||
const p = parser({ objectMode: true, mapping: [{ key: 'c1', type: 'bytea' }] }) | ||
p.on('error', (err) => { | ||
client.end() | ||
done(err) | ||
}) | ||
testParser() | ||
copyOutStream.pipe(p).pipe(concat({ encoding: 'object' }, assertResult)) | ||
}) | ||
}) |
@@ -1,20 +0,28 @@ | ||
var assert = require('assert'); | ||
var gonna = require('gonna'); | ||
const assert = require('assert') | ||
const util = require('util') | ||
var pgtypes = require('../lib/pg_types'); | ||
var types = pgtypes.types; | ||
var deparse = pgtypes.deparse; | ||
const pgtypes = require('../lib/pg_types') | ||
const { deparse } = pgtypes | ||
var BP = require('bufferput'); | ||
var samples = require('./samples'); | ||
const BP = require('bufferput') | ||
const samples = require('./samples') | ||
var test_samples = function() { | ||
samples.forEach(function(s) { | ||
var buf = deparse(new BP(), s.t, s.v).buffer(); | ||
var eq = buf.equals(s.r); | ||
assert(eq, 'Unparse ' + s.t + ' not matching: ' + ((s.v !== null) ? s.v.toString() : 'null') + ' => ' + buf.toString('hex') + ' / ' + s.r.toString('hex')); | ||
describe('encode', () => { | ||
samples.forEach(function (s) { | ||
it(`encode type ${s.t}: ${util.inspect(s.v)}`, async () => { | ||
const buf = deparse(new BP(), s.t, s.v).buffer() | ||
const eq = buf.equals(s.r) | ||
assert( | ||
eq, | ||
'encode ' + | ||
s.t + | ||
' not matching: ' + | ||
(s.v !== null ? s.v.toString() : 'null') + | ||
' => ' + | ||
buf.toString('hex') + | ||
' / ' + | ||
s.r.toString('hex') | ||
) | ||
}) | ||
}) | ||
} | ||
test_samples(); | ||
}) |
@@ -1,74 +0,75 @@ | ||
var assert = require('assert'); | ||
var gonna = require('gonna'); | ||
'use strict' | ||
var pgtypes = require('../lib/pg_types'); | ||
var types = pgtypes.types; | ||
var parse = pgtypes.parse; | ||
const assert = require('assert') | ||
const util = require('util') | ||
var BP = require('bufferput'); | ||
var samples = require('./samples'); | ||
const pgtypes = require('../lib/pg_types') | ||
const { parse } = pgtypes | ||
function size(ar){ | ||
var row_count = ar.length; | ||
var row_sizes = [] | ||
for(var i=0;i<row_count;i++){ | ||
row_sizes.push(ar[i].length) | ||
} | ||
return [row_count, Math.max.apply(null, row_sizes)] | ||
const samples = require('./samples') | ||
function size(ar) { | ||
const row_count = ar.length | ||
const row_sizes = [] | ||
for (let i = 0; i < row_count; i++) { | ||
row_sizes.push(ar[i].length) | ||
} | ||
return [row_count, Math.max.apply(null, row_sizes)] | ||
} | ||
function flatten(arr) { | ||
return arr.reduce((acc, val) => Array.isArray(val) ? acc.concat(flatten(val)) : acc.concat(val), []); | ||
return arr.reduce((acc, val) => (Array.isArray(val) ? acc.concat(flatten(val)) : acc.concat(val)), []) | ||
} | ||
var test_samples = function() { | ||
samples.forEach(function(s) { | ||
describe('decode', () => { | ||
samples.forEach(function (s) { | ||
it(`parse type ${s.t}: ${util.inspect(s.v)}`, async () => { | ||
const buf = s.r | ||
const isNull = buf.readInt32BE(0) | ||
const UInt32Len = 4 | ||
let type = s.t | ||
if (isNull === -1) { | ||
assert.equal(buf.length, UInt32Len, 'A "null" binary buffer should be 0xffffffff') | ||
} else { | ||
let result = parse(buf.slice(UInt32Len), s.t) | ||
let expected = s.v | ||
var buf = s.r; | ||
var fieldLen = buf.readUInt32BE(0); | ||
var isNull = buf.readInt32BE(0); | ||
var UInt32Len = 4; | ||
var type = s.t; | ||
if (isNull === -1) { | ||
assert.equal(buf.length, UInt32Len, 'A "null" binary buffer should be 0xffffffff') | ||
} else { | ||
var got = parse(buf.slice(UInt32Len), s.t) | ||
var expected = s.v; | ||
let results = [result] | ||
let expecteds = [expected] | ||
var gots = [ got ]; | ||
var expecteds = [ expected ]; | ||
if (s.t[0] === '_') { | ||
assert.equal(size(result).join(','), size(expected).join(','), 'array dimensions should match') | ||
results = flatten(result) | ||
expecteds = flatten(expected) | ||
type = s.t.substr(1) | ||
} | ||
if (s.t[0]==='_') { | ||
assert.equal(size(got).join(','), size(expected).join(','), 'array dimensions should match') | ||
gots = flatten(got) | ||
expecteds = flatten(expecteds) | ||
type = s.t.substr(1); | ||
} | ||
assert.equal(results.length, expecteds.length, s.t + ': arrays should have the same global number of members') | ||
assert.equal(gots.length, expecteds.length, s.t+': arrays should have the same global number of members') | ||
for (var i=0; i<gots.length;i++) { | ||
got = gots[i]; | ||
expected = expecteds[i]; | ||
switch(type) { | ||
case 'bytea': | ||
got = got.toString(); | ||
expected = got.toString(); | ||
break; | ||
case 'json': | ||
got = JSON.stringify(got); | ||
expected = JSON.stringify(expected); | ||
break; | ||
case 'timestamptz': | ||
got = got.getTime(); | ||
expected = expected.getTime(); | ||
break; | ||
for (let i = 0; i < results.length; i++) { | ||
result = results[i] | ||
expected = expecteds[i] | ||
switch (type) { | ||
case 'bytea': | ||
result = result.toString('hex') | ||
expected = expected.toString('hex') | ||
break | ||
case 'json': | ||
result = JSON.stringify(result) | ||
expected = JSON.stringify(expected) | ||
break | ||
case 'timestamptz': | ||
result = result.getTime() | ||
expected = expected.getTime() | ||
break | ||
} | ||
assert.equal( | ||
result, | ||
expected, | ||
s.t + ': parsed value is incorrect for ' + s.t + ' expected ' + expected + ', got ' + result | ||
) | ||
} | ||
assert.equal(got, expected, s.t+': parsed value is incorrect for ' + s.t + ' expected ' + expected + ', got ' + got) | ||
} | ||
} | ||
}) | ||
}) | ||
} | ||
test_samples(); | ||
}) |
@@ -1,22 +0,21 @@ | ||
var assert = require('assert'); | ||
const assert = require('assert') | ||
var pgtypes = require('../lib/pg_types'); | ||
var types = pgtypes.types; | ||
const pgtypes = require('../lib/pg_types') | ||
const { types } = pgtypes | ||
var samples = require('./samples'); | ||
const samples = require('./samples') | ||
var test_that_all_types_are_tested = function() { | ||
for (var k in types) { | ||
var has_null = 0; | ||
var has_not_null = 0; | ||
samples.forEach(function(s) { | ||
if (k === s.t && s.v === null) has_null++; | ||
if (k === s.t && s.v !== null) has_not_null++; | ||
}) | ||
assert(has_null >= 1, 'Unparse ' + k + ' should have a sample testing the NULL value') | ||
assert(has_not_null >= 1, 'Unparse ' + k + ' should have at least one sample testing a NOT NULL value') | ||
} | ||
} | ||
test_that_all_types_are_tested(); | ||
describe('sample coverage', () => { | ||
it('all implemented types should be tested both for NULL and not NULL values', async () => { | ||
for (const k in types) { | ||
let has_null = 0 | ||
let has_not_null = 0 | ||
samples.forEach(function (s) { | ||
if (k === s.t && s.v === null) has_null++ | ||
if (k === s.t && s.v !== null) has_not_null++ | ||
}) | ||
assert(has_null >= 1, 'samples for type ' + k + ' should have a sample testing the NULL value') | ||
assert(has_not_null >= 1, 'samples for type ' + k + ' should have at least one sample testing a NOT NULL value') | ||
} | ||
}) | ||
}) |
@@ -1,76 +0,249 @@ | ||
var pgtypes = require('../lib/pg_types'); | ||
var types = pgtypes.types; | ||
var BP = require('bufferput'); | ||
const pgtypes = require('../lib/pg_types') | ||
const { types } = pgtypes | ||
const BP = require('bufferput') | ||
BP.prototype.string = function(s, enc) { | ||
var buf = new Buffer(s, enc); | ||
return this.put(buf); | ||
BP.prototype.string = function (s, enc) { | ||
const buf = Buffer.from(s, enc) | ||
return this.put(buf) | ||
} | ||
module.exports = [ | ||
// simple types | ||
{ t:'bool', v:null, r: (new BP()).word32be(-1).buffer()}, | ||
{ t:'bool', v:true, r: (new BP()).word32be(1).word8(1).buffer()}, | ||
{ t:'bool', v:false, r: (new BP()).word32be(1).word8(0).buffer()}, | ||
{ t:'bytea', v:null, r: (new BP()).word32be(-1).buffer()}, | ||
{ t:'bytea', v:new Buffer([0x33,0x22,0x11,0x00]), r: (new BP()).word32be(4).put(new Buffer([0x33,0x22,0x11,0x00])).buffer()}, | ||
{ t:'int2', v:null, r: (new BP()).word32be(-1).buffer()}, | ||
{ t:'int2', v:128, r: (new BP()).word32be(2).word16be(128).buffer()}, | ||
{ t:'int4', v:null, r: (new BP()).word32be(-1).buffer()}, | ||
{ t:'int4', v:128, r: (new BP()).word32be(4).word32be(128).buffer()}, | ||
{ t:'text', v:null, r: (new BP()).word32be(-1).buffer()}, | ||
{ t:'text', v:'hello', r: (new BP()).word32be(5).put(new Buffer('hello')).buffer()}, | ||
{ t:'text', v:'utf8 éà', r: (new BP()).word32be(9).put(new Buffer('utf8 éà', 'utf-8')).buffer()}, | ||
{ t:'json', v:null, r: (new BP()).word32be(-1).buffer()}, | ||
{ t:'json', v:{a:true,b:[1,7]}, r: (new BP()).word32be(20).string("{\"a\":true,\"b\":[1,7]}", 'utf-8').buffer()}, | ||
{ t: 'bool', v: null, r: new BP().word32be(-1).buffer() }, | ||
{ t: 'bool', v: true, r: new BP().word32be(1).word8(1).buffer() }, | ||
{ t: 'bool', v: false, r: new BP().word32be(1).word8(0).buffer() }, | ||
{ t: 'bytea', v: null, r: new BP().word32be(-1).buffer() }, | ||
{ | ||
t: 'bytea', | ||
v: Buffer.from([0x33, 0x22, 0x11, 0x00]), | ||
r: new BP() | ||
.word32be(4) | ||
.put(Buffer.from([0x33, 0x22, 0x11, 0x00])) | ||
.buffer(), | ||
}, | ||
{ t: 'int2', v: null, r: new BP().word32be(-1).buffer() }, | ||
{ t: 'int2', v: 128, r: new BP().word32be(2).word16be(128).buffer() }, | ||
{ t: 'int4', v: null, r: new BP().word32be(-1).buffer() }, | ||
{ t: 'int4', v: 128, r: new BP().word32be(4).word32be(128).buffer() }, | ||
{ t: 'text', v: null, r: new BP().word32be(-1).buffer() }, | ||
{ t: 'text', v: 'hello', r: new BP().word32be(5).put(Buffer.from('hello')).buffer() }, | ||
{ t: 'text', v: 'utf8 éà', r: new BP().word32be(9).put(Buffer.from('utf8 éà', 'utf-8')).buffer() }, | ||
{ t: 'json', v: null, r: new BP().word32be(-1).buffer() }, | ||
{ t: 'json', v: { a: true, b: [1, 7] }, r: new BP().word32be(20).string('{"a":true,"b":[1,7]}', 'utf-8').buffer() }, | ||
// online float4+float8 hex converter, http://gregstoll.dyndns.org/~gregstoll/floattohex/ | ||
{ t:'float4', v:null, r: (new BP()).word32be(-1).buffer()}, | ||
{ t:'float4', v:0.12300000339746475, r: (new BP()).word32be(4).put(new Buffer([0x3d,0xfb,0xe7,0x6d])).buffer()}, | ||
{ t:'float8', v:null, r: (new BP()).word32be(-1).buffer()}, | ||
{ t:'float8', v:42.4242, r: (new BP()).word32be(8).put(new Buffer([0x40,0x45,0x36,0x4c,0x2f,0x83,0x7b,0x4a])).buffer()}, | ||
{ t:'timestamptz', v:null, r: (new BP()).word32be(-1).buffer()}, | ||
{ t:'timestamptz', v:new Date('2000-01-01T00:00:00Z'), r: (new BP()).word32be(8).put(new Buffer([0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00])).buffer()}, // 0 | ||
{ t:'timestamptz', v:new Date('2000-01-01T00:00:01Z'), r: (new BP()).word32be(8).put(new Buffer([0x00,0x00,0x00,0x00,0x00,0x0f,0x42,0x40])).buffer()}, // 1.000.000 | ||
{ t:'timestamptz', v:new Date('1999-12-31T00:00:00Z'), r: (new BP()).word32be(8).put(new Buffer([0xff,0xff,0xff,0xeb,0xe2,0x28,0xa0,0x00])).buffer()}, // -86400x10e6 | ||
{ t: 'float4', v: null, r: new BP().word32be(-1).buffer() }, | ||
{ | ||
t: 'float4', | ||
v: 0.12300000339746475, | ||
r: new BP() | ||
.word32be(4) | ||
.put(Buffer.from([0x3d, 0xfb, 0xe7, 0x6d])) | ||
.buffer(), | ||
}, | ||
{ t: 'float8', v: null, r: new BP().word32be(-1).buffer() }, | ||
{ | ||
t: 'float8', | ||
v: 42.4242, | ||
r: new BP() | ||
.word32be(8) | ||
.put(Buffer.from([0x40, 0x45, 0x36, 0x4c, 0x2f, 0x83, 0x7b, 0x4a])) | ||
.buffer(), | ||
}, | ||
{ t: 'timestamptz', v: null, r: new BP().word32be(-1).buffer() }, | ||
{ | ||
t: 'timestamptz', | ||
v: new Date('2000-01-01T00:00:00Z'), | ||
r: new BP() | ||
.word32be(8) | ||
.put(Buffer.from([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])) | ||
.buffer(), | ||
}, // 0 | ||
{ | ||
t: 'timestamptz', | ||
v: new Date('2000-01-01T00:00:01Z'), | ||
r: new BP() | ||
.word32be(8) | ||
.put(Buffer.from([0x00, 0x00, 0x00, 0x00, 0x00, 0x0f, 0x42, 0x40])) | ||
.buffer(), | ||
}, // 1.000.000 | ||
{ | ||
t: 'timestamptz', | ||
v: new Date('1999-12-31T00:00:00Z'), | ||
r: new BP() | ||
.word32be(8) | ||
.put(Buffer.from([0xff, 0xff, 0xff, 0xeb, 0xe2, 0x28, 0xa0, 0x00])) | ||
.buffer(), | ||
}, // -86400x10e6 | ||
// arrays | ||
{ t:'_bool', v:null, r: (new BP()).word32be(-1).buffer()}, | ||
{ t:'_bool', v:[true, false], r: (new BP()).word32be(30).word32be(1).word32be(0).word32be(types['bool'].oid).word32be(2).word32be(1) | ||
.word32be(1).word8(1).word32be(1).word8(0).buffer()}, | ||
{ t:'_int2', v:null, r: (new BP()).word32be(-1).buffer()}, | ||
{ t:'_int2', v:[5, 7], r: (new BP()).word32be(32).word32be(1).word32be(0).word32be(types['int2'].oid).word32be(2).word32be(1) | ||
.word32be(2).word16be(5).word32be(2).word16be(7).buffer()}, | ||
{ t:'_int4', v:null, r: (new BP()).word32be(-1).buffer()}, | ||
{ t:'_int4', v:[[1,2],[3,4],[5,6]], r: (new BP()).word32be(76) | ||
.word32be(2).word32be(0).word32be(types['int4'].oid) | ||
.word32be(3).word32be(1) | ||
.word32be(2).word32be(1) | ||
.word32be(4).word32be(1).word32be(4).word32be(2) | ||
.word32be(4).word32be(3).word32be(4).word32be(4) | ||
.word32be(4).word32be(5).word32be(4).word32be(6).buffer()}, | ||
{ t:'_bytea', v:null, r: (new BP()).word32be(-1).buffer()}, | ||
{ t:'_bytea', v:[new Buffer([61,62]), new Buffer([62,61])], r: (new BP()).word32be(32).word32be(1).word32be(0).word32be(types['bytea'].oid) | ||
.word32be(2).word32be(1) | ||
.word32be(2).word8(61).word8(62).word32be(2).word8(62).word8(61).buffer()}, | ||
{ t:'_text', v:null, r: (new BP()).word32be(-1).buffer()}, | ||
{ t:'_text', v:["ab", "cd"], r: (new BP()).word32be(32).word32be(1).word32be(0).word32be(types['text'].oid).word32be(2).word32be(1) | ||
.word32be(2).word8(97).word8(98).word32be(2).word8(99).word8(100).buffer()}, | ||
{ t:'_json', v:null, r: (new BP()).word32be(-1).buffer()}, | ||
{ t:'_json', v:[{a:1},{c:3}], r: (new BP()).word32be(42).word32be(1).word32be(0).word32be(types['json'].oid).word32be(2).word32be(1) | ||
.word32be(7).string("{\"a\":1}", 'utf-8').word32be(7).string("{\"c\":3}", 'utf-8').buffer()}, | ||
{ t:'_float4', v:null, r: (new BP()).word32be(-1).buffer()}, | ||
{ t:'_float4', v:[0.12300000339746475,0.12300000339746475], r: (new BP()).word32be(36).word32be(1).word32be(0).word32be(types['float4'].oid).word32be(2).word32be(1) | ||
.word32be(4).put(new Buffer([0x3d,0xfb,0xe7,0x6d])) | ||
.word32be(4).put(new Buffer([0x3d,0xfb,0xe7,0x6d])).buffer()}, | ||
{ t:'_float8', v:null, r: (new BP()).word32be(-1).buffer()}, | ||
{ t:'_float8', v:[42.4242,42.4242], r: (new BP()).word32be(44).word32be(1).word32be(0).word32be(types['float8'].oid).word32be(2).word32be(1) | ||
.word32be(8).put(new Buffer([0x40,0x45,0x36,0x4c,0x2f,0x83,0x7b,0x4a])) | ||
.word32be(8).put(new Buffer([0x40,0x45,0x36,0x4c,0x2f,0x83,0x7b,0x4a])).buffer()}, | ||
{ t:'_timestamptz', v:null, r: (new BP()).word32be(-1).buffer()}, | ||
{ t:'_timestamptz', v:[new Date('2000-01-01T00:00:00Z'),new Date('2000-01-01T00:00:01Z')], | ||
r: (new BP()).word32be(44).word32be(1).word32be(0).word32be(types['timestamptz'].oid).word32be(2).word32be(1) | ||
.word32be(8).put(new Buffer([0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00])) | ||
.word32be(8).put(new Buffer([0x00,0x00,0x00,0x00,0x00,0x0f,0x42,0x40])).buffer()}, | ||
{ t: '_bool', v: null, r: new BP().word32be(-1).buffer() }, | ||
{ | ||
t: '_bool', | ||
v: [true, false], | ||
r: new BP() | ||
.word32be(30) | ||
.word32be(1) | ||
.word32be(0) | ||
.word32be(types['bool'].oid) | ||
.word32be(2) | ||
.word32be(1) | ||
.word32be(1) | ||
.word8(1) | ||
.word32be(1) | ||
.word8(0) | ||
.buffer(), | ||
}, | ||
{ t: '_int2', v: null, r: new BP().word32be(-1).buffer() }, | ||
{ | ||
t: '_int2', | ||
v: [5, 7], | ||
r: new BP() | ||
.word32be(32) | ||
.word32be(1) | ||
.word32be(0) | ||
.word32be(types['int2'].oid) | ||
.word32be(2) | ||
.word32be(1) | ||
.word32be(2) | ||
.word16be(5) | ||
.word32be(2) | ||
.word16be(7) | ||
.buffer(), | ||
}, | ||
{ t: '_int4', v: null, r: new BP().word32be(-1).buffer() }, | ||
{ | ||
t: '_int4', | ||
v: [ | ||
[1, 2], | ||
[3, 4], | ||
[5, 6], | ||
], | ||
r: new BP() | ||
.word32be(76) | ||
.word32be(2) | ||
.word32be(0) | ||
.word32be(types['int4'].oid) | ||
.word32be(3) | ||
.word32be(1) | ||
.word32be(2) | ||
.word32be(1) | ||
.word32be(4) | ||
.word32be(1) | ||
.word32be(4) | ||
.word32be(2) | ||
.word32be(4) | ||
.word32be(3) | ||
.word32be(4) | ||
.word32be(4) | ||
.word32be(4) | ||
.word32be(5) | ||
.word32be(4) | ||
.word32be(6) | ||
.buffer(), | ||
}, | ||
{ t: '_bytea', v: null, r: new BP().word32be(-1).buffer() }, | ||
{ | ||
t: '_bytea', | ||
v: [Buffer.from([61, 62]), Buffer.from([62, 61])], | ||
r: new BP() | ||
.word32be(32) | ||
.word32be(1) | ||
.word32be(0) | ||
.word32be(types['bytea'].oid) | ||
.word32be(2) | ||
.word32be(1) | ||
.word32be(2) | ||
.word8(61) | ||
.word8(62) | ||
.word32be(2) | ||
.word8(62) | ||
.word8(61) | ||
.buffer(), | ||
}, | ||
{ t: '_text', v: null, r: new BP().word32be(-1).buffer() }, | ||
{ | ||
t: '_text', | ||
v: ['ab', 'cd'], | ||
r: new BP() | ||
.word32be(32) | ||
.word32be(1) | ||
.word32be(0) | ||
.word32be(types['text'].oid) | ||
.word32be(2) | ||
.word32be(1) | ||
.word32be(2) | ||
.word8(97) | ||
.word8(98) | ||
.word32be(2) | ||
.word8(99) | ||
.word8(100) | ||
.buffer(), | ||
}, | ||
{ t: '_json', v: null, r: new BP().word32be(-1).buffer() }, | ||
{ | ||
t: '_json', | ||
v: [{ a: 1 }, { c: 3 }], | ||
r: new BP() | ||
.word32be(42) | ||
.word32be(1) | ||
.word32be(0) | ||
.word32be(types['json'].oid) | ||
.word32be(2) | ||
.word32be(1) | ||
.word32be(7) | ||
.string('{"a":1}', 'utf-8') | ||
.word32be(7) | ||
.string('{"c":3}', 'utf-8') | ||
.buffer(), | ||
}, | ||
{ t: '_float4', v: null, r: new BP().word32be(-1).buffer() }, | ||
{ | ||
t: '_float4', | ||
v: [0.12300000339746475, 0.12300000339746475], | ||
r: new BP() | ||
.word32be(36) | ||
.word32be(1) | ||
.word32be(0) | ||
.word32be(types['float4'].oid) | ||
.word32be(2) | ||
.word32be(1) | ||
.word32be(4) | ||
.put(Buffer.from([0x3d, 0xfb, 0xe7, 0x6d])) | ||
.word32be(4) | ||
.put(Buffer.from([0x3d, 0xfb, 0xe7, 0x6d])) | ||
.buffer(), | ||
}, | ||
{ t: '_float8', v: null, r: new BP().word32be(-1).buffer() }, | ||
{ | ||
t: '_float8', | ||
v: [42.4242, 42.4242], | ||
r: new BP() | ||
.word32be(44) | ||
.word32be(1) | ||
.word32be(0) | ||
.word32be(types['float8'].oid) | ||
.word32be(2) | ||
.word32be(1) | ||
.word32be(8) | ||
.put(Buffer.from([0x40, 0x45, 0x36, 0x4c, 0x2f, 0x83, 0x7b, 0x4a])) | ||
.word32be(8) | ||
.put(Buffer.from([0x40, 0x45, 0x36, 0x4c, 0x2f, 0x83, 0x7b, 0x4a])) | ||
.buffer(), | ||
}, | ||
{ t: '_timestamptz', v: null, r: new BP().word32be(-1).buffer() }, | ||
{ | ||
t: '_timestamptz', | ||
v: [new Date('2000-01-01T00:00:00Z'), new Date('2000-01-01T00:00:01Z')], | ||
r: new BP() | ||
.word32be(44) | ||
.word32be(1) | ||
.word32be(0) | ||
.word32be(types['timestamptz'].oid) | ||
.word32be(2) | ||
.word32be(1) | ||
.word32be(8) | ||
.put(Buffer.from([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])) | ||
.word32be(8) | ||
.put(Buffer.from([0x00, 0x00, 0x00, 0x00, 0x00, 0x0f, 0x42, 0x40])) | ||
.buffer(), | ||
}, | ||
] | ||
@@ -1,117 +0,146 @@ | ||
var assert = require('assert') | ||
var async = require('async'); | ||
const assert = require('assert') | ||
const async = require('async') | ||
var pg = require('pg'); | ||
var pgCopyOut = require('pg-copy-streams').to; | ||
var pgCopyIn = require('pg-copy-streams').from; | ||
var through2 = require('through2'); | ||
const pg = require('pg') | ||
const { to: pgCopyTo, from: pgCopyFrom } = require('pg-copy-streams') | ||
const through2 = require('through2') | ||
var pgCopyTransform = require('../').transform; | ||
const { transform } = require('../') | ||
var client = function(dsn) { | ||
var client = new pg.Client(dsn); | ||
client.connect(); | ||
return client; | ||
const getClient = function (dsn) { | ||
const client = new pg.Client(dsn) | ||
client.connect() | ||
return client | ||
} | ||
var clientA = client(); | ||
var clientB = client(); | ||
var clientC = client(); | ||
describe('integration test - transform', () => { | ||
it('should correclty extract, transform and load data', (done) => { | ||
const clientA = getClient() | ||
const clientB = getClient() | ||
const clientC = getClient() | ||
var queriesA = [ | ||
"DROP TABLE IF EXISTS item", | ||
"CREATE TABLE item (id serial PRIMARY KEY, ref text, description text)", | ||
"INSERT INTO item (ref, description) VALUES ('1:CTX', 'A little item')", | ||
"INSERT INTO item (ref, description) VALUES ('2:CTX', 'A BIG item')" | ||
] | ||
const queriesA = [ | ||
'DROP TABLE IF EXISTS item', | ||
'CREATE TABLE item (id serial PRIMARY KEY, ref text, description text)', | ||
"INSERT INTO item (ref, description) VALUES ('1:CTX', 'A little item')", | ||
"INSERT INTO item (ref, description) VALUES ('2:CTX', 'A BIG item')", | ||
] | ||
var queriesB = [ | ||
"DROP TABLE IF EXISTS product", | ||
"CREATE TABLE product (code int4 PRIMARY KEY, label text, description text, ts_creation timestamptz, matrix int2[][])" | ||
] | ||
const queriesB = [ | ||
'DROP TABLE IF EXISTS product', | ||
'CREATE TABLE product (code int4 PRIMARY KEY, label text, description text, ts_creation timestamptz, matrix int2[][])', | ||
] | ||
var queriesC = [ | ||
"DROP TABLE IF EXISTS generated", | ||
"CREATE TABLE generated (body text)" | ||
] | ||
const queriesC = ['DROP TABLE IF EXISTS generated', 'CREATE TABLE generated (body text)'] | ||
// we simplify by observing here that A=B when tests are executed | ||
async.eachSeries(queriesA.concat(queriesB, queriesC), clientA.query.bind(clientA), function (err) { | ||
if (err) return done(err) | ||
// we simplify by observing here that A=B when tests are executed | ||
async.eachSeries(queriesA.concat(queriesB, queriesC), clientA.query.bind(clientA), function(err) { | ||
assert.ifError(err) | ||
const copyOut = clientA.query(pgCopyTo('COPY item TO STDOUT BINARY')) | ||
const copyIns = [ | ||
clientB.query(pgCopyFrom('COPY product FROM STDIN BINARY')), | ||
clientC.query(pgCopyFrom('COPY generated FROM STDIN BINARY')), | ||
] | ||
var copyOut = clientA.query(pgCopyOut('COPY item TO STDOUT BINARY')) | ||
var copyIns = [ | ||
clientB.query(pgCopyIn ('COPY product FROM STDIN BINARY')), | ||
clientC.query(pgCopyIn ('COPY generated FROM STDIN BINARY')), | ||
] | ||
var count = 0; | ||
var pct = pgCopyTransform({ | ||
mapping: [{key:'id',type:'int4'}, {key:'ref',type:'text'},{key:'description',type:'text'}], | ||
targets: copyIns, | ||
transform: through2.obj(function(row, _, cb) { | ||
var id = parseInt(row.ref.split(':')[0]); | ||
var d = new Date('1999-01-01T00:00:00Z'); | ||
d.setDate(d.getDate() + id); | ||
count++ | ||
this.push([0, | ||
{ type: 'int4', value: id }, | ||
{ type: 'text', value: row.ref.split(':')[1] }, | ||
{ type: 'text', value: row.description.toLowerCase() }, | ||
{ type: 'timestamptz', value: d }, | ||
{ type: '_int2', value: [ [ id, id+1 ], [ id+2, id+3 ] ] } | ||
]) | ||
while (id > 0) { | ||
count++ | ||
this.push([1, | ||
{ type: 'text', value: 'BODY: ' + row.description } | ||
]); | ||
id--; | ||
} | ||
cb() | ||
},function(cb) { | ||
this.push([1, { type: 'text', value: 'COUNT: ' + count}]) | ||
cb() | ||
let count = 0 | ||
const pct = transform({ | ||
mapping: [ | ||
{ key: 'id', type: 'int4' }, | ||
{ key: 'ref', type: 'text' }, | ||
{ key: 'description', type: 'text' }, | ||
], | ||
targets: copyIns, | ||
transform: through2.obj( | ||
function (row, _, cb) { | ||
let id = parseInt(row.ref.split(':')[0]) | ||
const d = new Date('1999-01-01T00:00:00Z') | ||
d.setDate(d.getDate() + id) | ||
count++ | ||
this.push([ | ||
0, | ||
{ type: 'int4', value: id }, | ||
{ type: 'text', value: row.ref.split(':')[1] }, | ||
{ type: 'text', value: row.description.toLowerCase() }, | ||
{ type: 'timestamptz', value: d }, | ||
{ | ||
type: '_int2', | ||
value: [ | ||
[id, id + 1], | ||
[id + 2, id + 3], | ||
], | ||
}, | ||
]) | ||
while (id > 0) { | ||
count++ | ||
this.push([1, { type: 'text', value: 'BODY: ' + row.description }]) | ||
id-- | ||
} | ||
cb() | ||
}, | ||
function (cb) { | ||
this.push([1, { type: 'text', value: 'COUNT: ' + count }]) | ||
cb() | ||
} | ||
), | ||
}) | ||
}) | ||
pct.on('close', function(err) { | ||
assert.ifError(err) | ||
clientA.query('SELECT * FROM item', function(err, res) { | ||
assert.equal(res.rowCount, 2, 'expected 2 tuples on A, but got ' + res.rowCount); | ||
clientA.end(); | ||
copyOut.pipe(pct) | ||
pct.on('close', function (err) { | ||
if (err) return done(err) | ||
let running = 3 | ||
clientA.query('SELECT * FROM item', function (err, res) { | ||
clientA.end() | ||
if (err) return done(err) | ||
try { | ||
assert.equal(res.rowCount, 2, 'expected 2 tuples on A, but got ' + res.rowCount) | ||
} catch (err) { | ||
return done(err) | ||
} | ||
running-- | ||
if (!running) done() | ||
}) | ||
clientB.query('SELECT * FROM product ORDER BY code ASC', function(err, res) { | ||
var d = new Date('1999-01-01T00:00:00Z'); | ||
assert.equal(res.rowCount, 2, 'expected 2 tuples on B, but got ' + res.rowCount); | ||
// first row | ||
assert.equal(res.rows[0].code, 1) | ||
assert.equal(res.rows[0].label, 'CTX') | ||
assert.equal(res.rows[0].description, 'a little item') | ||
assert.equal(res.rows[0].ts_creation.getTime(), d.getTime() + 1*24*60*60*1000) | ||
assert.equal(JSON.stringify(res.rows[0].matrix), "[[1,2],[3,4]]") | ||
clientB.query('SELECT * FROM product ORDER BY code ASC', function (err, res) { | ||
clientB.end() | ||
if (err) return done(err) | ||
try { | ||
const d = new Date('1999-01-01T00:00:00Z') | ||
assert.equal(res.rowCount, 2, 'expected 2 tuples on B, but got ' + res.rowCount) | ||
// second row | ||
assert.equal(res.rows[1].code, 2) | ||
assert.equal(res.rows[1].label, 'CTX') | ||
assert.equal(res.rows[1].description, 'a big item') | ||
assert.equal(JSON.stringify(res.rows[1].matrix), "[[2,3],[4,5]]") | ||
// first row | ||
assert.equal(res.rows[0].code, 1) | ||
assert.equal(res.rows[0].label, 'CTX') | ||
assert.equal(res.rows[0].description, 'a little item') | ||
assert.equal(res.rows[0].ts_creation.getTime(), d.getTime() + 1 * 24 * 60 * 60 * 1000) | ||
assert.equal(JSON.stringify(res.rows[0].matrix), '[[1,2],[3,4]]') | ||
clientB.end(); | ||
// second row | ||
assert.equal(res.rows[1].code, 2) | ||
assert.equal(res.rows[1].label, 'CTX') | ||
assert.equal(res.rows[1].description, 'a big item') | ||
assert.equal(JSON.stringify(res.rows[1].matrix), '[[2,3],[4,5]]') | ||
} catch (err) { | ||
return done(err) | ||
} | ||
running-- | ||
if (!running) done() | ||
}) | ||
clientC.query('SELECT * FROM generated ORDER BY body ASC', function(err, res) { | ||
assert.equal(res.rows[0].body, 'BODY: A BIG item') | ||
assert.equal(res.rows[1].body, 'BODY: A BIG item') | ||
assert.equal(res.rows[2].body, 'BODY: A little item') | ||
assert.equal(res.rows[3].body, 'COUNT: 5') | ||
clientC.end(); | ||
clientC.query('SELECT * FROM generated ORDER BY body ASC', function (err, res) { | ||
clientC.end() | ||
if (err) return done(err) | ||
try { | ||
assert.equal(res.rows[0].body, 'BODY: A BIG item') | ||
assert.equal(res.rows[1].body, 'BODY: A BIG item') | ||
assert.equal(res.rows[2].body, 'BODY: A little item') | ||
assert.equal(res.rows[3].body, 'COUNT: 5') | ||
} catch (err) { | ||
return done(err) | ||
} | ||
running-- | ||
if (!running) done() | ||
}) | ||
} | ||
) | ||
copyOut.pipe(pct); | ||
}) | ||
}) | ||
}) | ||
}) |
Sorry, the diff of this file is not supported yet
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
51063
1153
301
6
11
1
+ Addedbl@^4.0.2
+ Addedbase64-js@1.5.1(transitive)
+ Addedbl@4.1.0(transitive)
+ Addedbuffer@5.7.1(transitive)