Comparing version 0.3.0 to 0.4.0
136
index.js
var Libpq = require('libpq'); | ||
var consumeResults = require('./lib/consume-results'); | ||
var EventEmitter = require('events').EventEmitter; | ||
@@ -17,13 +16,9 @@ var util = require('util'); | ||
this.types = types; | ||
this._reading = false; | ||
this._read = this._read.bind(this); | ||
var self = this; | ||
this.on('newListener', function(event) { | ||
self.pq.startReader(); | ||
self.pq.once('readable', function() { | ||
self.pq.consumeInput(); | ||
var notice; | ||
while(notice = self.pq.notifies()) { | ||
self.emit('notification', notice); | ||
} | ||
}); | ||
}); | ||
if(event != 'notification') return; | ||
self._startReading(); | ||
}) | ||
}; | ||
@@ -42,3 +37,3 @@ | ||
Client.prototype.end = function(cb) { | ||
this.pq.stopReader(); | ||
this._stopReading(); | ||
this.pq.finish(); | ||
@@ -48,2 +43,68 @@ if(cb) setImmediate(cb); | ||
Client.prototype._readError = function(message) { | ||
this._stopReading(); | ||
var err = new Error(message || this.pq.errorMessage()); | ||
this.emit('error', err); | ||
}; | ||
Client.prototype._stopReading = function() { | ||
this._reading = false; | ||
this.pq.stopReader(); | ||
this.pq.removeListener('readable', this._read); | ||
}; | ||
//called when libpq is readable | ||
Client.prototype._read = function() { | ||
var pq = this.pq; | ||
//read waiting data from the socket | ||
//e.g. clear the pending 'select' | ||
if(!pq.consumeInput()) { | ||
return this._readError(); | ||
} | ||
//check if there is still outstanding data | ||
//if so, wait for it all to come in | ||
if(pq.isBusy()) { | ||
return; | ||
} | ||
//load our result object | ||
pq.getResult(); | ||
//"read until results return null" | ||
//or in our case ensure we only have one result | ||
if(pq.getResult() && pq.resultStatus() != 'PGRES_COPY_OUT') { | ||
return this._readError('Only one result at a time is accepted:' + pq.resultStatus()); | ||
} | ||
var status = pq.resultStatus(); | ||
switch(status) { | ||
case 'PGRES_FATAL_ERROR': | ||
return this._readError(); | ||
case 'PGRES_COMMAND_OK': | ||
case 'PGRES_TUPLES_OK': | ||
case 'PGRES_COPY_OUT': { | ||
this.emit('result'); | ||
break; | ||
} | ||
default: | ||
return this._readError('unrecognized cmmand status: ' + status); | ||
} | ||
var notice; | ||
while(notice = this.pq.notifies()) { | ||
this.emit('notification', notice); | ||
} | ||
}; | ||
//ensures the client is reading and | ||
//everything is set up for async io | ||
Client.prototype._startReading = function() { | ||
if(this._reading) return; | ||
this._reading = true; | ||
this.pq.setNonBlocking(true); | ||
this.pq.startReader(); | ||
this.pq.on('readable', this._read); | ||
}; | ||
var throwIfError = function(pq) { | ||
@@ -79,2 +140,21 @@ var err = pq.resultErrorMessage() || pq.errorMessage(); | ||
Client.prototype._awaitResult = function(cb) { | ||
this._startReading(); | ||
var self = this; | ||
var onError = function(e) { | ||
self.removeListener('error', onError); | ||
self.removeListener('result', onResult); | ||
cb(e); | ||
}; | ||
var onResult = function() { | ||
self.removeListener('error', onError); | ||
self.removeListener('result', onResult); | ||
cb(null); | ||
}; | ||
this.once('error', onError); | ||
this.once('result', onResult); | ||
} | ||
//wait for the writable socket to drain | ||
var waitForDrain = function(pq, cb) { | ||
@@ -89,2 +169,4 @@ var res = pq.flush(); | ||
//send an async query to libpq and wait for it to | ||
//finish writing query text to the socket | ||
var dispatchQuery = function(pq, fn, cb) { | ||
@@ -94,3 +176,3 @@ var success = pq.setNonBlocking(true); | ||
var sent = fn(); | ||
if(!sent) return cb(new Error(pq.errorMessage())); | ||
if(!sent) return cb(new Error(pq.errorMessage() || 'Something went wrong dispatching the query')); | ||
return waitForDrain(pq, cb); | ||
@@ -105,10 +187,11 @@ }; | ||
cb = values; | ||
queryFn = pq.sendQuery.bind(pq, text); | ||
queryFn = function() { return pq.sendQuery(text); }; | ||
} else { | ||
queryFn = pq.sendQueryParams.bind(pq, text, values); | ||
queryFn = function() { return pq.sendQueryParams(text, values); }; | ||
} | ||
var self = this | ||
dispatchQuery(pq, queryFn, function(err) { | ||
if(err) return cb(err); | ||
consumeResults(pq, function(err) { | ||
self._awaitResult(function(err) { | ||
return cb(err, err ? null : mapResults(pq, types)); | ||
@@ -120,7 +203,10 @@ }); | ||
Client.prototype.prepare = function(statementName, text, nParams, cb) { | ||
var self = this; | ||
var pq = this.pq; | ||
var fn = pq.sendPrepare.bind(pq, statementName, text, nParams); | ||
var fn = function() { | ||
return pq.sendPrepare(statementName, text, nParams); | ||
} | ||
dispatchQuery(pq, fn, function(err) { | ||
if(err) return cb(err); | ||
consumeResults(pq, cb); | ||
self._awaitResult(cb); | ||
}); | ||
@@ -131,2 +217,3 @@ }; | ||
var pq = this.pq; | ||
var self = this; | ||
var types = this.types; | ||
@@ -136,3 +223,3 @@ var fn = pq.sendQueryPrepared.bind(pq, statementName, parameters); | ||
if(err) return cb(err); | ||
consumeResults(pq, function(err) { | ||
self._awaitResult(function(err) { | ||
return cb(err, err ? null : mapResults(pq, types)); | ||
@@ -143,14 +230,9 @@ }); | ||
var CopyFromStream = require('./lib/copy-from-stream'); | ||
Client.prototype.getCopyFromStream = function() { | ||
var CopyStream = require('./lib/copy-stream'); | ||
Client.prototype.getCopyStream = function() { | ||
this.pq.setNonBlocking(true); | ||
return new CopyFromStream(this.pq); | ||
this._stopReading(); | ||
return new CopyStream(this.pq); | ||
}; | ||
var CopyToStream = require('./lib/copy-to-stream'); | ||
Client.prototype.getCopyToStream = function() { | ||
this.pq.setNonBlocking(true); | ||
return new CopyToStream(this.pq); | ||
}; | ||
Client.prototype.querySync = function(text, values) { | ||
@@ -157,0 +239,0 @@ var queryFn; |
{ | ||
"name": "pg-native", | ||
"version": "0.3.0", | ||
"version": "0.4.0", | ||
"description": "A slightly nicer interface to Postgres over node-libpq", | ||
@@ -34,4 +34,5 @@ "main": "index.js", | ||
"mocha": "^1.21.4", | ||
"okay": "^0.3.0" | ||
"okay": "^0.3.0", | ||
"pg": "^3.4.2" | ||
} | ||
} |
@@ -18,3 +18,3 @@ var assert = require('assert'); | ||
this.client.querySync('COPY blah FROM stdin'); | ||
var stream = this.client.getCopyFromStream(); | ||
var stream = this.client.getCopyStream(); | ||
stream.write(Buffer('Brian\t32\n', 'utf8')); | ||
@@ -37,3 +37,3 @@ stream.write(Buffer('Aaron\t30\n', 'utf8')); | ||
this.client.querySync('COPY boom FROM stdin'); | ||
var stream = this.client.getCopyFromStream(); | ||
var stream = this.client.getCopyStream(); | ||
stream.write(Buffer('Brian\t32\n', 'utf8')); | ||
@@ -40,0 +40,0 @@ stream.write(Buffer('Aaron\t30\n', 'utf8'), function() { |
@@ -22,3 +22,3 @@ var assert = require('assert'); | ||
if(err) return done(err); | ||
var stream = self.client.getCopyToStream(); | ||
var stream = self.client.getCopyStream(); | ||
stream.pipe(concat(function(buff) { | ||
@@ -25,0 +25,0 @@ var res = buff.toString('utf8') |
var Client = require('../') | ||
var ok = require('okay') | ||
describe('LISTEN/NOTIFY', function() { | ||
var notify = function(channel, payload) { | ||
var client = new Client(); | ||
client.connectSync(); | ||
client.querySync("NOTIFY " + channel + ", '" + payload + "'"); | ||
client.end(); | ||
}; | ||
describe('simple LISTEN/NOTIFY', function() { | ||
before(function(done) { | ||
@@ -9,10 +17,3 @@ var client = this.client = new Client(); | ||
var notify = function(channel, payload) { | ||
var client = new Client(); | ||
client.connectSync(); | ||
client.querySync("NOTIFY " + channel + ", '" + payload + "'"); | ||
client.end(); | ||
}; | ||
it('works in a simple case', function(done) { | ||
it('works', function(done) { | ||
var client = this.client; | ||
@@ -30,1 +31,29 @@ client.querySync('LISTEN boom'); | ||
}); | ||
describe('async LISTEN/NOTIFY', function() { | ||
before(function(done) { | ||
var client = this.client = new Client(); | ||
client.connect(done); | ||
}); | ||
it('works', function(done) { | ||
var client = this.client; | ||
var count = 0; | ||
var check = function() { | ||
count++; | ||
if(count >= 3) return done(); | ||
} | ||
client.on('notification', check); | ||
client.query('LISTEN test', ok(done, function() { | ||
notify('test', 'bot'); | ||
client.query('SELECT pg_sleep(.05)', ok(done, function() { | ||
check(); | ||
})); | ||
notify('test', 'bot'); | ||
})); | ||
}); | ||
after(function(done) { | ||
this.client.end(done); | ||
}); | ||
}); |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
24669
793
6