Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

pure-live-pg

Package Overview
Dependencies
Maintainers
1
Versions
57
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

pure-live-pg - npm Package Compare versions

Comparing version 1.0.4 to 1.0.5

132

lib/pg-base-table.js

@@ -8,9 +8,13 @@ 'use strict';

const md5 = require('md5');
const async = require('async');
const _ = require('lodash');
class PGBaseTable {
constructor(dbInstance, baseName, options) {
this._baseName = baseName;
constructor(dbInstance, baseName, options, callback) {
callback = callback || function(){};
this._database = dbInstance;
this._tableDependencies = [baseName];
this._schemaName = baseName.indexOf('.') > -1 ? baseName.split('.')[0] : this._database.defaultSchema;
this._baseName = baseName.indexOf('.') > -1 ? baseName.split('.')[1] : baseName;
this._tableDependencies = [this._schemaName + '.' + this._baseName];
this._alias = options && (options.as || options.alias);

@@ -22,34 +26,8 @@

this._options._removeUnknownCols = options && (options.removeUnknownCols === true || options.removeUnknownCols === false) ? options.removeUnknownCols : true;
this._options._idGenerator = (doc) => {
// generate the _id for the document
// for this - by default - we check the primaryKey cols
// and return the value of this column(s). If the pk contains on
// more then one column we concat each and return a hashed value
var _id = '';
if (this._pkColumns){
for (var i=0, max=this._pkColumns.length; i<max; i++){
_id += doc[this._pkColumns[i]] && doc[this._pkColumns[i]].toString() || '';
}
} else {
_id = doc.id || doc.aid;
}
if (!_id || _id.length == 0){
throw new Error('Invalid _id: Can\'t create an _id column using the default idGenerator.');
}
this._getColumnInformations(this._baseName, (error, columnInfos) => {
if (error) return callback(error);
return md5(_id);
};
if (options && _.isFunction(options.idGenerator)) {
this._options._idGenerator = options.idGenerator;
}
this._columnInfos = this._getColumnInformations(this._baseName);
this._pkColumns = null; // set by the mysql-table constructor
this._initNotifyTriggerSync = new Synchron(function(dbInstance, tableName){
dbInstance._initNotifyTrigger(tableName, (error, result) => {
if (error) return this.throw(error);
return this.done();
});
this._columnInfos = columnInfos;
return callback();
});

@@ -72,25 +50,48 @@ }

_getColumnInformations(baseName){
let connection = this._database.getConnectionSync();
let sql=`SELECT
column_name AS columnname,
is_nullable AS isnullable,
data_type AS datatype,
character_maximum_length AS charactermaximumlength
FROM
information_schema.columns
WHERE
table_name = $1
AND table_catalog = current_database();`;
_getColumnInformations(baseName, callback) {
var tableName = baseName,
schemaName = this._database.defaultSchema || 'public';
var result = this._database.querySync(connection, sql, [baseName]);
this._database.releaseConnection(connection);
if (baseName.indexOf('.') > -1) {
var schemaAndTable = baseName.split('.')[1];
schemaName = schemaAndTable[0];
tableName = schemaAndTable[1];
}
// transform the columns array into an object
var transformedResult = {};
for (var i=0, max=result.rows.length; i < max; i++){
var column = result.rows[i];
transformedResult[column.columnname] = column;
}
return transformedResult;
async.waterfall([
(callback) => {
this._database.getConnection( (error, connection) => {
callback(error, connection);
});
},
(connection, callback) => {
let sql=`SELECT
column_name AS columnname,
is_nullable AS isnullable,
data_type AS datatype,
character_maximum_length AS charactermaximumlength
FROM
information_schema.columns
WHERE
table_name = $1
AND table_schema = $2
AND table_catalog = current_database();`;
this._database.query(connection, sql, [tableName, schemaName], (error, result) => {
this._database.releaseConnection(connection);
if (error) return callback(error);
// transform the columns array into an object
var transformedResult = {};
for (var i=0, max=result.rows.length; i < max; i++){
var column = result.rows[i];
transformedResult[column.columnname] = column;
}
return callback(null, transformedResult);
});
}
], (error, result) => {
callback(error, result);
});
}

@@ -100,3 +101,3 @@

var queryObject = {
tableOrViewName: this._baseName,
tableOrViewName: this._schemaName + '.' + this._baseName,
tableDependencies: this._tableDependencies,

@@ -108,15 +109,8 @@ selector: selector,

// cache the query for a later rerun done by the the notification events
this._database._queryCache[query.queryId] = query;
// register the query for each related table it depends on
_.forEach(query._queryObject.tableDependencies, (tableName) => {
if (!this._database._queriesByTable[tableName]) {
this._database._queriesByTable[tableName] = [];
}
this._database._queriesByTable[tableName].push(query.queryId);
});
this._database._cacheQuery(query);
return query;
}
select(selector, options) {
select(selector, options, callback) {
var query = new PGQuery(this, {

@@ -128,6 +122,8 @@ tableOrViewName: this._baseName,

var result = query.run();
query.destroy();
query.run((error, result) => {
query.destroy();
query = null;
return result;
return callback(error, result);
});
}

@@ -134,0 +130,0 @@ }

@@ -13,18 +13,53 @@ 'use strict';

let dbCounter = 0;
class PGDatabase {
constructor(options) {
this._queryCount = 0;
this.id = dbCounter++;
this._pool = null;
this._client = null;
this._connected = false;
this.defaultSchema = 'public';
this.tableSpace = 'pg_default';
this._queryPreparators = [];
// cache each query Object itself, the prop is equal to queryId
this._queryCache = {};
// cache each queryId by Table to perform a rerun on change
// example:
// _queriesByTable = {
// "core_users": [12, 34, 78, 4542], --> Id's that reference to the queryCache
// "core_apps": [32, 38, 2378, 23242] --> Id's that reference to the queryCache
// "public.people": [12, 34, 78, 4542, ...], --> Id's that reference to the queryCache
// "public.hobbies": [32, 38, 2378, 23242, ...] --> Id's that reference to the queryCache
// }
this._queriesByTable = {};
// cache each queryId by Client and Table to perform a rerun on an reactive change event
//
// example:
// _queriesByClient = {
// "xf65as67": {
// "queries": [2, 17, 45],
// "tables" : {
// "public.people": [17, 45],
// "public.hobbies": [2]
// }
// }
// "a6a5axd3": {
// ...
// }
// }
this._queriesByClientTable = {};
// hash table with all currently running insert, update and delete's called by
// the table-object to wait for the notify-message to be done.
// This is used for an optimistic ui because if there are reactive queries for a table where
// the insert, update or delete is performed we have to wait for reactivity is done
this._runningStatements = {};
this._staledQueries = [];
// client-connection to listen on notifications

@@ -38,7 +73,7 @@ this._listnerClient = null;

// cache all tablenames that have already a trigger installed
// cache all tablenames that have already the trigger to notify changes
// example:
// this._installedNotifyTriggers = {
// core_users: true,
// core_roles: true
// "public.people": true,
// "public.hobbies": true
// };

@@ -52,8 +87,28 @@ this._installedNotifyTriggers = {};

var triggerName = tableName + '_notify',
sql = `DROP TRIGGER IF EXISTS "${triggerName}" ON "${tableName}";
// maybe there was a schema explicit defined
// so replace the tablename's <schema>.<tablename> with an underscore
// --> tiggername like then: public_mytable_notify, where 'public' is the schema
var schemaAndTable = tableName.indexOf('.') > -1 ? tableName : this.defaultSchema + '.' + tableName,
triggerName = schemaAndTable.replace('.', '_') + '_notify',
triggerNameStmt = schemaAndTable.replace('.', '_') + '_notify_stmt',
schemaAndTable = schemaAndTable.replace('.', '"."');
CREATE TRIGGER "${triggerName}" AFTER INSERT OR UPDATE OR DELETE ON "${tableName}"
FOR EACH ROW EXECUTE PROCEDURE core_notify_changes();`;
var sql = `DROP TRIGGER IF EXISTS "${triggerName}" ON "${schemaAndTable}";
CREATE TRIGGER "${triggerName}" AFTER INSERT OR UPDATE OR DELETE ON "${schemaAndTable}"
FOR EACH STATEMENT EXECUTE PROCEDURE core_notify_changes();
-- DROP TRIGGER IF EXISTS "${triggerNameStmt}_insert" ON "${schemaAndTable}";
-- DROP TRIGGER IF EXISTS "${triggerNameStmt}_update" ON "${schemaAndTable}";
-- CREATE TRIGGER "${triggerNameStmt}_insert" AFTER INSERT ON "${schemaAndTable}"
-- REFERENCING NEW TABLE AS new_table
-- FOR EACH STATEMENT EXECUTE PROCEDURE core_notify_changes_stmt();
-- CREATE TRIGGER "${triggerNameStmt}_update" AFTER UPDATE ON "${schemaAndTable}"
-- REFERENCING OLD TABLE AS old_table NEW TABLE AS new_table
-- FOR EACH STATEMENT EXECUTE PROCEDURE core_notify_changes_stmt();`;
this.getConnection((error, connection) => {

@@ -65,3 +120,2 @@ if (error) return callback(error);

if (!error){

@@ -78,3 +132,3 @@ this._installedNotifyTriggers[tableName] = true;

var sql = `
CREATE TABLE IF NOT EXISTS "public"."core_reactive"
CREATE TABLE IF NOT EXISTS "${this.defaultSchema}"."core_reactive"
(

@@ -90,6 +144,27 @@ query_id BIGINT NOT NULL,

)
TABLESPACE pg_default;
TABLESPACE ${this.tableSpace};
TRUNCATE "public"."core_reactive";`;
TRUNCATE "${this.defaultSchema}"."core_reactive";
/* wird aktuell nicht benötig! läuft alles über den EACH STATEMENt TRIGGER
*
CREATE SEQUENCE IF NOT EXISTS "${this.defaultSchema}".core_notifications_id_seq
INCREMENT 1
START 1
MINVALUE 1
MAXVALUE 9223372036854775807
CACHE 1;
CREATE TABLE IF NOT EXISTS "${this.defaultSchema}".core_notifications
(
_id bigint NOT NULL DEFAULT nextval('core_notifications_id_seq'::regclass),
details json NOT NULL,
CONSTRAINT core_notifications_pkey PRIMARY KEY (_id)
)
WITH (
OIDS = FALSE
)
TABLESPACE ${this.tableSpace};
*/`;
this.getConnection((error, connection) => {

@@ -108,7 +183,10 @@ if (error) return callback(error);

CREATE OR REPLACE FUNCTION core_notify_changes() RETURNS TRIGGER AS $$
DECLARE
olddata json;
newdata json;
notification json;
notify_id INTEGER;
/* unused --------------------> DECLARE
id VARCHAR(45);
optimistic_id VARCHAR(45);
olddata json;
newdata json;
notification json;
notify_id INTEGER;
--------------------------------> end unused */

@@ -119,5 +197,11 @@ BEGIN

-- Action = INSERT or UPDATE? -> NEW row
/* -----------------------------> Aktuell nicht genuzt
IF (TG_OP = 'DELETE') THEN
olddata = row_to_json(OLD);
id = OLD._id;
optimistic_id = OLD._optimistic_id;
ELSE
id = NEW._id;
optimistic_id = NEW._optimistic_id;
newdata = row_to_json(NEW);

@@ -144,4 +228,8 @@ IF (TG_OP = 'UPDATE') THEN

json_build_object(
'trigger', 'EACH ROW',
'schema', TG_TABLE_SCHEMA,
'table', TG_TABLE_NAME,
'action', TG_OP,
'_id', id,
'optimistic_id', optimistic_id,
'notify_id', notify_id

@@ -151,2 +239,17 @@ )::text

---------> Ende */
PERFORM pg_notify(
'core_reactive_event',
json_build_object(
'type', 'STATEMENT',
'schema', TG_TABLE_SCHEMA,
'table', TG_TABLE_NAME,
'action', TG_OP,
'client_id', current_setting('core_settings.client_id')::text,
'statement_id', current_setting('core_settings.statement_id')::text
)::text
);
-- Result is ignored since this is an AFTER trigger

@@ -156,4 +259,24 @@ RETURN NULL;

$$ LANGUAGE plpgsql;`;
$$ LANGUAGE plpgsql;
/* ------------------------------------------------------------------------------> currently unused
CREATE OR REPLACE FUNCTION core_notify_changes_stmt() RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify(
'core_reactive_event',
json_build_object(
'trigger', 'STATEMENT',
'schema', TG_TABLE_SCHEMA,
'table', TG_TABLE_NAME,
'action', TG_OP
)::text
);
-- Result is ignored since this is an AFTER trigger
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
----------------------------------------------- unused end */`;
this.getConnection((error, connection) => {

@@ -169,2 +292,59 @@ if (error) return callback(error);

_resolveTableName(tableName) {
// the tableName is always used with the schema
// like schema.table
// if there was no schema define we use the default schema
if (tableName.indexOf('.') == -1){
return this.defaultSchema + '.' + tableName;
}
// the schema was already defined.
return tableName;
}
_cacheQuery(query){
// cache the query for a later rerun done by the the notification events
this._queryCache[query.queryId] = query;
// register the query for each related table it depends on
_.forEach(query._queryObject.tableDependencies, (tableName) => {
tableName = this._resolveTableName(tableName);
if (!this._queriesByTable[tableName]) {
this._queriesByTable[tableName] = [];
}
this._queriesByTable[tableName].push({
queryId: query.queryId,
clientId: query._queryObject.options.clientId
});
});
}
_unCacheQuery(queryId, tableDependencies){
// remove the references from the _queriesByTable
_.forEach(tableDependencies, (tableName) => {
tableName = this._resolveTableName(tableName);
if (this._queriesByTable[tableName]){
let getArrIndex = (tableName, queryId) => {
let arr = this._queriesByTable[tableName];
for (var i=0, max=arr.length; i<max; i++){
if (arr[i].queryId == queryId)
return i;
}
return -1;
}
var index = getArrIndex(tableName, queryId);
if (index > -1) {
this._queriesByTable[tableName].splice(index, 1);
}
//this._arrayIndexthis._queriesByTable[tableName].indexOf(query._queryId);
}
});
this._queryCache[queryId] = null;
//console.log('uncache/delete', queryId, tableDependencies);
delete this._queryCache[queryId];
}
_onNotify(notification){

@@ -175,24 +355,75 @@ if (notification.channel !== 'core_reactive_event') return;

var data = JSON.parse(notification.payload);
var queryIds = this._queriesByTable[data.table];
var queryRunners = queryIds.map(id => {
var self = this,
queryId = id;
return function(callback) {
try {
self._queryCache[queryId].reRun();
} catch(e) {
return callback(e);
}
return callback();
//console.log(data.trigger, data.action);
//console.log(data.statement_id + ' - ' + data.client_id + ' - ' + data.schema + '.' + data.table);
//console.log(this._queriesByTable);
//console.log('onNotify:', data);
var relatedTable = data.schema + '.' + data.table;
// get all queries for the related table
var firstQueries = [];
var secondQueries = [];
var max = (this._queriesByTable[relatedTable] && this._queriesByTable[relatedTable].length) || -1;
//console.log("max relatedTable:", max);
for (var i=0; i<max; i++){
var q = this._queriesByTable[relatedTable][i];
if (q.clientId && q.clientId == data.client_id) {
firstQueries.push(q);
} else {
secondQueries.push(q);
}
}
// check if there are currently reactive queries registered
// if not, still exit!
if (firstQueries.length == 0) {
if (this._runningStatements[data.statement_id]){
delete this._runningStatements[data.statement_id];
}
}
// excute all prior/first queries in parallel
async.each(firstQueries, (q, callback) => {
this._queryCache[q.queryId].reRun(callback);
}, (error)=>{
if (error){
console.log('FIRST ERROR!!!!!!!!!!!!', error);
}
if (this._runningStatements[data.statement_id]){
delete this._runningStatements[data.statement_id];
}
for(var i=0, max=secondQueries.length; i<max; i++){
var qid = secondQueries[i].queryId;
if (this._staledQueries.indexOf(qid) == -1)
this._staledQueries.push(qid);
}
});
}
async.parallel(queryRunners, (error) => {
//console.log(error);
})
_initStaledQueriesInterval(){
this._staledRunning = false;
var staledInterval = setInterval(()=>{
if (this._staledRunning) return;
/*_.forEach(queryIds, (queryId) => {
this._queryCache[queryId].reRun();
});*/
this._staledRunning=true;
var staledQueries = [];
while (this._staledQueries.length > 0 && staledQueries.length < 200) {
staledQueries.push(this._staledQueries.shift());
}
async.eachLimit(staledQueries, 50, (qid, callback) => {
if (this._queryCache[qid]) {
this._queryCache[qid].reRun(callback);
}
}, (error)=>{
// still ignore errors
if (error){
console.log('RUN STALED ERROR!!!!!!!!!!!!', error);
}
//console.log('Finished staledQueries', staledQueries.length);
this._staledRunning = false;
});
}, 10);
}

@@ -237,3 +468,8 @@

}
], function (error) {
], (error) => {
if (!error) this._connected = true;
// enable rerunStaled Queries
this._initStaledQueriesInterval();
return callback(error);

@@ -257,2 +493,4 @@ });

end(){
if (!this._connected) return;
if (!this._options.enablePooling) {

@@ -263,89 +501,41 @@ this._client.end();

}
this._listnerClient.query('UNLISTEN core_reactive_event', (error, result) => {
this._listnerClient.end();
});
}
/**
* @method Table
* @memberOf PGDatabase
* @locus Server
*
* @summary Creates a new instance of a PGTable object.
*
* @param {String} tableName
* Name of the table in the postgreSQL database.
* @param {Object} [options]
* Specifies the options for the new table instances. Details see [new PGTable()](Api-PGTable.html#PGTable).
*/
Table(tableName, options) {
return new PGTable(this, tableName, options);
Table(tableName, options, callback) {
var __callback = callback,
__options = options
if (!_.isString(tableName)) {
throw new Error('First argument "tableName" must be type of string.');
}
if (!_.isFunction(callback) && _.isFunction(options)) {
// no options supported
__callback = options;
__options = {};
}
return new PGTable(this, tableName, __options, __callback);
}
/**
* @method View
* @memberOf PGDatabase
* @locus Anywhere
*
* @summary Creates a new instance of a PGView object.
*
* @param {String} tableName
* Name of the view in the postgreSQL database.
* @param {Object} [options]
* | Option | Type | Description |
* |---------------------|----------|-------------------------------------------------------------------------------|
* | `alias` | String | Defines an alias for this table. |
* | `as` | String | See alias |
* | `idGenerator` | Function | **Server-side only** A callback function to define an `_id` column if the table does not provide a column named `_id` or `id`. |
* | `removeUnknownCols` | Boolean | **Server-side only** If `true` this option will remove all unknown columns used within an object before trying to insert or update a record. |
*
*/
View(viewName, options){
return new PGView(this, viewName, options);
View(viewName, options, callback){
var __callback = callback,
__options = options
if (!_.isString(viewName)) {
throw new Error('First argument "viewName" must be type of string.');
}
if (!_.isFunction(callback) && _.isFunction(options)) {
// no options supported
__callback = options;
__options = {};
}
return new PGView(this, viewName, __options, __callback);
}
/**
* @method queryPreparations
* @memberOf PGDatabase
* @locus Server
*
* @summary Registers a new callback-function that will be called each time a query will be executed.
*
* This will be helpful if you have todo some generall preparations before executing the target query.
*
* @after
* # Using Query-Preparations
*
* In our project we are working with a huge count of views. Most of them should be executed in the context of the current user.
* Here we have the possibility to set a session variable @currentUser that will be consumed from most of the views.
*
* To avoid having to set this variable again with each query, and especially to think about it, there is a central way to do this.
*
* **Example**
*
* ```javascript
* import { PGDatabase } from 'meteor/planetarydev:mysql';
*
* export const MyDb = new PGDatabase({
* enablePooling: true
* });
*
* // connect to the database
* [...]
*
* // declare the query-preparator
* MyDb.queryPreparations(function(connection, queryObjToRun, currentInvocation) {
* if (currentInvocation) {
* var userId = currentInvocation && currentInvocation.userId;
* // the function runs in the context of the PGDatabase object, so we can use this to access each method
* this.querySync(connection, 'SET @currentUser = ?;', [userId]);
* }
* });
* ```
*
* @param {Function} callback Function to call each time before the query will be executed. `function(connection, queryObject, currentInvocation) { ... }`
* | Parameter | Type | Description |
* |---------------------|----------|-------------------------------------------------------------------------------|
* | `connection` | Object | Specifies the connection for the query to be executed on. |
* | `queryObject` | Object | Specifies the query object that will be executed after the function returns. |
* | `currentInvocation` | Array | Specifies the Meteors method- or publication-context that gives access to the `userId` and some other stuff. |
*
*/
queryPreparations(preperatorCallback){

@@ -379,43 +569,2 @@ this._queryPreparators.push(preperatorCallback);

/**
* @method getConnectionSync
* @memberOf PGDatabase
* @locus Server
*
* @summary Returns a new connection depending on if you are using pooling or not.
*
* @return {Object} Connection to use for your next query operations.
*
* @after
* # Connections
*
* To execute one or more database operations like `querySync` you need a valid connection.
* The `getConnection` and `getConnectionSync` will give you a connection depending on the option if you are using pooling or not.
* If you are not using connection pooling you will receive the current established connection.
*
* > **Important**
* >
* > After you finished work be sure that you release the connection you have received.
*
* ```javascript
* import { PGDatabase } from 'meteor/planetarydev:mysql';
*
* export const MyDb = new PGDatabase({
* enablePooling: true
* });
*
* // connect to the database
* [...]
*
* // get a new or the established connection
* var connection = MyDb.getConnectionSync();
*
* // do some query
* MyDb.querySync(connection, 'UPDATE mytable SET mycol = 1 WHERE id = 2');
* MyDb.querySync(connection, 'DELETE FROM mytable WHERE id = 3');
*
* // this is the important thing - release the connection after you have finished work
* MyDb.releaseConnection(connection);
* ```
*/
getConnectionSync(){

@@ -428,11 +577,2 @@ if (!this._options.enablePooling) {

/**
* @method _getConnectionFromPoolSync
* @memberOf PGDatabase
* @locus Server
*
* @summary Returns a new connection from the pool.
*
* @return {Object} Connection to use for your next query operations.
*/
_getConnectionFromPoolSync(){

@@ -486,3 +626,3 @@ var self = this;

*/
wpQuerySync(connection, sql, values){
/*wpQuerySync(connection, sql, values){
// call each global query-preparator

@@ -501,3 +641,3 @@ var queryObj = {

return this.querySync(connection, queryObj.sql, queryObj.values);
}
}*/

@@ -521,2 +661,3 @@ /**

};
for (var i=0, max=this._queryPreparators.length; i<max; i++){

@@ -529,3 +670,3 @@ var preperator = this._queryPreparators[i];

this.query(connection, sql, values, callback);
this.query(connection, queryObj.sql, queryObj.values, callback);
//connection.query(sql, values, callback);

@@ -546,4 +687,8 @@ }

query(connection, sql, values, callback) {
this._queryCount++;
//console.log(sql, values);
connection.query(sql, values, callback);
connection.query(sql, values, (error, result)=>{
if (error) console.log('onQuery ERROR:', error);
return callback(error, result);
});
}

@@ -550,0 +695,0 @@

@@ -5,2 +5,3 @@ 'use strict';

const sqlBuilder = new SQLBuilder('postgreSQL');
const async = require('async');
const md5 = require('md5');

@@ -43,3 +44,5 @@ const EventEmitter = require('events');

run(){
run(callback){
callback = callback || function(){};
let query = {

@@ -53,3 +56,3 @@ preparation: true,

// override the basic query-options, they couldnt modified by the user
// override the basic query-options, they could'nt modified by the user
query = {

@@ -112,9 +115,48 @@ $select: {

var results = this.executeQuery(this._queryObject.invocation); // invocation is only available in the MySQLReactiveQuery, else it's undefined
return results;
this.executeQuery(false, callback);
}
executeQuery(reRun){
var connection = this._database.getConnectionSync();
executeQuery(reRun, callback){
async.waterfall([
(callback) => {
this._database.getConnection((error, connection) => {
return callback(error, connection);
});
},
(connection, callback) => {
var query = { sql: this._stmt.sql, values: this._stmt.values.slice() };
var queryFn;
if (reRun && this._lazy.enabled) {
// if we are on a rerun and lazyload option is active
// we have to rest the offset with 0, otherwise if there
// are changes made on records before the offset the client subscription
// will not be updated
if (this._sql.indexOf(' offset $') > -1) { // check explict for offset, because on the firstrun we only have the limit
// offset is always the last parameter value
// overwrite the current offset with 0 to check all records loaded before
query.values[query.values.length - 1 /*offset*/] = 0;
// set the limit of records to the current offset
// because we need all records till the end of the currentOffset
query.values[query.values.length - 2 /*limit*/] = this._parameters[query.values.length - 1 /*offset*/];
}
}
if (this._queryObject.options.preparation) {
queryFn = this._database.wpQuery;
} else {
queryFn = this._database.query;
}
queryFn.call(this._database, connection, query.sql, query.values, (error, result) => {
this._database.releaseConnection(connection);
return callback(error, result);
});
}
], (error, result) => {
callback(error, result);
});
/*var connection = this._database.getConnectionSync();
var query = { sql: this._stmt.sql, values: this._stmt.values.slice() };

@@ -130,6 +172,6 @@

// overwrite the current offset with 0 to check all records loaded before
query.values[query.values.length - 1 /*offset*/] = 0;
query.values[query.values.length - 1 ] = 0;
// set the limit of records to the current offset
// because we need all records till the end of the currentOffset
query.values[query.values.length - 2 /*limit*/] = this._parameters[query.values.length - 1 /*offset*/];
query.values[query.values.length - 2 ] = this._parameters[query.values.length - 1 ];
}

@@ -149,5 +191,5 @@ }

this._database.releaseConnection(connection);
return results;
return results;*/
}
}
module.exports.PGQuery = PGQuery;

@@ -56,3 +56,5 @@ 'use strict';

DELETE FROM core_reactive
WHERE NOT EXISTS (
WHERE
core_reactive.query_id = @@QueryId
AND NOT EXISTS (
SELECT 1 FROM hashed_query

@@ -99,5 +101,3 @@ WHERE

//this._publishedIds = null; // cache the _id's from the result to use on binlog-change
//this._oldPublishedIds = null;
this._queryId = queryIdCounter++;
this._salt = null;

@@ -126,20 +126,21 @@

this._bindedExecuteQuery = null;
this._queryId = queryIdCounter++;
}
destroy(done) {
var self = this;
self._stoped = true;
this._stoped = true;
var superDestroy = super.destroy.bind(this);
if (!_.isFunction(done)) {
done = function(){};
};
// delete the hashed reactive data from the core_reactive for this query
async.series([
function(callback){
self._database.getConnection(function(error, connection){
(callback) => {
this._database.getConnection((error, connection) => {
if (error) return callback(error);
connection.query('DELETE FROM core_reactive WHERE query_id = ' + self.queryId, [], function(error, result){
self._database.releaseConnection(connection);
// console.log('DELETE FROM core_reactive WHERE query_id = $1', self.queryId);
this._database.query(connection, 'DELETE FROM core_reactive WHERE query_id = $1', [this._queryId], (error, result) => {
this._database.releaseConnection(connection);

@@ -151,6 +152,7 @@ if (error) return callback(error);

},
function(callback){
(callback) => {
// delete the published Info documents
var infoCollection = self._getInfoCollection();
/*var infoCollection = self._getInfoCollection();
if (self._pagination.published) {

@@ -164,13 +166,7 @@ var docId = 'P' + self._queryObject.invocation._subscriptionId;

}
*/
// remove the query from the query-cache
self._database._queryCache[self._queryId] = null;
delete self._database._queryCache[self._queryId];
// remove the references from the _queriesByTable
_.forEach(self._queryObject.tableDependencies, function(tableName){
var index = self._database._queriesByTable[tableName].indexOf(self._queryId);
self._database._queriesByTable[tableName].splice(index, 1);
});
this._database._unCacheQuery(this._queryId, this._queryObject.tableDependencies);
superDestroy();

@@ -180,2 +176,3 @@ return callback();

], function(error){
if (error) console.log('ERROR DESTROY:', error);
return done(error);

@@ -190,21 +187,19 @@ });

run(){
let query = {
preparation: true,
salt: true
},
run(callback) {
this._running = true;
callback = callback || function(){};
let query = {},
options = this._queryObject.options;
query.$select = _.extend(query.$select, options);
// override the basic query-options, they couldnt modified by the user
query = {
$select: {
$from: this._queryObject.tableOrViewName,
$where: this._queryObject.selector
}
};
query.$select.$from = this._queryObject.tableOrViewName;
query.$select.$where = this._queryObject.selector
query.$select = _.extend(query.$select, options);
// remove internal options that should not used for the query itself
delete query.$select.salt;
delete query.$select.preparation;
delete query.$select.clientId;

@@ -253,3 +248,3 @@ // the server-side pagination option exclude the use of offset and limit

let stmt = sqlBuilder.build(query);
//console.log(stmt.sql, stmt.values);
this._stmt = {

@@ -261,6 +256,9 @@ // after building the SQL Stamtent we have to inject this into the rective query template

this.executeQuery(false/*rerun*/);
this.executeQuery(false/*rerun*/, (error, result)=>{
this._running=false;
return callback(error, result);
});
}
executeQuery(reRun){
executeQuery(reRun, callback){
if (this._pagination.enabled){

@@ -276,72 +274,95 @@ this._pagination.ready = false;

var result = super.executeQuery(reRun);
//console.log('reactive.executeQuery', reRun);
super.executeQuery(reRun, (error, result) => {
if (error) return callback(error);
if (result.rowCount > 0) {
_.forEach(result.rows, (row) => {
//console.log('ROW:', row);
if (result.rowCount > 0) {
_.forEach(result.rows, (row) => {
if (row.action == 'added') {
return this._added(row._id, row.data);
}
if (row.action == 'added') {
return this._added(row._id, row.data);
}
if (row.action == 'changed') {
return this._changed(row._id, row.data);
}
if (row.action == 'changed') {
return this._changed(row._id, row.data);
}
if (row.action == 'removed') {
return this._removed(row._id);
}
});
}
if (row.action == 'removed') {
return this._removed(row._id);
this.ready();
/*
if (this._pagination.enabled) {
this.removeAllOldPublishedDocuments();
}
// publish the results only when not in lazy mode
// by using the lazy option we will always add the new docs
// BUT! remember the reRun -> in case of a reRun the lazyload option will get all records
if (reRun || !this._lazy.enabled){
this.publishQueryResults(results);
} else {
// using the lazyload option we only add new documents
if (!this._publishedIds) {
this._publishedIds = {};
}
});
}
this.ready();
this.iterateDocuments(results, (id, document) => {
// add new document
this._added(id, document);
});
this.clonePublishedIds();
this.ready();
}
*/
/*
if (this._pagination.enabled) {
this.removeAllOldPublishedDocuments();
}
if (this._pagination.enabled){
this._pagination.ready = true;
this.publishPaginationInfo();
}
// publish the results only when not in lazy mode
// by using the lazy option we will always add the new docs
// BUT! remember the reRun -> in case of a reRun the lazyload option will get all records
if (reRun || !this._lazy.enabled){
this.publishQueryResults(results);
} else {
// using the lazyload option we only add new documents
if (!this._publishedIds) {
this._publishedIds = {};
if (this._lazy.enabled){
this._lazy.ready = true;
this.publishLazyInfo();
}
this.iterateDocuments(results, (id, document) => {
// add new document
this._added(id, document);
});
this.clonePublishedIds();
this.ready();
}
*/
return callback();
});
}
if (this._pagination.enabled){
this._pagination.ready = true;
this.publishPaginationInfo();
reRun(callback) {
if (this._running) {
if (this._database._staledQueries.indexOf(this._queryId) == -1)
this._database._staledQueries.push(this._queryId);
return callback();
}
if (this._lazy.enabled){
this._lazy.ready = true;
this.publishLazyInfo();
}
}
/*if (this._running) {
var intv = setInterval(()=>{
if (!this._running){
clearInterval(intv);
reRun(eventName, tablename, rows, evt) {
if (this._stoped) return;
if (this.lazyLoading) return;
this._running = true;
this.executeQuery(true, (error, result) => {
this._running = false;
return callback(error, result);
});
}
}, 50);
//console.log('out');
//this._database._staledQueries.push(this._queryId);
return; // callback();
}*/
if (this._stoped) return callback();
if (this.lazyLoading) return callback();
this.executeQuery(true/*reRun*/);
//TODO: check for bindEnvironment and chage the executeQuery into an async function with a callback
//this._bindedExecuteQuery(this._queryObject.invocation, true/*reRun*/);
//var f = Fiber(()=>{
// this.executeQuery(this._queryObject.invocation, true/*reRun*/);
//}).run();
// from the docs --> https://www.npmjs.com/package/fibers#garbage-collection
//f = undefined;
this._running = true;
this.executeQuery(true/*reRun*/, (error, result) => {
this._running = false;
return callback(error, result);
});
}

@@ -372,2 +393,3 @@

/*
_addedOptimistic(id, doc){

@@ -390,3 +412,5 @@ // register the new id as a published document

}
*/
ready(){

@@ -393,0 +417,0 @@ this.emit('state', 'ready');

@@ -8,39 +8,17 @@ 'use strict';

const async = require('async');
const _ = require('lodash');
const LOOKUP_INTERVAL = 5;
class PGTable extends PGBaseTable {
constructor(database, tableName, options) {
super(database, tableName, options);
constructor(database, tableName, options, callback) {
super(database, tableName, options, (error) => {
database._initNotifyTrigger(this._schemaName + '.' + this._baseName, (error, result) => {
if (error) return callback(error);
this._pkColumns = this._getPkColumns(this._baseName);
this._initNotifyTriggerSync(this._database, this._baseName);
return callback();
});
});
}
_getPkColumns(baseName){
let connection = this._database.getConnectionSync();
let sql=`SELECT
string_agg(k.column_name, ',') AS pk_columns
FROM
INFORMATION_SCHEMA.TABLE_CONSTRAINTS t
LEFT JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE k
USING (CONSTRAINT_NAME, TABLE_SCHEMA, TABLE_NAME)
WHERE
t.constraint_type = 'PRIMARY KEY'
AND t.table_catalog = current_database()
AND t.table_name = $1
GROUP BY
k.ordinal_position
ORDER BY
k.ordinal_position;`;
var results = this._database.querySync(connection, sql, [baseName]);
this._database.releaseConnection(connection);
if (results.rowCount == 0){
return null;
} else {
return results.rows[0].pk_columns.split(',');
}
}
_removeUnknownColumns(doc){

@@ -61,2 +39,3 @@ var columns = Object.keys(doc),

/*
_insert(docOrDocs, optimistic, subscriptionId, callback){

@@ -115,3 +94,3 @@ let methodInvocation = DDP._CurrentMethodInvocation.get();

var results = this._insert(doc, true/*optimistic*/, subscriptionId, callback);
var results = this._insert(doc, true, subscriptionId, callback);
} catch (ex) {

@@ -126,31 +105,31 @@ if (published){

return results;
}
}*/
_execStmt(stmt, clientId, callback){
insert(docOrDocs, callback) {
return this._insert(docOrDocs, false/*optimistic*/, null/*subscriptionId*/, callback);
}
if (!_.isFunction(callback) && _.isFunction(clientId)) {
callback = clientId;
}
if (!_.isString(clientId)) {
// no client or session id suppoerted
// take a pseudo client-id, because postgreSQL
// will thrown an exeption by using current_config()
clientId = PGTable.NO_CLIENT_ID_SUPPORTED;
}
update(filter, document, callback){
callback = callback || function(){};
async.waterfall([
(callback) => {
let query = {
$update: {
$table: this._baseName,
$set: document,
$where: filter
}
};
let stmt = sqlBuilder.build(query);
return callback(null, stmt)
},
(stmt, callback) => {
this._database.getConnection((error, connection) => {
return callback(error, stmt, connection);
return callback(error, connection);
});
},
(stmt, connection, callback) => {
this._database.wpQuery(connection, stmt.sql, stmt.values, (error, results) => {
(connection, callback) => {
// set the unique id for this statement
var statementId = Math.random().toString(26).slice(2);
connection.query(`SELECT set_config('core_settings.client_id', $1, false);`, [clientId]);
connection.query(`SELECT set_config('core_settings.statement_id', $1, false);`, [statementId]);
if (clientId != PGTable.NO_CLIENT_ID_SUPPORTED) {
this._database._runningStatements[statementId] = true;
}
//console.log('RUN Action:', clientId);
this._database.wpQuery(connection, stmt.sql, stmt.values, (error, result) => {
this._database.releaseConnection(connection);

@@ -160,4 +139,24 @@

return callback(null, results);
return callback(null, result, statementId);
});
},
(result, statementId, callback) => {
//console.log('EXECUTED');
// waiting till clientId was received for the notify-listner and all reactive queries are performed
if (clientId != PGTable.NO_CLIENT_ID_SUPPORTED && result.rowCount > 0) {
var interv = setInterval(()=>{
if (!this._database._runningStatements[statementId]) {
clearInterval(interv);
//console.log('DONE Action:', clientId);
return callback(null, result);
}
}, LOOKUP_INTERVAL);
} else {
if (clientId != PGTable.NO_CLIENT_ID_SUPPORTED){
delete this._database._runningStatements[statementId];
}
//console.log('OUT');
return callback(null, result);
}
}

@@ -172,14 +171,42 @@ ],

remove(filter){
let methodInvocation = DDP._CurrentMethodInvocation.get();
insert(docOrDocs, clientId, callback) {
let query = {
type: 'delete',
table: this._baseName,
where: filter
$insert: {
$into: this._schemaName + '.' + this._baseName,
$documents: docOrDocs
}
};
let stmt = SQLBuilder.buildStatement(query);
return this._database.wpQuerySync(stmt.toString(), stmt.values, methodInvocation);
let stmt = sqlBuilder.build(query);
this._execStmt(stmt, clientId, callback);
}
update(filter, document, clientId, callback){
let query = {
$update: {
$table: this._schemaName + '.' + this._baseName,
$set: document,
$where: filter
}
};
let stmt = sqlBuilder.build(query);
this._execStmt(stmt, clientId, callback);
}
remove(filter, clientId, callback){
let query = {
$delete: {
$table: this._schemaName + '.' + this._baseName,
$where: filter
}
};
let stmt = sqlBuilder.build(query);
this._execStmt(stmt, clientId, callback);
}
}
PGTable.NO_CLIENT_ID_SUPPORTED = '$$NO-CLIENT-ID-SUPPORTED$$';
module.exports.PGTable = PGTable;
'use strict';
const PGBaseTable = require('./pg-base-table').PGBaseTable;
const async = require('async');
const _ = require('lodash');
class PGView extends PGBaseTable {
constructor(mysqlDatabase, viewName, options) {
super(mysqlDatabase, viewName, options);
constructor(database, viewName, options, callback) {
super(database, viewName, options, (error) => {
this._getViewDependencies(this._schemaName, this._baseName, (error, dependencies) => {
if (error) return callback(error);
this.setViewDependencies(this._baseName);
// after setting the view dependencies we have to check for the notify triggers
// that are relevant to listen for changes
_.forEach(this._tableDependencies, tableName => {
this._initNotifyTriggerSync(this._database, tableName);
this._tableDependencies = dependencies;
// after setting the view dependencies we have to check for the notify triggers
// that are relevant to listen for changes
async.each(this._tableDependencies, (tableName, callback) => { // the tablename includes always the schema done by getViewDependencies
database._initNotifyTrigger(tableName, (error, result) => {
if (error) return callback(error);
return callback();
});
}, callback);
});
});
}
setViewDependencies(viewName) {
let depTables = [],
connection = this._database.getConnectionSync();
_getViewDependencies(schemaName, viewName, callback) {
let depTables = [];
let getDepsFromInfoSchema = (connection, schemaName, objectName, callback) => {
var sql = '';
let getDepsFromInfoSchema = (objectName) => {
var sql = '';
sql += 'SELECT ';
sql += ' tab.TABLE_NAME AS table_name, ';
sql += ' tab.TABLE_TYPE AS table_type ';
sql += ' tab.TABLE_NAME AS table_name, ';
sql += ' tab.TABLE_SCHEMA AS table_schema, ';
sql += ' tab.TABLE_TYPE AS table_type ';
sql += 'FROM ';

@@ -37,6 +46,28 @@ sql += ' INFORMATION_SCHEMA.TABLES AS tab ';

sql += ' views.TABLE_NAME = $1 ';
sql += 'AND views.TABLE_SCHEMA = $2 ';
sql += 'AND tab.TABLE_CATALOG = current_database();';
var depResult = this._database.querySync(connection, sql, [objectName]);
this._database.query(connection, sql, [objectName, schemaName], (error, result) => {
if (error) return callback(error);
async.each(result.rows, (dep, callback) => {
if (dep.table_type == 'VIEW') {
getDepsFromInfoSchema(connection, dep.table_schema, dep.table_name, callback);
} else { // BASE TABLE
// add only if the table-dependency is unknown at this time
if (depTables.indexOf(dep.table_schema + '.' + dep.table_name) == -1) {
depTables.push(dep.table_schema + '.' + dep.table_name);
}
return callback();
}
}, (error) => {
if (error) return callback(error);
return callback(null, depTables);
});
});
/*
var depResult = this._database.querySync(connection, sql, [objectName, schemaName]);
for (var i=0, max=depResult.rows.length; i<max; i++){

@@ -54,12 +85,20 @@ var dep = depResult.rows[i];

}
}
}*/
};
getDepsFromInfoSchema(viewName);
this._database.releaseConnection(connection);
// overwrite the dependencies from viewName with the dependend tables
this._tableDependencies = depTables;
async.waterfall([
(callback) => {
this._database.getConnection((error, connection)=>{
return callback(error, connection);
});
},
(connection, callback) => {
getDepsFromInfoSchema(connection, this._schemaName, this._baseName, (error, tableDependencies) => {
this._database.releaseConnection(connection);
return callback(error, tableDependencies);
});
}
], callback);
}
}
module.exports.PGView = PGView;
{
"name": "pure-live-pg",
"version": "1.0.4",
"version": "1.0.5",
"description": "PostgreSQL package to support realtime applications",

@@ -21,3 +21,3 @@ "main": "index.js",

"fibers": "^2.0.0",
"json-sql-builder": "^1.0.7",
"json-sql-builder": "^1.0.8",
"lodash": "^4.17.4",

@@ -24,0 +24,0 @@ "md5": "^2.2.1",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc