Comparing version 1.0.0-rc1 to 1.0.0-rc2
351
API.md
@@ -1,3 +0,350 @@ | ||
# Moved | ||
# Any-DB API | ||
This file has moved to [any-db/API.md](any-db/API.md) | ||
This document gives a comprehensive overview of the API guaranteed by any-db. | ||
## exports.createConnection | ||
`require('any-db').createConnection(dbURL, [callback])` | ||
Create a new connection object. `dbURL` may be a URL string of the form | ||
_adapter://user:password@host:port/database_ or an adapter-specific config | ||
object, in which case it must have an "adapter" property. In either case, | ||
adapter must which must be one of "mysql", "postgres", or "sqlite3". If a | ||
callback is given, it will be called with either an error or the established | ||
connection: `callback(error, conn)`. Additional connection settings can be | ||
included as query parameters in the URL. The returned object will conform to | ||
the [Connection API](#connection) detailed below. | ||
See also: README notes for your chosen adapter | ||
([MySQL](../any-db-mysql/README.md#api-extensions), | ||
[Postgres](../any-db-postgres/README.md#api-extensions), and | ||
[SQLite3](../any-db-sqlite3/README.md#api-extensions)) | ||
## exports.createPool | ||
`require('any-db').createPool(dbUrl, [poolOpts])` | ||
Create a new [ConnectionPool](#connectionpool) and return it immediately. See | ||
the [createConnection](#exportscreateconnection) docs for an explanation of the | ||
`dbURL` parameter. `poolOpts` may be an object with any of the following keys: | ||
* `min: 2` | ||
The minimum number of connections to keep open in the pool. | ||
* `max: 10` | ||
The maximum number of connections to allow in the pool. | ||
* `onConnect: function (conn, done) { done(null, conn) }` | ||
Called immediately after a connection is first established. Use this to do | ||
one-time setup of new connections. You must call `done(error, connection)` | ||
for the connection to actually make it into the pool. | ||
* `reset: function (conn, done) { done(null) }`, | ||
Called each time the connection is returned to the pool. Use this to restore | ||
your connection to it's original state (e.g. rollback transactions, set the | ||
user or encoding). | ||
See [ConnectionPool](#connectionpool) below for the API of the returned object. | ||
## Connection | ||
Connection objects returned by [createConnection](#exportscreateconnection) or | ||
[ConnectionPool.acquire](#connectionpoolacquire) are guaranteed to have the | ||
methods and events listed here, but the connection objects of various drivers | ||
may have additional methods or emit additional events. If you need to access a | ||
feature of your database is not described here (such as Postgres' server-side | ||
prepared statements), consult the documentation for the database driver. | ||
### Connection Events | ||
* `'error', err` - Emitted when there is a connection-level error. | ||
* `'close'` - Emitted when the connection has been closed. | ||
### Connection.query | ||
Execute a SQL statement, using bound parameters if they are given, and return a | ||
[Query](#query) object for the in-progress query. If `callback` is given it will | ||
be called with any errors or an object representing the query results | ||
(`callback(error, results)`). The returned Query object and the result object | ||
passed to the callback may have extra driver-specific properties and events. | ||
*Callback-style* | ||
```javascript | ||
conn.query('SELECT * FROM my_table', function (err, res) { | ||
if (err) return console.error(err) | ||
res.rows.forEach(console.log) | ||
console.log('All done!') | ||
}) | ||
``` | ||
*EventEmitter-style* | ||
```javascript | ||
conn.query('SELECT * FROM my_table') | ||
.on('error', console.error) | ||
.on('row', console.log) | ||
.on('end', function () { console.log('All done!') }) | ||
``` | ||
### Connection.begin | ||
`var tx = conn.begin([statement="BEGIN"], [callback])` | ||
Start a new transaction and return a [Transaction](#transaction) object to | ||
manage it. If a string `statement` is given it will be used in place of the | ||
default statement (`BEGIN`). If `callback` is given it will be called with any | ||
errors encountered starting the transaction and the transaction object itself: | ||
`callback(error, transaction)`. See also: the [Transaction](#transaction) API. | ||
*Callback-style* | ||
```javascript | ||
conn.begin(function (err, transaction) { | ||
if (err) return console.error(err) | ||
// Do work using transaction | ||
}) | ||
``` | ||
*Synchronous-style* | ||
```javascript | ||
var transaction = conn.begin() | ||
transaction.on('error', console.error) | ||
// Do work using transaction, queries are queued until transaction successfully | ||
// starts. | ||
``` | ||
### Connection.end | ||
`conn.end([callback])` | ||
Close the database connection. If `callback` is given it will be called after | ||
the connection has closed. | ||
## Query | ||
Query objects are returned by the `.query(...)` methods of | ||
[connections](#connection), [pools](#connectionpool), and | ||
[transctions](#transaction). Like connections, query objects are created by the | ||
drivers themselves and may have more methods and events than are described here. | ||
### Query properties | ||
* `.text` - The string query submitted. If you are using MySQL this will | ||
contain interpolated values *after* the query has been enqueued by a | ||
connection. | ||
* `.values` - The parameter values submitted to the backend. | ||
### Query Events | ||
* `'error', err` - Emitted if the query results in an error. | ||
* `'row', row` - Emitted for each row in the queries result set. | ||
* `'end', [res]` - Emitted when the query completes. | ||
## ConnectionPool | ||
ConnectionPool instances are created with [createPool](#exportscreatepool). | ||
### ConnectionPool.query | ||
`var query = pool.query(stmt, [params], [callback])` | ||
Acts exactly like [Connection.query](#connectionquery) by automatically | ||
acquiring a connection and releasing it when the query completes. | ||
### ConnectionPool.begin | ||
`var tx = pool.begin([callback])` | ||
Acts exactly like [Connection.begin](#connectionbegin), but the underlying | ||
connection is returned to the pool when the transaction commits or rolls back. | ||
### ConnectionPool.acquire | ||
`pool.acquire(function (err, conn) { ... })` | ||
Remove a connection from the pool. If you use this method you **must** return | ||
the connection back to the pool using [ConnectionPool.release](#connectionpoolrelease). | ||
### ConnectionPool.release | ||
`pool.release(conn)` | ||
Return a connection to the pool. This should only be called with connections | ||
you've manually [acquired](#connectionpoolacquire), and you **must not** | ||
continue to use the connection after releasing it. | ||
### ConnectionPool.close | ||
Stop giving out new connections, and close all existing database connections as | ||
they are returned to the pool. | ||
### ConnectionPool events | ||
* `'acquire'` - emitted whenever `pool.acquire` is called | ||
* `'release'` - emitted whenever `pool.release` is called | ||
* `'query', query` - emitted immediately after `.query` is called on a | ||
connection via `pool.query`. The argument is a [query](#query) object. | ||
* `'close'` - emitted when the connection pool has closed all of it | ||
connections after a call to `close()`. | ||
## Transaction | ||
Transaction objects are simple wrappers around a [Connection](#connection) that | ||
ensure all queries take place within a single database transaction. They are | ||
created by [Connection.begin](#connectionbegin) and [Pool.begin](#poolbegin) and | ||
implement the same API as connections. This includes implementing their own | ||
[begin method](#transactionbegin) that creates nested transactions using | ||
savepoints. A nested transaction can safely rollback without rolling back the | ||
entire parent transaction. | ||
Any queries that error during a transaction will cause an automatic rollback. If | ||
a query has no callback, the transaction will also handle (and re-emit) | ||
`'error'` events for that query. This enables handling errors for an entire | ||
transaction in a single place. | ||
### Transaction states | ||
Transactions are finite state machines with 4 states: `disconnected`, | ||
`connected`, `open`, and `closed`: | ||
[disconnected] | ||
↓ | ||
[connected] | ||
↓ ↓ ↑ | ||
↓ [open] | ||
↓ ↓ | ||
[closed] | ||
Every transaction starts out in the `disconnected` state, in which it will queue | ||
all tasks (queries, child transactions, commits and rollbacks) in the order they | ||
are received. | ||
Once the transaction acquires a connection\* it will transition to the | ||
`connected` state and begin processing it's internal task queue. While in this | ||
state any new tasks will still be added to the end of the queue. There are two | ||
possible transitions from the `connected` state: | ||
* `connected → open` - When queued queries have finished. | ||
* `connected → closed` - When a rollback or commit is encountered in the queue. | ||
This includes automatic rollbacks caused by query errors. | ||
`closed` is a terminal state in which all further database operations result in | ||
errors. (The errors will either be sent to any callback provided or emitted as | ||
`error` events on the next tick). | ||
In the `open` state, all database operations will be performed immediately. If a | ||
child transaction is started with [Transaction.begin](#transactionbegin), the | ||
parent transaction will move back into the `connected` state (queueing | ||
any queries it receives) until the child completes, at which point it will resume processing it's internal queue. | ||
*\ * - Transactions started from [Connection.begin](#connectionbegin) transition | ||
to `connected` before the transaction is returned from `.begin`.* | ||
### Transaction.query | ||
`var q = tx.query(stmt, [params], [callback])` | ||
Acts exactly like [Connection.query](#connectionquery) except queries are | ||
guaranteed to be performed within the transaction. If the transaction has been | ||
committed or rolled back further calls to `query` will fail. | ||
### Transaction.commit | ||
`tx.commit([callback])` | ||
Issue a `COMMIT` statement to the database. If a callback is given it will be | ||
called with any errors after the `COMMIT` statement completes. The transaction | ||
object itself will be unusable after calling `commit()`. | ||
### Transaction.rollback | ||
`tx.rollback([callback])` | ||
The same as [Transaction.commit](#transactioncommit) but issues a `ROLLBACK`. | ||
Again, the transaction will be unusable after calling this method. | ||
### Transaction.begin | ||
`tx.begin([callback])` | ||
Starts a nested transaction (by creating a savepoint) within this transaction | ||
and returns a new transaction object. Unlike [Connection.begin](#connectionbegin), | ||
there is no option to replace the statement used to begin the transaction, this | ||
is because the statement must use a known savepoint name. | ||
While the child transaction is in progress the parent transaction will queue any | ||
queries it receives until the child transaction either commits or rolls back, at | ||
which point it will process the queue. Be careful: it's quite possible to write | ||
code that deadlocks by waiting for a query in the parent transaction before | ||
committing the child transaction. For example: | ||
// Do not do this! it won't work! | ||
var parent = conn.begin(); // starts the transaction | ||
var child = parent.begin(); // creates a savepoint | ||
parent.query('SELECT 1', function (err) { | ||
child.commit(); | ||
}); | ||
### Transaction events | ||
* `'query', query` - emitted immediately after `.query` is called on a | ||
connection via `tx.query`. The argument is a [query](#query) object. | ||
* `'commit:start'` - Emitted when `.commit()` is called. | ||
* `'commit:complete'` - Emitted after the transaction has committed. | ||
* `'rollback:start'` - Emitted when `.rollback()` is called. | ||
* `'rollback:complete'` - Emitted after the transaction has rolled back. | ||
* `'close'` - Emitted after `rollback` or `commit` completes. | ||
* `'error', err` - Emitted under three conditions: | ||
1. There was an error acquiring a connection. | ||
2. Any query performed in this transaction emits an error that would otherwise | ||
go unhandled. | ||
3. Any of `query`, `begin`, `commit`, or `rollback` are called after the | ||
connection has already been committed or rolled back. | ||
Note that the `'error'` event **may be emitted multiple times!** depending on | ||
the callback you are registering, you way want to wrap it using [once][once]. | ||
[once]: http://npm.im/once | ||
### Transaction Example | ||
Here's an example where we stream all of our user ids, check them against an | ||
external abuse-monitoring service, and flag or delete users as necessary, if | ||
for any reason we only get part way through, the entire transaction is rolled | ||
back and nobody is flagged or deleted: | ||
var tx = pool.begin() | ||
tx.on('error', finished) | ||
/* | ||
Why query with the pool and not the transaction? | ||
Because it allows the transaction queries to begin executing immediately, | ||
rather than queueing them all up behind the initial SELECT. | ||
*/ | ||
pool.query('SELECT id FROM users').on('row', function (user) { | ||
if (tx.state().match('rollback')) return | ||
abuseService.checkUser(user.id, function (err, result) { | ||
if (err) return tx.handleError(err) | ||
// Errors from these queries will propagate up to the transaction object | ||
if (result.flag) { | ||
tx.query('UPDATE users SET abuse_flag = 1 WHERE id = $1', [user.id]) | ||
} else if (result.destroy) { | ||
tx.query('DELETE FROM users WHERE id = $1', [user.id]) | ||
} | ||
}) | ||
}).on('error', function (err) { | ||
tx.handleError(err) | ||
}).on('end', function () { | ||
tx.commit(finished) | ||
}) | ||
function finished (err) { | ||
if (err) console.error(err) | ||
else console.log('All done!') | ||
} |
26
index.js
@@ -46,19 +46,19 @@ var ConnectionPool = require('any-db-pool') | ||
pool.begin = function (stmt, callback) { | ||
if (stmt && typeof stmt == 'function') { | ||
callback = stmt | ||
stmt = undefined | ||
} | ||
var t = new Transaction(adapter.createQuery) | ||
var begin = Transaction.createBeginMethod( | ||
adapter.createQuery, pool.acquire.bind(pool) | ||
); | ||
pool.begin = function (beginStatement, callback) { | ||
var tx = begin(beginStatement, callback); | ||
// Proxy query events from the transaction to the pool | ||
t.on('query', pool.emit.bind(this, 'query')) | ||
tx.on('query', pool.emit.bind(this, 'query')) | ||
pool.acquire(function (err, conn) { | ||
if (err) return callback ? callback(err) : t.emit('error', err) | ||
t.begin(conn, stmt, callback) | ||
if (err) return callback ? callback(err) : tx.emit('error', err) | ||
var release = pool.release.bind(pool, conn) | ||
t.once('rollback:complete', release) | ||
t.once('commit:complete', release) | ||
}.bind(pool)) | ||
return t | ||
tx.setConnection(conn) | ||
.once('rollback:complete', release) | ||
.once('commit:complete', release) | ||
}) | ||
return tx | ||
} | ||
@@ -65,0 +65,0 @@ return pool |
@@ -7,11 +7,10 @@ var EventEmitter = require('events').EventEmitter | ||
module.exports.UndefinedMethodError = UndefinedMethodError; | ||
module.exports.nullImplementation = nullImplementation; | ||
inherits(StateMachine, EventEmitter) | ||
function StateMachine (initialState, methods, transitions, onError) { | ||
function StateMachine (initialState, prototypes, transitions) { | ||
EventEmitter.call(this) | ||
var currentState = initialState; | ||
var currentState = null; | ||
var self = this; | ||
this.state = function (to) { | ||
@@ -22,12 +21,18 @@ if (!to) return currentState; | ||
if (typeof prototypes[to] !== 'function') { | ||
} | ||
var extra = Array.prototype.slice.call(arguments, 1) | ||
, legal = transitions[currentState] | ||
, legal = currentState ? transitions[currentState] : [initialState] | ||
if (to === 'errored' || legal && legal.indexOf(to) > -1) { | ||
if (legal && legal.indexOf(to) > -1) { | ||
if (this.log) { | ||
this.log("Transition from:'" + currentState + "' to:'" + to + "'"); | ||
} | ||
removeMethods(); | ||
currentState = to; | ||
assignMethods(); | ||
if (!prototypes[to]) { | ||
throw new Error('unknown state:' + to); | ||
} | ||
this.__proto__ = prototypes[to].prototype | ||
this.emit(currentState) | ||
@@ -37,3 +42,3 @@ return true | ||
extra.unshift(new IllegalTransitionError(currentState, to)) | ||
onError.apply(this, extra) | ||
this.handleError.apply(this, extra) | ||
return false | ||
@@ -43,29 +48,3 @@ } | ||
function assignMethods () { | ||
for (var methodName in methods) { | ||
if (currentState in methods[methodName]) { | ||
self[methodName] = methods[methodName][currentState] | ||
} | ||
} | ||
} | ||
function removeMethods () { | ||
for (var methodName in methods) { | ||
if (currentState in methods[methodName]) { | ||
self[methodName] = methods[methodName][null] || nullImpl(methodName); | ||
} | ||
} | ||
} | ||
function nullImpl (methodName) { | ||
return function () { | ||
var lastArg = [].slice.call(arguments).pop(); | ||
var error = new UndefinedMethodError(methodName, currentState) | ||
if (typeof lastArg == 'function') { | ||
lastArg(error); | ||
} else { | ||
this.emit('error', error); | ||
} | ||
} | ||
} | ||
this.state(initialState); | ||
} | ||
@@ -86,1 +65,14 @@ | ||
} | ||
function nullImplementation (methodName) { | ||
return function () { | ||
var lastArg = [].slice.call(arguments).pop(); | ||
var error = new StateMachine.UndefinedMethodError(methodName, this.state()) | ||
if (typeof lastArg == 'function') { | ||
debugger | ||
lastArg(error); | ||
} else { | ||
this.emit('error', error); | ||
} | ||
} | ||
} |
@@ -7,33 +7,23 @@ var inherits = require('util').inherits | ||
inherits(Transaction, StateMachine) | ||
function Transaction(createQuery) { | ||
if (typeof createQuery != 'function') { | ||
throw new Error('createQuery is not a function!'); | ||
function Transaction(opts) { | ||
opts = opts || {}; | ||
if (typeof opts.createQuery != 'function') { | ||
throw new Error('opts.createQuery is not a function!'); | ||
} | ||
this._createQuery = opts.createQuery | ||
this._statements = { | ||
begin: opts.begin || 'BEGIN', | ||
commit: opts.commit || 'COMMIT', | ||
rollback: opts.rollback || 'ROLLBACK' | ||
} | ||
this._queue = [] | ||
this._createQuery = createQuery | ||
this.log = false | ||
this._nestingLevel = opts.nestingLevel || 0 | ||
this.handleError = forwardOrEmitError.bind(this) | ||
this.handleError = this.handleError.bind(this); | ||
StateMachine.call(this, 'pending', { | ||
'query': { | ||
null: rejectQuery, | ||
'pending': queueQuery, | ||
'opening': queueQuery, | ||
'dequeueing': queueQuery, | ||
'open': doQuery, | ||
'rollback:start': rejectQuery, | ||
'commit:start': rejectQuery, | ||
'errored': rejectQuery | ||
}, | ||
'rollback': { | ||
'open': doRollback, | ||
'errored': doRollback, | ||
// Allow rollback to be called repeatedly | ||
'rollback:start': function (cb) { if (cb) cb() }, | ||
'rollback:complete': function (cb) { if (cb) cb() } | ||
}, | ||
'commit': { | ||
'open': doCommit | ||
} | ||
StateMachine.call(this, 'disconnected', { | ||
'disconnected': DisconnectedTransaction, | ||
'connected': ConnectedTransaction, | ||
'open': OpenTransaction, | ||
'closed': ClosedTransaction | ||
}, | ||
@@ -43,35 +33,38 @@ // The valid state transitions. | ||
{ | ||
'pending': [ 'opening' ], | ||
'opening': [ 'dequeueing' ], | ||
'dequeueing': [ 'open', 'rollback:start', 'commit:start' ], | ||
'open': [ 'open', 'rollback:start', 'commit:start' ], | ||
'errored': [ 'rollback:start' ], | ||
'rollback:start': [ 'rollback:complete' ], | ||
'commit:start': [ 'commit:complete' ] | ||
}, this.handleError) | ||
'disconnected': [ 'connected' ], | ||
'connected': [ 'open', 'closed' ], | ||
'open': [ 'connected', 'closed' ] | ||
}) | ||
} | ||
// create a .begin method that can be patched on to connection objects | ||
Transaction.createBeginMethod = function (createQuery) { | ||
return function (stmt, callback) { | ||
if (stmt && typeof stmt == 'function') { | ||
callback = stmt | ||
stmt = undefined | ||
Transaction.createBeginMethod = function (createQuery, asyncConnection) { | ||
return function (beginStatement, callback) { | ||
if (beginStatement && typeof beginStatement == 'function') { | ||
callback = beginStatement; | ||
beginStatement = undefined; | ||
} | ||
var t = new Transaction(createQuery) | ||
t.begin(this, stmt, callback) | ||
return t | ||
var tx = new Transaction({ | ||
createQuery: createQuery, | ||
begin: beginStatement | ||
}) | ||
if (callback) { | ||
tx.once('error', callback) | ||
.once('open', function () { | ||
tx.removeListener('error', callback); | ||
callback(null, tx); | ||
}) | ||
} | ||
if (asyncConnection) return tx; | ||
return tx.setConnection(this) | ||
} | ||
} | ||
function forwardOrEmitError(err, callback) { | ||
Transaction.prototype.handleError = function (err, callback) { | ||
var propagate = callback || this.emit.bind(this, 'error') | ||
var rolledBack = this.state().match('rollback') | ||
this.state('errored') | ||
if (!rolledBack) { | ||
this.rollback(function (rollbackErr) { | ||
var ended = /^clos/.test(this.state()) | ||
if (!ended && this._connection) { | ||
OpenTransaction.prototype.rollback.call(this, function (rollbackErr) { | ||
if (rollbackErr) { | ||
err = new Error('Failed to rollback transaction: ' + rollbackErr + | ||
'\nError causing rollback: ' + err) | ||
err = new RollbackFailedError(rollbackErr, err); | ||
} | ||
@@ -84,19 +77,72 @@ propagate(err) | ||
Transaction.prototype.begin = function (conn, statement, callback) { | ||
if (typeof statement == 'function') { | ||
callback = statement | ||
statement = 'begin' | ||
Transaction.prototype._createChildTransaction = function (callback) { | ||
var nestingLevel = this._nestingLevel + 1; | ||
var savepointName = 'sp_' + nestingLevel; | ||
var tx = new Transaction({ | ||
createQuery: this._createQuery, | ||
nestingLevel: nestingLevel, | ||
begin: 'SAVEPOINT ' + savepointName, | ||
commit: 'RELEASE ' + savepointName, | ||
rollback: 'ROLLBACK TO ' + savepointName | ||
}) | ||
tx.on('query', this.emit.bind(this, 'query')) | ||
.once('connected', this.state.bind(this, 'connected')) | ||
.once('close', this._runQueue.bind(this)) | ||
if (callback) { | ||
tx.once('error', callback) | ||
.once('open', function () { | ||
tx.removeListener('error', callback) | ||
callback(null, tx) | ||
}) | ||
} | ||
statement = statement || 'begin'; | ||
if (!this.state('opening', callback)) return this | ||
var self = this | ||
if (this.log) this.log('starting transaction') | ||
var queryObject = conn.query(statement, function (err) { | ||
if (err) return self.handleError(err, callback) | ||
this._connection = conn | ||
conn.on('error', this.handleError) | ||
this.once('rollback:complete', unsubErrors.bind(this)) | ||
this.once('commit:complete', unsubErrors.bind(this)) | ||
this._runQueue(callback) | ||
return tx | ||
} | ||
inherits(ConnectedTransaction, Transaction) | ||
function ConnectedTransaction () {} | ||
ConnectedTransaction.prototype.query = function (stmt, params, callback) { | ||
return this._queueTask(this._createQuery(stmt, params, callback)) | ||
} | ||
ConnectedTransaction.prototype.begin = function (callback) { | ||
return this._queueTask(this._createChildTransaction(callback)); | ||
} | ||
ConnectedTransaction.prototype.commit = queueMethodCall('commit'); | ||
ConnectedTransaction.prototype.rollback = queueMethodCall('rollback'); | ||
ConnectedTransaction.prototype._queueTask = function (task) { | ||
this._queue.push(task); | ||
return task; | ||
} | ||
function queueMethodCall (method) { | ||
return function () { | ||
this._queue.push([method, [].slice.call(arguments)]) | ||
} | ||
} | ||
/** | ||
* Transactions start in the Disconnected state, this is identical to the | ||
* Connected state *except* there is an additional setConnection method | ||
* available. | ||
*/ | ||
inherits(DisconnectedTransaction, ConnectedTransaction) | ||
function DisconnectedTransaction () {} | ||
DisconnectedTransaction.prototype.setConnection = function (connection) { | ||
if (!this.state('connected')) return this | ||
connection.on('error', this.handleError) | ||
this._connection = connection | ||
var queryObject = connection.query(this._statements.begin, function (err) { | ||
if (err) return this.handleError(err) | ||
this._runQueue() | ||
}.bind(this)) | ||
this.emit('query', queryObject) | ||
@@ -106,55 +152,96 @@ return this | ||
function unsubErrors() { | ||
this._connection.removeListener('error', this.handleError); | ||
delete this._connection | ||
ConnectedTransaction.prototype._runQueue = function () { | ||
var self = this | ||
return next(); | ||
function next (err) { | ||
if (/^clos/.test(self.state())) { | ||
debugger; | ||
return | ||
} | ||
if (err) { | ||
self._queue.splice(0, self._queue.length); | ||
return self.handleError(err); | ||
} | ||
if (!self._queue.length) { | ||
// The queue is empty, transition to fully open state | ||
self.state('open'); | ||
return | ||
} | ||
var task = self._queue.shift() | ||
if (Array.isArray(task)) self._runQueuedMethod(task, next) | ||
else if (task instanceof Transaction) self._runQueuedTransaction(task, next) | ||
else self._runQueuedQuery(task, next); | ||
} | ||
} | ||
Transaction.prototype._runQueue = function (callback) { | ||
if (!this.state('dequeueing', callback)) return | ||
var next = function () { | ||
var state = this.state() | ||
if (state == 'errored' || state.match('rollback')) return | ||
var conn = this._connection | ||
, handleError = this.handleError | ||
, query = this._queue.shift() | ||
if (query) { | ||
// TODO - ask @brianc about changing pg.Query to use '_callback' | ||
var cbName = query._callback ? '_callback' : 'callback' | ||
var queryCb = query[cbName] | ||
if (queryCb) { | ||
query[cbName] = function (err, res) { | ||
if (err) return handleError(err, queryCb) | ||
else if (queryCb) queryCb(err, res) | ||
} | ||
} else { | ||
query.once('error', function (err) { | ||
if (!query.listeners('error').length) handleError(err) | ||
}) | ||
} | ||
this.emit('query', query); | ||
conn.query(query) | ||
ConnectedTransaction.prototype._runQueuedMethod = function (task, next) { | ||
var method = task.shift() | ||
, args = task.shift() | ||
, last = args[args.length - 1] | ||
; | ||
// Node 0.10 changed the behaviour of EventEmitter.listeners, so we need | ||
// to do a little poking at internals here. | ||
query.on('end', next); | ||
if (query.listeners('end').length > 1) { | ||
var listeners = query._events.end | ||
listeners.unshift(listeners.pop()) | ||
if (typeof last == 'function') { | ||
args[args.length - 1] = function (err) { | ||
if (err) return last(err); | ||
last.apply(this, arguments); | ||
next(); | ||
} | ||
} else { | ||
args.push(next) | ||
} | ||
OpenTransaction.prototype[method].apply(this, args); | ||
} | ||
ConnectedTransaction.prototype._runQueuedTransaction = function (childTx) { | ||
if (!childTx.listeners('error').length) { | ||
childTx.on('error', this.handleError) | ||
} | ||
childTx.setConnection(this._connection) | ||
} | ||
ConnectedTransaction.prototype._runQueuedQuery = function (query, callback) { | ||
var self = this; | ||
var cbName = query._callback ? '_callback' : 'callback' | ||
var queryCb = query[cbName] | ||
if (queryCb) { | ||
query[cbName] = function (err, res) { | ||
if (err) return self.handleError(err, queryCb) | ||
else { | ||
queryCb(null, res); | ||
callback(); | ||
} | ||
} else { | ||
// The queue is empty, queries can now go directly to the connection. | ||
this._queue = null | ||
if (this.state('open', callback) && callback) callback(null, this) | ||
} | ||
}.bind(this) | ||
next() | ||
} else { | ||
query.once('error', function (err) { | ||
if (!query.listeners('error').length) self.handleError(err) | ||
}) | ||
// Node 0.10 changed the behaviour of EventEmitter.listeners, so we need | ||
// to do a little poking at internals here. | ||
query.on('end', callback.bind(null, null)) | ||
if (query.listeners('end').length > 1) { | ||
var listeners = query._events.end | ||
listeners.unshift(listeners.pop()) | ||
} | ||
} | ||
self.emit('query', query) | ||
self._connection.query(query) | ||
} | ||
var queueQuery = function (stmt, params, callback) { | ||
var query = this._createQuery(stmt, params, callback) | ||
this._queue.push(query) | ||
return query | ||
/** | ||
* A transaction transitions to 'open' when it has completed all queued tasks. | ||
*/ | ||
inherits(OpenTransaction, Transaction) | ||
function OpenTransaction () {} | ||
OpenTransaction.prototype.begin = function (callback) { | ||
return this._createChildTransaction(callback).setConnection(this._connection); | ||
} | ||
var doQuery = function (stmt, params, callback) { | ||
OpenTransaction.prototype.query = function (stmt, params, callback) { | ||
if (typeof params == 'function') { | ||
@@ -168,37 +255,16 @@ callback = params | ||
return queryObject | ||
} | ||
}; | ||
var rejectQuery = function (stmt, params, callback) { | ||
var q = this._createQuery(stmt, params, callback) | ||
, msg = "Cannot query in '" + this.state() + "' state. Query: " + stmt | ||
, err = new Error(msg) | ||
; | ||
process.nextTick(function () { | ||
if (callback) callback(err) | ||
else q.emit('error', err) | ||
}) | ||
return q | ||
} | ||
OpenTransaction.prototype.commit = closeVia('commit') | ||
OpenTransaction.prototype.rollback = closeVia('rollback') | ||
// Calling 'commit' or 'rollback' on a newly created transaction must wait | ||
// until the query queue has been cleared before doing anything. | ||
; | ||
['commit', 'rollback'].forEach(function (method) { | ||
Transaction.prototype[method] = function (callback) { | ||
this.once('open', function () { | ||
this[method](callback) | ||
}) | ||
} | ||
}) | ||
function sqlTransition(stmt) { | ||
function closeVia (action) { | ||
return function (callback) { | ||
if (this.state(stmt + ':start', callback)) { | ||
var queryObject = this._connection.query(stmt, function (err) { | ||
if (err) return this.handleError(err, callback) | ||
if (this.state(stmt + ':complete', callback)) { | ||
if (callback) callback() | ||
} | ||
if (this.state('closed', callback)) { | ||
this.emit(action + ':start'); | ||
var q = this._createQuery(this._statements[action], function (err) { | ||
this._close(err, action, callback) | ||
}.bind(this)) | ||
this.emit('query', queryObject) | ||
this.emit('query', q); | ||
this._connection.query(q); | ||
} | ||
@@ -209,3 +275,27 @@ return this | ||
var doCommit = sqlTransition('commit') | ||
var doRollback = sqlTransition('rollback') | ||
inherits(ClosedTransaction, Transaction) | ||
function ClosedTransaction () {} | ||
['query', 'begin', 'rollback', 'commit'].forEach(function (name) { | ||
ClosedTransaction.prototype[name] = StateMachine.nullImplementation(name); | ||
}) | ||
ClosedTransaction.prototype._close = function (err, action, callback) { | ||
this.state('closed') | ||
this._connection.removeListener('error', this.handleError) | ||
delete this._connection | ||
if (err) { | ||
this.handleError(new CloseFailedError(action, err), callback) | ||
} else { | ||
this.emit(action + ':complete'); | ||
this.emit('close'); | ||
if (callback) callback() | ||
} | ||
} | ||
inherits(CloseFailedError, Error); | ||
function CloseFailedError(err, action, previous) { | ||
Error.captureStackTrace(this, RollbackFailedError); | ||
this.name = action + ' failed'; | ||
this.message = rollbackErr + "\nError causing rollback: " + previous; | ||
} |
{ | ||
"name": "any-db", | ||
"version": "1.0.0-rc1", | ||
"version": "1.0.0-rc2", | ||
"description": "Database-agnostic connection pooling, querying, and result sets", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
111
README.md
@@ -1,111 +0,16 @@ | ||
# Any-DB Project | ||
# Any-DB - a less-opinionated database abstraction layer. | ||
[![Build Status](https://secure.travis-ci.org/grncdr/node-any-db.png?branch=master)](http://travis-ci.org/grncdr/node-any-db) | ||
This is the main entry point for Any-DB. Users of the library will | ||
`require('any-db')` to make use of the [API](API.md) it exposes. | ||
_The less-opinionated Node.js database abstraction layer_ | ||
## Installation | ||
## [ANN] v1.0.0-alpha1 released | ||
Do not install this library directly. Instead, install one or more of the | ||
database adapters, which will pull in `any-db` as a peerDependency. For example: | ||
Any-DB 1.0 significantly restructures the various `any-db-*` modules. If you | ||
are updating from a previous version you *will* need to update `package.json`. | ||
npm install --save any-db-mysql | ||
npm install --save-dev any-db-sqlite3 | ||
**Applications** should replace an `any-db` dependency with one or more | ||
dependencies on `any-db-<adapter>` where `adapter` can be `mysql`, `postgres`, | ||
or `sqlite3`. With this change, a direct dependency on an database driver | ||
package (such as `mysql`) is no longer *required*, though you can continue to | ||
use one if you like. | ||
All of the adapter libraries have `any-db` as a *peerDependency* which means | ||
that `any-db` will be pulled in transitively as a dependency on the same level | ||
as the adapter. | ||
**Libraries** should move their `any-db` dependency to `peerDependencies`, | ||
even though things may appear to operate correctly without doing so. If your | ||
library depends on a database connection (e.g. for tests) you should also add | ||
a *devDependency* on the corresponding `any-db-<adapter>` library. | ||
## Synopsis | ||
(There's also detailed [API][API] documentation available) | ||
var anyDB = require('any-db') | ||
var dbURL = 'driver://user:pass@hostname/database' | ||
Establish a connection: | ||
var conn = anyDB.createConnection(dbURL) // Takes an optional callback | ||
Make queries: | ||
var sql = 'SELECT * FROM my_table' | ||
conn.query(sql).on('row', function (row) {}) // evented | ||
conn.query(sql, function (error, result) {}) // or callback | ||
Use bound parameters: | ||
sql += ' WHERE my_column = ?' | ||
conn.query(sql, [42]).on('row', ...) // again, evented | ||
conn.query(sql, [42], function (err, res) {}) // or callback | ||
Close a connection: | ||
conn.end() | ||
Start a transaction: | ||
var tx = conn.begin() // Can also take a callback | ||
tx.on('error', function (err) {}) // Emitted for unhandled query errors | ||
tx.query(...) // same interface as connections, plus... | ||
tx.commit() // takes an optional callback for errors | ||
tx.rollback() // this too | ||
Create a connection pool that maintains 2-20 connections | ||
var pool = anyDB.createPool(dbURL, {min: 2, max: 20}) | ||
pool.query(...) // perform a single query, same API as connection | ||
var tx = pool.begin() // start a transaction, again, same API as connection | ||
pool.close() // close the pool (call when your app should exit) | ||
## Description | ||
The purpose of this library is to provide a consistent API for the commonly used | ||
functionality of SQL database drivers, while avoiding altering driver behaviour | ||
as much as possible. | ||
### Things it does | ||
* Supports MySQL, Postgres, and SQLite3 as equally as possible. (More driver | ||
support is very much welcomed!) | ||
* Parses connection parameters from URLs: `driver://user:pass@host/database` | ||
* Streams results or gets them all at once, using an [api][query] almost | ||
identical to the existing interfaces of the MySQL and Postgres drivers. | ||
* A simple, solid, [connection pool][pool] with the ability to execute queries | ||
directly on a pool for auto-release behaviour. E.g. - this will never leak | ||
connections: `pool.query("SELECT 1", function (err, results) { ... })` | ||
* Stateful [transaction objects][tx] for managing database transactions. | ||
### Things it might do (feedback needed!) | ||
* Provide a common result set API. | ||
### Things it will never do | ||
* Add it's own query helper methods like `.first` or `.fetchAll` | ||
* Include any sort of SQL string building. You might want to try my other library | ||
[gesundheit](https://github.com/BetSmartMedia/gesundheit), or one of the many | ||
[alternatives](https://encrypted.google.com/search?q=sql&q=site:npmjs.org&hl=en) | ||
for that. _(send me pull requests to list your libs here)_ | ||
## Installation | ||
npm install --save any-db-{pg,mysql,sqlite3} | ||
## License | ||
MIT | ||
[API]: any-db/API.md | ||
[query]: any-db/API.md#query | ||
[pool]: any-db/API.md#exportscreatepool | ||
[tx]: any-db/API.md#transaction |
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
27859
402
7
17
1