Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

any-db

Package Overview
Dependencies
Maintainers
1
Versions
33
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

any-db - npm Package Compare versions

Comparing version 1.0.0-rc2 to 1.0.0

24

index.js

@@ -22,3 +22,3 @@ var ConnectionPool = require('any-db-pool')

exports.createPool = function getPool (dbUrl, poolConfig) {
exports.createPool = function createPool (dbUrl, poolConfig) {
poolConfig = poolConfig || {}

@@ -44,23 +44,17 @@ if (poolConfig.create || poolConfig.destroy) {

var adapter = getAdapter(adapterConfig.adapter);
var pool = new ConnectionPool(adapter, adapterConfig, poolConfig)
var begin = Transaction.createBeginMethod(
adapter.createQuery, pool.acquire.bind(pool)
);
pool.begin = function (beginStatement, callback) {
var tx = begin(beginStatement, callback);
pool.begin = Transaction.createBeginMethod(adapter.createQuery, function (tx) {
// Proxy query events from the transaction to the pool
tx.on('query', pool.emit.bind(this, 'query'))
tx.on('query', this.emit.bind(this, 'query'))
pool.acquire(function (err, conn) {
if (err) return callback ? callback(err) : tx.emit('error', err)
var release = pool.release.bind(pool, conn)
tx.setConnection(conn)
.once('rollback:complete', release)
this.acquire(function (err, connection) {
if (err) return tx.emit('error', err);
var release = pool.release.bind(pool, connection)
tx.once('rollback:complete', release)
.once('commit:complete', release)
.setConnection(connection)
})
});
return tx
}
return pool

@@ -67,0 +61,0 @@ }

@@ -7,9 +7,8 @@ var EventEmitter = require('events').EventEmitter

module.exports.UndefinedMethodError = UndefinedMethodError;
module.exports.nullImplementation = nullImplementation;
inherits(StateMachine, EventEmitter)
function StateMachine (initialState, prototypes, transitions) {
function StateMachine (initialState, transitions) {
EventEmitter.call(this)
var currentState = null;
var currentState = initialState;

@@ -19,32 +18,49 @@ this.state = function (to) {

if (to === currentState) return true;
if (to === currentState) return;
if (typeof prototypes[to] !== 'function') {
}
var extra = Array.prototype.slice.call(arguments, 1)
, legal = currentState ? transitions[currentState] : [initialState]
, legal = transitions[currentState]
;
if (legal && legal.indexOf(to) > -1) {
if (this.log) {
this.log("Transition from:'" + currentState + "' to:'" + to + "'");
}
this.emit('transition', currentState, to);
currentState = to;
if (!prototypes[to]) {
throw new Error('unknown state:' + to);
}
this.__proto__ = prototypes[to].prototype
this.emit(currentState)
return true
} else {
extra.unshift(new IllegalTransitionError(currentState, to))
this.handleError.apply(this, extra)
return false
return new IllegalTransitionError(currentState, to);
}
}
}
this.state(initialState);
StateMachine.method = function (name, implementations) {
dispatch.implementations = {};
for (var key in implementations) {
var states = key.split('|');
while (states.length) {
var state = states.shift();
dispatch.implementations[state] = implementations[key];
}
}
return dispatch;
function dispatch () {
var implementation = dispatch.implementations[this.state()];
if (typeof implementation !== 'function') {
var error = new StateMachine.UndefinedMethodError(name, this.state());
var lastArg = [].slice.call(arguments).pop();
if (typeof lastArg === 'function') {
lastArg.call(this, error);
} else {
var self = this;
process.nextTick(function () {
self.emit('error', error);
})
}
return;
}
return implementation.apply(this, arguments);
}
}
inherits(UndefinedMethodError, Error);

@@ -63,14 +79,1 @@ function UndefinedMethodError(method, state) {

}
function nullImplementation (methodName) {
return function () {
var lastArg = [].slice.call(arguments).pop();
var error = new StateMachine.UndefinedMethodError(methodName, this.state())
if (typeof lastArg == 'function') {
debugger
lastArg(error);
} else {
this.emit('error', error);
}
}
}

@@ -14,4 +14,4 @@ var inherits = require('util').inherits

this._statements = {
begin: opts.begin || 'BEGIN',
commit: opts.commit || 'COMMIT',
begin: opts.begin || 'BEGIN',
commit: opts.commit || 'COMMIT',
rollback: opts.rollback || 'ROLLBACK'

@@ -22,21 +22,26 @@ }

this.handleError = this.handleError.bind(this);
StateMachine.call(this, 'disconnected', {
'disconnected': DisconnectedTransaction,
'connected': ConnectedTransaction,
'open': OpenTransaction,
'closed': ClosedTransaction
},
// The valid state transitions.
// A transition to 'errored' is *always* allowed.
{
'disconnected': [ 'connected' ],
'connected': [ 'open', 'closed' ],
'connected': [ 'open', 'closed' ],
'open': [ 'connected', 'closed' ]
})
if (opts.callback) {
var callback = opts.callback;
this
.once('error', callback)
.once('begin:complete', function () {
this.removeListener('error', callback);
callback(null, this);
})
}
}
// create a .begin method that can be patched on to connection objects
Transaction.createBeginMethod = function (createQuery, asyncConnection) {
Transaction.createBeginMethod = function (createQuery, setConnection) {
// default setConnection implementation just assumes that begin is called
// on a connection object.
if (typeof setConnection == 'undefined') {
setConnection = function (tx) { tx.setConnection(this) };
}
return function (beginStatement, callback) {

@@ -49,13 +54,7 @@ if (beginStatement && typeof beginStatement == 'function') {

createQuery: createQuery,
begin: beginStatement
begin: beginStatement,
callback: callback
})
if (callback) {
tx.once('error', callback)
.once('open', function () {
tx.removeListener('error', callback);
callback(null, tx);
})
}
if (asyncConnection) return tx;
return tx.setConnection(this)
setConnection.call(this, tx);
return tx;
}

@@ -65,15 +64,52 @@ }

Transaction.prototype.handleError = function (err, callback) {
var propagate = callback || this.emit.bind(this, 'error')
var ended = /^clos/.test(this.state())
if (!ended && this._connection) {
OpenTransaction.prototype.rollback.call(this, function (rollbackErr) {
if (rollbackErr) {
err = new RollbackFailedError(rollbackErr, err);
}
var self = this;
var propagate = callback || function (err) { self.emit('error', err) };
if (this.state() !== 'closed' && this._connection) {
Transaction.prototype.rollback.implementations.open.call(this, function (rollbackErr) {
propagate(err)
})
}
else process.nextTick(propagate.bind(this, err))
else propagate(err);
}
Transaction.prototype.query = StateMachine.method('query', {
'connected|disconnected': function (text, params, callback) {
return this._queueTask(this._createQuery(text, params, callback));
},
'open': function (stmt, params, callback) {
if (typeof params == 'function') {
callback = params
params = undefined
}
var queryObject = this._connection.query(stmt, params, callback)
this.emit('query', queryObject)
if (!callback) queryObject.on('error', this.handleError)
return queryObject
}
})
Transaction.prototype.begin = StateMachine.method('begin', {
'open': function (callback) {
return this._createChildTransaction(callback).setConnection(this._connection);
},
'connected|disconnected': function (callback) {
return this._queueTask(this._createChildTransaction(callback));
}
});
['commit', 'rollback'].forEach(function (methodName) {
Transaction.prototype[methodName] = StateMachine.method(methodName, {
'open': closeVia(methodName),
'connected|disconnected': function (callback) {
this._queue.push([methodName, [callback]]);
return this;
}
});
});
Transaction.prototype._queueTask = function (task) {
this._queue.push(task);
return task;
}
Transaction.prototype._createChildTransaction = function (callback) {

@@ -84,6 +120,7 @@ var nestingLevel = this._nestingLevel + 1;

var tx = new Transaction({
createQuery: this._createQuery,
createQuery: this._createQuery,
nestingLevel: nestingLevel,
callback: callback,
begin: 'SAVEPOINT ' + savepointName,
commit: 'RELEASE ' + savepointName,
commit: 'RELEASE SAVEPOINT ' + savepointName,
rollback: 'ROLLBACK TO ' + savepointName

@@ -96,69 +133,33 @@ })

if (callback) {
tx.once('error', callback)
.once('open', function () {
tx.removeListener('error', callback)
callback(null, tx)
})
}
return tx
}
inherits(ConnectedTransaction, Transaction)
function ConnectedTransaction () {}
Transaction.prototype.setConnection = StateMachine.method('setConnection', {
'disconnected': function (connection) {
var self = this;
self.state('connected');
ConnectedTransaction.prototype.query = function (stmt, params, callback) {
return this._queueTask(this._createQuery(stmt, params, callback))
}
self._onConnectionError = self.handleError.bind(self);
connection.on('error', self._onConnectionError);
ConnectedTransaction.prototype.begin = function (callback) {
return this._queueTask(this._createChildTransaction(callback));
}
self._connection = connection
ConnectedTransaction.prototype.commit = queueMethodCall('commit');
ConnectedTransaction.prototype.rollback = queueMethodCall('rollback');
self.emit('begin:start');
var beginQuery = connection.query(self._statements.begin, function (err) {
if (err) return self.handleError(err);
self.emit('begin:complete'); // removes error listener
self._runQueue()
})
ConnectedTransaction.prototype._queueTask = function (task) {
this._queue.push(task);
return task;
}
function queueMethodCall (method) {
return function () {
this._queue.push([method, [].slice.call(arguments)])
self.emit('query', beginQuery)
return self
}
}
});
/**
* Transactions start in the Disconnected state, this is identical to the
* Connected state *except* there is an additional setConnection method
* available.
*/
inherits(DisconnectedTransaction, ConnectedTransaction)
function DisconnectedTransaction () {}
DisconnectedTransaction.prototype.setConnection = function (connection) {
if (!this.state('connected')) return this
connection.on('error', this.handleError)
this._connection = connection
var queryObject = connection.query(this._statements.begin, function (err) {
if (err) return this.handleError(err)
this._runQueue()
}.bind(this))
this.emit('query', queryObject)
return this
}
ConnectedTransaction.prototype._runQueue = function () {
Transaction.prototype._runQueue = function () {
var self = this
return next();
var counter = 0;
function next (err) {
if (/^clos/.test(self.state())) {
debugger;
return
}
if (err) {

@@ -169,4 +170,5 @@ self._queue.splice(0, self._queue.length);

if (!self._queue.length) {
// The queue is empty, transition to fully open state
self.state('open');
if (self.state() !== 'closed' && (err = self.state('open'))) {
self.handleError(err);
}
return

@@ -183,3 +185,3 @@ }

ConnectedTransaction.prototype._runQueuedMethod = function (task, next) {
Transaction.prototype._runQueuedMethod = function (task, next) {
var method = task.shift()

@@ -200,8 +202,8 @@ , args = task.shift()

OpenTransaction.prototype[method].apply(this, args);
this[method].implementations.open.apply(this, args);
}
ConnectedTransaction.prototype._runQueuedTransaction = function (childTx) {
Transaction.prototype._runQueuedTransaction = function (childTx) {
if (!childTx.listeners('error').length) {
childTx.on('error', this.handleError)
childTx.on('error', this.handleError.bind(this))
}

@@ -211,3 +213,3 @@ childTx.setConnection(this._connection)

ConnectedTransaction.prototype._runQueuedQuery = function (query, callback) {
Transaction.prototype._runQueuedQuery = function (query, callback) {
var self = this;

@@ -231,3 +233,3 @@ var cbName = query._callback ? '_callback' : 'callback'

// to do a little poking at internals here.
query.on('end', callback.bind(null, null))
query.on('end', function () { callback() })
if (query.listeners('end').length > 1) {

@@ -242,59 +244,28 @@ var listeners = query._events.end

/**
* A transaction transitions to 'open' when it has completed all queued tasks.
*/
inherits(OpenTransaction, Transaction)
function OpenTransaction () {}
OpenTransaction.prototype.begin = function (callback) {
return this._createChildTransaction(callback).setConnection(this._connection);
}
OpenTransaction.prototype.query = function (stmt, params, callback) {
if (typeof params == 'function') {
callback = params
params = undefined
}
var queryObject = this._connection.query(stmt, params, callback)
this.emit('query', queryObject)
if (!callback) queryObject.on('error', this.handleError)
return queryObject
};
OpenTransaction.prototype.commit = closeVia('commit')
OpenTransaction.prototype.rollback = closeVia('rollback')
function closeVia (action) {
return function (callback) {
if (this.state('closed', callback)) {
this.emit(action + ':start');
var q = this._createQuery(this._statements[action], function (err) {
this._close(err, action, callback)
}.bind(this))
this.emit('query', q);
this._connection.query(q);
var self = this;
var err = self.state('closed');
if (err) {
return self.handleError(err, callback);
}
return this
self.emit(action + ':start');
var q = self._connection.query(self._statements[action], function (err) {
self._close(err, action, callback)
self._connection.removeListener('error', self._onConnectionError);
delete self._connection
if (err) {
self.handleError(new CloseFailedError(action, err), callback)
} else {
self.emit(action + ':complete');
self.emit('close');
if (callback) callback()
}
})
self.emit('query', q);
return self;
}
}
inherits(ClosedTransaction, Transaction)
function ClosedTransaction () {}
['query', 'begin', 'rollback', 'commit'].forEach(function (name) {
ClosedTransaction.prototype[name] = StateMachine.nullImplementation(name);
})
ClosedTransaction.prototype._close = function (err, action, callback) {
this.state('closed')
this._connection.removeListener('error', this.handleError)
delete this._connection
if (err) {
this.handleError(new CloseFailedError(action, err), callback)
} else {
this.emit(action + ':complete');
this.emit('close');
if (callback) callback()
}
Transaction.prototype._close = function (err, action, callback) {
}

@@ -304,5 +275,5 @@

function CloseFailedError(err, action, previous) {
Error.captureStackTrace(this, RollbackFailedError);
Error.captureStackTrace(this, CloseFailedError);
this.name = action + ' failed';
this.message = rollbackErr + "\nError causing rollback: " + previous;
this.message = err + "\nError causing rollback: " + previous;
}
{
"name": "any-db",
"version": "1.0.0-rc2",
"version": "1.0.0",
"description": "Database-agnostic connection pooling, querying, and result sets",

@@ -5,0 +5,0 @@ "main": "index.js",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc