Socket
Socket
Sign inDemoInstall

pg-pool

Package Overview
Dependencies
Maintainers
1
Versions
46
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

pg-pool - npm Package Compare versions

Comparing version 1.8.0 to 2.0.0

test/bring-your-own-promise.js

352

index.js

@@ -1,174 +0,248 @@

var genericPool = require('generic-pool')
var util = require('util')
var EventEmitter = require('events').EventEmitter
var objectAssign = require('object-assign')
'use strict'
const EventEmitter = require('events').EventEmitter
// there is a bug in the generic pool where it will not recreate
// destroyed workers (even if there is waiting work to do) unless
// there is a min specified. Make sure we keep some connections
// SEE: https://github.com/coopernurse/node-pool/pull/186
// SEE: https://github.com/brianc/node-pg-pool/issues/48
// SEE: https://github.com/strongloop/loopback-connector-postgresql/issues/231
function _ensureMinimum () {
var i, diff, waiting
if (this._draining) return
waiting = this._waitingClients.size()
if (this._factory.min > 0) { // we have positive specified minimum
diff = this._factory.min - this._count
} else if (waiting > 0) { // we have no minimum, but we do have work to do
diff = Math.min(waiting, this._factory.max - this._count)
const NOOP = function () { }
class IdleItem {
constructor (client, timeoutId) {
this.client = client
this.timeoutId = timeoutId
}
for (i = 0; i < diff; i++) {
this._createResource()
}
function throwOnRelease () {
throw new Error('Release called on client which has already been released to the pool.')
}
function release (client, err) {
client.release = throwOnRelease
if (err) {
this._remove(client)
this._pulseQueue()
return
}
};
var Pool = module.exports = function (options, Client) {
if (!(this instanceof Pool)) {
return new Pool(options, Client)
// idle timeout
let tid
if (this.options.idleTimeoutMillis) {
tid = setTimeout(() => {
this.log('remove idle client')
this._remove(client)
}, this.idleTimeoutMillis)
}
EventEmitter.call(this)
this.options = objectAssign({}, options)
this.log = this.options.log || function () { }
this.Client = this.options.Client || Client || require('pg').Client
this.Promise = this.options.Promise || global.Promise
this.options.max = this.options.max || this.options.poolSize || 10
this.options.create = this.options.create || this._create.bind(this)
this.options.destroy = this.options.destroy || this._destroy.bind(this)
this.pool = new genericPool.Pool(this.options)
// Monkey patch to ensure we always finish our work
// - There is a bug where callbacks go uncalled if min is not set
// - We might still not want a connection to *always* exist
// - but we do want to create up to max connections if we have work
// - still waiting
// This should be safe till the version of pg-pool is upgraded
// SEE: https://github.com/coopernurse/node-pool/pull/186
this.pool._ensureMinimum = _ensureMinimum
this.onCreate = this.options.onCreate
if (this.ending) {
this._remove(client)
} else {
this._idle.push(new IdleItem(client, tid))
}
this._pulseQueue()
}
util.inherits(Pool, EventEmitter)
function promisify (Promise, callback) {
if (callback) {
return { callback: callback, result: undefined }
}
let rej
let res
const cb = function (err, client) {
err ? rej(err) : res(client)
}
const result = new Promise(function (resolve, reject) {
res = resolve
rej = reject
})
return { callback: cb, result: result }
}
Pool.prototype._promise = function (cb, executor) {
if (!cb) {
return new this.Promise(executor)
class Pool extends EventEmitter {
constructor (options, Client) {
super()
this.options = Object.assign({}, options)
this.options.max = this.options.max || this.options.poolSize || 10
this.log = this.options.log || function () { }
this.Client = this.options.Client || Client || require('pg').Client
this.Promise = this.options.Promise || global.Promise
this._clients = []
this._idle = []
this._pendingQueue = []
this._endCallback = undefined
this.ending = false
}
function resolved (value) {
process.nextTick(function () {
cb(null, value)
})
_isFull () {
return this._clients.length >= this.options.max
}
function rejected (error) {
process.nextTick(function () {
cb(error)
})
_pulseQueue () {
this.log('pulse queue')
if (this.ending) {
this.log('pulse queue on ending')
if (this._idle.length) {
this._idle.map(item => {
this._remove(item.client)
})
}
if (!this._clients.length) {
this._endCallback()
}
return
}
// if we don't have any waiting, do nothing
if (!this._pendingQueue.length) {
this.log('no queued requests')
return
}
// if we don't have any idle clients and we have no more room do nothing
if (!this._idle.length && this._isFull()) {
return
}
const waiter = this._pendingQueue.shift()
if (this._idle.length) {
const idleItem = this._idle.pop()
clearTimeout(idleItem.timeoutId)
const client = idleItem.client
client.release = release.bind(this, client)
this.emit('acquire', client)
return waiter(undefined, client, client.release)
}
if (!this._isFull()) {
return this.connect(waiter)
}
throw new Error('unexpected condition')
}
executor(resolved, rejected)
}
_remove (client) {
this._idle = this._idle.filter(item => item.client !== client)
this._clients = this._clients.filter(c => c !== client)
client.end()
this.emit('remove', client)
}
Pool.prototype._promiseNoCallback = function (callback, executor) {
return callback
? executor()
: new this.Promise(executor)
}
connect (cb) {
if (this.ending) {
const err = new Error('Cannot use a pool after calling end on the pool')
return cb ? cb(err) : this.Promise.reject(err)
}
if (this._clients.length >= this.options.max || this._idle.length) {
const response = promisify(this.Promise, cb)
const result = response.result
this._pendingQueue.push(response.callback)
// if we have idle clients schedule a pulse immediately
if (this._idle.length) {
process.nextTick(() => this._pulseQueue())
}
return result
}
Pool.prototype._destroy = function (client) {
if (client._destroying) return
client._destroying = true
client.end()
}
const client = new this.Client(this.options)
this._clients.push(client)
const idleListener = (err) => {
err.client = client
client.removeListener('error', idleListener)
client.on('error', () => {
this.log('additional client error after disconnection due to error', err)
})
this._remove(client)
// TODO - document that once the pool emits an error
// the client has already been closed & purged and is unusable
this.emit('error', err, client)
}
Pool.prototype._create = function (cb) {
this.log('connecting new client')
var client = new this.Client(this.options)
this.log('connecting new client')
client.on('error', function (e) {
this.log('connected client error:', e)
this.pool.destroy(client)
e.client = client
this.emit('error', e, client)
}.bind(this))
client.connect(function (err) {
if (err) {
this.log('client connection error:', err)
cb(err, null)
} else {
this.log('client connected')
this.emit('connect', client)
cb(null, client)
// connection timeout logic
let tid
let timeoutHit = false
if (this.options.connectionTimeoutMillis) {
tid = setTimeout(() => {
this.log('ending client due to timeout')
timeoutHit = true
// force kill the node driver, and let libpq do its teardown
client.connection ? client.connection.stream.destroy() : client.end()
}, this.options.connectionTimeoutMillis)
}
}.bind(this))
}
Pool.prototype.connect = function (cb) {
return this._promiseNoCallback(cb, function (resolve, reject) {
this.log('acquire client begin')
this.pool.acquire(function (err, client) {
const response = promisify(this.Promise, cb)
cb = response.callback
this.log('connecting new client')
client.connect((err) => {
this.log('new client connected')
if (tid) {
clearTimeout(tid)
}
client.on('error', idleListener)
if (err) {
this.log('acquire client. error:', err)
if (cb) {
cb(err, null, function () {})
// remove the dead client from our list of clients
this._clients = this._clients.filter(c => c !== client)
if (timeoutHit) {
err.message = 'Connection terminiated due to connection timeout'
}
cb(err, undefined, NOOP)
} else {
client.release = release.bind(this, client)
this.emit('connect', client)
this.emit('acquire', client)
if (this.options.verify) {
this.options.verify(client, cb)
} else {
reject(err)
cb(undefined, client, client.release)
}
return
}
})
return response.result
}
this.log('acquire client')
this.emit('acquire', client)
client.release = function (err) {
delete client.release
query (text, values, cb) {
if (typeof values === 'function') {
cb = values
values = undefined
}
const response = promisify(this.Promise, cb)
cb = response.callback
this.connect((err, client) => {
if (err) {
return cb(err)
}
this.log('dispatching query')
client.query(text, values, (err, res) => {
this.log('query dispatched')
client.release(err)
if (err) {
this.log('destroy client. error:', err)
this.pool.destroy(client)
return cb(err)
} else {
this.log('release client')
this.pool.release(client)
return cb(undefined, res)
}
}.bind(this)
})
})
return response.result
}
if (cb) {
cb(null, client, client.release)
} else {
resolve(client)
}
}.bind(this))
}.bind(this))
}
end (cb) {
this.log('ending')
if (this.ending) {
const err = new Error('Called end on pool more than once')
return cb ? cb(err) : this.Promise.reject(err)
}
this.ending = true
const promised = promisify(this.Promise, cb)
this._endCallback = promised.callback
this._pulseQueue()
return promised.result
}
Pool.prototype.take = Pool.prototype.connect
get waitingCount () {
return this._pendingQueue.length
}
Pool.prototype.query = function (text, values, cb) {
if (typeof values === 'function') {
cb = values
values = undefined
get idleCount () {
return this._idle.length
}
return this._promise(cb, function (resolve, reject) {
this.connect(function (err, client, done) {
if (err) {
return reject(err)
}
client.query(text, values, function (err, res) {
done(err)
err ? reject(err) : resolve(res)
})
})
}.bind(this))
get totalCount () {
return this._clients.length
}
}
Pool.prototype.end = function (cb) {
this.log('draining pool')
return this._promise(cb, function (resolve, reject) {
this.pool.drain(function () {
this.log('pool drained, calling destroy all now')
this.pool.destroyAllNow(resolve)
}.bind(this))
}.bind(this))
}
module.exports = Pool
{
"name": "pg-pool",
"version": "1.8.0",
"version": "2.0.0",
"description": "Connection pool for node-postgres",

@@ -30,13 +30,14 @@ "main": "index.js",

"bluebird": "3.4.1",
"co": "4.6.0",
"expect.js": "0.3.1",
"lodash": "4.13.1",
"mocha": "^2.3.3",
"pg": "5.1.0",
"pg": "*",
"standard": "7.1.2",
"standard-format": "2.2.1"
},
"dependencies": {
"generic-pool": "2.4.3",
"object-assign": "4.1.0"
"dependencies": {},
"peerDependencies": {
"pg": ">5.0"
}
}

@@ -1,11 +0,11 @@

var expect = require('expect.js')
var describe = require('mocha').describe
var it = require('mocha').it
var Pool = require('../')
const expect = require('expect.js')
const describe = require('mocha').describe
const it = require('mocha').it
const Pool = require('../')
describe('Connection strings', function () {
it('pool delegates connectionString property to client', function (done) {
var connectionString = 'postgres://foo:bar@baz:1234/xur'
const connectionString = 'postgres://foo:bar@baz:1234/xur'
var pool = new Pool({
const pool = new Pool({
// use a fake client so we can check we're passed the connectionString

@@ -12,0 +12,0 @@ Client: function (args) {

@@ -1,12 +0,13 @@

var expect = require('expect.js')
var EventEmitter = require('events').EventEmitter
var describe = require('mocha').describe
var it = require('mocha').it
var objectAssign = require('object-assign')
var Pool = require('../')
'use strict'
const expect = require('expect.js')
const EventEmitter = require('events').EventEmitter
const describe = require('mocha').describe
const it = require('mocha').it
const Pool = require('../')
describe('events', function () {
it('emits connect before callback', function (done) {
var pool = new Pool()
var emittedClient = false
const pool = new Pool()
let emittedClient = false
pool.on('connect', function (client) {

@@ -26,7 +27,10 @@ emittedClient = client

it('emits "connect" only with a successful connection', function (done) {
var pool = new Pool({
const pool = new Pool({
// This client will always fail to connect
Client: mockClient({
connect: function (cb) {
process.nextTick(function () { cb(new Error('bad news')) })
process.nextTick(() => {
cb(new Error('bad news'))
setImmediate(done)
})
}

@@ -38,11 +42,8 @@ })

})
pool._create(function (err) {
if (err) done()
else done(new Error('expected failure'))
})
return pool.connect().catch(e => expect(e.message).to.equal('bad news'))
})
it('emits acquire every time a client is acquired', function (done) {
var pool = new Pool()
var acquireCount = 0
const pool = new Pool()
let acquireCount = 0
pool.on('acquire', function (client) {

@@ -52,7 +53,6 @@ expect(client).to.be.ok()

})
for (var i = 0; i < 10; i++) {
for (let i = 0; i < 10; i++) {
pool.connect(function (err, client, release) {
err ? done(err) : release()
if (err) return done(err)
release()
if (err) return done(err)
})

@@ -64,9 +64,9 @@ pool.query('SELECT now()')

pool.end(done)
}, 40)
}, 100)
})
it('emits error and client if an idle client in the pool hits an error', function (done) {
var pool = new Pool()
const pool = new Pool()
pool.connect(function (err, client) {
expect(err).to.equal(null)
expect(err).to.equal(undefined)
client.release()

@@ -87,6 +87,6 @@ setImmediate(function () {

return function () {
var client = new EventEmitter()
objectAssign(client, methods)
const client = new EventEmitter()
Object.assign(client, methods)
return client
}
}

@@ -1,24 +0,14 @@

var expect = require('expect.js')
var _ = require('lodash')
'use strict'
const expect = require('expect.js')
const _ = require('lodash')
var describe = require('mocha').describe
var it = require('mocha').it
var Promise = require('bluebird')
const describe = require('mocha').describe
const it = require('mocha').it
var Pool = require('../')
const Pool = require('../')
if (typeof global.Promise === 'undefined') {
global.Promise = Promise
}
describe('pool', function () {
it('can be used as a factory function', function () {
var pool = Pool()
expect(pool instanceof Pool).to.be.ok()
expect(typeof pool.connect).to.be('function')
})
describe('with callbacks', function () {
it('works totally unconfigured', function (done) {
var pool = new Pool()
const pool = new Pool()
pool.connect(function (err, client, release) {

@@ -36,3 +26,3 @@ if (err) return done(err)

it('passes props to clients', function (done) {
var pool = new Pool({ binary: true })
const pool = new Pool({ binary: true })
pool.connect(function (err, client, release) {

@@ -47,3 +37,3 @@ release()

it('can run a query with a callback without parameters', function (done) {
var pool = new Pool()
const pool = new Pool()
pool.query('SELECT 1 as num', function (err, res) {

@@ -58,3 +48,3 @@ expect(res.rows[0]).to.eql({ num: 1 })

it('can run a query with a callback', function (done) {
var pool = new Pool()
const pool = new Pool()
pool.query('SELECT $1::text as name', ['brianc'], function (err, res) {

@@ -69,6 +59,8 @@ expect(res.rows[0]).to.eql({ name: 'brianc' })

it('passes connection errors to callback', function (done) {
var pool = new Pool({host: 'no-postgres-server-here.com'})
const pool = new Pool({ port: 53922 })
pool.query('SELECT $1::text as name', ['brianc'], function (err, res) {
expect(res).to.be(undefined)
expect(err).to.be.an(Error)
// a connection error should not polute the pool with a dead client
expect(pool.totalCount).to.equal(0)
pool.end(function (err) {

@@ -80,4 +72,14 @@ done(err)

it('does not pass client to error callback', function (done) {
const pool = new Pool({ port: 58242 })
pool.connect(function (err, client, release) {
expect(err).to.be.an(Error)
expect(client).to.be(undefined)
expect(release).to.be.a(Function)
pool.end(done)
})
})
it('removes client if it errors in background', function (done) {
var pool = new Pool()
const pool = new Pool()
pool.connect(function (err, client, release) {

@@ -102,4 +104,4 @@ release()

it('should not change given options', function (done) {
var options = { max: 10 }
var pool = new Pool(options)
const options = { max: 10 }
const pool = new Pool(options)
pool.connect(function (err, client, release) {

@@ -114,4 +116,4 @@ release()

it('does not create promises when connecting', function (done) {
var pool = new Pool()
var returnValue = pool.connect(function (err, client, release) {
const pool = new Pool()
const returnValue = pool.connect(function (err, client, release) {
release()

@@ -125,4 +127,4 @@ if (err) return done(err)

it('does not create promises when querying', function (done) {
var pool = new Pool()
var returnValue = pool.query('SELECT 1 as num', function (err) {
const pool = new Pool()
const returnValue = pool.query('SELECT 1 as num', function (err) {
pool.end(function () {

@@ -136,18 +138,35 @@ done(err)

it('does not create promises when ending', function (done) {
var pool = new Pool()
var returnValue = pool.end(done)
const pool = new Pool()
const returnValue = pool.end(done)
expect(returnValue).to.be(undefined)
})
it('never calls callback syncronously', function (done) {
const pool = new Pool()
pool.connect((err, client) => {
if (err) throw err
client.release()
setImmediate(() => {
let called = false
pool.connect((err, client) => {
if (err) throw err
called = true
client.release()
setImmediate(() => {
pool.end(done)
})
})
expect(called).to.equal(false)
})
})
})
})
describe('with promises', function () {
it('connects and disconnects', function () {
var pool = new Pool()
it('connects, queries, and disconnects', function () {
const pool = new Pool()
return pool.connect().then(function (client) {
expect(pool.pool.availableObjectsCount()).to.be(0)
return client.query('select $1::text as name', ['hi']).then(function (res) {
expect(res.rows).to.eql([{ name: 'hi' }])
client.release()
expect(pool.pool.getPoolSize()).to.be(1)
expect(pool.pool.availableObjectsCount()).to.be(1)
return pool.end()

@@ -158,5 +177,16 @@ })

it('executes a query directly', () => {
const pool = new Pool()
return pool
.query('SELECT $1::text as name', ['hi'])
.then(res => {
expect(res.rows).to.have.length(1)
expect(res.rows[0].name).to.equal('hi')
return pool.end()
})
})
it('properly pools clients', function () {
var pool = new Pool({ poolSize: 9 })
return Promise.map(_.times(30), function () {
const pool = new Pool({ poolSize: 9 })
const promises = _.times(30, function () {
return pool.connect().then(function (client) {

@@ -168,5 +198,6 @@ return client.query('select $1::text as name', ['hi']).then(function (res) {

})
}).then(function (res) {
})
return Promise.all(promises).then(function (res) {
expect(res).to.have.length(30)
expect(pool.pool.getPoolSize()).to.be(9)
expect(pool.totalCount).to.be(9)
return pool.end()

@@ -177,9 +208,9 @@ })

it('supports just running queries', function () {
var pool = new Pool({ poolSize: 9 })
return Promise.map(_.times(30), function () {
return pool.query('SELECT $1::text as name', ['hi'])
}).then(function (queries) {
const pool = new Pool({ poolSize: 9 })
const text = 'select $1::text as name'
const values = ['hi']
const query = { text: text, values: values }
const promises = _.times(30, () => pool.query(query))
return Promise.all(promises).then(function (queries) {
expect(queries).to.have.length(30)
expect(pool.pool.getPoolSize()).to.be(9)
expect(pool.pool.availableObjectsCount()).to.be(9)
return pool.end()

@@ -189,7 +220,7 @@ })

it('recovers from all errors', function () {
var pool = new Pool()
it('recovers from query errors', function () {
const pool = new Pool()
var errors = []
return Promise.mapSeries(_.times(30), function () {
const errors = []
const promises = _.times(30, () => {
return pool.query('SELECT asldkfjasldkf')

@@ -199,5 +230,8 @@ .catch(function (e) {

})
}).then(function () {
})
return Promise.all(promises).then(() => {
expect(errors).to.have.length(30)
expect(pool.totalCount).to.equal(0)
expect(pool.idleCount).to.equal(0)
return pool.query('SELECT $1::text as name', ['hi']).then(function (res) {
expect(errors).to.have.length(30)
expect(res.rows).to.eql([{ name: 'hi' }])

@@ -210,38 +244,1 @@ return pool.end()

})
describe('pool error handling', function () {
it('Should complete these queries without dying', function (done) {
var pgPool = new Pool()
var pool = pgPool.pool
pool._factory.max = 1
pool._factory.min = null
var errors = 0
var shouldGet = 0
function runErrorQuery () {
shouldGet++
return new Promise(function (resolve, reject) {
pgPool.query("SELECT 'asd'+1 ").then(function (res) {
reject(res) // this should always error
}).catch(function (err) {
errors++
resolve(err)
})
})
}
var ps = []
for (var i = 0; i < 5; i++) {
ps.push(runErrorQuery())
}
Promise.all(ps).then(function () {
expect(shouldGet).to.eql(errors)
done()
})
})
})
process.on('unhandledRejection', function (e) {
console.error(e.message, e.stack)
setImmediate(function () {
throw e
})
})

@@ -1,15 +0,15 @@

var expect = require('expect.js')
const expect = require('expect.js')
var describe = require('mocha').describe
var it = require('mocha').it
const describe = require('mocha').describe
const it = require('mocha').it
var Pool = require('../')
const Pool = require('../')
describe('logging', function () {
it('logs to supplied log function if given', function () {
var messages = []
var log = function (msg) {
const messages = []
const log = function (msg) {
messages.push(msg)
}
var pool = new Pool({ log: log })
const pool = new Pool({ log: log })
return pool.query('SELECT NOW()').then(function () {

@@ -16,0 +16,0 @@ expect(messages.length).to.be.greaterThan(0)

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc