pure-live-pg
Advanced tools
Comparing version 1.0.4 to 1.0.5
@@ -8,9 +8,13 @@ 'use strict'; | ||
const md5 = require('md5'); | ||
const async = require('async'); | ||
const _ = require('lodash'); | ||
class PGBaseTable { | ||
constructor(dbInstance, baseName, options) { | ||
this._baseName = baseName; | ||
constructor(dbInstance, baseName, options, callback) { | ||
callback = callback || function(){}; | ||
this._database = dbInstance; | ||
this._tableDependencies = [baseName]; | ||
this._schemaName = baseName.indexOf('.') > -1 ? baseName.split('.')[0] : this._database.defaultSchema; | ||
this._baseName = baseName.indexOf('.') > -1 ? baseName.split('.')[1] : baseName; | ||
this._tableDependencies = [this._schemaName + '.' + this._baseName]; | ||
this._alias = options && (options.as || options.alias); | ||
@@ -22,34 +26,8 @@ | ||
this._options._removeUnknownCols = options && (options.removeUnknownCols === true || options.removeUnknownCols === false) ? options.removeUnknownCols : true; | ||
this._options._idGenerator = (doc) => { | ||
// generate the _id for the document | ||
// for this - by default - we check the primaryKey cols | ||
// and return the value of this column(s). If the pk contains on | ||
// more then one column we concat each and return a hashed value | ||
var _id = ''; | ||
if (this._pkColumns){ | ||
for (var i=0, max=this._pkColumns.length; i<max; i++){ | ||
_id += doc[this._pkColumns[i]] && doc[this._pkColumns[i]].toString() || ''; | ||
} | ||
} else { | ||
_id = doc.id || doc.aid; | ||
} | ||
if (!_id || _id.length == 0){ | ||
throw new Error('Invalid _id: Can\'t create an _id column using the default idGenerator.'); | ||
} | ||
this._getColumnInformations(this._baseName, (error, columnInfos) => { | ||
if (error) return callback(error); | ||
return md5(_id); | ||
}; | ||
if (options && _.isFunction(options.idGenerator)) { | ||
this._options._idGenerator = options.idGenerator; | ||
} | ||
this._columnInfos = this._getColumnInformations(this._baseName); | ||
this._pkColumns = null; // set by the mysql-table constructor | ||
this._initNotifyTriggerSync = new Synchron(function(dbInstance, tableName){ | ||
dbInstance._initNotifyTrigger(tableName, (error, result) => { | ||
if (error) return this.throw(error); | ||
return this.done(); | ||
}); | ||
this._columnInfos = columnInfos; | ||
return callback(); | ||
}); | ||
@@ -72,25 +50,48 @@ } | ||
_getColumnInformations(baseName){ | ||
let connection = this._database.getConnectionSync(); | ||
let sql=`SELECT | ||
column_name AS columnname, | ||
is_nullable AS isnullable, | ||
data_type AS datatype, | ||
character_maximum_length AS charactermaximumlength | ||
FROM | ||
information_schema.columns | ||
WHERE | ||
table_name = $1 | ||
AND table_catalog = current_database();`; | ||
_getColumnInformations(baseName, callback) { | ||
var tableName = baseName, | ||
schemaName = this._database.defaultSchema || 'public'; | ||
var result = this._database.querySync(connection, sql, [baseName]); | ||
this._database.releaseConnection(connection); | ||
if (baseName.indexOf('.') > -1) { | ||
var schemaAndTable = baseName.split('.')[1]; | ||
schemaName = schemaAndTable[0]; | ||
tableName = schemaAndTable[1]; | ||
} | ||
// transform the columns array into an object | ||
var transformedResult = {}; | ||
for (var i=0, max=result.rows.length; i < max; i++){ | ||
var column = result.rows[i]; | ||
transformedResult[column.columnname] = column; | ||
} | ||
return transformedResult; | ||
async.waterfall([ | ||
(callback) => { | ||
this._database.getConnection( (error, connection) => { | ||
callback(error, connection); | ||
}); | ||
}, | ||
(connection, callback) => { | ||
let sql=`SELECT | ||
column_name AS columnname, | ||
is_nullable AS isnullable, | ||
data_type AS datatype, | ||
character_maximum_length AS charactermaximumlength | ||
FROM | ||
information_schema.columns | ||
WHERE | ||
table_name = $1 | ||
AND table_schema = $2 | ||
AND table_catalog = current_database();`; | ||
this._database.query(connection, sql, [tableName, schemaName], (error, result) => { | ||
this._database.releaseConnection(connection); | ||
if (error) return callback(error); | ||
// transform the columns array into an object | ||
var transformedResult = {}; | ||
for (var i=0, max=result.rows.length; i < max; i++){ | ||
var column = result.rows[i]; | ||
transformedResult[column.columnname] = column; | ||
} | ||
return callback(null, transformedResult); | ||
}); | ||
} | ||
], (error, result) => { | ||
callback(error, result); | ||
}); | ||
} | ||
@@ -100,3 +101,3 @@ | ||
var queryObject = { | ||
tableOrViewName: this._baseName, | ||
tableOrViewName: this._schemaName + '.' + this._baseName, | ||
tableDependencies: this._tableDependencies, | ||
@@ -108,15 +109,8 @@ selector: selector, | ||
// cache the query for a later rerun done by the the notification events | ||
this._database._queryCache[query.queryId] = query; | ||
// register the query for each related table it depends on | ||
_.forEach(query._queryObject.tableDependencies, (tableName) => { | ||
if (!this._database._queriesByTable[tableName]) { | ||
this._database._queriesByTable[tableName] = []; | ||
} | ||
this._database._queriesByTable[tableName].push(query.queryId); | ||
}); | ||
this._database._cacheQuery(query); | ||
return query; | ||
} | ||
select(selector, options) { | ||
select(selector, options, callback) { | ||
var query = new PGQuery(this, { | ||
@@ -128,6 +122,8 @@ tableOrViewName: this._baseName, | ||
var result = query.run(); | ||
query.destroy(); | ||
query.run((error, result) => { | ||
query.destroy(); | ||
query = null; | ||
return result; | ||
return callback(error, result); | ||
}); | ||
} | ||
@@ -134,0 +130,0 @@ } |
@@ -13,18 +13,53 @@ 'use strict'; | ||
let dbCounter = 0; | ||
class PGDatabase { | ||
constructor(options) { | ||
this._queryCount = 0; | ||
this.id = dbCounter++; | ||
this._pool = null; | ||
this._client = null; | ||
this._connected = false; | ||
this.defaultSchema = 'public'; | ||
this.tableSpace = 'pg_default'; | ||
this._queryPreparators = []; | ||
// cache each query Object itself, the prop is equal to queryId | ||
this._queryCache = {}; | ||
// cache each queryId by Table to perform a rerun on change | ||
// example: | ||
// _queriesByTable = { | ||
// "core_users": [12, 34, 78, 4542], --> Id's that reference to the queryCache | ||
// "core_apps": [32, 38, 2378, 23242] --> Id's that reference to the queryCache | ||
// "public.people": [12, 34, 78, 4542, ...], --> Id's that reference to the queryCache | ||
// "public.hobbies": [32, 38, 2378, 23242, ...] --> Id's that reference to the queryCache | ||
// } | ||
this._queriesByTable = {}; | ||
// cache each queryId by Client and Table to perform a rerun on an reactive change event | ||
// | ||
// example: | ||
// _queriesByClient = { | ||
// "xf65as67": { | ||
// "queries": [2, 17, 45], | ||
// "tables" : { | ||
// "public.people": [17, 45], | ||
// "public.hobbies": [2] | ||
// } | ||
// } | ||
// "a6a5axd3": { | ||
// ... | ||
// } | ||
// } | ||
this._queriesByClientTable = {}; | ||
// hash table with all currently running insert, update and delete's called by | ||
// the table-object to wait for the notify-message to be done. | ||
// This is used for an optimistic ui because if there are reactive queries for a table where | ||
// the insert, update or delete is performed we have to wait for reactivity is done | ||
this._runningStatements = {}; | ||
this._staledQueries = []; | ||
// client-connection to listen on notifications | ||
@@ -38,7 +73,7 @@ this._listnerClient = null; | ||
// cache all tablenames that have already a trigger installed | ||
// cache all tablenames that have already the trigger to notify changes | ||
// example: | ||
// this._installedNotifyTriggers = { | ||
// core_users: true, | ||
// core_roles: true | ||
// "public.people": true, | ||
// "public.hobbies": true | ||
// }; | ||
@@ -52,8 +87,28 @@ this._installedNotifyTriggers = {}; | ||
var triggerName = tableName + '_notify', | ||
sql = `DROP TRIGGER IF EXISTS "${triggerName}" ON "${tableName}"; | ||
// maybe there was a schema explicit defined | ||
// so replace the tablename's <schema>.<tablename> with an underscore | ||
// --> tiggername like then: public_mytable_notify, where 'public' is the schema | ||
var schemaAndTable = tableName.indexOf('.') > -1 ? tableName : this.defaultSchema + '.' + tableName, | ||
triggerName = schemaAndTable.replace('.', '_') + '_notify', | ||
triggerNameStmt = schemaAndTable.replace('.', '_') + '_notify_stmt', | ||
schemaAndTable = schemaAndTable.replace('.', '"."'); | ||
CREATE TRIGGER "${triggerName}" AFTER INSERT OR UPDATE OR DELETE ON "${tableName}" | ||
FOR EACH ROW EXECUTE PROCEDURE core_notify_changes();`; | ||
var sql = `DROP TRIGGER IF EXISTS "${triggerName}" ON "${schemaAndTable}"; | ||
CREATE TRIGGER "${triggerName}" AFTER INSERT OR UPDATE OR DELETE ON "${schemaAndTable}" | ||
FOR EACH STATEMENT EXECUTE PROCEDURE core_notify_changes(); | ||
-- DROP TRIGGER IF EXISTS "${triggerNameStmt}_insert" ON "${schemaAndTable}"; | ||
-- DROP TRIGGER IF EXISTS "${triggerNameStmt}_update" ON "${schemaAndTable}"; | ||
-- CREATE TRIGGER "${triggerNameStmt}_insert" AFTER INSERT ON "${schemaAndTable}" | ||
-- REFERENCING NEW TABLE AS new_table | ||
-- FOR EACH STATEMENT EXECUTE PROCEDURE core_notify_changes_stmt(); | ||
-- CREATE TRIGGER "${triggerNameStmt}_update" AFTER UPDATE ON "${schemaAndTable}" | ||
-- REFERENCING OLD TABLE AS old_table NEW TABLE AS new_table | ||
-- FOR EACH STATEMENT EXECUTE PROCEDURE core_notify_changes_stmt();`; | ||
this.getConnection((error, connection) => { | ||
@@ -65,3 +120,2 @@ if (error) return callback(error); | ||
if (!error){ | ||
@@ -78,3 +132,3 @@ this._installedNotifyTriggers[tableName] = true; | ||
var sql = ` | ||
CREATE TABLE IF NOT EXISTS "public"."core_reactive" | ||
CREATE TABLE IF NOT EXISTS "${this.defaultSchema}"."core_reactive" | ||
( | ||
@@ -90,6 +144,27 @@ query_id BIGINT NOT NULL, | ||
) | ||
TABLESPACE pg_default; | ||
TABLESPACE ${this.tableSpace}; | ||
TRUNCATE "public"."core_reactive";`; | ||
TRUNCATE "${this.defaultSchema}"."core_reactive"; | ||
/* wird aktuell nicht benötig! läuft alles über den EACH STATEMENt TRIGGER | ||
* | ||
CREATE SEQUENCE IF NOT EXISTS "${this.defaultSchema}".core_notifications_id_seq | ||
INCREMENT 1 | ||
START 1 | ||
MINVALUE 1 | ||
MAXVALUE 9223372036854775807 | ||
CACHE 1; | ||
CREATE TABLE IF NOT EXISTS "${this.defaultSchema}".core_notifications | ||
( | ||
_id bigint NOT NULL DEFAULT nextval('core_notifications_id_seq'::regclass), | ||
details json NOT NULL, | ||
CONSTRAINT core_notifications_pkey PRIMARY KEY (_id) | ||
) | ||
WITH ( | ||
OIDS = FALSE | ||
) | ||
TABLESPACE ${this.tableSpace}; | ||
*/`; | ||
this.getConnection((error, connection) => { | ||
@@ -108,7 +183,10 @@ if (error) return callback(error); | ||
CREATE OR REPLACE FUNCTION core_notify_changes() RETURNS TRIGGER AS $$ | ||
DECLARE | ||
olddata json; | ||
newdata json; | ||
notification json; | ||
notify_id INTEGER; | ||
/* unused --------------------> DECLARE | ||
id VARCHAR(45); | ||
optimistic_id VARCHAR(45); | ||
olddata json; | ||
newdata json; | ||
notification json; | ||
notify_id INTEGER; | ||
--------------------------------> end unused */ | ||
@@ -119,5 +197,11 @@ BEGIN | ||
-- Action = INSERT or UPDATE? -> NEW row | ||
/* -----------------------------> Aktuell nicht genuzt | ||
IF (TG_OP = 'DELETE') THEN | ||
olddata = row_to_json(OLD); | ||
id = OLD._id; | ||
optimistic_id = OLD._optimistic_id; | ||
ELSE | ||
id = NEW._id; | ||
optimistic_id = NEW._optimistic_id; | ||
newdata = row_to_json(NEW); | ||
@@ -144,4 +228,8 @@ IF (TG_OP = 'UPDATE') THEN | ||
json_build_object( | ||
'trigger', 'EACH ROW', | ||
'schema', TG_TABLE_SCHEMA, | ||
'table', TG_TABLE_NAME, | ||
'action', TG_OP, | ||
'_id', id, | ||
'optimistic_id', optimistic_id, | ||
'notify_id', notify_id | ||
@@ -151,2 +239,17 @@ )::text | ||
---------> Ende */ | ||
PERFORM pg_notify( | ||
'core_reactive_event', | ||
json_build_object( | ||
'type', 'STATEMENT', | ||
'schema', TG_TABLE_SCHEMA, | ||
'table', TG_TABLE_NAME, | ||
'action', TG_OP, | ||
'client_id', current_setting('core_settings.client_id')::text, | ||
'statement_id', current_setting('core_settings.statement_id')::text | ||
)::text | ||
); | ||
-- Result is ignored since this is an AFTER trigger | ||
@@ -156,4 +259,24 @@ RETURN NULL; | ||
$$ LANGUAGE plpgsql;`; | ||
$$ LANGUAGE plpgsql; | ||
/* ------------------------------------------------------------------------------> currently unused | ||
CREATE OR REPLACE FUNCTION core_notify_changes_stmt() RETURNS TRIGGER AS $$ | ||
BEGIN | ||
PERFORM pg_notify( | ||
'core_reactive_event', | ||
json_build_object( | ||
'trigger', 'STATEMENT', | ||
'schema', TG_TABLE_SCHEMA, | ||
'table', TG_TABLE_NAME, | ||
'action', TG_OP | ||
)::text | ||
); | ||
-- Result is ignored since this is an AFTER trigger | ||
RETURN NULL; | ||
END; | ||
$$ LANGUAGE plpgsql; | ||
----------------------------------------------- unused end */`; | ||
this.getConnection((error, connection) => { | ||
@@ -169,2 +292,59 @@ if (error) return callback(error); | ||
_resolveTableName(tableName) { | ||
// the tableName is always used with the schema | ||
// like schema.table | ||
// if there was no schema define we use the default schema | ||
if (tableName.indexOf('.') == -1){ | ||
return this.defaultSchema + '.' + tableName; | ||
} | ||
// the schema was already defined. | ||
return tableName; | ||
} | ||
_cacheQuery(query){ | ||
// cache the query for a later rerun done by the the notification events | ||
this._queryCache[query.queryId] = query; | ||
// register the query for each related table it depends on | ||
_.forEach(query._queryObject.tableDependencies, (tableName) => { | ||
tableName = this._resolveTableName(tableName); | ||
if (!this._queriesByTable[tableName]) { | ||
this._queriesByTable[tableName] = []; | ||
} | ||
this._queriesByTable[tableName].push({ | ||
queryId: query.queryId, | ||
clientId: query._queryObject.options.clientId | ||
}); | ||
}); | ||
} | ||
_unCacheQuery(queryId, tableDependencies){ | ||
// remove the references from the _queriesByTable | ||
_.forEach(tableDependencies, (tableName) => { | ||
tableName = this._resolveTableName(tableName); | ||
if (this._queriesByTable[tableName]){ | ||
let getArrIndex = (tableName, queryId) => { | ||
let arr = this._queriesByTable[tableName]; | ||
for (var i=0, max=arr.length; i<max; i++){ | ||
if (arr[i].queryId == queryId) | ||
return i; | ||
} | ||
return -1; | ||
} | ||
var index = getArrIndex(tableName, queryId); | ||
if (index > -1) { | ||
this._queriesByTable[tableName].splice(index, 1); | ||
} | ||
//this._arrayIndexthis._queriesByTable[tableName].indexOf(query._queryId); | ||
} | ||
}); | ||
this._queryCache[queryId] = null; | ||
//console.log('uncache/delete', queryId, tableDependencies); | ||
delete this._queryCache[queryId]; | ||
} | ||
_onNotify(notification){ | ||
@@ -175,24 +355,75 @@ if (notification.channel !== 'core_reactive_event') return; | ||
var data = JSON.parse(notification.payload); | ||
var queryIds = this._queriesByTable[data.table]; | ||
var queryRunners = queryIds.map(id => { | ||
var self = this, | ||
queryId = id; | ||
return function(callback) { | ||
try { | ||
self._queryCache[queryId].reRun(); | ||
} catch(e) { | ||
return callback(e); | ||
} | ||
return callback(); | ||
//console.log(data.trigger, data.action); | ||
//console.log(data.statement_id + ' - ' + data.client_id + ' - ' + data.schema + '.' + data.table); | ||
//console.log(this._queriesByTable); | ||
//console.log('onNotify:', data); | ||
var relatedTable = data.schema + '.' + data.table; | ||
// get all queries for the related table | ||
var firstQueries = []; | ||
var secondQueries = []; | ||
var max = (this._queriesByTable[relatedTable] && this._queriesByTable[relatedTable].length) || -1; | ||
//console.log("max relatedTable:", max); | ||
for (var i=0; i<max; i++){ | ||
var q = this._queriesByTable[relatedTable][i]; | ||
if (q.clientId && q.clientId == data.client_id) { | ||
firstQueries.push(q); | ||
} else { | ||
secondQueries.push(q); | ||
} | ||
} | ||
// check if there are currently reactive queries registered | ||
// if not, still exit! | ||
if (firstQueries.length == 0) { | ||
if (this._runningStatements[data.statement_id]){ | ||
delete this._runningStatements[data.statement_id]; | ||
} | ||
} | ||
// excute all prior/first queries in parallel | ||
async.each(firstQueries, (q, callback) => { | ||
this._queryCache[q.queryId].reRun(callback); | ||
}, (error)=>{ | ||
if (error){ | ||
console.log('FIRST ERROR!!!!!!!!!!!!', error); | ||
} | ||
if (this._runningStatements[data.statement_id]){ | ||
delete this._runningStatements[data.statement_id]; | ||
} | ||
for(var i=0, max=secondQueries.length; i<max; i++){ | ||
var qid = secondQueries[i].queryId; | ||
if (this._staledQueries.indexOf(qid) == -1) | ||
this._staledQueries.push(qid); | ||
} | ||
}); | ||
} | ||
async.parallel(queryRunners, (error) => { | ||
//console.log(error); | ||
}) | ||
_initStaledQueriesInterval(){ | ||
this._staledRunning = false; | ||
var staledInterval = setInterval(()=>{ | ||
if (this._staledRunning) return; | ||
/*_.forEach(queryIds, (queryId) => { | ||
this._queryCache[queryId].reRun(); | ||
});*/ | ||
this._staledRunning=true; | ||
var staledQueries = []; | ||
while (this._staledQueries.length > 0 && staledQueries.length < 200) { | ||
staledQueries.push(this._staledQueries.shift()); | ||
} | ||
async.eachLimit(staledQueries, 50, (qid, callback) => { | ||
if (this._queryCache[qid]) { | ||
this._queryCache[qid].reRun(callback); | ||
} | ||
}, (error)=>{ | ||
// still ignore errors | ||
if (error){ | ||
console.log('RUN STALED ERROR!!!!!!!!!!!!', error); | ||
} | ||
//console.log('Finished staledQueries', staledQueries.length); | ||
this._staledRunning = false; | ||
}); | ||
}, 10); | ||
} | ||
@@ -237,3 +468,8 @@ | ||
} | ||
], function (error) { | ||
], (error) => { | ||
if (!error) this._connected = true; | ||
// enable rerunStaled Queries | ||
this._initStaledQueriesInterval(); | ||
return callback(error); | ||
@@ -257,2 +493,4 @@ }); | ||
end(){ | ||
if (!this._connected) return; | ||
if (!this._options.enablePooling) { | ||
@@ -263,89 +501,41 @@ this._client.end(); | ||
} | ||
this._listnerClient.query('UNLISTEN core_reactive_event', (error, result) => { | ||
this._listnerClient.end(); | ||
}); | ||
} | ||
/** | ||
* @method Table | ||
* @memberOf PGDatabase | ||
* @locus Server | ||
* | ||
* @summary Creates a new instance of a PGTable object. | ||
* | ||
* @param {String} tableName | ||
* Name of the table in the postgreSQL database. | ||
* @param {Object} [options] | ||
* Specifies the options for the new table instances. Details see [new PGTable()](Api-PGTable.html#PGTable). | ||
*/ | ||
Table(tableName, options) { | ||
return new PGTable(this, tableName, options); | ||
Table(tableName, options, callback) { | ||
var __callback = callback, | ||
__options = options | ||
if (!_.isString(tableName)) { | ||
throw new Error('First argument "tableName" must be type of string.'); | ||
} | ||
if (!_.isFunction(callback) && _.isFunction(options)) { | ||
// no options supported | ||
__callback = options; | ||
__options = {}; | ||
} | ||
return new PGTable(this, tableName, __options, __callback); | ||
} | ||
/** | ||
* @method View | ||
* @memberOf PGDatabase | ||
* @locus Anywhere | ||
* | ||
* @summary Creates a new instance of a PGView object. | ||
* | ||
* @param {String} tableName | ||
* Name of the view in the postgreSQL database. | ||
* @param {Object} [options] | ||
* | Option | Type | Description | | ||
* |---------------------|----------|-------------------------------------------------------------------------------| | ||
* | `alias` | String | Defines an alias for this table. | | ||
* | `as` | String | See alias | | ||
* | `idGenerator` | Function | **Server-side only** A callback function to define an `_id` column if the table does not provide a column named `_id` or `id`. | | ||
* | `removeUnknownCols` | Boolean | **Server-side only** If `true` this option will remove all unknown columns used within an object before trying to insert or update a record. | | ||
* | ||
*/ | ||
View(viewName, options){ | ||
return new PGView(this, viewName, options); | ||
View(viewName, options, callback){ | ||
var __callback = callback, | ||
__options = options | ||
if (!_.isString(viewName)) { | ||
throw new Error('First argument "viewName" must be type of string.'); | ||
} | ||
if (!_.isFunction(callback) && _.isFunction(options)) { | ||
// no options supported | ||
__callback = options; | ||
__options = {}; | ||
} | ||
return new PGView(this, viewName, __options, __callback); | ||
} | ||
/** | ||
* @method queryPreparations | ||
* @memberOf PGDatabase | ||
* @locus Server | ||
* | ||
* @summary Registers a new callback-function that will be called each time a query will be executed. | ||
* | ||
* This will be helpful if you have todo some generall preparations before executing the target query. | ||
* | ||
* @after | ||
* # Using Query-Preparations | ||
* | ||
* In our project we are working with a huge count of views. Most of them should be executed in the context of the current user. | ||
* Here we have the possibility to set a session variable @currentUser that will be consumed from most of the views. | ||
* | ||
* To avoid having to set this variable again with each query, and especially to think about it, there is a central way to do this. | ||
* | ||
* **Example** | ||
* | ||
* ```javascript | ||
* import { PGDatabase } from 'meteor/planetarydev:mysql'; | ||
* | ||
* export const MyDb = new PGDatabase({ | ||
* enablePooling: true | ||
* }); | ||
* | ||
* // connect to the database | ||
* [...] | ||
* | ||
* // declare the query-preparator | ||
* MyDb.queryPreparations(function(connection, queryObjToRun, currentInvocation) { | ||
* if (currentInvocation) { | ||
* var userId = currentInvocation && currentInvocation.userId; | ||
* // the function runs in the context of the PGDatabase object, so we can use this to access each method | ||
* this.querySync(connection, 'SET @currentUser = ?;', [userId]); | ||
* } | ||
* }); | ||
* ``` | ||
* | ||
* @param {Function} callback Function to call each time before the query will be executed. `function(connection, queryObject, currentInvocation) { ... }` | ||
* | Parameter | Type | Description | | ||
* |---------------------|----------|-------------------------------------------------------------------------------| | ||
* | `connection` | Object | Specifies the connection for the query to be executed on. | | ||
* | `queryObject` | Object | Specifies the query object that will be executed after the function returns. | | ||
* | `currentInvocation` | Array | Specifies the Meteors method- or publication-context that gives access to the `userId` and some other stuff. | | ||
* | ||
*/ | ||
queryPreparations(preperatorCallback){ | ||
@@ -379,43 +569,2 @@ this._queryPreparators.push(preperatorCallback); | ||
/** | ||
* @method getConnectionSync | ||
* @memberOf PGDatabase | ||
* @locus Server | ||
* | ||
* @summary Returns a new connection depending on if you are using pooling or not. | ||
* | ||
* @return {Object} Connection to use for your next query operations. | ||
* | ||
* @after | ||
* # Connections | ||
* | ||
* To execute one or more database operations like `querySync` you need a valid connection. | ||
* The `getConnection` and `getConnectionSync` will give you a connection depending on the option if you are using pooling or not. | ||
* If you are not using connection pooling you will receive the current established connection. | ||
* | ||
* > **Important** | ||
* > | ||
* > After you finished work be sure that you release the connection you have received. | ||
* | ||
* ```javascript | ||
* import { PGDatabase } from 'meteor/planetarydev:mysql'; | ||
* | ||
* export const MyDb = new PGDatabase({ | ||
* enablePooling: true | ||
* }); | ||
* | ||
* // connect to the database | ||
* [...] | ||
* | ||
* // get a new or the established connection | ||
* var connection = MyDb.getConnectionSync(); | ||
* | ||
* // do some query | ||
* MyDb.querySync(connection, 'UPDATE mytable SET mycol = 1 WHERE id = 2'); | ||
* MyDb.querySync(connection, 'DELETE FROM mytable WHERE id = 3'); | ||
* | ||
* // this is the important thing - release the connection after you have finished work | ||
* MyDb.releaseConnection(connection); | ||
* ``` | ||
*/ | ||
getConnectionSync(){ | ||
@@ -428,11 +577,2 @@ if (!this._options.enablePooling) { | ||
/** | ||
* @method _getConnectionFromPoolSync | ||
* @memberOf PGDatabase | ||
* @locus Server | ||
* | ||
* @summary Returns a new connection from the pool. | ||
* | ||
* @return {Object} Connection to use for your next query operations. | ||
*/ | ||
_getConnectionFromPoolSync(){ | ||
@@ -486,3 +626,3 @@ var self = this; | ||
*/ | ||
wpQuerySync(connection, sql, values){ | ||
/*wpQuerySync(connection, sql, values){ | ||
// call each global query-preparator | ||
@@ -501,3 +641,3 @@ var queryObj = { | ||
return this.querySync(connection, queryObj.sql, queryObj.values); | ||
} | ||
}*/ | ||
@@ -521,2 +661,3 @@ /** | ||
}; | ||
for (var i=0, max=this._queryPreparators.length; i<max; i++){ | ||
@@ -529,3 +670,3 @@ var preperator = this._queryPreparators[i]; | ||
this.query(connection, sql, values, callback); | ||
this.query(connection, queryObj.sql, queryObj.values, callback); | ||
//connection.query(sql, values, callback); | ||
@@ -546,4 +687,8 @@ } | ||
query(connection, sql, values, callback) { | ||
this._queryCount++; | ||
//console.log(sql, values); | ||
connection.query(sql, values, callback); | ||
connection.query(sql, values, (error, result)=>{ | ||
if (error) console.log('onQuery ERROR:', error); | ||
return callback(error, result); | ||
}); | ||
} | ||
@@ -550,0 +695,0 @@ |
@@ -5,2 +5,3 @@ 'use strict'; | ||
const sqlBuilder = new SQLBuilder('postgreSQL'); | ||
const async = require('async'); | ||
const md5 = require('md5'); | ||
@@ -43,3 +44,5 @@ const EventEmitter = require('events'); | ||
run(){ | ||
run(callback){ | ||
callback = callback || function(){}; | ||
let query = { | ||
@@ -53,3 +56,3 @@ preparation: true, | ||
// override the basic query-options, they couldnt modified by the user | ||
// override the basic query-options, they could'nt modified by the user | ||
query = { | ||
@@ -112,9 +115,48 @@ $select: { | ||
var results = this.executeQuery(this._queryObject.invocation); // invocation is only available in the MySQLReactiveQuery, else it's undefined | ||
return results; | ||
this.executeQuery(false, callback); | ||
} | ||
executeQuery(reRun){ | ||
var connection = this._database.getConnectionSync(); | ||
executeQuery(reRun, callback){ | ||
async.waterfall([ | ||
(callback) => { | ||
this._database.getConnection((error, connection) => { | ||
return callback(error, connection); | ||
}); | ||
}, | ||
(connection, callback) => { | ||
var query = { sql: this._stmt.sql, values: this._stmt.values.slice() }; | ||
var queryFn; | ||
if (reRun && this._lazy.enabled) { | ||
// if we are on a rerun and lazyload option is active | ||
// we have to rest the offset with 0, otherwise if there | ||
// are changes made on records before the offset the client subscription | ||
// will not be updated | ||
if (this._sql.indexOf(' offset $') > -1) { // check explict for offset, because on the firstrun we only have the limit | ||
// offset is always the last parameter value | ||
// overwrite the current offset with 0 to check all records loaded before | ||
query.values[query.values.length - 1 /*offset*/] = 0; | ||
// set the limit of records to the current offset | ||
// because we need all records till the end of the currentOffset | ||
query.values[query.values.length - 2 /*limit*/] = this._parameters[query.values.length - 1 /*offset*/]; | ||
} | ||
} | ||
if (this._queryObject.options.preparation) { | ||
queryFn = this._database.wpQuery; | ||
} else { | ||
queryFn = this._database.query; | ||
} | ||
queryFn.call(this._database, connection, query.sql, query.values, (error, result) => { | ||
this._database.releaseConnection(connection); | ||
return callback(error, result); | ||
}); | ||
} | ||
], (error, result) => { | ||
callback(error, result); | ||
}); | ||
/*var connection = this._database.getConnectionSync(); | ||
var query = { sql: this._stmt.sql, values: this._stmt.values.slice() }; | ||
@@ -130,6 +172,6 @@ | ||
// overwrite the current offset with 0 to check all records loaded before | ||
query.values[query.values.length - 1 /*offset*/] = 0; | ||
query.values[query.values.length - 1 ] = 0; | ||
// set the limit of records to the current offset | ||
// because we need all records till the end of the currentOffset | ||
query.values[query.values.length - 2 /*limit*/] = this._parameters[query.values.length - 1 /*offset*/]; | ||
query.values[query.values.length - 2 ] = this._parameters[query.values.length - 1 ]; | ||
} | ||
@@ -149,5 +191,5 @@ } | ||
this._database.releaseConnection(connection); | ||
return results; | ||
return results;*/ | ||
} | ||
} | ||
module.exports.PGQuery = PGQuery; |
@@ -56,3 +56,5 @@ 'use strict'; | ||
DELETE FROM core_reactive | ||
WHERE NOT EXISTS ( | ||
WHERE | ||
core_reactive.query_id = @@QueryId | ||
AND NOT EXISTS ( | ||
SELECT 1 FROM hashed_query | ||
@@ -99,5 +101,3 @@ WHERE | ||
//this._publishedIds = null; // cache the _id's from the result to use on binlog-change | ||
//this._oldPublishedIds = null; | ||
this._queryId = queryIdCounter++; | ||
this._salt = null; | ||
@@ -126,20 +126,21 @@ | ||
this._bindedExecuteQuery = null; | ||
this._queryId = queryIdCounter++; | ||
} | ||
destroy(done) { | ||
var self = this; | ||
self._stoped = true; | ||
this._stoped = true; | ||
var superDestroy = super.destroy.bind(this); | ||
if (!_.isFunction(done)) { | ||
done = function(){}; | ||
}; | ||
// delete the hashed reactive data from the core_reactive for this query | ||
async.series([ | ||
function(callback){ | ||
self._database.getConnection(function(error, connection){ | ||
(callback) => { | ||
this._database.getConnection((error, connection) => { | ||
if (error) return callback(error); | ||
connection.query('DELETE FROM core_reactive WHERE query_id = ' + self.queryId, [], function(error, result){ | ||
self._database.releaseConnection(connection); | ||
// console.log('DELETE FROM core_reactive WHERE query_id = $1', self.queryId); | ||
this._database.query(connection, 'DELETE FROM core_reactive WHERE query_id = $1', [this._queryId], (error, result) => { | ||
this._database.releaseConnection(connection); | ||
@@ -151,6 +152,7 @@ if (error) return callback(error); | ||
}, | ||
function(callback){ | ||
(callback) => { | ||
// delete the published Info documents | ||
var infoCollection = self._getInfoCollection(); | ||
/*var infoCollection = self._getInfoCollection(); | ||
if (self._pagination.published) { | ||
@@ -164,13 +166,7 @@ var docId = 'P' + self._queryObject.invocation._subscriptionId; | ||
} | ||
*/ | ||
// remove the query from the query-cache | ||
self._database._queryCache[self._queryId] = null; | ||
delete self._database._queryCache[self._queryId]; | ||
// remove the references from the _queriesByTable | ||
_.forEach(self._queryObject.tableDependencies, function(tableName){ | ||
var index = self._database._queriesByTable[tableName].indexOf(self._queryId); | ||
self._database._queriesByTable[tableName].splice(index, 1); | ||
}); | ||
this._database._unCacheQuery(this._queryId, this._queryObject.tableDependencies); | ||
superDestroy(); | ||
@@ -180,2 +176,3 @@ return callback(); | ||
], function(error){ | ||
if (error) console.log('ERROR DESTROY:', error); | ||
return done(error); | ||
@@ -190,21 +187,19 @@ }); | ||
run(){ | ||
let query = { | ||
preparation: true, | ||
salt: true | ||
}, | ||
run(callback) { | ||
this._running = true; | ||
callback = callback || function(){}; | ||
let query = {}, | ||
options = this._queryObject.options; | ||
query.$select = _.extend(query.$select, options); | ||
// override the basic query-options, they couldnt modified by the user | ||
query = { | ||
$select: { | ||
$from: this._queryObject.tableOrViewName, | ||
$where: this._queryObject.selector | ||
} | ||
}; | ||
query.$select.$from = this._queryObject.tableOrViewName; | ||
query.$select.$where = this._queryObject.selector | ||
query.$select = _.extend(query.$select, options); | ||
// remove internal options that should not used for the query itself | ||
delete query.$select.salt; | ||
delete query.$select.preparation; | ||
delete query.$select.clientId; | ||
@@ -253,3 +248,3 @@ // the server-side pagination option exclude the use of offset and limit | ||
let stmt = sqlBuilder.build(query); | ||
//console.log(stmt.sql, stmt.values); | ||
this._stmt = { | ||
@@ -261,6 +256,9 @@ // after building the SQL Stamtent we have to inject this into the rective query template | ||
this.executeQuery(false/*rerun*/); | ||
this.executeQuery(false/*rerun*/, (error, result)=>{ | ||
this._running=false; | ||
return callback(error, result); | ||
}); | ||
} | ||
executeQuery(reRun){ | ||
executeQuery(reRun, callback){ | ||
if (this._pagination.enabled){ | ||
@@ -276,72 +274,95 @@ this._pagination.ready = false; | ||
var result = super.executeQuery(reRun); | ||
//console.log('reactive.executeQuery', reRun); | ||
super.executeQuery(reRun, (error, result) => { | ||
if (error) return callback(error); | ||
if (result.rowCount > 0) { | ||
_.forEach(result.rows, (row) => { | ||
//console.log('ROW:', row); | ||
if (result.rowCount > 0) { | ||
_.forEach(result.rows, (row) => { | ||
if (row.action == 'added') { | ||
return this._added(row._id, row.data); | ||
} | ||
if (row.action == 'added') { | ||
return this._added(row._id, row.data); | ||
} | ||
if (row.action == 'changed') { | ||
return this._changed(row._id, row.data); | ||
} | ||
if (row.action == 'changed') { | ||
return this._changed(row._id, row.data); | ||
} | ||
if (row.action == 'removed') { | ||
return this._removed(row._id); | ||
} | ||
}); | ||
} | ||
if (row.action == 'removed') { | ||
return this._removed(row._id); | ||
this.ready(); | ||
/* | ||
if (this._pagination.enabled) { | ||
this.removeAllOldPublishedDocuments(); | ||
} | ||
// publish the results only when not in lazy mode | ||
// by using the lazy option we will always add the new docs | ||
// BUT! remember the reRun -> in case of a reRun the lazyload option will get all records | ||
if (reRun || !this._lazy.enabled){ | ||
this.publishQueryResults(results); | ||
} else { | ||
// using the lazyload option we only add new documents | ||
if (!this._publishedIds) { | ||
this._publishedIds = {}; | ||
} | ||
}); | ||
} | ||
this.ready(); | ||
this.iterateDocuments(results, (id, document) => { | ||
// add new document | ||
this._added(id, document); | ||
}); | ||
this.clonePublishedIds(); | ||
this.ready(); | ||
} | ||
*/ | ||
/* | ||
if (this._pagination.enabled) { | ||
this.removeAllOldPublishedDocuments(); | ||
} | ||
if (this._pagination.enabled){ | ||
this._pagination.ready = true; | ||
this.publishPaginationInfo(); | ||
} | ||
// publish the results only when not in lazy mode | ||
// by using the lazy option we will always add the new docs | ||
// BUT! remember the reRun -> in case of a reRun the lazyload option will get all records | ||
if (reRun || !this._lazy.enabled){ | ||
this.publishQueryResults(results); | ||
} else { | ||
// using the lazyload option we only add new documents | ||
if (!this._publishedIds) { | ||
this._publishedIds = {}; | ||
if (this._lazy.enabled){ | ||
this._lazy.ready = true; | ||
this.publishLazyInfo(); | ||
} | ||
this.iterateDocuments(results, (id, document) => { | ||
// add new document | ||
this._added(id, document); | ||
}); | ||
this.clonePublishedIds(); | ||
this.ready(); | ||
} | ||
*/ | ||
return callback(); | ||
}); | ||
} | ||
if (this._pagination.enabled){ | ||
this._pagination.ready = true; | ||
this.publishPaginationInfo(); | ||
reRun(callback) { | ||
if (this._running) { | ||
if (this._database._staledQueries.indexOf(this._queryId) == -1) | ||
this._database._staledQueries.push(this._queryId); | ||
return callback(); | ||
} | ||
if (this._lazy.enabled){ | ||
this._lazy.ready = true; | ||
this.publishLazyInfo(); | ||
} | ||
} | ||
/*if (this._running) { | ||
var intv = setInterval(()=>{ | ||
if (!this._running){ | ||
clearInterval(intv); | ||
reRun(eventName, tablename, rows, evt) { | ||
if (this._stoped) return; | ||
if (this.lazyLoading) return; | ||
this._running = true; | ||
this.executeQuery(true, (error, result) => { | ||
this._running = false; | ||
return callback(error, result); | ||
}); | ||
} | ||
}, 50); | ||
//console.log('out'); | ||
//this._database._staledQueries.push(this._queryId); | ||
return; // callback(); | ||
}*/ | ||
if (this._stoped) return callback(); | ||
if (this.lazyLoading) return callback(); | ||
this.executeQuery(true/*reRun*/); | ||
//TODO: check for bindEnvironment and chage the executeQuery into an async function with a callback | ||
//this._bindedExecuteQuery(this._queryObject.invocation, true/*reRun*/); | ||
//var f = Fiber(()=>{ | ||
// this.executeQuery(this._queryObject.invocation, true/*reRun*/); | ||
//}).run(); | ||
// from the docs --> https://www.npmjs.com/package/fibers#garbage-collection | ||
//f = undefined; | ||
this._running = true; | ||
this.executeQuery(true/*reRun*/, (error, result) => { | ||
this._running = false; | ||
return callback(error, result); | ||
}); | ||
} | ||
@@ -372,2 +393,3 @@ | ||
/* | ||
_addedOptimistic(id, doc){ | ||
@@ -390,3 +412,5 @@ // register the new id as a published document | ||
} | ||
*/ | ||
ready(){ | ||
@@ -393,0 +417,0 @@ this.emit('state', 'ready'); |
@@ -8,39 +8,17 @@ 'use strict'; | ||
const async = require('async'); | ||
const _ = require('lodash'); | ||
const LOOKUP_INTERVAL = 5; | ||
class PGTable extends PGBaseTable { | ||
constructor(database, tableName, options) { | ||
super(database, tableName, options); | ||
constructor(database, tableName, options, callback) { | ||
super(database, tableName, options, (error) => { | ||
database._initNotifyTrigger(this._schemaName + '.' + this._baseName, (error, result) => { | ||
if (error) return callback(error); | ||
this._pkColumns = this._getPkColumns(this._baseName); | ||
this._initNotifyTriggerSync(this._database, this._baseName); | ||
return callback(); | ||
}); | ||
}); | ||
} | ||
_getPkColumns(baseName){ | ||
let connection = this._database.getConnectionSync(); | ||
let sql=`SELECT | ||
string_agg(k.column_name, ',') AS pk_columns | ||
FROM | ||
INFORMATION_SCHEMA.TABLE_CONSTRAINTS t | ||
LEFT JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE k | ||
USING (CONSTRAINT_NAME, TABLE_SCHEMA, TABLE_NAME) | ||
WHERE | ||
t.constraint_type = 'PRIMARY KEY' | ||
AND t.table_catalog = current_database() | ||
AND t.table_name = $1 | ||
GROUP BY | ||
k.ordinal_position | ||
ORDER BY | ||
k.ordinal_position;`; | ||
var results = this._database.querySync(connection, sql, [baseName]); | ||
this._database.releaseConnection(connection); | ||
if (results.rowCount == 0){ | ||
return null; | ||
} else { | ||
return results.rows[0].pk_columns.split(','); | ||
} | ||
} | ||
_removeUnknownColumns(doc){ | ||
@@ -61,2 +39,3 @@ var columns = Object.keys(doc), | ||
/* | ||
_insert(docOrDocs, optimistic, subscriptionId, callback){ | ||
@@ -115,3 +94,3 @@ let methodInvocation = DDP._CurrentMethodInvocation.get(); | ||
var results = this._insert(doc, true/*optimistic*/, subscriptionId, callback); | ||
var results = this._insert(doc, true, subscriptionId, callback); | ||
} catch (ex) { | ||
@@ -126,31 +105,31 @@ if (published){ | ||
return results; | ||
} | ||
}*/ | ||
_execStmt(stmt, clientId, callback){ | ||
insert(docOrDocs, callback) { | ||
return this._insert(docOrDocs, false/*optimistic*/, null/*subscriptionId*/, callback); | ||
} | ||
if (!_.isFunction(callback) && _.isFunction(clientId)) { | ||
callback = clientId; | ||
} | ||
if (!_.isString(clientId)) { | ||
// no client or session id suppoerted | ||
// take a pseudo client-id, because postgreSQL | ||
// will thrown an exeption by using current_config() | ||
clientId = PGTable.NO_CLIENT_ID_SUPPORTED; | ||
} | ||
update(filter, document, callback){ | ||
callback = callback || function(){}; | ||
async.waterfall([ | ||
(callback) => { | ||
let query = { | ||
$update: { | ||
$table: this._baseName, | ||
$set: document, | ||
$where: filter | ||
} | ||
}; | ||
let stmt = sqlBuilder.build(query); | ||
return callback(null, stmt) | ||
}, | ||
(stmt, callback) => { | ||
this._database.getConnection((error, connection) => { | ||
return callback(error, stmt, connection); | ||
return callback(error, connection); | ||
}); | ||
}, | ||
(stmt, connection, callback) => { | ||
this._database.wpQuery(connection, stmt.sql, stmt.values, (error, results) => { | ||
(connection, callback) => { | ||
// set the unique id for this statement | ||
var statementId = Math.random().toString(26).slice(2); | ||
connection.query(`SELECT set_config('core_settings.client_id', $1, false);`, [clientId]); | ||
connection.query(`SELECT set_config('core_settings.statement_id', $1, false);`, [statementId]); | ||
if (clientId != PGTable.NO_CLIENT_ID_SUPPORTED) { | ||
this._database._runningStatements[statementId] = true; | ||
} | ||
//console.log('RUN Action:', clientId); | ||
this._database.wpQuery(connection, stmt.sql, stmt.values, (error, result) => { | ||
this._database.releaseConnection(connection); | ||
@@ -160,4 +139,24 @@ | ||
return callback(null, results); | ||
return callback(null, result, statementId); | ||
}); | ||
}, | ||
(result, statementId, callback) => { | ||
//console.log('EXECUTED'); | ||
// waiting till clientId was received for the notify-listner and all reactive queries are performed | ||
if (clientId != PGTable.NO_CLIENT_ID_SUPPORTED && result.rowCount > 0) { | ||
var interv = setInterval(()=>{ | ||
if (!this._database._runningStatements[statementId]) { | ||
clearInterval(interv); | ||
//console.log('DONE Action:', clientId); | ||
return callback(null, result); | ||
} | ||
}, LOOKUP_INTERVAL); | ||
} else { | ||
if (clientId != PGTable.NO_CLIENT_ID_SUPPORTED){ | ||
delete this._database._runningStatements[statementId]; | ||
} | ||
//console.log('OUT'); | ||
return callback(null, result); | ||
} | ||
} | ||
@@ -172,14 +171,42 @@ ], | ||
remove(filter){ | ||
let methodInvocation = DDP._CurrentMethodInvocation.get(); | ||
insert(docOrDocs, clientId, callback) { | ||
let query = { | ||
type: 'delete', | ||
table: this._baseName, | ||
where: filter | ||
$insert: { | ||
$into: this._schemaName + '.' + this._baseName, | ||
$documents: docOrDocs | ||
} | ||
}; | ||
let stmt = SQLBuilder.buildStatement(query); | ||
return this._database.wpQuerySync(stmt.toString(), stmt.values, methodInvocation); | ||
let stmt = sqlBuilder.build(query); | ||
this._execStmt(stmt, clientId, callback); | ||
} | ||
update(filter, document, clientId, callback){ | ||
let query = { | ||
$update: { | ||
$table: this._schemaName + '.' + this._baseName, | ||
$set: document, | ||
$where: filter | ||
} | ||
}; | ||
let stmt = sqlBuilder.build(query); | ||
this._execStmt(stmt, clientId, callback); | ||
} | ||
remove(filter, clientId, callback){ | ||
let query = { | ||
$delete: { | ||
$table: this._schemaName + '.' + this._baseName, | ||
$where: filter | ||
} | ||
}; | ||
let stmt = sqlBuilder.build(query); | ||
this._execStmt(stmt, clientId, callback); | ||
} | ||
} | ||
PGTable.NO_CLIENT_ID_SUPPORTED = '$$NO-CLIENT-ID-SUPPORTED$$'; | ||
module.exports.PGTable = PGTable; |
'use strict'; | ||
const PGBaseTable = require('./pg-base-table').PGBaseTable; | ||
const async = require('async'); | ||
const _ = require('lodash'); | ||
class PGView extends PGBaseTable { | ||
constructor(mysqlDatabase, viewName, options) { | ||
super(mysqlDatabase, viewName, options); | ||
constructor(database, viewName, options, callback) { | ||
super(database, viewName, options, (error) => { | ||
this._getViewDependencies(this._schemaName, this._baseName, (error, dependencies) => { | ||
if (error) return callback(error); | ||
this.setViewDependencies(this._baseName); | ||
// after setting the view dependencies we have to check for the notify triggers | ||
// that are relevant to listen for changes | ||
_.forEach(this._tableDependencies, tableName => { | ||
this._initNotifyTriggerSync(this._database, tableName); | ||
this._tableDependencies = dependencies; | ||
// after setting the view dependencies we have to check for the notify triggers | ||
// that are relevant to listen for changes | ||
async.each(this._tableDependencies, (tableName, callback) => { // the tablename includes always the schema done by getViewDependencies | ||
database._initNotifyTrigger(tableName, (error, result) => { | ||
if (error) return callback(error); | ||
return callback(); | ||
}); | ||
}, callback); | ||
}); | ||
}); | ||
} | ||
setViewDependencies(viewName) { | ||
let depTables = [], | ||
connection = this._database.getConnectionSync(); | ||
_getViewDependencies(schemaName, viewName, callback) { | ||
let depTables = []; | ||
let getDepsFromInfoSchema = (connection, schemaName, objectName, callback) => { | ||
var sql = ''; | ||
let getDepsFromInfoSchema = (objectName) => { | ||
var sql = ''; | ||
sql += 'SELECT '; | ||
sql += ' tab.TABLE_NAME AS table_name, '; | ||
sql += ' tab.TABLE_TYPE AS table_type '; | ||
sql += ' tab.TABLE_NAME AS table_name, '; | ||
sql += ' tab.TABLE_SCHEMA AS table_schema, '; | ||
sql += ' tab.TABLE_TYPE AS table_type '; | ||
sql += 'FROM '; | ||
@@ -37,6 +46,28 @@ sql += ' INFORMATION_SCHEMA.TABLES AS tab '; | ||
sql += ' views.TABLE_NAME = $1 '; | ||
sql += 'AND views.TABLE_SCHEMA = $2 '; | ||
sql += 'AND tab.TABLE_CATALOG = current_database();'; | ||
var depResult = this._database.querySync(connection, sql, [objectName]); | ||
this._database.query(connection, sql, [objectName, schemaName], (error, result) => { | ||
if (error) return callback(error); | ||
async.each(result.rows, (dep, callback) => { | ||
if (dep.table_type == 'VIEW') { | ||
getDepsFromInfoSchema(connection, dep.table_schema, dep.table_name, callback); | ||
} else { // BASE TABLE | ||
// add only if the table-dependency is unknown at this time | ||
if (depTables.indexOf(dep.table_schema + '.' + dep.table_name) == -1) { | ||
depTables.push(dep.table_schema + '.' + dep.table_name); | ||
} | ||
return callback(); | ||
} | ||
}, (error) => { | ||
if (error) return callback(error); | ||
return callback(null, depTables); | ||
}); | ||
}); | ||
/* | ||
var depResult = this._database.querySync(connection, sql, [objectName, schemaName]); | ||
for (var i=0, max=depResult.rows.length; i<max; i++){ | ||
@@ -54,12 +85,20 @@ var dep = depResult.rows[i]; | ||
} | ||
} | ||
}*/ | ||
}; | ||
getDepsFromInfoSchema(viewName); | ||
this._database.releaseConnection(connection); | ||
// overwrite the dependencies from viewName with the dependend tables | ||
this._tableDependencies = depTables; | ||
async.waterfall([ | ||
(callback) => { | ||
this._database.getConnection((error, connection)=>{ | ||
return callback(error, connection); | ||
}); | ||
}, | ||
(connection, callback) => { | ||
getDepsFromInfoSchema(connection, this._schemaName, this._baseName, (error, tableDependencies) => { | ||
this._database.releaseConnection(connection); | ||
return callback(error, tableDependencies); | ||
}); | ||
} | ||
], callback); | ||
} | ||
} | ||
module.exports.PGView = PGView; |
{ | ||
"name": "pure-live-pg", | ||
"version": "1.0.4", | ||
"version": "1.0.5", | ||
"description": "PostgreSQL package to support realtime applications", | ||
@@ -21,3 +21,3 @@ "main": "index.js", | ||
"fibers": "^2.0.0", | ||
"json-sql-builder": "^1.0.7", | ||
"json-sql-builder": "^1.0.8", | ||
"lodash": "^4.17.4", | ||
@@ -24,0 +24,0 @@ "md5": "^2.2.1", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
56326
1586
Updatedjson-sql-builder@^1.0.8