Socket
Socket
Sign inDemoInstall

pg-native

Package Overview
Dependencies
Maintainers
1
Versions
37
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

pg-native - npm Package Compare versions

Comparing version 0.3.0 to 0.4.0

bench/leaks.js

136

index.js
var Libpq = require('libpq');
var consumeResults = require('./lib/consume-results');
var EventEmitter = require('events').EventEmitter;

@@ -17,13 +16,9 @@ var util = require('util');

this.types = types;
this._reading = false;
this._read = this._read.bind(this);
var self = this;
this.on('newListener', function(event) {
self.pq.startReader();
self.pq.once('readable', function() {
self.pq.consumeInput();
var notice;
while(notice = self.pq.notifies()) {
self.emit('notification', notice);
}
});
});
if(event != 'notification') return;
self._startReading();
})
};

@@ -42,3 +37,3 @@

Client.prototype.end = function(cb) {
this.pq.stopReader();
this._stopReading();
this.pq.finish();

@@ -48,2 +43,68 @@ if(cb) setImmediate(cb);

Client.prototype._readError = function(message) {
this._stopReading();
var err = new Error(message || this.pq.errorMessage());
this.emit('error', err);
};
Client.prototype._stopReading = function() {
this._reading = false;
this.pq.stopReader();
this.pq.removeListener('readable', this._read);
};
//called when libpq is readable
Client.prototype._read = function() {
var pq = this.pq;
//read waiting data from the socket
//e.g. clear the pending 'select'
if(!pq.consumeInput()) {
return this._readError();
}
//check if there is still outstanding data
//if so, wait for it all to come in
if(pq.isBusy()) {
return;
}
//load our result object
pq.getResult();
//"read until results return null"
//or in our case ensure we only have one result
if(pq.getResult() && pq.resultStatus() != 'PGRES_COPY_OUT') {
return this._readError('Only one result at a time is accepted:' + pq.resultStatus());
}
var status = pq.resultStatus();
switch(status) {
case 'PGRES_FATAL_ERROR':
return this._readError();
case 'PGRES_COMMAND_OK':
case 'PGRES_TUPLES_OK':
case 'PGRES_COPY_OUT': {
this.emit('result');
break;
}
default:
return this._readError('unrecognized cmmand status: ' + status);
}
var notice;
while(notice = this.pq.notifies()) {
this.emit('notification', notice);
}
};
//ensures the client is reading and
//everything is set up for async io
Client.prototype._startReading = function() {
if(this._reading) return;
this._reading = true;
this.pq.setNonBlocking(true);
this.pq.startReader();
this.pq.on('readable', this._read);
};
var throwIfError = function(pq) {

@@ -79,2 +140,21 @@ var err = pq.resultErrorMessage() || pq.errorMessage();

Client.prototype._awaitResult = function(cb) {
this._startReading();
var self = this;
var onError = function(e) {
self.removeListener('error', onError);
self.removeListener('result', onResult);
cb(e);
};
var onResult = function() {
self.removeListener('error', onError);
self.removeListener('result', onResult);
cb(null);
};
this.once('error', onError);
this.once('result', onResult);
}
//wait for the writable socket to drain
var waitForDrain = function(pq, cb) {

@@ -89,2 +169,4 @@ var res = pq.flush();

//send an async query to libpq and wait for it to
//finish writing query text to the socket
var dispatchQuery = function(pq, fn, cb) {

@@ -94,3 +176,3 @@ var success = pq.setNonBlocking(true);

var sent = fn();
if(!sent) return cb(new Error(pq.errorMessage()));
if(!sent) return cb(new Error(pq.errorMessage() || 'Something went wrong dispatching the query'));
return waitForDrain(pq, cb);

@@ -105,10 +187,11 @@ };

cb = values;
queryFn = pq.sendQuery.bind(pq, text);
queryFn = function() { return pq.sendQuery(text); };
} else {
queryFn = pq.sendQueryParams.bind(pq, text, values);
queryFn = function() { return pq.sendQueryParams(text, values); };
}
var self = this
dispatchQuery(pq, queryFn, function(err) {
if(err) return cb(err);
consumeResults(pq, function(err) {
self._awaitResult(function(err) {
return cb(err, err ? null : mapResults(pq, types));

@@ -120,7 +203,10 @@ });

Client.prototype.prepare = function(statementName, text, nParams, cb) {
var self = this;
var pq = this.pq;
var fn = pq.sendPrepare.bind(pq, statementName, text, nParams);
var fn = function() {
return pq.sendPrepare(statementName, text, nParams);
}
dispatchQuery(pq, fn, function(err) {
if(err) return cb(err);
consumeResults(pq, cb);
self._awaitResult(cb);
});

@@ -131,2 +217,3 @@ };

var pq = this.pq;
var self = this;
var types = this.types;

@@ -136,3 +223,3 @@ var fn = pq.sendQueryPrepared.bind(pq, statementName, parameters);

if(err) return cb(err);
consumeResults(pq, function(err) {
self._awaitResult(function(err) {
return cb(err, err ? null : mapResults(pq, types));

@@ -143,14 +230,9 @@ });

var CopyFromStream = require('./lib/copy-from-stream');
Client.prototype.getCopyFromStream = function() {
var CopyStream = require('./lib/copy-stream');
Client.prototype.getCopyStream = function() {
this.pq.setNonBlocking(true);
return new CopyFromStream(this.pq);
this._stopReading();
return new CopyStream(this.pq);
};
var CopyToStream = require('./lib/copy-to-stream');
Client.prototype.getCopyToStream = function() {
this.pq.setNonBlocking(true);
return new CopyToStream(this.pq);
};
Client.prototype.querySync = function(text, values) {

@@ -157,0 +239,0 @@ var queryFn;

5

package.json
{
"name": "pg-native",
"version": "0.3.0",
"version": "0.4.0",
"description": "A slightly nicer interface to Postgres over node-libpq",

@@ -34,4 +34,5 @@ "main": "index.js",

"mocha": "^1.21.4",
"okay": "^0.3.0"
"okay": "^0.3.0",
"pg": "^3.4.2"
}
}

@@ -18,3 +18,3 @@ var assert = require('assert');

this.client.querySync('COPY blah FROM stdin');
var stream = this.client.getCopyFromStream();
var stream = this.client.getCopyStream();
stream.write(Buffer('Brian\t32\n', 'utf8'));

@@ -37,3 +37,3 @@ stream.write(Buffer('Aaron\t30\n', 'utf8'));

this.client.querySync('COPY boom FROM stdin');
var stream = this.client.getCopyFromStream();
var stream = this.client.getCopyStream();
stream.write(Buffer('Brian\t32\n', 'utf8'));

@@ -40,0 +40,0 @@ stream.write(Buffer('Aaron\t30\n', 'utf8'), function() {

@@ -22,3 +22,3 @@ var assert = require('assert');

if(err) return done(err);
var stream = self.client.getCopyToStream();
var stream = self.client.getCopyStream();
stream.pipe(concat(function(buff) {

@@ -25,0 +25,0 @@ var res = buff.toString('utf8')

var Client = require('../')
var ok = require('okay')
describe('LISTEN/NOTIFY', function() {
var notify = function(channel, payload) {
var client = new Client();
client.connectSync();
client.querySync("NOTIFY " + channel + ", '" + payload + "'");
client.end();
};
describe('simple LISTEN/NOTIFY', function() {
before(function(done) {

@@ -9,10 +17,3 @@ var client = this.client = new Client();

var notify = function(channel, payload) {
var client = new Client();
client.connectSync();
client.querySync("NOTIFY " + channel + ", '" + payload + "'");
client.end();
};
it('works in a simple case', function(done) {
it('works', function(done) {
var client = this.client;

@@ -30,1 +31,29 @@ client.querySync('LISTEN boom');

});
describe('async LISTEN/NOTIFY', function() {
before(function(done) {
var client = this.client = new Client();
client.connect(done);
});
it('works', function(done) {
var client = this.client;
var count = 0;
var check = function() {
count++;
if(count >= 3) return done();
}
client.on('notification', check);
client.query('LISTEN test', ok(done, function() {
notify('test', 'bot');
client.query('SELECT pg_sleep(.05)', ok(done, function() {
check();
}));
notify('test', 'bot');
}));
});
after(function(done) {
this.client.end(done);
});
});

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc