Comparing version 3.0.0 to 3.0.1
@@ -51,3 +51,3 @@ const net = require('net') | ||
function Connection(options, { onopen = noop, onend = noop, ondrain = noop, onclose = noop } = {}) { | ||
function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose = noop } = {}) { | ||
const { | ||
@@ -84,3 +84,2 @@ ssl, | ||
, statements = {} | ||
, state = 'closed' | ||
, statementId = Math.random().toString(36).slice(2) | ||
@@ -110,9 +109,4 @@ , statementCount = 1 | ||
const connection = { | ||
get state() { return state }, | ||
set state(x) { | ||
state = x | ||
state === 'open' | ||
? idleTimer.start() | ||
: idleTimer.cancel() | ||
}, | ||
queue: queues.closed, | ||
idleTimer, | ||
connect(query) { | ||
@@ -130,2 +124,4 @@ initial = query | ||
queues.closed && queues.closed.push(connection) | ||
return connection | ||
@@ -152,3 +148,3 @@ | ||
if (terminated) | ||
return q.reject(Errors.connection('CONNECTION_DESTROYED', options)) | ||
return queryError(q, Errors.connection('CONNECTION_DESTROYED', options)) | ||
@@ -299,3 +295,3 @@ if (q.cancelled) | ||
function drain() { | ||
ondrain(connection) | ||
onopen(connection) | ||
} | ||
@@ -371,3 +367,3 @@ | ||
function error(err) { | ||
if (connection.state === 'connecting' && options.host[retries + 1]) | ||
if (connection.queue === queues.connecting && options.host[retries + 1]) | ||
return | ||
@@ -539,3 +535,3 @@ | ||
while (sent.length && (query = sent.shift()) && (query.active = true) && query.cancelled) | ||
Connection(options, {}).cancel(query.state, query.cancelled.resolve, query.cancelled.reject) | ||
Connection(options).cancel(query.state, query.cancelled.resolve, query.cancelled.reject) | ||
@@ -542,0 +538,0 @@ if (query) |
const os = require('os') | ||
const fs = require('fs') | ||
const Stream = require('stream') | ||
@@ -24,2 +23,3 @@ const { | ||
const Subscribe = require('./subscribe.js') | ||
const largeObject = require('./large.js') | ||
@@ -46,12 +46,13 @@ Object.assign(Postgres, { | ||
const queries = Queue() | ||
, connections = [...Array(options.max)].map(() => Connection(options, { onopen, onend, ondrain, onclose })) | ||
, closed = Queue(connections) | ||
, connecting = Queue() | ||
, reserved = Queue() | ||
, closed = Queue() | ||
, ended = Queue() | ||
, open = Queue() | ||
, busy = Queue() | ||
, full = Queue() | ||
, ended = Queue() | ||
, connecting = Queue() | ||
, queues = { closed, ended, connecting, reserved, open, busy, full } | ||
, queues = { connecting, reserved, closed, ended, open, busy, full } | ||
const connections = [...Array(options.max)].map(() => Connection(options, queues, { onopen, onend, onclose })) | ||
const sql = Sql(handler) | ||
@@ -61,3 +62,3 @@ | ||
get parameters() { return options.parameters }, | ||
largeObject, | ||
largeObject: largeObject.bind(null, sql), | ||
subscribe, | ||
@@ -204,10 +205,13 @@ CLOSE, | ||
sql.savepoint = savepoint | ||
let errored | ||
let uncaughtError | ||
name && await sql`savepoint ${ sql(name) }` | ||
try { | ||
const result = await new Promise((resolve, reject) => { | ||
errored = reject | ||
const x = fn(sql) | ||
Promise.resolve(Array.isArray(x) ? Promise.all(x) : x).then(resolve, reject) | ||
}) | ||
if (uncaughtError) | ||
throw uncaughtError | ||
!name && await sql`commit` | ||
@@ -220,3 +224,3 @@ return result | ||
) | ||
throw e | ||
throw e instanceof PostgresError && e.code === '25P02' && uncaughtError || e | ||
} | ||
@@ -233,6 +237,6 @@ | ||
function handler(q) { | ||
errored && q.catch(errored) | ||
c.state === 'full' | ||
q.catch(e => uncaughtError || (uncaughtError = e)) | ||
c.queue === full | ||
? queries.push(q) | ||
: c.execute(q) || (c.state = 'full', full.push(c)) | ||
: c.execute(q) || move(c, full) | ||
} | ||
@@ -242,79 +246,17 @@ } | ||
function onexecute(c) { | ||
queues[c.state].remove(c) | ||
c.state = 'reserved' | ||
connection = c | ||
move(c, reserved) | ||
c.reserved = () => queries.length | ||
? c.execute(queries.shift()) | ||
: c.state = 'reserved' | ||
reserved.push(c) | ||
connection = c | ||
: move(c, reserved) | ||
} | ||
} | ||
function largeObject(oid, mode = 0x00020000 | 0x00040000) { | ||
return new Promise(async(resolve, reject) => { | ||
await sql.begin(async sql => { | ||
let finish | ||
!oid && ([{ oid }] = await sql`select lo_creat(-1) as oid`) | ||
const [{ fd }] = await sql`select lo_open(${ oid }, ${ mode }) as fd` | ||
const lo = { | ||
writable, | ||
readable, | ||
close : () => sql`select lo_close(${ fd })`.then(finish), | ||
tell : () => sql`select lo_tell64(${ fd })`, | ||
read : (x) => sql`select loread(${ fd }, ${ x }) as data`, | ||
write : (x) => sql`select lowrite(${ fd }, ${ x })`, | ||
truncate : (x) => sql`select lo_truncate64(${ fd }, ${ x })`, | ||
seek : (x, whence = 0) => sql`select lo_lseek64(${ fd }, ${ x }, ${ whence })`, | ||
size : () => sql` | ||
select | ||
lo_lseek64(${ fd }, location, 0) as position, | ||
seek.size | ||
from ( | ||
select | ||
lo_lseek64($1, 0, 2) as size, | ||
tell.location | ||
from (select lo_tell64($1) as location) tell | ||
) seek | ||
` | ||
} | ||
resolve(lo) | ||
return new Promise(async r => finish = r) | ||
async function readable({ | ||
highWaterMark = 2048 * 8, | ||
start = 0, | ||
end = Infinity | ||
} = {}) { | ||
let max = end - start | ||
start && await lo.seek(start) | ||
return new Stream.Readable({ | ||
highWaterMark, | ||
async read(size) { | ||
const l = size > max ? size - max : size | ||
max -= size | ||
const [{ data }] = await lo.read(l) | ||
this.push(data) | ||
if (data.length < size) | ||
this.push(null) | ||
} | ||
}) | ||
} | ||
async function writable({ | ||
highWaterMark = 2048 * 8, | ||
start = 0 | ||
} = {}) { | ||
start && await lo.seek(start) | ||
return new Stream.Writable({ | ||
highWaterMark, | ||
write(chunk, encoding, callback) { | ||
lo.write(chunk).then(() => callback(), callback) | ||
} | ||
}) | ||
} | ||
}).catch(reject) | ||
}) | ||
function move(c, queue) { | ||
c.queue.remove(c) | ||
queue.push(c) | ||
c.queue = queue | ||
queue === open | ||
? c.idleTimer.start() | ||
: c.idleTimer.cancel() | ||
} | ||
@@ -338,3 +280,3 @@ | ||
if (open.length) | ||
return go(open, query) | ||
return go(open.shift(), query) | ||
@@ -345,11 +287,10 @@ if (closed.length) | ||
busy.length | ||
? go(busy, query) | ||
? go(busy.shift(), query) | ||
: queries.push(query) | ||
} | ||
function go(xs, query) { | ||
const c = xs.shift() | ||
function go(c, query) { | ||
return c.execute(query) | ||
? (c.state = 'busy', busy.push(c)) | ||
: (c.state = 'full', full.push(c)) | ||
? move(c, busy) | ||
: move(c, full) | ||
} | ||
@@ -361,3 +302,3 @@ | ||
? query.active | ||
? Connection(options, {}).cancel(query.state, resolve, reject) | ||
? Connection(options).cancel(query.state, resolve, reject) | ||
: query.cancelled = { resolve, reject } | ||
@@ -396,4 +337,3 @@ : ( | ||
function connect(c, query) { | ||
c.state = 'connecting' | ||
connecting.push(c) | ||
move(c, connecting) | ||
c.connect(query) | ||
@@ -403,11 +343,8 @@ } | ||
function onend(c) { | ||
queues[c.state].remove(c) | ||
c.state = 'ended' | ||
ended.push(c) | ||
move(c, ended) | ||
} | ||
function onopen(c) { | ||
queues[c.state].remove(c) | ||
if (queries.length === 0) | ||
return (c.state = 'open', open.push(c)) | ||
return move(c, open) | ||
@@ -421,19 +358,11 @@ let max = Math.ceil(queries.length / (connecting.length + 1)) | ||
ready | ||
? (c.state = 'busy', busy.push(c)) | ||
: (c.state = 'full', full.push(c)) | ||
? move(c, busy) | ||
: move(c, full) | ||
} | ||
function ondrain(c) { | ||
full.remove(c) | ||
onopen(c) | ||
} | ||
function onclose(c) { | ||
queues[c.state].remove(c) | ||
c.state = 'closed' | ||
move(c, closed) | ||
c.reserved = null | ||
options.onclose && options.onclose(c.id) | ||
queries.length | ||
? connect(c, queries.shift()) | ||
: queues.closed.push(c) | ||
queries.length && connect(c, queries.shift()) | ||
} | ||
@@ -481,3 +410,4 @@ } | ||
parameters : {}, | ||
shared : { retries: 0, typeArrayMap: {} } | ||
shared : { retries: 0, typeArrayMap: {} }, | ||
publications : o.publications || query.get('publications') || 'alltables' | ||
}, | ||
@@ -484,0 +414,0 @@ mergeUserTypes(o.types) |
@@ -34,13 +34,15 @@ const originCache = new Map() | ||
this[originError] = handler.debug || !this.tagged | ||
this[originError] = this.handler.debug | ||
? new Error() | ||
: cachedError(this.strings) | ||
: this.tagged && cachedError(this.strings) | ||
} | ||
get origin() { | ||
return this.handler.debug || !this.tagged | ||
return this.handler.debug | ||
? this[originError].stack | ||
: originStackCache.has(this.strings) | ||
? originStackCache.get(this.strings) | ||
: originStackCache.set(this.strings, this[originError].stack).get(this.strings) | ||
: this.tagged | ||
? originStackCache.has(this.strings) | ||
? originStackCache.get(this.strings) | ||
: originStackCache.set(this.strings, this[originError].stack).get(this.strings) | ||
: '' | ||
} | ||
@@ -47,0 +49,0 @@ |
@@ -11,2 +11,3 @@ module.exports = Subscribe;function Subscribe(postgres, options) { | ||
options.onclose = onclose | ||
options.fetch_types = false | ||
options.connection = { | ||
@@ -49,3 +50,3 @@ ...options.connection, | ||
async function init(sql, slot, publications = 'alltables') { | ||
async function init(sql, slot, publications) { | ||
if (!publications) | ||
@@ -52,0 +53,0 @@ throw new Error('Missing publication names') |
{ | ||
"name": "postgres", | ||
"version": "3.0.0", | ||
"version": "3.0.1", | ||
"description": "Fastest full featured PostgreSQL client for Node.js", | ||
@@ -5,0 +5,0 @@ "type": "module", |
@@ -52,3 +52,3 @@ <img align="left" width="440" height="180" alt="Fastest full PostgreSQL nodejs client" src="https://raw.githubusercontent.com/porsager/postgres/master/postgresjs.svg?sanitize=true"> | ||
async function insertUser({ name, age }) { | ||
const users = sql` | ||
const users = await sql` | ||
insert into users | ||
@@ -625,3 +625,3 @@ (name, age) | ||
onparameter : fn, // (key, value) when server param change | ||
debug : fn, // Is called with (connection, query, params) | ||
debug : fn, // Is called with (connection, query, params, types) | ||
transform : { | ||
@@ -628,0 +628,0 @@ column : fn, // Transforms incoming column names |
@@ -51,3 +51,3 @@ import net from 'net' | ||
function Connection(options, { onopen = noop, onend = noop, ondrain = noop, onclose = noop } = {}) { | ||
function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose = noop } = {}) { | ||
const { | ||
@@ -84,3 +84,2 @@ ssl, | ||
, statements = {} | ||
, state = 'closed' | ||
, statementId = Math.random().toString(36).slice(2) | ||
@@ -110,9 +109,4 @@ , statementCount = 1 | ||
const connection = { | ||
get state() { return state }, | ||
set state(x) { | ||
state = x | ||
state === 'open' | ||
? idleTimer.start() | ||
: idleTimer.cancel() | ||
}, | ||
queue: queues.closed, | ||
idleTimer, | ||
connect(query) { | ||
@@ -130,2 +124,4 @@ initial = query | ||
queues.closed && queues.closed.push(connection) | ||
return connection | ||
@@ -152,3 +148,3 @@ | ||
if (terminated) | ||
return q.reject(Errors.connection('CONNECTION_DESTROYED', options)) | ||
return queryError(q, Errors.connection('CONNECTION_DESTROYED', options)) | ||
@@ -299,3 +295,3 @@ if (q.cancelled) | ||
function drain() { | ||
ondrain(connection) | ||
onopen(connection) | ||
} | ||
@@ -371,3 +367,3 @@ | ||
function error(err) { | ||
if (connection.state === 'connecting' && options.host[retries + 1]) | ||
if (connection.queue === queues.connecting && options.host[retries + 1]) | ||
return | ||
@@ -539,3 +535,3 @@ | ||
while (sent.length && (query = sent.shift()) && (query.active = true) && query.cancelled) | ||
Connection(options, {}).cancel(query.state, query.cancelled.resolve, query.cancelled.reject) | ||
Connection(options).cancel(query.state, query.cancelled.resolve, query.cancelled.reject) | ||
@@ -542,0 +538,0 @@ if (query) |
154
src/index.js
import os from 'os' | ||
import fs from 'fs' | ||
import Stream from 'stream' | ||
@@ -24,2 +23,3 @@ import { | ||
import Subscribe from './subscribe.js' | ||
import largeObject from './large.js' | ||
@@ -46,12 +46,13 @@ Object.assign(Postgres, { | ||
const queries = Queue() | ||
, connections = [...Array(options.max)].map(() => Connection(options, { onopen, onend, ondrain, onclose })) | ||
, closed = Queue(connections) | ||
, connecting = Queue() | ||
, reserved = Queue() | ||
, closed = Queue() | ||
, ended = Queue() | ||
, open = Queue() | ||
, busy = Queue() | ||
, full = Queue() | ||
, ended = Queue() | ||
, connecting = Queue() | ||
, queues = { closed, ended, connecting, reserved, open, busy, full } | ||
, queues = { connecting, reserved, closed, ended, open, busy, full } | ||
const connections = [...Array(options.max)].map(() => Connection(options, queues, { onopen, onend, onclose })) | ||
const sql = Sql(handler) | ||
@@ -61,3 +62,3 @@ | ||
get parameters() { return options.parameters }, | ||
largeObject, | ||
largeObject: largeObject.bind(null, sql), | ||
subscribe, | ||
@@ -204,10 +205,13 @@ CLOSE, | ||
sql.savepoint = savepoint | ||
let errored | ||
let uncaughtError | ||
name && await sql`savepoint ${ sql(name) }` | ||
try { | ||
const result = await new Promise((resolve, reject) => { | ||
errored = reject | ||
const x = fn(sql) | ||
Promise.resolve(Array.isArray(x) ? Promise.all(x) : x).then(resolve, reject) | ||
}) | ||
if (uncaughtError) | ||
throw uncaughtError | ||
!name && await sql`commit` | ||
@@ -220,3 +224,3 @@ return result | ||
) | ||
throw e | ||
throw e instanceof PostgresError && e.code === '25P02' && uncaughtError || e | ||
} | ||
@@ -233,6 +237,6 @@ | ||
function handler(q) { | ||
errored && q.catch(errored) | ||
c.state === 'full' | ||
q.catch(e => uncaughtError || (uncaughtError = e)) | ||
c.queue === full | ||
? queries.push(q) | ||
: c.execute(q) || (c.state = 'full', full.push(c)) | ||
: c.execute(q) || move(c, full) | ||
} | ||
@@ -242,79 +246,17 @@ } | ||
function onexecute(c) { | ||
queues[c.state].remove(c) | ||
c.state = 'reserved' | ||
connection = c | ||
move(c, reserved) | ||
c.reserved = () => queries.length | ||
? c.execute(queries.shift()) | ||
: c.state = 'reserved' | ||
reserved.push(c) | ||
connection = c | ||
: move(c, reserved) | ||
} | ||
} | ||
function largeObject(oid, mode = 0x00020000 | 0x00040000) { | ||
return new Promise(async(resolve, reject) => { | ||
await sql.begin(async sql => { | ||
let finish | ||
!oid && ([{ oid }] = await sql`select lo_creat(-1) as oid`) | ||
const [{ fd }] = await sql`select lo_open(${ oid }, ${ mode }) as fd` | ||
const lo = { | ||
writable, | ||
readable, | ||
close : () => sql`select lo_close(${ fd })`.then(finish), | ||
tell : () => sql`select lo_tell64(${ fd })`, | ||
read : (x) => sql`select loread(${ fd }, ${ x }) as data`, | ||
write : (x) => sql`select lowrite(${ fd }, ${ x })`, | ||
truncate : (x) => sql`select lo_truncate64(${ fd }, ${ x })`, | ||
seek : (x, whence = 0) => sql`select lo_lseek64(${ fd }, ${ x }, ${ whence })`, | ||
size : () => sql` | ||
select | ||
lo_lseek64(${ fd }, location, 0) as position, | ||
seek.size | ||
from ( | ||
select | ||
lo_lseek64($1, 0, 2) as size, | ||
tell.location | ||
from (select lo_tell64($1) as location) tell | ||
) seek | ||
` | ||
} | ||
resolve(lo) | ||
return new Promise(async r => finish = r) | ||
async function readable({ | ||
highWaterMark = 2048 * 8, | ||
start = 0, | ||
end = Infinity | ||
} = {}) { | ||
let max = end - start | ||
start && await lo.seek(start) | ||
return new Stream.Readable({ | ||
highWaterMark, | ||
async read(size) { | ||
const l = size > max ? size - max : size | ||
max -= size | ||
const [{ data }] = await lo.read(l) | ||
this.push(data) | ||
if (data.length < size) | ||
this.push(null) | ||
} | ||
}) | ||
} | ||
async function writable({ | ||
highWaterMark = 2048 * 8, | ||
start = 0 | ||
} = {}) { | ||
start && await lo.seek(start) | ||
return new Stream.Writable({ | ||
highWaterMark, | ||
write(chunk, encoding, callback) { | ||
lo.write(chunk).then(() => callback(), callback) | ||
} | ||
}) | ||
} | ||
}).catch(reject) | ||
}) | ||
function move(c, queue) { | ||
c.queue.remove(c) | ||
queue.push(c) | ||
c.queue = queue | ||
queue === open | ||
? c.idleTimer.start() | ||
: c.idleTimer.cancel() | ||
} | ||
@@ -338,3 +280,3 @@ | ||
if (open.length) | ||
return go(open, query) | ||
return go(open.shift(), query) | ||
@@ -345,11 +287,10 @@ if (closed.length) | ||
busy.length | ||
? go(busy, query) | ||
? go(busy.shift(), query) | ||
: queries.push(query) | ||
} | ||
function go(xs, query) { | ||
const c = xs.shift() | ||
function go(c, query) { | ||
return c.execute(query) | ||
? (c.state = 'busy', busy.push(c)) | ||
: (c.state = 'full', full.push(c)) | ||
? move(c, busy) | ||
: move(c, full) | ||
} | ||
@@ -361,3 +302,3 @@ | ||
? query.active | ||
? Connection(options, {}).cancel(query.state, resolve, reject) | ||
? Connection(options).cancel(query.state, resolve, reject) | ||
: query.cancelled = { resolve, reject } | ||
@@ -396,4 +337,3 @@ : ( | ||
function connect(c, query) { | ||
c.state = 'connecting' | ||
connecting.push(c) | ||
move(c, connecting) | ||
c.connect(query) | ||
@@ -403,11 +343,8 @@ } | ||
function onend(c) { | ||
queues[c.state].remove(c) | ||
c.state = 'ended' | ||
ended.push(c) | ||
move(c, ended) | ||
} | ||
function onopen(c) { | ||
queues[c.state].remove(c) | ||
if (queries.length === 0) | ||
return (c.state = 'open', open.push(c)) | ||
return move(c, open) | ||
@@ -421,19 +358,11 @@ let max = Math.ceil(queries.length / (connecting.length + 1)) | ||
ready | ||
? (c.state = 'busy', busy.push(c)) | ||
: (c.state = 'full', full.push(c)) | ||
? move(c, busy) | ||
: move(c, full) | ||
} | ||
function ondrain(c) { | ||
full.remove(c) | ||
onopen(c) | ||
} | ||
function onclose(c) { | ||
queues[c.state].remove(c) | ||
c.state = 'closed' | ||
move(c, closed) | ||
c.reserved = null | ||
options.onclose && options.onclose(c.id) | ||
queries.length | ||
? connect(c, queries.shift()) | ||
: queues.closed.push(c) | ||
queries.length && connect(c, queries.shift()) | ||
} | ||
@@ -481,3 +410,4 @@ } | ||
parameters : {}, | ||
shared : { retries: 0, typeArrayMap: {} } | ||
shared : { retries: 0, typeArrayMap: {} }, | ||
publications : o.publications || query.get('publications') || 'alltables' | ||
}, | ||
@@ -484,0 +414,0 @@ mergeUserTypes(o.types) |
@@ -34,13 +34,15 @@ const originCache = new Map() | ||
this[originError] = handler.debug || !this.tagged | ||
this[originError] = this.handler.debug | ||
? new Error() | ||
: cachedError(this.strings) | ||
: this.tagged && cachedError(this.strings) | ||
} | ||
get origin() { | ||
return this.handler.debug || !this.tagged | ||
return this.handler.debug | ||
? this[originError].stack | ||
: originStackCache.has(this.strings) | ||
? originStackCache.get(this.strings) | ||
: originStackCache.set(this.strings, this[originError].stack).get(this.strings) | ||
: this.tagged | ||
? originStackCache.has(this.strings) | ||
? originStackCache.get(this.strings) | ||
: originStackCache.set(this.strings, this[originError].stack).get(this.strings) | ||
: '' | ||
} | ||
@@ -47,0 +49,0 @@ |
@@ -11,2 +11,3 @@ export default function Subscribe(postgres, options) { | ||
options.onclose = onclose | ||
options.fetch_types = false | ||
options.connection = { | ||
@@ -49,3 +50,3 @@ ...options.connection, | ||
async function init(sql, slot, publications = 'alltables') { | ||
async function init(sql, slot, publications) { | ||
if (!publications) | ||
@@ -52,0 +53,0 @@ throw new Error('Missing publication names') |
@@ -70,3 +70,3 @@ /** | ||
/** Is called with (connection; query; parameters) */ | ||
debug: boolean | ((connection: number, query: string, parameters: any[]) => void); | ||
debug: boolean | ((connection: number, query: string, parameters: any[], paramTypes: any[]) => void); | ||
/** Transform hooks */ | ||
@@ -113,2 +113,6 @@ transform: { | ||
publications: string | ||
onclose: (connId: number) => void; | ||
backoff: boolean | ((attemptNum:number) => number); | ||
max_lifetime: number | null; | ||
keep_alive: number | null; | ||
} | ||
@@ -115,0 +119,0 @@ |
26
181163
4649