postgres
Advanced tools
Comparing version 2.0.0-beta.6 to 2.0.0-beta.7
@@ -18,2 +18,4 @@ const { errors } = require('./errors.js') | ||
onready, | ||
oncopy, | ||
ondata, | ||
transform, | ||
@@ -96,3 +98,5 @@ onnotice, | ||
/* c8 ignore next 3 */ | ||
function CopyDone() { /* No handling needed */ } | ||
function CopyDone() { | ||
backend.query.readable.push(null) | ||
} | ||
@@ -105,3 +109,3 @@ function DataRow(x) { | ||
const row = {} | ||
const row = backend.query.raw ? new Array(backend.query.statement.columns.length) : {} | ||
for (let i = 0; i < backend.query.statement.columns.length; i++) { | ||
@@ -114,9 +118,13 @@ column = backend.query.statement.columns[i] | ||
? null | ||
: column.parser === undefined | ||
? x.toString('utf8', index, index += length) | ||
: column.parser.array === true | ||
? column.parser(x.toString('utf8', index + 1, index += length)) | ||
: column.parser(x.toString('utf8', index, index += length)) | ||
: backend.query.raw | ||
? x.slice(index, index += length) | ||
: column.parser === undefined | ||
? x.toString('utf8', index, index += length) | ||
: column.parser.array === true | ||
? column.parser(x.toString('utf8', index + 1, index += length)) | ||
: column.parser(x.toString('utf8', index, index += length)) | ||
row[column.name] = transform.value ? transform.value(value) : value | ||
backend.query.raw | ||
? (row[i] = value) | ||
: (row[column.name] = transform.value ? transform.value(value) : value) | ||
} | ||
@@ -130,3 +138,5 @@ | ||
/* c8 ignore next 3 */ | ||
function CopyData() { /* No handling needed until implemented */ } | ||
function CopyData(x) { | ||
ondata(x.slice(5)) | ||
} | ||
@@ -139,9 +149,7 @@ function ErrorResponse(x) { | ||
function CopyInResponse() { | ||
backend.error = errors.notSupported('CopyInResponse') | ||
oncopy() | ||
} | ||
/* c8 ignore next 3 */ | ||
function CopyOutResponse() { | ||
backend.error = errors.notSupported('CopyOutResponse') | ||
} | ||
function CopyOutResponse() { /* No handling needed */ } | ||
@@ -232,3 +240,3 @@ /* c8 ignore next 3 */ | ||
function CopyBothResponse() { | ||
backend.error = errors.notSupported('CopyBothResponse') | ||
oncopy() | ||
} | ||
@@ -235,0 +243,0 @@ |
const size = 256 | ||
let buffer = Buffer.allocUnsafe(size) | ||
const messages = ['B', 'C', 'Q', 'P', 'F', 'p', 'D', 'E', 'H', 'S'].reduce((acc, x) => { | ||
const messages = ['B', 'C', 'Q', 'P', 'F', 'p', 'D', 'E', 'H', 'S', 'd', 'c', 'f'].reduce((acc, x) => { | ||
const v = x.charCodeAt(0) | ||
@@ -48,2 +48,7 @@ acc[x] = () => { | ||
}, | ||
raw(x) { | ||
buffer = Buffer.concat([buffer.slice(0, b.i), x]) | ||
b.i = buffer.length | ||
return b | ||
}, | ||
end(at = 1) { | ||
@@ -50,0 +55,0 @@ buffer.writeUInt32BE(b.i - at, at) |
@@ -6,3 +6,3 @@ const net = require('net') | ||
const Queue = require('./queue.js') | ||
const { END } = require('./types.js') | ||
const { END, retryRoutines } = require('./types.js') | ||
const { errors } = require('./errors.js') | ||
@@ -41,3 +41,2 @@ | ||
, uid = Math.random().toString(36).slice(2) | ||
, connection = { send, end, destroy } | ||
@@ -52,2 +51,4 @@ const socket = postgresSocket(options, { | ||
const connection = { send, end, destroy, socket } | ||
const backend = Backend({ | ||
@@ -65,2 +66,4 @@ onparse, | ||
onauth, | ||
oncopy, | ||
ondata, | ||
error | ||
@@ -132,4 +135,13 @@ }) | ||
function retry(query) { | ||
query.retried = true | ||
delete statements[query.sig] | ||
ready = true | ||
backend.query = backend.error = null | ||
send(query, { sig: query.sig, str: query.str, args: query.args }) | ||
} | ||
function send(query, { sig, str, args = [] }) { | ||
try { | ||
query.sig = sig | ||
query.str = str | ||
@@ -209,2 +221,5 @@ query.args = args | ||
if (backend.query) { | ||
if (!backend.query.retried && retryRoutines[err.routine]) | ||
return retry(backend.query) | ||
err.stack += backend.query.origin.replace(/.*\n/, '\n') | ||
@@ -244,2 +259,17 @@ Object.defineProperty(err, 'query', { | ||
function oncopy() { | ||
backend.query.writable.push = ({ chunk, error, callback }) => { | ||
error | ||
? socket.write(frontend.CopyFail(error)) | ||
: chunk === null | ||
? socket.write(frontend.CopyDone()) | ||
: socket.write(frontend.CopyData(chunk), callback) | ||
} | ||
backend.query.writable.forEach(backend.query.writable.push) | ||
} | ||
function ondata(x) { | ||
!backend.query.readable.push(x) && socket.pause() | ||
} | ||
function multi() { | ||
@@ -388,7 +418,11 @@ if (next) | ||
}, | ||
write: x => { | ||
pause: () => socket.pause(), | ||
resume: () => socket.resume(), | ||
isPaused: () => socket.isPaused(), | ||
write: (x, callback) => { | ||
buffer = buffer ? Buffer.concat([buffer, x]) : Buffer.from(x) | ||
if (buffer.length >= 1024) | ||
return write() | ||
return write(callback) | ||
next === null && (next = setImmediate(write)) | ||
callback && callback() | ||
}, | ||
@@ -406,4 +440,4 @@ destroy: () => { | ||
function write() { | ||
socket.write(buffer) | ||
function write(callback) { | ||
socket.write(buffer, callback) | ||
next !== null && clearImmediate(next) | ||
@@ -410,0 +444,0 @@ buffer = next = null |
@@ -48,3 +48,6 @@ const crypto = require('crypto') | ||
Close, | ||
Execute | ||
Execute, | ||
CopyData, | ||
CopyDone, | ||
CopyFail | ||
} | ||
@@ -151,2 +154,22 @@ | ||
function CopyData(x) { | ||
return bytes | ||
.d() | ||
.raw(x) | ||
.end() | ||
} | ||
function CopyDone() { | ||
return bytes | ||
.c() | ||
.end() | ||
} | ||
function CopyFail(err) { | ||
return bytes | ||
.f() | ||
.str(String(err) + N) | ||
.end() | ||
} | ||
function Bind(name, args, rows = 0) { | ||
@@ -153,0 +176,0 @@ let prev |
107
lib/index.js
const fs = require('fs') | ||
const Url = require('url') | ||
const Stream = require('stream') | ||
const Connection = require('./connection.js') | ||
@@ -65,3 +66,3 @@ const Queue = require('./queue.js') | ||
, ended = null | ||
, arrayTypesPromise | ||
, arrayTypesPromise = options.fetch_types ? null : Promise.resolve([]) | ||
, slots = max | ||
@@ -71,3 +72,3 @@ , listener | ||
function postgres(xs) { | ||
return query({ prepare: options.prepare }, getConnection(), xs, Array.from(arguments).slice(1)) | ||
return query({ tagged: true, prepare: options.prepare }, getConnection(), xs, Array.from(arguments).slice(1)) | ||
} | ||
@@ -129,3 +130,3 @@ | ||
query({ raw: true }, connection, begin || savepoint) | ||
query({}, connection, begin || savepoint) | ||
.then(() => { | ||
@@ -143,3 +144,3 @@ const result = fn(scoped) | ||
.catch((err) => { | ||
query({ raw: true }, connection, | ||
query({}, connection, | ||
begin | ||
@@ -153,7 +154,7 @@ ? 'rollback' | ||
connections.push(connection) | ||
next() | ||
next(connection) | ||
})) | ||
function scoped(xs) { | ||
return query({}, connection, xs, Array.from(arguments).slice(1)) | ||
return query({ tagged: true }, connection, xs, Array.from(arguments).slice(1)) | ||
} | ||
@@ -166,6 +167,12 @@ } | ||
while (queries.length && (c = getConnection(queries.peek().fn)) && (x = queries.shift())) { | ||
while ( | ||
(x = queries.peek()) | ||
&& (c = x.query && x.query.connection || getConnection(queries.peek().fn)) | ||
&& queries.shift() | ||
) { | ||
x.fn | ||
? transaction(x, c) | ||
: send(c, x.query, x.xs, x.args) | ||
x.query && x.query.connection && x.query.writable && (c.blocked = true) | ||
} | ||
@@ -177,3 +184,3 @@ } | ||
query.prepare = 'prepare' in query ? query.prepare : options.prepare | ||
if (!query.raw && (!Array.isArray(xs) || !Array.isArray(xs.raw))) | ||
if (query.tagged && (!Array.isArray(xs) || !Array.isArray(xs.raw))) | ||
return nested(xs, args) | ||
@@ -215,5 +222,8 @@ | ||
function send(connection, query, xs, args) { | ||
connection | ||
? process.nextTick(connection.send, query, query.raw ? parseRaw(query, xs, args) : parse(query, xs, args)) | ||
: queries.push({ query, xs, args }) | ||
connection && (query.connection = connection) | ||
if (!connection || connection.blocked) | ||
return queries.push({ query, xs, args, connection }) | ||
connection.blocked = query.blocked | ||
process.nextTick(connection.send, query, query.tagged ? parseTagged(query, xs, args) : parseUnsafe(query, xs, args)) | ||
} | ||
@@ -252,3 +262,3 @@ | ||
new Promise((resolve, reject) => { | ||
send(connection, { resolve, reject, raw: true, prepare: false }, ` | ||
send(connection, { resolve, reject, tagged: false, prepare: false, origin: new Error().stack }, ` | ||
select b.oid, b.typarray | ||
@@ -298,3 +308,3 @@ from pg_catalog.pg_type a | ||
const prepare = queryOptions && queryOptions.prepare || false | ||
return query({ raw: true, simple: !args, prepare }, connection || getConnection(), xs, args || []) | ||
return query({ simple: !args, prepare }, connection || getConnection(), xs, args || []) | ||
} | ||
@@ -312,3 +322,3 @@ | ||
const file = files[path] | ||
const q = { raw: true, simple: !args } | ||
const q = { tagged: false, simple: !args } | ||
@@ -339,4 +349,11 @@ if (options.cache && typeof file === 'string') | ||
function addMethods(promise, query) { | ||
promise.readable = () => readable(promise, query) | ||
promise.writable = () => writable(promise, query) | ||
promise.raw = () => (query.raw = true, promise) | ||
promise.stream = (fn) => (query.stream = fn, promise) | ||
promise.cursor = (rows, fn) => { | ||
promise.cursor = cursor(promise, query) | ||
} | ||
function cursor(promise, query) { | ||
return (rows, fn) => { | ||
if (typeof rows === 'function') { | ||
@@ -353,2 +370,49 @@ fn = rows | ||
function readable(promise, query) { | ||
query.connection | ||
? query.connection.blocked = true | ||
: query.blocked = true | ||
const read = () => query.connection.socket.isPaused() && query.connection.socket.resume() | ||
promise.catch(err => query.readable.destroy(err)).then(() => { | ||
query.connection.blocked = false | ||
read() | ||
next() | ||
}) | ||
return query.readable = new Stream.Readable({ read }) | ||
} | ||
function writable(promise, query) { | ||
query.connection | ||
? query.connection.blocked = true | ||
: query.blocked = true | ||
let error | ||
query.prepare = false | ||
query.simple = true | ||
query.writable = [] | ||
promise.catch(err => error = err).then(() => { | ||
query.connection.blocked = false | ||
next() | ||
}) | ||
return query.readable = new Stream.Duplex({ | ||
read() { /* backpressure handling not possible */ }, | ||
write(chunk, encoding, callback) { | ||
error | ||
? callback(error) | ||
: query.writable.push({ chunk, callback }) | ||
}, | ||
destroy(error, callback) { | ||
query.writable.push({ error }) | ||
callback(error) | ||
}, | ||
final(callback) { | ||
if (error) | ||
return callback(error) | ||
query.writable.push({ chunk: null }) | ||
promise.then(() => callback(), callback) | ||
} | ||
}) | ||
} | ||
function listen(channel, fn) { | ||
@@ -366,3 +430,3 @@ const listener = getListener() | ||
return query({ raw: true }, listener.conn, 'listen ' + escape(channel)) | ||
return query({}, listener.conn, 'listen ' + escape(channel)) | ||
.then((result) => { | ||
@@ -385,3 +449,3 @@ Object.assign(listener.result, result) | ||
delete listeners[channel] | ||
return query({ raw: true }, getListener().conn, 'unlisten ' + escape(channel)).then(() => undefined) | ||
return query({}, getListener().conn, 'unlisten ' + escape(channel)).then(() => undefined) | ||
} | ||
@@ -427,3 +491,3 @@ } | ||
function parseRaw(query, str, args = []) { | ||
function parseUnsafe(query, str, args = []) { | ||
const types = [] | ||
@@ -441,3 +505,3 @@ , xargs = [] | ||
function parse(query, xs, args = []) { | ||
function parseTagged(query, xs, args = []) { | ||
const xargs = [] | ||
@@ -587,3 +651,4 @@ , types = [] | ||
target_session_attrs: o.target_session_attrs || url.query.target_session_attrs || env.PGTARGETSESSIONATTRS, | ||
debug : o.debug | ||
debug : o.debug, | ||
fetch_types : 'fetch_types' in o ? o.fetch_types : true | ||
}, | ||
@@ -595,3 +660,3 @@ mergeUserTypes(o.types) | ||
function parseSSL(x) { | ||
return x !== 'disabled' && x !== 'false' && x | ||
return x !== 'disable' && x !== 'false' && x | ||
} | ||
@@ -598,0 +663,0 @@ |
@@ -195,1 +195,7 @@ const char = module.exports.char = (acc, [k, v]) => (acc[k.charCodeAt(0)] = v, acc) | ||
}).reduce(char, {}) | ||
module.exports.retryRoutines = { | ||
FetchPreparedStatement: true, | ||
RevalidateCachedQuery: true, | ||
transformAssignedExpr: true | ||
} |
{ | ||
"name": "postgres", | ||
"version": "2.0.0-beta.6", | ||
"version": "2.0.0-beta.7", | ||
"description": "Fastest full featured PostgreSQL client for Node.js", | ||
@@ -5,0 +5,0 @@ "main": "lib/index.js", |
@@ -74,2 +74,4 @@ <img align="left" width="440" height="140" alt="Fastest full PostgreSQL nodejs client" src="https://raw.githubusercontent.com/porsager/postgres/master/postgresjs.svg?sanitize=true" /> | ||
// ensure only connecting to primary | ||
fetch_array_types : true // Disable automatically fetching array types | ||
// on initial connection. | ||
}) | ||
@@ -100,2 +102,10 @@ ``` | ||
### Auto fetching of array types | ||
When Postgres.js first connects to the database it automatically fetches array type information. | ||
If you have revoked access to `pg_catalog` this feature will no longer work and will need to be disabled. | ||
You can disable fetching array types by setting `fetch_array_types` to `false` when creating an instance. | ||
### Environment Variables for Options | ||
@@ -121,3 +131,3 @@ | ||
A query will always return a `Promise` which resolves to a results array `[...]{ rows, command }`. Destructuring is great to immediately access the first element. | ||
A query will always return a `Promise` which resolves to a results array `[...]{ count, command, columns }`. Destructuring is great to immediately access the first element. | ||
@@ -274,2 +284,8 @@ ```js | ||
## Raw ```sql``.raw()``` | ||
Using `.raw()` will return rows as an array with `Buffer` values for each column, instead of objects. | ||
This can be useful to receive identical named columns, or for specific performance / transformation reasons. The column definitions are still included on the result array with access to parsers for each column. | ||
## Listen and notify | ||
@@ -276,0 +292,0 @@ |
86820
2048
725