any-db-pool
Advanced tools
Comparing version 2.0.1 to 2.1.0
149
index.js
var inherits = require('util').inherits | ||
var EventEmitter = require('events').EventEmitter | ||
var Pool = require('generic-pool').Pool | ||
var once = require('once') | ||
var GenericPool = require('generic-pool').Pool | ||
var chain = require('./lib/chain') | ||
@@ -19,3 +18,2 @@ | ||
connParams = connParams || {} | ||
if (options.create) { | ||
@@ -25,3 +23,3 @@ console.warn("PoolConfig.create ignored, use PoolConfig.onConnect instead") | ||
if (options.destroy) { | ||
console.warn("PoolConfig.destroy ignored, use PoolConfig.onConnect instead") | ||
console.warn("PoolConfig.destroy ignored") | ||
} | ||
@@ -42,2 +40,4 @@ | ||
var onConnect = options.onConnect || function (c, done) { done(null, c) } | ||
var poolOpts = { | ||
@@ -49,15 +49,30 @@ min: options.min || 0, | ||
if (err) return ready(err); | ||
else if (options.onConnect) options.onConnect(conn, ready) | ||
else ready(null, conn) | ||
onConnect(conn, function (err, connection) { | ||
if (err) return ready(err); | ||
conn.on('error', self._handleIdleError) | ||
ready(null, connection) | ||
}) | ||
}) | ||
}, | ||
destroy: function (conn) { | ||
conn.end() | ||
conn._events = {} | ||
destroy: function (connection) { | ||
connection.removeAllListeners() | ||
connection.on('error', function () {}) | ||
connection.end() | ||
}, | ||
log: options.log | ||
log: options.log, | ||
idleTimeoutMillis: options.idleTimeout, | ||
reapIntervalMillis: options.reapInterval, | ||
} | ||
var resetSteps = []; | ||
if (options.hasOwnProperty('refreshIdle')) { | ||
poolOpts.refreshIdle = options.refreshIdle | ||
} | ||
this._pool = new GenericPool(poolOpts) | ||
var resetSteps = [] | ||
if (adapter.reset) resetSteps.unshift(adapter.reset) | ||
@@ -67,3 +82,11 @@ if (options.reset) resetSteps.unshift(options.reset) | ||
this._reset = chain(resetSteps) | ||
this._pool = new Pool(poolOpts) | ||
this._shouldDestroyConnection = options.shouldDestroyConnection || function (err) { return true } | ||
var self = this | ||
self._handleIdleError = function (err) { | ||
var connection = this | ||
self._maybeDestroy(connection, err) | ||
self.emit('error', err, connection) | ||
} | ||
} | ||
@@ -74,24 +97,60 @@ | ||
, query = this.adapter.createQuery(statement, params, callback) | ||
, connection = null | ||
this.acquire(function (err, conn) { | ||
if (err) { | ||
if (typeof params === 'function') { | ||
return params(err) | ||
} else if (callback) { | ||
return callback(err); | ||
} else { | ||
debugger | ||
return query.emit('error', err); | ||
if (query.callback) { | ||
callback = query.callback | ||
query.callback = function (err, result) { | ||
self._maybeDestroy(connection, err) | ||
callback(err, result) | ||
} | ||
} else { | ||
var finished = false | ||
query.once('end', function () { | ||
if (!finished) { | ||
finished = true | ||
self.release(connection) | ||
} | ||
} | ||
conn.query(query); | ||
self.emit('query', query) | ||
var release = once(self.release.bind(self, conn)) | ||
query.once('end', release).once('error', function (err) { | ||
release() | ||
}) | ||
query.once('error', function (err) { | ||
if (!finished) { | ||
finished = true | ||
self._maybeDestroy(connection, err) | ||
} | ||
// If this was the only error listener, re-emit the error from the pool. | ||
if (!this.listeners('error').length) { | ||
self.emit('error', err) | ||
self.emit('error', err, query) | ||
} | ||
}) | ||
} | ||
/** | ||
* if a connection cannot be acquired, or emits an 'error' event while a | ||
* query is in progress, the error should be handled by the query object. | ||
*/ | ||
var handleConnectionError = function (error) { | ||
self._maybeDestroy(connection, error) | ||
if (query.callback) { | ||
query.callback(error) | ||
} else { | ||
query.emit('error', error) | ||
} | ||
} | ||
this.acquire(function (err, connection_) { | ||
if (err) { | ||
return handleConnectionError(err) | ||
} | ||
// expose the connection to everything else in the outer scope | ||
connection = connection_ | ||
// attach error event listener to the connection | ||
connection.on('error', handleConnectionError) | ||
query.once('end', function () { | ||
connection.removeListener('error', handleConnectionError) | ||
}) | ||
connection.query(query); | ||
self.emit('query', query) | ||
}) | ||
@@ -103,11 +162,18 @@ | ||
ConnectionPool.prototype.acquire = function (callback) { | ||
this.emit('acquire') | ||
this._pool.acquire(callback); | ||
var self = this | ||
self._pool.acquire(function (err, connection) { | ||
if (err) return callback(err); | ||
connection.removeListener('error', self._handleIdleError) | ||
self.emit('acquire', connection) | ||
callback(null, connection) | ||
}); | ||
} | ||
ConnectionPool.prototype.release = function (connection) { | ||
this.emit('release') | ||
var self = this | ||
this._reset(connection, function (err) { | ||
if (err) return self.destroy(connection) | ||
self.emit('release', connection) | ||
connection.removeAllListeners() | ||
self._reset(connection, function (err) { | ||
if (err) return self.destroy(connection); | ||
connection.on('error', self._handleIdleError) | ||
self._pool.release(connection) | ||
@@ -124,6 +190,17 @@ }) | ||
this._pool.drain(function () { | ||
self._pool.destroyAllNow() | ||
self.emit('close') | ||
if (callback) callback() | ||
self._pool.destroyAllNow(function () { | ||
self.emit('close') | ||
if (callback) callback() | ||
}) | ||
}) | ||
} | ||
ConnectionPool.prototype._maybeDestroy = function (connection, error) { | ||
if (connection) { | ||
if (error && this._shouldDestroyConnection(error)) { | ||
this.destroy(connection) | ||
} else { | ||
this.release(connection) | ||
} | ||
} | ||
} |
{ | ||
"name": "any-db-pool", | ||
"version": "2.0.1", | ||
"version": "2.1.0", | ||
"description": "Any-DB connection pool", | ||
@@ -25,4 +25,5 @@ "main": "index.js", | ||
"any-db-fake": "~0.0.3", | ||
"covert": "~0.1.1" | ||
"covert": "~0.1.1", | ||
"extend": "~1.2.1" | ||
} | ||
} |
@@ -51,4 +51,8 @@ # any-db-pool - database agnostic connection pool | ||
max: Number?, | ||
idleTimeout: Number?, | ||
reapInterval: Number?, | ||
refreshIdle: Boolean?, | ||
onConnect: (Connection, ready: Continuation<Connection>) => void | ||
reset: (Connection, done: Continuation<void>) => void | ||
shouldDestroyConnection: (error: Error) => Boolean | ||
} | ||
@@ -63,4 +67,11 @@ ``` | ||
- `max` (default `10`) The maximum number of connections to keep open in the pool. When this limit is reached further requests for connections will queue waiting for an existing connection to be released back into the pool. | ||
- `refreshIdle` (default `true`) When this is true, the pool will reap connections that have been idle for more than `idleTimeout` milliseconds. | ||
- `idleTimeout` (default `30000`) The maximum amount of time a connection can sit idle in the pool before being reaped. | ||
- `reapInterval` (default `1000`) How frequently the pool should check for connections that are old enough to be reaped. | ||
- `onConnect` Called immediately after a connection is first established. Use this to do one-time setup of new connections. The supplied `Connection` will not be added to the pool until you pass it to the `done` continuation. | ||
- `reset` Called each time a connection is returned to the pool. Use this to restore a connection to it's original state (e.g. rollback transactions, set the database session vars). If `reset` fails to call the `done` continuation the connection will be lost in limbo. | ||
- `shouldDestroyConnection` (default `function (err) { return true }`) - Called | ||
when an error is encountered by `pool.query` or emitted by an idle | ||
connection. If `shouldDestroyConnection(error)` is truthy the connection will | ||
be destroyed, otherwise it will be reset. | ||
@@ -67,0 +78,0 @@ ### ConnectionPool.query |
var EventEmitter = require('events').EventEmitter | ||
var extend = require('extend') | ||
@@ -18,3 +19,3 @@ var test = require('tape') | ||
createConnection: function (_, callback) { | ||
var connection = { | ||
var connection = extend(new EventEmitter, { | ||
id: ++connectionCount, | ||
@@ -27,3 +28,3 @@ query: function (q) { | ||
end: function () {} | ||
} | ||
}) | ||
callback(null, connection) | ||
@@ -30,0 +31,0 @@ }, |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
23062
19
506
169
4