Comparing version 1.0.0-rc2 to 1.0.0
24
index.js
@@ -22,3 +22,3 @@ var ConnectionPool = require('any-db-pool') | ||
exports.createPool = function getPool (dbUrl, poolConfig) { | ||
exports.createPool = function createPool (dbUrl, poolConfig) { | ||
poolConfig = poolConfig || {} | ||
@@ -44,23 +44,17 @@ if (poolConfig.create || poolConfig.destroy) { | ||
var adapter = getAdapter(adapterConfig.adapter); | ||
var pool = new ConnectionPool(adapter, adapterConfig, poolConfig) | ||
var begin = Transaction.createBeginMethod( | ||
adapter.createQuery, pool.acquire.bind(pool) | ||
); | ||
pool.begin = function (beginStatement, callback) { | ||
var tx = begin(beginStatement, callback); | ||
pool.begin = Transaction.createBeginMethod(adapter.createQuery, function (tx) { | ||
// Proxy query events from the transaction to the pool | ||
tx.on('query', pool.emit.bind(this, 'query')) | ||
tx.on('query', this.emit.bind(this, 'query')) | ||
pool.acquire(function (err, conn) { | ||
if (err) return callback ? callback(err) : tx.emit('error', err) | ||
var release = pool.release.bind(pool, conn) | ||
tx.setConnection(conn) | ||
.once('rollback:complete', release) | ||
this.acquire(function (err, connection) { | ||
if (err) return tx.emit('error', err); | ||
var release = pool.release.bind(pool, connection) | ||
tx.once('rollback:complete', release) | ||
.once('commit:complete', release) | ||
.setConnection(connection) | ||
}) | ||
}); | ||
return tx | ||
} | ||
return pool | ||
@@ -67,0 +61,0 @@ } |
@@ -7,9 +7,8 @@ var EventEmitter = require('events').EventEmitter | ||
module.exports.UndefinedMethodError = UndefinedMethodError; | ||
module.exports.nullImplementation = nullImplementation; | ||
inherits(StateMachine, EventEmitter) | ||
function StateMachine (initialState, prototypes, transitions) { | ||
function StateMachine (initialState, transitions) { | ||
EventEmitter.call(this) | ||
var currentState = null; | ||
var currentState = initialState; | ||
@@ -19,32 +18,49 @@ this.state = function (to) { | ||
if (to === currentState) return true; | ||
if (to === currentState) return; | ||
if (typeof prototypes[to] !== 'function') { | ||
} | ||
var extra = Array.prototype.slice.call(arguments, 1) | ||
, legal = currentState ? transitions[currentState] : [initialState] | ||
, legal = transitions[currentState] | ||
; | ||
if (legal && legal.indexOf(to) > -1) { | ||
if (this.log) { | ||
this.log("Transition from:'" + currentState + "' to:'" + to + "'"); | ||
} | ||
this.emit('transition', currentState, to); | ||
currentState = to; | ||
if (!prototypes[to]) { | ||
throw new Error('unknown state:' + to); | ||
} | ||
this.__proto__ = prototypes[to].prototype | ||
this.emit(currentState) | ||
return true | ||
} else { | ||
extra.unshift(new IllegalTransitionError(currentState, to)) | ||
this.handleError.apply(this, extra) | ||
return false | ||
return new IllegalTransitionError(currentState, to); | ||
} | ||
} | ||
} | ||
this.state(initialState); | ||
StateMachine.method = function (name, implementations) { | ||
dispatch.implementations = {}; | ||
for (var key in implementations) { | ||
var states = key.split('|'); | ||
while (states.length) { | ||
var state = states.shift(); | ||
dispatch.implementations[state] = implementations[key]; | ||
} | ||
} | ||
return dispatch; | ||
function dispatch () { | ||
var implementation = dispatch.implementations[this.state()]; | ||
if (typeof implementation !== 'function') { | ||
var error = new StateMachine.UndefinedMethodError(name, this.state()); | ||
var lastArg = [].slice.call(arguments).pop(); | ||
if (typeof lastArg === 'function') { | ||
lastArg.call(this, error); | ||
} else { | ||
var self = this; | ||
process.nextTick(function () { | ||
self.emit('error', error); | ||
}) | ||
} | ||
return; | ||
} | ||
return implementation.apply(this, arguments); | ||
} | ||
} | ||
inherits(UndefinedMethodError, Error); | ||
@@ -63,14 +79,1 @@ function UndefinedMethodError(method, state) { | ||
} | ||
function nullImplementation (methodName) { | ||
return function () { | ||
var lastArg = [].slice.call(arguments).pop(); | ||
var error = new StateMachine.UndefinedMethodError(methodName, this.state()) | ||
if (typeof lastArg == 'function') { | ||
debugger | ||
lastArg(error); | ||
} else { | ||
this.emit('error', error); | ||
} | ||
} | ||
} |
@@ -14,4 +14,4 @@ var inherits = require('util').inherits | ||
this._statements = { | ||
begin: opts.begin || 'BEGIN', | ||
commit: opts.commit || 'COMMIT', | ||
begin: opts.begin || 'BEGIN', | ||
commit: opts.commit || 'COMMIT', | ||
rollback: opts.rollback || 'ROLLBACK' | ||
@@ -22,21 +22,26 @@ } | ||
this.handleError = this.handleError.bind(this); | ||
StateMachine.call(this, 'disconnected', { | ||
'disconnected': DisconnectedTransaction, | ||
'connected': ConnectedTransaction, | ||
'open': OpenTransaction, | ||
'closed': ClosedTransaction | ||
}, | ||
// The valid state transitions. | ||
// A transition to 'errored' is *always* allowed. | ||
{ | ||
'disconnected': [ 'connected' ], | ||
'connected': [ 'open', 'closed' ], | ||
'connected': [ 'open', 'closed' ], | ||
'open': [ 'connected', 'closed' ] | ||
}) | ||
if (opts.callback) { | ||
var callback = opts.callback; | ||
this | ||
.once('error', callback) | ||
.once('begin:complete', function () { | ||
this.removeListener('error', callback); | ||
callback(null, this); | ||
}) | ||
} | ||
} | ||
// create a .begin method that can be patched on to connection objects | ||
Transaction.createBeginMethod = function (createQuery, asyncConnection) { | ||
Transaction.createBeginMethod = function (createQuery, setConnection) { | ||
// default setConnection implementation just assumes that begin is called | ||
// on a connection object. | ||
if (typeof setConnection == 'undefined') { | ||
setConnection = function (tx) { tx.setConnection(this) }; | ||
} | ||
return function (beginStatement, callback) { | ||
@@ -49,13 +54,7 @@ if (beginStatement && typeof beginStatement == 'function') { | ||
createQuery: createQuery, | ||
begin: beginStatement | ||
begin: beginStatement, | ||
callback: callback | ||
}) | ||
if (callback) { | ||
tx.once('error', callback) | ||
.once('open', function () { | ||
tx.removeListener('error', callback); | ||
callback(null, tx); | ||
}) | ||
} | ||
if (asyncConnection) return tx; | ||
return tx.setConnection(this) | ||
setConnection.call(this, tx); | ||
return tx; | ||
} | ||
@@ -65,15 +64,52 @@ } | ||
Transaction.prototype.handleError = function (err, callback) { | ||
var propagate = callback || this.emit.bind(this, 'error') | ||
var ended = /^clos/.test(this.state()) | ||
if (!ended && this._connection) { | ||
OpenTransaction.prototype.rollback.call(this, function (rollbackErr) { | ||
if (rollbackErr) { | ||
err = new RollbackFailedError(rollbackErr, err); | ||
} | ||
var self = this; | ||
var propagate = callback || function (err) { self.emit('error', err) }; | ||
if (this.state() !== 'closed' && this._connection) { | ||
Transaction.prototype.rollback.implementations.open.call(this, function (rollbackErr) { | ||
propagate(err) | ||
}) | ||
} | ||
else process.nextTick(propagate.bind(this, err)) | ||
else propagate(err); | ||
} | ||
Transaction.prototype.query = StateMachine.method('query', { | ||
'connected|disconnected': function (text, params, callback) { | ||
return this._queueTask(this._createQuery(text, params, callback)); | ||
}, | ||
'open': function (stmt, params, callback) { | ||
if (typeof params == 'function') { | ||
callback = params | ||
params = undefined | ||
} | ||
var queryObject = this._connection.query(stmt, params, callback) | ||
this.emit('query', queryObject) | ||
if (!callback) queryObject.on('error', this.handleError) | ||
return queryObject | ||
} | ||
}) | ||
Transaction.prototype.begin = StateMachine.method('begin', { | ||
'open': function (callback) { | ||
return this._createChildTransaction(callback).setConnection(this._connection); | ||
}, | ||
'connected|disconnected': function (callback) { | ||
return this._queueTask(this._createChildTransaction(callback)); | ||
} | ||
}); | ||
['commit', 'rollback'].forEach(function (methodName) { | ||
Transaction.prototype[methodName] = StateMachine.method(methodName, { | ||
'open': closeVia(methodName), | ||
'connected|disconnected': function (callback) { | ||
this._queue.push([methodName, [callback]]); | ||
return this; | ||
} | ||
}); | ||
}); | ||
Transaction.prototype._queueTask = function (task) { | ||
this._queue.push(task); | ||
return task; | ||
} | ||
Transaction.prototype._createChildTransaction = function (callback) { | ||
@@ -84,6 +120,7 @@ var nestingLevel = this._nestingLevel + 1; | ||
var tx = new Transaction({ | ||
createQuery: this._createQuery, | ||
createQuery: this._createQuery, | ||
nestingLevel: nestingLevel, | ||
callback: callback, | ||
begin: 'SAVEPOINT ' + savepointName, | ||
commit: 'RELEASE ' + savepointName, | ||
commit: 'RELEASE SAVEPOINT ' + savepointName, | ||
rollback: 'ROLLBACK TO ' + savepointName | ||
@@ -96,69 +133,33 @@ }) | ||
if (callback) { | ||
tx.once('error', callback) | ||
.once('open', function () { | ||
tx.removeListener('error', callback) | ||
callback(null, tx) | ||
}) | ||
} | ||
return tx | ||
} | ||
inherits(ConnectedTransaction, Transaction) | ||
function ConnectedTransaction () {} | ||
Transaction.prototype.setConnection = StateMachine.method('setConnection', { | ||
'disconnected': function (connection) { | ||
var self = this; | ||
self.state('connected'); | ||
ConnectedTransaction.prototype.query = function (stmt, params, callback) { | ||
return this._queueTask(this._createQuery(stmt, params, callback)) | ||
} | ||
self._onConnectionError = self.handleError.bind(self); | ||
connection.on('error', self._onConnectionError); | ||
ConnectedTransaction.prototype.begin = function (callback) { | ||
return this._queueTask(this._createChildTransaction(callback)); | ||
} | ||
self._connection = connection | ||
ConnectedTransaction.prototype.commit = queueMethodCall('commit'); | ||
ConnectedTransaction.prototype.rollback = queueMethodCall('rollback'); | ||
self.emit('begin:start'); | ||
var beginQuery = connection.query(self._statements.begin, function (err) { | ||
if (err) return self.handleError(err); | ||
self.emit('begin:complete'); // removes error listener | ||
self._runQueue() | ||
}) | ||
ConnectedTransaction.prototype._queueTask = function (task) { | ||
this._queue.push(task); | ||
return task; | ||
} | ||
function queueMethodCall (method) { | ||
return function () { | ||
this._queue.push([method, [].slice.call(arguments)]) | ||
self.emit('query', beginQuery) | ||
return self | ||
} | ||
} | ||
}); | ||
/** | ||
* Transactions start in the Disconnected state, this is identical to the | ||
* Connected state *except* there is an additional setConnection method | ||
* available. | ||
*/ | ||
inherits(DisconnectedTransaction, ConnectedTransaction) | ||
function DisconnectedTransaction () {} | ||
DisconnectedTransaction.prototype.setConnection = function (connection) { | ||
if (!this.state('connected')) return this | ||
connection.on('error', this.handleError) | ||
this._connection = connection | ||
var queryObject = connection.query(this._statements.begin, function (err) { | ||
if (err) return this.handleError(err) | ||
this._runQueue() | ||
}.bind(this)) | ||
this.emit('query', queryObject) | ||
return this | ||
} | ||
ConnectedTransaction.prototype._runQueue = function () { | ||
Transaction.prototype._runQueue = function () { | ||
var self = this | ||
return next(); | ||
var counter = 0; | ||
function next (err) { | ||
if (/^clos/.test(self.state())) { | ||
debugger; | ||
return | ||
} | ||
if (err) { | ||
@@ -169,4 +170,5 @@ self._queue.splice(0, self._queue.length); | ||
if (!self._queue.length) { | ||
// The queue is empty, transition to fully open state | ||
self.state('open'); | ||
if (self.state() !== 'closed' && (err = self.state('open'))) { | ||
self.handleError(err); | ||
} | ||
return | ||
@@ -183,3 +185,3 @@ } | ||
ConnectedTransaction.prototype._runQueuedMethod = function (task, next) { | ||
Transaction.prototype._runQueuedMethod = function (task, next) { | ||
var method = task.shift() | ||
@@ -200,8 +202,8 @@ , args = task.shift() | ||
OpenTransaction.prototype[method].apply(this, args); | ||
this[method].implementations.open.apply(this, args); | ||
} | ||
ConnectedTransaction.prototype._runQueuedTransaction = function (childTx) { | ||
Transaction.prototype._runQueuedTransaction = function (childTx) { | ||
if (!childTx.listeners('error').length) { | ||
childTx.on('error', this.handleError) | ||
childTx.on('error', this.handleError.bind(this)) | ||
} | ||
@@ -211,3 +213,3 @@ childTx.setConnection(this._connection) | ||
ConnectedTransaction.prototype._runQueuedQuery = function (query, callback) { | ||
Transaction.prototype._runQueuedQuery = function (query, callback) { | ||
var self = this; | ||
@@ -231,3 +233,3 @@ var cbName = query._callback ? '_callback' : 'callback' | ||
// to do a little poking at internals here. | ||
query.on('end', callback.bind(null, null)) | ||
query.on('end', function () { callback() }) | ||
if (query.listeners('end').length > 1) { | ||
@@ -242,59 +244,28 @@ var listeners = query._events.end | ||
/** | ||
* A transaction transitions to 'open' when it has completed all queued tasks. | ||
*/ | ||
inherits(OpenTransaction, Transaction) | ||
function OpenTransaction () {} | ||
OpenTransaction.prototype.begin = function (callback) { | ||
return this._createChildTransaction(callback).setConnection(this._connection); | ||
} | ||
OpenTransaction.prototype.query = function (stmt, params, callback) { | ||
if (typeof params == 'function') { | ||
callback = params | ||
params = undefined | ||
} | ||
var queryObject = this._connection.query(stmt, params, callback) | ||
this.emit('query', queryObject) | ||
if (!callback) queryObject.on('error', this.handleError) | ||
return queryObject | ||
}; | ||
OpenTransaction.prototype.commit = closeVia('commit') | ||
OpenTransaction.prototype.rollback = closeVia('rollback') | ||
function closeVia (action) { | ||
return function (callback) { | ||
if (this.state('closed', callback)) { | ||
this.emit(action + ':start'); | ||
var q = this._createQuery(this._statements[action], function (err) { | ||
this._close(err, action, callback) | ||
}.bind(this)) | ||
this.emit('query', q); | ||
this._connection.query(q); | ||
var self = this; | ||
var err = self.state('closed'); | ||
if (err) { | ||
return self.handleError(err, callback); | ||
} | ||
return this | ||
self.emit(action + ':start'); | ||
var q = self._connection.query(self._statements[action], function (err) { | ||
self._close(err, action, callback) | ||
self._connection.removeListener('error', self._onConnectionError); | ||
delete self._connection | ||
if (err) { | ||
self.handleError(new CloseFailedError(action, err), callback) | ||
} else { | ||
self.emit(action + ':complete'); | ||
self.emit('close'); | ||
if (callback) callback() | ||
} | ||
}) | ||
self.emit('query', q); | ||
return self; | ||
} | ||
} | ||
inherits(ClosedTransaction, Transaction) | ||
function ClosedTransaction () {} | ||
['query', 'begin', 'rollback', 'commit'].forEach(function (name) { | ||
ClosedTransaction.prototype[name] = StateMachine.nullImplementation(name); | ||
}) | ||
ClosedTransaction.prototype._close = function (err, action, callback) { | ||
this.state('closed') | ||
this._connection.removeListener('error', this.handleError) | ||
delete this._connection | ||
if (err) { | ||
this.handleError(new CloseFailedError(action, err), callback) | ||
} else { | ||
this.emit(action + ':complete'); | ||
this.emit('close'); | ||
if (callback) callback() | ||
} | ||
Transaction.prototype._close = function (err, action, callback) { | ||
} | ||
@@ -304,5 +275,5 @@ | ||
function CloseFailedError(err, action, previous) { | ||
Error.captureStackTrace(this, RollbackFailedError); | ||
Error.captureStackTrace(this, CloseFailedError); | ||
this.name = action + ' failed'; | ||
this.message = rollbackErr + "\nError causing rollback: " + previous; | ||
this.message = err + "\nError causing rollback: " + previous; | ||
} |
{ | ||
"name": "any-db", | ||
"version": "1.0.0-rc2", | ||
"version": "1.0.0", | ||
"description": "Database-agnostic connection pooling, querying, and result sets", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
1
0
26956
380