Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

pure-live-pg

Package Overview
Dependencies
Maintainers
1
Versions
57
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

pure-live-pg - npm Package Compare versions

Comparing version 1.0.6 to 1.0.7

162

lib/pg-database.js

@@ -34,24 +34,11 @@ 'use strict';

// _queriesByTable = {
// "public.people": [12, 34, 78, 4542, ...], --> Id's that reference to the queryCache
// "public.hobbies": [32, 38, 2378, 23242, ...] --> Id's that reference to the queryCache
// "public.people": [ // --> client and queryId's that reference to the queryCache
// { clientId: "cvbcsh", queryId: 56 },
// { clientId: "cvbcsh", queryId: 34 },
// { clientId: "324rew", queryId: 442 }
// ],
// "public.hobbies": [ ... ]
// }
this._queriesByTable = {};
// cache each queryId by Client and Table to perform a rerun on an reactive change event
//
// example:
// _queriesByClient = {
// "xf65as67": {
// "queries": [2, 17, 45],
// "tables" : {
// "public.people": [17, 45],
// "public.hobbies": [2]
// }
// }
// "a6a5axd3": {
// ...
// }
// }
this._queriesByClientTable = {};
// hash table with all currently running insert, update and delete's called by

@@ -97,17 +84,4 @@ // the table-object to wait for the notify-message to be done.

CREATE TRIGGER "${triggerName}" AFTER INSERT OR UPDATE OR DELETE ON "${schemaAndTable}"
FOR EACH STATEMENT EXECUTE PROCEDURE core_notify_changes();
FOR EACH STATEMENT EXECUTE PROCEDURE core_notify_changes();`;
-- DROP TRIGGER IF EXISTS "${triggerNameStmt}_insert" ON "${schemaAndTable}";
-- DROP TRIGGER IF EXISTS "${triggerNameStmt}_update" ON "${schemaAndTable}";
-- CREATE TRIGGER "${triggerNameStmt}_insert" AFTER INSERT ON "${schemaAndTable}"
-- REFERENCING NEW TABLE AS new_table
-- FOR EACH STATEMENT EXECUTE PROCEDURE core_notify_changes_stmt();
-- CREATE TRIGGER "${triggerNameStmt}_update" AFTER UPDATE ON "${schemaAndTable}"
-- REFERENCING OLD TABLE AS old_table NEW TABLE AS new_table
-- FOR EACH STATEMENT EXECUTE PROCEDURE core_notify_changes_stmt();`;
this.getConnection((error, connection) => {

@@ -143,25 +117,4 @@ if (error) return callback(error);

TRUNCATE "${this.defaultSchema}"."core_reactive";
TRUNCATE "${this.defaultSchema}"."core_reactive";`;
/* wird aktuell nicht benötig! läuft alles über den EACH STATEMENt TRIGGER
*
CREATE SEQUENCE IF NOT EXISTS "${this.defaultSchema}".core_notifications_id_seq
INCREMENT 1
START 1
MINVALUE 1
MAXVALUE 9223372036854775807
CACHE 1;
CREATE TABLE IF NOT EXISTS "${this.defaultSchema}".core_notifications
(
_id bigint NOT NULL DEFAULT nextval('core_notifications_id_seq'::regclass),
details json NOT NULL,
CONSTRAINT core_notifications_pkey PRIMARY KEY (_id)
)
WITH (
OIDS = FALSE
)
TABLESPACE ${this.tableSpace};
*/`;
this.getConnection((error, connection) => {

@@ -180,63 +133,24 @@ if (error) return callback(error);

CREATE OR REPLACE FUNCTION core_notify_changes() RETURNS TRIGGER AS $$
/* unused -------------------->
DECLARE
id VARCHAR(45);
optimistic_id VARCHAR(45);
olddata json;
newdata json;
notification json;
notify_id INTEGER;
--------------------------------> end unused */
client_id VARCHAR(45);
statement_id VARCHAR(45);
statement_target VARCHAR(255);
BEGIN
-- Convert the old and new row to JSON, based on the kind of action.
-- Action = DELETE? -> OLD row
-- Action = INSERT or UPDATE? -> NEW row
BEGIN
client_id = current_setting('core_settings.client_id')::text;
statement_id = current_setting('core_settings.statement_id')::text;
statement_target = current_setting('core_settings.statement_target')::text;
EXCEPTION WHEN others THEN
client_id = NULL;
statement_id = NULL;
statement_target = NULL;
END;
/* -----------------------------> Aktuell nicht genuzt
IF (TG_OP = 'DELETE') THEN
olddata = row_to_json(OLD);
id = OLD._id;
optimistic_id = OLD._optimistic_id;
ELSE
id = NEW._id;
optimistic_id = NEW._optimistic_id;
newdata = row_to_json(NEW);
IF (TG_OP = 'UPDATE') THEN
olddata = row_to_json(OLD);
END IF;
IF ( CONCAT(TG_TABLE_SCHEMA, '.', TG_TABLE_NAME) != statement_target ) THEN
statement_id = NULL;
END IF;
-- Contruct the notification as a JSON string.
notification = json_build_object(
'table', TG_TABLE_NAME,
'action', TG_OP,
'oldrow', olddata,
'newrow', newdata
);
INSERT INTO core_notifications(details) VALUES (notification) RETURNING _id INTO notify_id;
-- Execute pg_notify(channel, notification)
-- PERFORM pg_notify('core_reactive_event', notify_id::text);
PERFORM pg_notify(
'core_reactive_event',
json_build_object(
'trigger', 'EACH ROW',
'schema', TG_TABLE_SCHEMA,
'table', TG_TABLE_NAME,
'action', TG_OP,
'_id', id,
'optimistic_id', optimistic_id,
'notify_id', notify_id
)::text
);
---------> Ende */
PERFORM pg_notify(
'core_reactive_event',
json_build_object(
'type', 'STATEMENT',

@@ -246,8 +160,8 @@ 'schema', TG_TABLE_SCHEMA,

'action', TG_OP,
'client_id', current_setting('core_settings.client_id')::text,
'statement_id', current_setting('core_settings.statement_id')::text
'client_id', client_id,
'statement_id', statement_id,
'statement_target', statement_target
)::text
);
-- Result is ignored since this is an AFTER trigger

@@ -257,24 +171,4 @@ RETURN NULL;

$$ LANGUAGE plpgsql;
$$ LANGUAGE plpgsql;`;
/* ------------------------------------------------------------------------------> currently unused
CREATE OR REPLACE FUNCTION core_notify_changes_stmt() RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify(
'core_reactive_event',
json_build_object(
'trigger', 'STATEMENT',
'schema', TG_TABLE_SCHEMA,
'table', TG_TABLE_NAME,
'action', TG_OP
)::text
);
-- Result is ignored since this is an AFTER trigger
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
----------------------------------------------- unused end */`;
this.getConnection((error, connection) => {

@@ -376,3 +270,3 @@ if (error) return callback(error);

if (firstQueries.length == 0) {
if (this._runningStatements[data.statement_id]){
if (data.statement_id && this._runningStatements[data.statement_id]){
delete this._runningStatements[data.statement_id];

@@ -389,3 +283,3 @@ }

}
if (this._runningStatements[data.statement_id]){
if (data.statement_id && this._runningStatements[data.statement_id]){
delete this._runningStatements[data.statement_id];

@@ -392,0 +286,0 @@ }

@@ -329,3 +329,2 @@ 'use strict';

if (this._database._staledQueries.indexOf(this._queryId) == -1) {
console.log('Registered as staled');
this._database._staledQueries.push(this._queryId);

@@ -332,0 +331,0 @@ }

50

lib/pg-table.js

@@ -103,3 +103,3 @@ 'use strict';

}*/
_execStmt(stmt, clientId, callback){
_execStmt(stmt, clientId, schemaAndTable, callback){

@@ -127,2 +127,3 @@ if (!_.isFunction(callback) && _.isFunction(clientId)) {

connection.query(`SELECT set_config('core_settings.statement_id', $1, false);`, [statementId]);
connection.query(`SELECT set_config('core_settings.statement_target', $1, false);`, [schemaAndTable]);
if (clientId != PGTable.NO_CLIENT_ID_SUPPORTED) {

@@ -169,36 +170,39 @@ this._database._runningStatements[statementId] = true;

insert(docOrDocs, clientId, callback) {
let query = {
$insert: {
$into: this._schemaName + '.' + this._baseName,
$documents: docOrDocs
}
};
let schemaAndTable = this._schemaName + '.' + this._baseName,
query = {
$insert: {
$into: schemaAndTable,
$documents: docOrDocs
}
};
let stmt = sqlBuilder.build(query);
this._execStmt(stmt, clientId, callback);
this._execStmt(stmt, clientId, schemaAndTable, callback);
}
update(filter, document, clientId, callback){
let query = {
$update: {
$table: this._schemaName + '.' + this._baseName,
$set: document,
$where: filter
}
};
let schemaAndTable = this._schemaName + '.' + this._baseName,
query = {
$update: {
$table: schemaAndTable,
$set: document,
$where: filter
}
};
let stmt = sqlBuilder.build(query);
this._execStmt(stmt, clientId, callback);
this._execStmt(stmt, clientId, schemaAndTable, callback);
}
remove(filter, clientId, callback){
let query = {
$delete: {
$table: this._schemaName + '.' + this._baseName,
$where: filter
}
};
let schemaAndTable = this._schemaName + '.' + this._baseName,
query = {
$delete: {
$table: schemaAndTable,
$where: filter
}
};
let stmt = sqlBuilder.build(query);
this._execStmt(stmt, clientId, callback);
this._execStmt(stmt, clientId, schemaAndTable, callback);
}

@@ -205,0 +209,0 @@ }

{
"name": "pure-live-pg",
"version": "1.0.6",
"version": "1.0.7",
"description": "PostgreSQL package to support realtime applications",

@@ -5,0 +5,0 @@ "main": "index.js",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc