Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

any-db-transaction

Package Overview
Dependencies
Maintainers
1
Versions
9
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

any-db-transaction - npm Package Compare versions

Comparing version 0.0.1 to 2.1.0

.travis.yml

19

package.json
{
"name": "any-db-transaction",
"version": "0.0.1",
"version": "2.1.0",
"description": "Transaction object for Any-DB adapters",

@@ -8,7 +8,18 @@ "main": "transaction.js",

"inherits": "~2.0.1",
"yafsm": "0.0.0"
"yafsm": "0.0.0",
"once": "~1.3.0"
},
"devDependencies": {},
"devDependencies": {
"any-db-postgres": "~2.1.0",
"any-db-sqlite3": "~2.1.0",
"any-db-mysql": "~2.1.0",
"tape": "~2.3.2",
"any-db-fake": "~0.0.3",
"any-db-adapter-spec": "~2.1.0",
"assert-in-order": "0.0.1",
"covert": "~0.2.0"
},
"scripts": {
"test": "node test.js"
"test": "tape tests/*.js",
"covert": "covert tests/*.js"
},

@@ -15,0 +26,0 @@ "repository": {

# any-db-transaction
*note:* Generally you will not want to use this package directly, it is primarily
intended to simplify writing an Any-DB adapter that supports transactions.
[![Build Status](https://travis-ci.org/grncdr/node-any-db-transaction.png)](https://travis-ci.org/grncdr/node-any-db-transaction)
## Description
A simple transaction helper for [any-db][] compliant database adapters.
Transaction objects are created by [Connection.begin][] and [Pool.begin][]. They
are simple wrappers around a [Connection][] that implement the same API, but
ensure that all queries take place within a single database transaction.
## Synopsis
Any queries that error during a transaction will cause an automatic rollback. If
a query has no callback, the transaction will also handle (and re-emit)
`'error'` events for that query. This enables handling errors for an entire
transaction in a single place.
```javascript
var anyDB = require('any-db')
var begin = require('any-db-transaction')
Transactions also implement their own [begin method][] for creating nested
transactions using savepoints. Nested transaction can safely error and rollback
without rolling back their parent transaction.
var connection = anyDB.createConnection(...)
var pool = anyDB.createPool(...)
// Callback-style
begin(connection, function (err, transaction) {
if (err) return console.error(err)
// Do work using transaction
transaction.query(...)
transaction.commit()
})
// Synchronous-style*
var transaction = begin(connection)
transaction.on('error', console.error)
transaction.query(...)
transaction.commit()
// Or use a connection pool
var transaction = begin(pool)
```
## API
```ocaml
Transaction := StateMachine & {
adapter: String
query: (String, Array?, Continuation<ResultSet>) => Query
begin: (String?, Continuation<Transaction>) => Transaction
module.exports := begin(Queryable, statement: String?, Continuation<Transaction>?) => Transaction
Transaction := FSM & Queryable & {
commit: (Continuation?) => void

@@ -33,9 +45,57 @@ rollback: (Continuation?) => void

### Transaction.adapter
### begin
Contains the adapter name used for the transaction, e.g. `'sqlite3'`, etc.
```ocaml
module.exports := begin(Queryable, statement: String?, Continuation<Transaction>?) => Transaction
```
Transaction objects are are simple wrappers around a [Connection][] that also
implement the [Queryable][] API, but guarantee that all queries take place
within a single database transaction or not at all. Note that `begin` also
understands how to acquire (and release) a connection from a [ConnectionPool][]
as well, so you can simply pass a pool to it: `var tx = begin(pool)`
Any queries that error during a transaction will cause an automatic rollback.
If a query has no callback, the transaction will also handle (and re-emit)
`'error'` events for the [Query][] instance. This enables handling errors for
an entire transaction in a single place.
Transactions may also be nested by passing a `Transaction` to `begin` and these
nested transactions can safely error and rollback without rolling back their
parent transaction:
```javascript
var parent = begin(connection)
var child = begin(parent)
child.query("some invalid sql")
child.on('error', function () {
parent.query("select 1") // parent still works
})
```
This feature relies on the `SAVEPOINT` support in your database. (In particular
MySQL will doesn't have good support in very old versions). The use of
savepoints also means there is no option to replace the statement used to begin
the child transaction.
While the child transaction is in progress the parent transaction will queue any
queries it receives until the child transaction either commits or rolls back, at
which point it will process the queue. Be careful: it's quite possible to write
code that deadlocks by waiting for a query in the parent transaction before
committing the child transaction. For example:
```javascript
// Do not do this! it will deadlock!
var parent = begin(connection) // starts the transaction
var child = begin(parent) // creates a savepoint
parent.query('SELECT 1', function (err) {
child.commit();
});
```
### Transaction states
Transactions are finite state machines with 4 states: `disconnected`,
Transactions are [FSM][] instances with 4 states: `disconnected`,
`connected`, `open`, and `closed`:

@@ -69,3 +129,3 @@

In the `open` state, all database operations will be performed immediately. If
a child transaction is started with [Transaction.begin][], the parent
a child transaction is started like `var child = begin(parentTxn)`, the parent
transaction will move back into the `connected` state (queueing any queries it

@@ -75,5 +135,9 @@ receives) until the child completes, at which point it will resume processing

*\ * - Transactions started from [Connection.begin][] transition
to `connected` before the transaction is returned from `.begin`.*
Transactions created from a [Connection][] transition to `connected` before
[begin][] returns.
### Transaction.adapter
Contains the adapter name used for the transaction, e.g. `'sqlite3'`, etc.
### Transaction.query

@@ -85,5 +149,6 @@

Acts exactly like [Connection.query][] except queries are
guaranteed to be performed within the transaction. If the transaction has been
committed or rolled back further calls to `query` will fail.
Maintains the same contract as [Queryable.query][] but adds further guarantees
that queries will be performed within the transaction or not at all. If the
transaction has been committed or rolled back this method will fail by passing
an error to the continuation (if provided) or emitting an `'error'` event.

@@ -97,5 +162,5 @@ ### Transaction.commit

Issue a `COMMIT` (or `RELEASE ...` in the case of nested transactions) statement
to the database. If a callback is given it will be called with any errors after
the `COMMIT` statement completes. The transaction object itself will be unusable
after calling `commit()`.
to the database. If a continuation is provided it will be called (possibly with
an error) after the `COMMIT` statement completes. The transaction object itself
will be unusable after calling `commit()`.

@@ -111,30 +176,2 @@ ### Transaction.rollback

### Transaction.begin
`(Continuation<void>) => void`
Starts a nested transaction (by creating a savepoint) within this transaction
and returns a new transaction object. Unlike [Connection.begin][], there is no
option to replace the statement used to begin the transaction, this is because
the statement must use a known savepoint name.
While the child transaction is in progress the parent transaction will queue any
queries it receives until the child transaction either commits or rolls back, at
which point it will process the queue. Be careful: it's quite possible to write
code that deadlocks by waiting for a query in the parent transaction before
committing the child transaction. For example:
// Do not do this! it won't work!
var parent = conn.begin(); // starts the transaction
var child = parent.begin(); // creates a savepoint
parent.query('SELECT 1', function (err) {
child.commit();
});
### Transaction.adapter
Contains the adapter name used for this transaction, e.g. `'sqlite3'`, etc.
### Transaction events

@@ -157,6 +194,39 @@

Note that the `'error'` event **may be emitted multiple times!** depending on
the callback you are registering, you way want to wrap it using [once][once].
the callback you are registering, you way want to wrap it using [once][].
### Transaction Example
## Examples
### Unit-of-work middleware
A common pattern in web applications is start a transaction for each request and
commit it before sending a response. Here is a simplified [connect][] middleware
that encapsulates this pattern:
```javascript
module.exports = function unitOfWorkMiddleware (pool, errorHandler) {
return function (req, res, next) {
req.tx = pool.begin()
// intercept writeHead to ensure we have completed our transaction before
// responding to the user
var writeHead = res.writeHead
res.writeHead = function () {
if (req.tx.state() != 'closed') {
req.tx.commit(function (err) {
if (err) {
errorHandler(req, res, err)
} else {
writeHead.apply(res, arguments)
}
})
} else {
writeHead.apply(res, arguments)
}
}
next()
}
}
```
### Rolling back
Here's an example where we stream all of our user ids, check them against an

@@ -167,4 +237,5 @@ external abuse-monitoring service, and flag or delete users as necessary, if

var tx = pool.begin()
var pool = require('any-db').createPool(...)
var tx = begin(pool)
tx.on('error', finished)

@@ -193,7 +264,13 @@

})
}).on('error', function (err) {
tx.handleError(err)
}).on('end', function () {
tx.commit(finished)
})
function finished (err) {
if (err) console.error(err)
else console.log('All done!')
}
function finished (err) {
if (err) console.error(err)
else console.log('All done!')
}
```

@@ -203,1 +280,8 @@ # License

2-clause BSD
[begin]: #begin
[Connection]: https://github.com/grncdr/any-db-adapter-spec#connection
[Queryable]: https://github.com/grncdr/any-db-adapter-spec#queryable
[Queryable.query]: https://github.com/grncdr/any-db-adapter-spec#queryablequery
[Query]: https://github.com/grncdr/any-db-adapter-spec#query
[ConnectionPool]: https://github.com/grncdr/any-db-pool#connectionpool

@@ -1,6 +0,41 @@

console.log([
"Where are the tests?",
"",
"For now, any-db-transaction is tested by running the test-suite",
"from any-db-adapter-spec on an actual adapter"
].join('\n'))
var fs = require('fs')
var tape = require('tape')
var anyDB = require('any-db')
var URLS = [
'mysql://root@localhost/any_db_test',
'postgres://postgres@localhost/any_db_test',
'sqlite3:///tmp/any_db_test.db',
]
module.exports = function test (description, opts, callback) {
if (!callback) {
callback = opts
opts = {}
}
var urls = URLS.filter(function (url) {
return true // TODO select database via env or command line?
})
tape(description, function (t) {
t.plan(urls.length)
var pool = opts.pool
urls.forEach(function (url) {
var backend = url.split(':').shift()
if (backend == 'sqlite3') fs.unlink('/tmp/any_db_test.db', function () {})
var queryable, cleanup;
if (pool) {
queryable = anyDB.createPool(url, pool)
cleanup = queryable.close.bind(queryable)
} else {
queryable = anyDB.createConnection(url)
cleanup = queryable.end.bind(queryable)
}
t.test(backend + ' - ' + description, function (t) {
callback(queryable, t)
t.on('end', cleanup)
})
})
})
}
var inherits = require('inherits')
var FSM = require('yafsm')
var once = require('once')
module.exports = Transaction
module.exports = begin
begin.Transaction = Transaction
function begin (queryable, beginStatement, callback) {
if (typeof beginStatement == 'function') {
callback = beginStatement
beginStatement = undefined
}
if (queryable instanceof Transaction) {
return beginWithParent(queryable, callback)
}
var adapter = queryable.adapter;
var tx = new Transaction({
adapter: adapter,
begin: beginStatement,
callback: callback
})
if (typeof adapter.createQuery != 'function' ||
typeof queryable.query != 'function') {
var error = new TypeError(queryable + ' is not a queryable!')
if (callback) {
callback(error)
} else {
throw error
}
}
if (typeof queryable.acquire == 'function') {
// it's a pool
queryable.acquire(function (err, conn) {
if (err) return process.nextTick(function () {
tx.emit('error', err)
})
var release = pool.release.bind(pool, connection)
tx.on('query', pool.emit.bind(pool, 'query'))
tx.once('rollback:complete', release)
.once('commit:complete', release)
.setConnection(connection)
})
}
else {
// it's a connection
tx.setConnection(queryable)
}
return tx
}
inherits(Transaction, FSM)
function Transaction(opts) {
opts = opts || {}
if (typeof opts.createQuery != 'function') {
throw new Error('opts.createQuery is not a function!')
}
this._createQuery = opts.createQuery
this.adapter = opts.adapter
this._connection = null
this._statements = {

@@ -21,2 +70,4 @@ begin: opts.begin || 'BEGIN',

this.handleError = this.handleError.bind(this)
FSM.call(this, 'disconnected', {

@@ -39,23 +90,12 @@ 'disconnected': [ 'connected' ],

Transaction.begin = function (createQuery, beginStatement, callback) {
if (typeof beginStatement == 'function') {
callback = beginStatement
beginStatement = undefined
}
return new Transaction({
createQuery: createQuery,
begin: beginStatement,
callback: callback
})
}
Transaction.prototype.handleError = function (err, callback) {
Transaction.prototype.handleError = function (err, skipEmit) {
var self = this
var propagate = callback || function (err) { self.emit('error', err) }
var rollback = this.rollback.implementations['open']
if (this.state() !== 'closed' && this._connection) {
Transaction.prototype.rollback.implementations.open.call(this, function (rollbackErr) {
propagate(err)
rollback.call(this, function (rollbackErr) {
if (rollbackErr) self.emit('error', rollbackErr)
else if (!skipEmit) self.emit('error', err)
})
}
else propagate(err)
else if (!skipEmit) self.emit('error', err)
}

@@ -65,25 +105,16 @@

'connected|disconnected': function (text, params, callback) {
return this._queueTask(this._createQuery(text, params, callback))
var query = this.adapter.createQuery(text, params, callback)
this._queue.push(query)
return query
},
'open': function (stmt, params, callback) {
if (typeof params == 'function') {
callback = params
params = undefined
}
var queryObject = this._connection.query(stmt, params, callback)
this.emit('query', queryObject)
if (!callback) queryObject.on('error', this.handleError)
return queryObject
'open': function (text, params, callback) {
var self = this
var query = this.adapter.createQuery(text, params, callback)
query.once('error', function (err) {
self.handleError(err, query.listeners('error').length)
})
return this._connection.query(query)
}
})
Transaction.prototype.begin = FSM.method('begin', {
'open': function (callback) {
return this._createChildTransaction(callback).setConnection(this._connection)
},
'connected|disconnected': function (callback) {
return this._queueTask(this._createChildTransaction(callback))
}
})
;['commit', 'rollback'].forEach(function (methodName) {

@@ -93,3 +124,4 @@ Transaction.prototype[methodName] = FSM.method(methodName, {

'connected|disconnected': function (callback) {
this._queue.push([methodName, [callback]])
var fn = this[methodName].implementations['open']
this._queue.push([fn, [callback]])
return this

@@ -100,37 +132,19 @@ }

Transaction.prototype._queueTask = function (task) {
this._queue.push(task)
return task
}
Transaction.prototype._createChildTransaction = function (callback) {
var nestingLevel = this._nestingLevel + 1
var savepointName = 'sp_' + nestingLevel
var tx = new Transaction({
createQuery: this._createQuery,
nestingLevel: nestingLevel,
callback: callback,
begin: 'SAVEPOINT ' + savepointName,
commit: 'RELEASE SAVEPOINT ' + savepointName,
rollback: 'ROLLBACK TO ' + savepointName
})
tx.on('query', this.emit.bind(this, 'query'))
.once('connected', this.state.bind(this, 'connected'))
.once('close', this._runQueue.bind(this))
return tx
}
Transaction.prototype.setConnection = FSM.method('setConnection', {
'disconnected': function (connection) {
var self = this
self.state('connected')
var err = self.state('connected')
if (err) {
process.nextTick(function () {
self.emit('error', err)
})
return
}
self._onConnectionError = self.handleError.bind(self)
connection.on('error', self._onConnectionError)
connection.on('query', self._emitQuery = function (query) {
self.emit('query', query)
})
connection.on('error', self.handleError)
self._connection = connection
self.adapter = self._connection.adapter

@@ -153,7 +167,5 @@ self.emit('begin:start')

var counter = 0
function next (err) {
function next (err, skipEmit) {
if (err) {
self._queue.splice(0, self._queue.length)
return self.handleError(err)
self.handleError(err, skipEmit)
}

@@ -169,11 +181,15 @@ if (!self._queue.length) {

if (Array.isArray(task)) self._runQueuedMethod(task, next)
else if (task instanceof Transaction) self._runQueuedTransaction(task, next)
else self._runQueuedQuery(task, next)
if (Array.isArray(task)) {
runFunctionCall(self, task, next)
} else if (task instanceof Transaction) {
runChildTransaction(self, task)
} else {
runQueuedQuery(self, task, next)
}
}
}
Transaction.prototype._runQueuedMethod = function (task, next) {
var method = task.shift()
, args = task.shift()
function runFunctionCall (ctx, fnAndArgs, next) {
var fn = fnAndArgs[0]
, args = fnAndArgs[1]
, last = args[args.length - 1]

@@ -191,41 +207,83 @@

this[method].implementations.open.apply(this, args)
return fn.apply(ctx, args)
}
Transaction.prototype._runQueuedTransaction = function (childTx) {
if (!childTx.listeners('error').length) {
childTx.on('error', this.handleError.bind(this))
function runQueuedQuery (self, query, next) {
if (self.state() == 'closed') {
self.query(query, function (err) {
query.emit('error', err)
next()
})
return
}
childTx.setConnection(this._connection)
self._connection.query(query)
var onext = once(next) // ensure we only call `next` once
query.once('error', function (err) {
onext(err, this.listeners('error').length)
})
query.once('close', function () {
// let 'error' events have a chance to call `next` first
process.nextTick(onext)
})
}
Transaction.prototype._runQueuedQuery = function (query, callback) {
var self = this
var cbName = query._callback ? '_callback' : 'callback'
var queryCb = query[cbName]
if (queryCb) {
query[cbName] = function (err, res) {
if (err) return self.handleError(err, queryCb)
else {
queryCb(null, res)
callback()
}
}
} else {
query.once('error', function (err) {
if (!query.listeners('error').length) self.handleError(err)
})
function beginWithParent (parent, callback) {
var child = createChildTransaction(parent, callback)
switch (parent.state()) {
case 'disconnected':
case 'connected':
parent._queue.push(child)
break
case 'open':
runChildTransaction(parent, child)
break
case 'closed':
var error = new Error("Cannot start child transaction on parent in state 'closed'")
process.nextTick(function () {
// callback is already attached to error event
child.emit('error', error)
})
}
return child
}
// Node 0.10 changed the behaviour of EventEmitter.listeners, so we need
// to do a little poking at internals here.
query.on('end', function () { callback() })
if (query.listeners('end').length > 1) {
var listeners = query._events.end
listeners.unshift(listeners.pop())
function createChildTransaction (parent, callback) {
var nestingLevel = parent._nestingLevel + 1
var savepointName = 'sp_' + nestingLevel
var child = new Transaction({
adapter: parent.adapter,
nestingLevel: nestingLevel,
callback: callback,
begin: 'SAVEPOINT ' + savepointName,
commit: 'RELEASE SAVEPOINT ' + savepointName,
rollback: 'ROLLBACK TO ' + savepointName,
})
child
.on('query', parent.emit.bind(parent, 'query'))
.once('connected', parent.state.bind(parent, 'connected'))
.once('close', parent._runQueue.bind(parent))
return child
}
function runChildTransaction (parent, child) {
// Child transaction
child.setConnection(parent._connection)
child.on('error', function (err) {
if (child.listeners('error').length == 1) {
// if a child transaction errors, and the parent is the only
// listener, it should re-emit the error, but *not* roll back
parent.emit('error', err)
}
}
self.emit('query', query)
self._connection.query(query)
})
}
Transaction.prototype._removeConnection = function () {
this._connection.removeListener('error', this.handleError)
this._connection.removeListener('query', this._emitQuery)
this._connection = null
}
function closeVia (action) {

@@ -240,5 +298,3 @@ return function (callback) {

var q = self._connection.query(self._statements[action], function (err) {
self._close(err, action, callback)
self._connection.removeListener('error', self._onConnectionError)
delete self._connection
self._removeConnection()
if (err) {

@@ -257,5 +313,2 @@ self.handleError(new CloseFailedError(action, err), callback)

Transaction.prototype._close = function (err, action, callback) {
}
inherits(CloseFailedError, Error)

@@ -262,0 +315,0 @@ function CloseFailedError(err, action, previous) {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc