Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

db-watch

Package Overview
Dependencies
Maintainers
1
Versions
8
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

db-watch - npm Package Compare versions

Comparing version 0.0.4 to 0.0.5

lib/cache.fixture.d.ts

13

lib/dump.d.ts

@@ -5,17 +5,6 @@ /// <reference types="node" />

export declare const sorted: any;
export interface Shard {
index: number;
modulus: number;
}
export interface StreamRowsOptions {
shard?: Shard;
shardKey?: string;
shardInverse?: boolean;
version?: Date;
versionKey?: string;
}
export declare function newArraySink<X>(output: X[], transform?: (x: X) => X): (item: X) => void;
export declare function newSortedArraySink<X>(output: X[], compare: (a: X, b: X) => number, transform?: (x: X) => X, mergeExisting?: (existing: X, addItem: X) => void): (item: X) => void;
export declare function newSortedArraySink<X>(output: X[], compare: (a: X, b: X) => number, transform?: (x: X) => X, mergeLeft?: (existing: X, addItem: X) => void): (item: X) => void;
export declare function dump<X>(stream: ReadableStreamTree, sink: (x: X) => void): Promise<void>;
export declare function dumpStream(stream: ReadableStreamTree, output: Transform): Promise<undefined>;
//# sourceMappingURL=dump.d.ts.map

@@ -16,11 +16,11 @@ "use strict";

exports.newArraySink = newArraySink;
function newSortedArraySink(output, compare, transform, mergeExisting) {
function newSortedArraySink(output, compare, transform, mergeLeft) {
return function (item) {
var added = false;
var addItem = transform ? transform(item) : item;
if (mergeExisting) {
if (mergeLeft) {
var existing = exports.sorted.eq(output, item, compare);
added = existing >= 0;
if (added)
mergeExisting(output[existing], addItem);
mergeLeft(output[existing], addItem);
}

@@ -27,0 +27,0 @@ if (!added)

@@ -1,18 +0,12 @@

import { SplitQueryStream } from 'dbgate-query-splitter/lib/splitQueryStream';
import Knex from 'knex';
import StreamTree, { ReadableStreamTree } from 'tree-stream';
import { StreamRowsOptions } from './dump';
import { InsertRowsOptions } from './restore';
export declare function streamFromKnex(knex: Knex, query: Knex.QueryBuilder, options?: StreamRowsOptions): ReadableStreamTree;
export declare function streamToKnex(source: {
knex?: Knex;
transaction?: Knex.Transaction;
}, options: InsertRowsOptions): StreamTree.WritableStreamTree;
export declare function streamToKnexRaw(source: {
knex?: Knex;
transaction?: Knex.Transaction;
}, options?: {
returning?: boolean;
}): StreamTree.WritableStreamTree;
export declare function newStatementSplitterStream(type?: any): SplitQueryStream;
import { ReadableStreamTree } from 'tree-stream';
import { DatabaseLoaderSource, DatabaseLoaderSourceOptions, DatabaseLoadOptions, DatabaseTable, UpdatedRow } from './load';
export declare class KnexLoaderSource extends DatabaseLoaderSource {
knex: Knex;
options?: DatabaseLoaderSourceOptions | undefined;
constructor(knex: Knex, options?: DatabaseLoaderSourceOptions | undefined);
close(): Promise<void>;
fetch(table: DatabaseTable, update: UpdatedRow): Promise<any>;
load(table: DatabaseTable, options: DatabaseLoadOptions): ReadableStreamTree;
}
//# sourceMappingURL=knex.d.ts.map
"use strict";
var __extends = (this && this.__extends) || (function () {
var extendStatics = function (d, b) {
extendStatics = Object.setPrototypeOf ||
({ __proto__: [] } instanceof Array && function (d, b) { d.__proto__ = b; }) ||
function (d, b) { for (var p in b) if (Object.prototype.hasOwnProperty.call(b, p)) d[p] = b[p]; };
return extendStatics(d, b);
};
return function (d, b) {
if (typeof b !== "function" && b !== null)
throw new TypeError("Class extends value " + String(b) + " is not a constructor or null");
extendStatics(d, b);
function __() { this.constructor = d; }
d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __());
};
})();
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());
});
};
var __generator = (this && this.__generator) || function (thisArg, body) {
var _ = { label: 0, sent: function() { if (t[0] & 1) throw t[1]; return t[1]; }, trys: [], ops: [] }, f, y, t, g;
return g = { next: verb(0), "throw": verb(1), "return": verb(2) }, typeof Symbol === "function" && (g[Symbol.iterator] = function() { return this; }), g;
function verb(n) { return function (v) { return step([n, v]); }; }
function step(op) {
if (f) throw new TypeError("Generator is already executing.");
while (_) try {
if (f = 1, y && (t = op[0] & 2 ? y["return"] : op[0] ? y["throw"] || ((t = y["return"]) && t.call(y), 0) : y.next) && !(t = t.call(y, op[1])).done) return t;
if (y = 0, t) op = [op[0] & 2, t.value];
switch (op[0]) {
case 0: case 1: t = op; break;
case 4: _.label++; return { value: op[1], done: false };
case 5: _.label++; y = op[1]; op = [0]; continue;
case 7: op = _.ops.pop(); _.trys.pop(); continue;
default:
if (!(t = _.trys, t = t.length > 0 && t[t.length - 1]) && (op[0] === 6 || op[0] === 2)) { _ = 0; continue; }
if (op[0] === 3 && (!t || (op[1] > t[0] && op[1] < t[3]))) { _.label = op[1]; break; }
if (op[0] === 6 && _.label < t[1]) { _.label = t[1]; t = op; break; }
if (t && _.label < t[2]) { _.label = t[2]; _.ops.push(op); break; }
if (t[2]) _.ops.pop();
_.trys.pop(); continue;
}
op = body.call(thisArg, _);
} catch (e) { op = [6, e]; y = 0; } finally { f = t = 0; }
if (op[0] & 5) throw op[1]; return { value: op[0] ? op[1] : void 0, done: true };
}
};
var __importDefault = (this && this.__importDefault) || function (mod) {

@@ -6,78 +57,64 @@ return (mod && mod.__esModule) ? mod : { "default": mod };

Object.defineProperty(exports, "__esModule", { value: true });
exports.newStatementSplitterStream = exports.streamToKnexRaw = exports.streamToKnex = exports.streamFromKnex = void 0;
var byline_1 = __importDefault(require("byline"));
var options_1 = require("dbgate-query-splitter/lib/options");
var splitQueryStream_1 = require("dbgate-query-splitter/lib/splitQueryStream");
var through2_1 = __importDefault(require("through2"));
exports.KnexLoaderSource = void 0;
var tree_stream_1 = __importDefault(require("tree-stream"));
var batch2 = require('batch2');
function streamFromKnex(knex, query, options) {
if (options === null || options === void 0 ? void 0 : options.shard) {
query = query.where(knex.raw("shard(" + (options.shardKey || 'id') + ", " + options.shard.modulus + ")"), options.shardInverse ? '!=' : '=', options.shard.index);
var load_1 = require("./load");
var KnexLoaderSource = /** @class */ (function (_super) {
__extends(KnexLoaderSource, _super);
function KnexLoaderSource(knex, options) {
var _this = _super.call(this) || this;
_this.knex = knex;
_this.options = options;
return _this;
}
if (options === null || options === void 0 ? void 0 : options.version) {
query = query.where([(options === null || options === void 0 ? void 0 : options.versionKey) || 'updated_at', '>', options.version]);
}
return tree_stream_1.default.readable(query.stream());
}
exports.streamFromKnex = streamFromKnex;
function streamToKnex(source, options) {
var _a;
var stream = tree_stream_1.default.writable(through2_1.default.obj(function (data, _, callback) {
var _this = this;
var query = source.transaction
? source.transaction.batchInsert(options.table, data)
: source.knex.batchInsert(options.table, data);
if (options.returning)
query = query.returning(options.returning);
if (source.transaction)
query = query.transacting(source.transaction);
query
.then(function (result) {
if (options.returning)
_this.push(result);
callback();
})
.catch(function (err) {
throw err;
KnexLoaderSource.prototype.close = function () {
var _a;
return __awaiter(this, void 0, void 0, function () {
return __generator(this, function (_b) {
switch (_b.label) {
case 0:
if (!((_a = this.options) === null || _a === void 0 ? void 0 : _a.knexOwner)) return [3 /*break*/, 2];
return [4 /*yield*/, this.knex.destroy()];
case 1:
_b.sent();
_b.label = 2;
case 2: return [2 /*return*/];
}
});
});
}));
return stream.pipeFrom(batch2.obj({ size: (_a = options.batchSize) !== null && _a !== void 0 ? _a : 4000 }));
}
exports.streamToKnex = streamToKnex;
function streamToKnexRaw(source, options) {
var stream = tree_stream_1.default.writable(through2_1.default.obj(function (data, _, callback) {
var _this = this;
var text = data.replace(/\?/g, '\\?');
var query = source.transaction ? source.transaction.raw(text) : source.knex.raw(text);
query
.then(function (result) {
if (options === null || options === void 0 ? void 0 : options.returning)
_this.push(result);
callback();
})
.catch(function (err) {
throw err;
};
KnexLoaderSource.prototype.fetch = function (table, update) {
return __awaiter(this, void 0, void 0, function () {
var query, _i, _a, keyField, ret;
return __generator(this, function (_b) {
switch (_b.label) {
case 0:
query = this.knex(table.table);
for (_i = 0, _a = table.keyFields; _i < _a.length; _i++) {
keyField = _a[_i];
query = query.where(keyField, update.key[keyField]);
}
if (update.updated && update.updated.length > 0) {
query = query.returning(update.updated.filter(function (x) { return x !== 'archiving'; }));
}
return [4 /*yield*/, query];
case 1:
ret = _b.sent();
return [2 /*return*/, ret[0]];
}
});
});
}));
stream = stream.pipeFrom(newStatementSplitterStream((source.transaction || source.knex).context.client.config.client));
stream = stream.pipeFrom(byline_1.default.createStream());
return stream;
}
exports.streamToKnexRaw = streamToKnexRaw;
function newStatementSplitterStream(type) {
switch (type) {
case 'postgresql':
return new splitQueryStream_1.SplitQueryStream(options_1.postgreSplitterOptions);
case 'mysql':
return new splitQueryStream_1.SplitQueryStream(options_1.mysqlSplitterOptions);
case 'mssql':
return new splitQueryStream_1.SplitQueryStream(options_1.mssqlSplitterOptions);
case 'sqlite':
return new splitQueryStream_1.SplitQueryStream(options_1.sqliteSplitterOptions);
default:
return new splitQueryStream_1.SplitQueryStream(options_1.defaultSplitterOptions);
}
}
exports.newStatementSplitterStream = newStatementSplitterStream;
};
KnexLoaderSource.prototype.load = function (table, options) {
var query = this.knex(table.table);
if (options === null || options === void 0 ? void 0 : options.shard) {
query = query.where(this.knex.raw("shard(" + (table.shardField || 'id') + ", " + options.shard.modulus + ")"), options.shardInverse ? '!=' : '=', options.shard.index);
}
if (options === null || options === void 0 ? void 0 : options.version) {
query = query.where([table.versionField || 'updated_at', '>', options.version]);
}
return tree_stream_1.default.readable(query.stream());
};
return KnexLoaderSource;
}(load_1.DatabaseLoaderSource));
exports.KnexLoaderSource = KnexLoaderSource;
//# sourceMappingURL=knex.js.map

@@ -0,1 +1,3 @@

export declare function shardIntegerSQL(column: string, modulus: string | number): string;
export declare function shardTextSQL(column: string, modulus: string | number): string;
export declare const setupQueries: string[];

@@ -2,0 +4,0 @@ /**

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.dropTrigger = exports.createNotifyTrigger = exports.dropNotifyFunction = exports.createNotifyRowFieldsFunction = exports.createNotifyRowFunction = exports.createNotifyFunction = exports.setupQueries = void 0;
exports.dropTrigger = exports.createNotifyTrigger = exports.dropNotifyFunction = exports.createNotifyRowFieldsFunction = exports.createNotifyRowFunction = exports.createNotifyFunction = exports.setupQueries = exports.shardTextSQL = exports.shardIntegerSQL = void 0;
function shardIntegerSQL(column, modulus) {
return "mod(" + column + ", " + modulus + ")";
}
exports.shardIntegerSQL = shardIntegerSQL;
function shardTextSQL(column, modulus) {
return "mod(('x' || right(md5(" + column + "), 4))::bit(16)::int, " + modulus + ")";
}
exports.shardTextSQL = shardTextSQL;
exports.setupQueries = [
'CREATE EXTENSION hstore',
'CREATE EXTENSION IF NOT EXISTS hstore',
"CREATE OR REPLACE FUNCTION jsonb_to_hstore(jsonb)\n RETURNS hstore AS\n $func$\n SELECT COALESCE(hstore(array_agg(key), array_agg(value)), hstore(''))\n FROM jsonb_each_text($1)\n $func$ LANGUAGE sql IMMUTABLE STRICT",
"CREATE OR REPLACE FUNCTION shard(text, int)\n RETURNS int AS\n $func$\n SELECT mod(('x' || right(md5($1), 4))::bit(16)::int, $2)\n $func$ LANGUAGE sql IMMUTABLE STRICT",
"CREATE OR REPLACE FUNCTION link_from_url(text)\n RETURNS text AS\n $func$\n SELECT SUBSTR($1, STRPOS($1, '://') + 3)\n $func$ LANGUAGE sql IMMUTABLE STRICT",
"CREATE OR REPLACE FUNCTION shard(text, int)\n RETURNS int AS\n $func$\n " + shardTextSQL('$1', '$2') + "\n $func$ LANGUAGE sql IMMUTABLE STRICT",
];

@@ -10,0 +17,0 @@ /**

import { ClientConfig } from 'pg';
import { Subscriber } from 'pg-listen';
import { DatabaseListener, UpdatedRow } from '../watch';
export declare class PostgresListener implements DatabaseListener {
import { UpdatedRow } from '../load';
import { DatabaseWatcherSource, UpdateType } from '../watch';
export declare class PostgresTriggerWatcher extends DatabaseWatcherSource {
config: ClientConfig;
subscriber: Subscriber | undefined;
constructor(config: ClientConfig);
constructor(config: ClientConfig, updateType: UpdateType);
close(): Promise<void>;
watch(table: string, callback: (payload: UpdatedRow) => void): Promise<void>;
getSubscriber(): Promise<Subscriber<{
[channel: string]: any;
}>>;
listen(channel: string, callback: (payload: UpdatedRow) => void): Promise<void>;
}
//# sourceMappingURL=watch.d.ts.map
"use strict";
var __extends = (this && this.__extends) || (function () {
var extendStatics = function (d, b) {
extendStatics = Object.setPrototypeOf ||
({ __proto__: [] } instanceof Array && function (d, b) { d.__proto__ = b; }) ||
function (d, b) { for (var p in b) if (Object.prototype.hasOwnProperty.call(b, p)) d[p] = b[p]; };
return extendStatics(d, b);
};
return function (d, b) {
if (typeof b !== "function" && b !== null)
throw new TypeError("Class extends value " + String(b) + " is not a constructor or null");
extendStatics(d, b);
function __() { this.constructor = d; }
d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __());
};
})();
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {

@@ -42,9 +57,13 @@ function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }

Object.defineProperty(exports, "__esModule", { value: true });
exports.PostgresListener = void 0;
exports.PostgresTriggerWatcher = void 0;
var pg_listen_1 = __importDefault(require("pg-listen"));
var PostgresListener = /** @class */ (function () {
function PostgresListener(config) {
this.config = config;
var watch_1 = require("../watch");
var PostgresTriggerWatcher = /** @class */ (function (_super) {
__extends(PostgresTriggerWatcher, _super);
function PostgresTriggerWatcher(config, updateType) {
var _this = _super.call(this, watch_1.WatchMethod.Trigger, updateType) || this;
_this.config = config;
return _this;
}
PostgresListener.prototype.getSubscriber = function () {
PostgresTriggerWatcher.prototype.close = function () {
return __awaiter(this, void 0, void 0, function () {

@@ -54,12 +73,9 @@ return __generator(this, function (_a) {

case 0:
if (!!this.subscriber) return [3 /*break*/, 2];
this.subscriber = pg_listen_1.default(this.config);
this.subscriber.events.on('error', function (error) {
throw error;
});
return [4 /*yield*/, this.subscriber.connect()];
if (!this.subscriber)
return [2 /*return*/];
return [4 /*yield*/, this.subscriber.close()];
case 1:
_a.sent();
_a.label = 2;
case 2: return [2 /*return*/, this.subscriber];
this.subscriber = undefined;
return [2 /*return*/];
}

@@ -69,8 +85,10 @@ });

};
PostgresListener.prototype.listen = function (channel, callback) {
PostgresTriggerWatcher.prototype.watch = function (table, callback) {
return __awaiter(this, void 0, void 0, function () {
var subscriber;
var channel, subscriber;
return __generator(this, function (_a) {
switch (_a.label) {
case 0: return [4 /*yield*/, this.getSubscriber()];
case 0:
channel = table + "_updated";
return [4 /*yield*/, this.getSubscriber()];
case 1:

@@ -87,5 +105,24 @@ subscriber = _a.sent();

};
return PostgresListener;
}());
exports.PostgresListener = PostgresListener;
PostgresTriggerWatcher.prototype.getSubscriber = function () {
return __awaiter(this, void 0, void 0, function () {
return __generator(this, function (_a) {
switch (_a.label) {
case 0:
if (!!this.subscriber) return [3 /*break*/, 2];
this.subscriber = pg_listen_1.default(this.config);
this.subscriber.events.on('error', function (error) {
throw error;
});
return [4 /*yield*/, this.subscriber.connect()];
case 1:
_a.sent();
_a.label = 2;
case 2: return [2 /*return*/, this.subscriber];
}
});
});
};
return PostgresTriggerWatcher;
}(watch_1.DatabaseWatcherSource));
exports.PostgresTriggerWatcher = PostgresTriggerWatcher;
//# sourceMappingURL=watch.js.map

@@ -1,25 +0,75 @@

export interface DatabaseListener {
listen(channel: string, callback: (payload: UpdatedRow) => void): Promise<void>;
import { DatabaseLoaderSource, DatabaseTable, DatabaseTableParser, Shard, UpdatedRow } from './load';
export declare enum WatchMethod {
Log = 0,
Poll = 1,
Trigger = 2
}
export interface DatabaseSink<Item> {
keyFields: string[];
insert: (item: Item) => Item | null;
find: (update: UpdatedRow) => Item | null;
remove: (update: UpdatedRow) => Item | null;
update?: (record: Item, itemRecord: Record<string, any>) => void;
updated?: (current: Item, row: Record<string, any>) => void;
validate?: (row: Record<string, any>) => Item;
export declare enum AntiShardBehavior {
None = 0,
Interleaved = 1,
ShardFirst = 2,
ShardLast = 3
}
export interface UpdatedRow {
op: UpdateOp;
table?: string;
key: Record<string, any>;
row?: Record<string, any>;
updated?: string[];
export declare enum UpdateType {
Delta = 0,
Full = 1,
Key = 2
}
export declare enum UpdateOp {
Insert = "INSERT",
Update = "UPDATE",
Delete = "DELETE"
export interface IdempotentDatabaseSink<Item> extends DatabaseTableParser<Item> {
load: (item: Item) => void;
upsert: (key: Partial<Item>, value: Partial<Item> | null) => Item | null;
}
export interface MemoryDatabaseSink<Item> extends Partial<DatabaseTableParser<Item>> {
insert: (item: Item) => Item;
find: (key: Partial<Item>) => Item | null;
remove: (key: Partial<Item>) => Item | null;
update?: (current: Item, update: Partial<Item>) => void;
updated?: (current: Item | null, update: Partial<Item> | null) => void;
}
export interface DatabaseWatcherSink<Item> extends IdempotentDatabaseSink<Item> {
filterUpdate?: (item: UpdatedRow) => boolean;
}
export declare abstract class DatabaseWatcherSource {
watchMethod: WatchMethod;
updateType: UpdateType;
constructor(watchMethod: WatchMethod, updateType: UpdateType);
abstract watch(table: string, callback: (payload: UpdatedRow) => void): Promise<void>;
}
export interface DatabaseWatcherOptions {
antiShardBehavior?: AntiShardBehavior;
concurrency: number;
debug?: boolean;
shard?: Shard;
shardField?: string;
version?: Date;
}
export declare class DatabaseLoader<Item> {
source: DatabaseLoaderSource;
table: DatabaseTable;
sink: IdempotentDatabaseSink<Item>;
updateQueue: UpdatedRow[];
loading: boolean;
loaded: boolean;
version?: Date;
constructor(source: DatabaseLoaderSource, table: DatabaseTable, sink: IdempotentDatabaseSink<Item>);
startLoading(): void;
doneLoading(): UpdatedRow[];
delayLoading(updateFn: (update: UpdatedRow) => void): (update: UpdatedRow) => void;
delayLoadingWithFetchQueue(dequeue: () => void): (update: UpdatedRow) => void;
}
export declare class DatabaseWatcher<Item> extends DatabaseLoader<Item> {
source: DatabaseLoaderSource;
watcher: DatabaseWatcherSource;
table: DatabaseTable;
sink: DatabaseWatcherSink<Item>;
options: DatabaseWatcherOptions;
constructor(source: DatabaseLoaderSource, watcher: DatabaseWatcherSource, table: DatabaseTable, sink: DatabaseWatcherSink<Item>, options: DatabaseWatcherOptions);
watch(): Promise<void>;
init(): Promise<void>;
initialLoad(): Promise<void>;
fetchAndUpdate(): Promise<void>;
handleUpdatedRow(update: UpdatedRow): Item | null;
}
export declare function idempotentSinkFromMemorySink<Item>(table: DatabaseTable, sink: MemoryDatabaseSink<Item>, watcherOptions?: Partial<DatabaseWatcherSink<Item>>): DatabaseWatcherSink<Item>;
export declare function assignProps(record: Record<string, any>, itemRecord: Record<string, any>): void;
//# sourceMappingURL=watch.d.ts.map
"use strict";
// import pSettle from 'p-settle'
var __extends = (this && this.__extends) || (function () {
var extendStatics = function (d, b) {
extendStatics = Object.setPrototypeOf ||
({ __proto__: [] } instanceof Array && function (d, b) { d.__proto__ = b; }) ||
function (d, b) { for (var p in b) if (Object.prototype.hasOwnProperty.call(b, p)) d[p] = b[p]; };
return extendStatics(d, b);
};
return function (d, b) {
if (typeof b !== "function" && b !== null)
throw new TypeError("Class extends value " + String(b) + " is not a constructor or null");
extendStatics(d, b);
function __() { this.constructor = d; }
d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __());
};
})();
var __assign = (this && this.__assign) || function () {
__assign = Object.assign || function(t) {
for (var s, i = 1, n = arguments.length; i < n; i++) {
s = arguments[i];
for (var p in s) if (Object.prototype.hasOwnProperty.call(s, p))
t[p] = s[p];
}
return t;
};
return __assign.apply(this, arguments);
};
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());
});
};
var __generator = (this && this.__generator) || function (thisArg, body) {
var _ = { label: 0, sent: function() { if (t[0] & 1) throw t[1]; return t[1]; }, trys: [], ops: [] }, f, y, t, g;
return g = { next: verb(0), "throw": verb(1), "return": verb(2) }, typeof Symbol === "function" && (g[Symbol.iterator] = function() { return this; }), g;
function verb(n) { return function (v) { return step([n, v]); }; }
function step(op) {
if (f) throw new TypeError("Generator is already executing.");
while (_) try {
if (f = 1, y && (t = op[0] & 2 ? y["return"] : op[0] ? y["throw"] || ((t = y["return"]) && t.call(y), 0) : y.next) && !(t = t.call(y, op[1])).done) return t;
if (y = 0, t) op = [op[0] & 2, t.value];
switch (op[0]) {
case 0: case 1: t = op; break;
case 4: _.label++; return { value: op[1], done: false };
case 5: _.label++; y = op[1]; op = [0]; continue;
case 7: op = _.ops.pop(); _.trys.pop(); continue;
default:
if (!(t = _.trys, t = t.length > 0 && t[t.length - 1]) && (op[0] === 6 || op[0] === 2)) { _ = 0; continue; }
if (op[0] === 3 && (!t || (op[1] > t[0] && op[1] < t[3]))) { _.label = op[1]; break; }
if (op[0] === 6 && _.label < t[1]) { _.label = t[1]; t = op; break; }
if (t && _.label < t[2]) { _.label = t[2]; _.ops.push(op); break; }
if (t[2]) _.ops.pop();
_.trys.pop(); continue;
}
op = body.call(thisArg, _);
} catch (e) { op = [6, e]; y = 0; } finally { f = t = 0; }
if (op[0] & 5) throw op[1]; return { value: op[0] ? op[1] : void 0, done: true };
}
};
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
exports.UpdateOp = void 0;
var UpdateOp;
(function (UpdateOp) {
UpdateOp["Insert"] = "INSERT";
UpdateOp["Update"] = "UPDATE";
UpdateOp["Delete"] = "DELETE";
})(UpdateOp = exports.UpdateOp || (exports.UpdateOp = {}));
/*
export class DatabaseLoader<X> {
loading = false
loaded = false
updateQueue: UpdatedRow[] = []
startLoading() {
this.loading = true
}
doneLoading() {
const updates = this.updateQueue
this.updateQueue = []
this.loading = false
this.loaded = true
return updates
}
delayLoading(updateFn: (update: UpdatedRow) => void) {
return (update: UpdatedRow) => {
if (this.loading) this.updateQeue.push(update)
else updateFn(update)
exports.assignProps = exports.idempotentSinkFromMemorySink = exports.DatabaseWatcher = exports.DatabaseLoader = exports.DatabaseWatcherSource = exports.UpdateType = exports.AntiShardBehavior = exports.WatchMethod = void 0;
var p_settle_1 = __importDefault(require("p-settle"));
var dump_1 = require("./dump");
var load_1 = require("./load");
var WatchMethod;
(function (WatchMethod) {
WatchMethod[WatchMethod["Log"] = 0] = "Log";
WatchMethod[WatchMethod["Poll"] = 1] = "Poll";
WatchMethod[WatchMethod["Trigger"] = 2] = "Trigger";
})(WatchMethod = exports.WatchMethod || (exports.WatchMethod = {}));
var AntiShardBehavior;
(function (AntiShardBehavior) {
AntiShardBehavior[AntiShardBehavior["None"] = 0] = "None";
AntiShardBehavior[AntiShardBehavior["Interleaved"] = 1] = "Interleaved";
AntiShardBehavior[AntiShardBehavior["ShardFirst"] = 2] = "ShardFirst";
AntiShardBehavior[AntiShardBehavior["ShardLast"] = 3] = "ShardLast";
})(AntiShardBehavior = exports.AntiShardBehavior || (exports.AntiShardBehavior = {}));
var UpdateType;
(function (UpdateType) {
UpdateType[UpdateType["Delta"] = 0] = "Delta";
UpdateType[UpdateType["Full"] = 1] = "Full";
UpdateType[UpdateType["Key"] = 2] = "Key";
})(UpdateType = exports.UpdateType || (exports.UpdateType = {}));
var DatabaseWatcherSource = /** @class */ (function () {
function DatabaseWatcherSource(watchMethod, updateType) {
this.watchMethod = watchMethod;
this.updateType = updateType;
}
}
delayLoadingWithFetchQueue(dequeue: () => void) {
return (update: UpdatedRow) => {
this.updateQueue.push(update)
dequeue()
return DatabaseWatcherSource;
}());
exports.DatabaseWatcherSource = DatabaseWatcherSource;
var DatabaseLoader = /** @class */ (function () {
function DatabaseLoader(source, table, sink) {
this.source = source;
this.table = table;
this.sink = sink;
this.updateQueue = [];
this.loading = false;
this.loaded = false;
}
}
}
export class DatabaseWatcher<X> extends DatabaseLoader<X> {
constructor(public listener: DatabaseListener, public sink: DatabaseSink<X>) {}
async listen(channel: string) {
this.listener.listen(
'event_updated',
this.delayLoadingWithFetchQueue(() => this.fetchAndUpdateEvents())
),
async fetchAndUpdate(
filterFn: (update: UpdatedRow) => boolean,
parseFn: (update: UpdatedRow) => X,
fetchFn: (update: UpdatedRow) => Promise<X>,
updateFn: (update: UpdatedRow) => void,
concurrency = 2,
debug = false
) {
if (this.loading || !this.updateQueue.length) return
this.loading = true
while (this.updateQueue.length) {
const updates = this.updateQueue
if (debug) console.log(`fetchAndUpdate: queue=${updates.length}`)
this.updateQueue = []
const ready = await pSettle(
updates
.filter(filterFn)
.map((x) => () => (x.op === 'DELETE' ? Promise.resolve(parseFn(x)) : fetchFn(x))),
{ concurrency }
)
ready.forEach((x, i) =>
updateFn({
op: updates[i].op,
key: updates[i].key,
row: x.isFulfilled ? x.value : undefined,
})
)
DatabaseLoader.prototype.startLoading = function () {
this.loading = true;
};
DatabaseLoader.prototype.doneLoading = function () {
var updates = this.updateQueue;
this.updateQueue = [];
this.loading = false;
this.loaded = true;
return updates;
};
DatabaseLoader.prototype.delayLoading = function (updateFn) {
var _this = this;
return function (update) {
if (_this.loading)
_this.updateQueue.push(update);
else
updateFn(update);
};
};
DatabaseLoader.prototype.delayLoadingWithFetchQueue = function (dequeue) {
var _this = this;
return function (update) {
_this.updateQueue.push(update);
dequeue();
};
};
return DatabaseLoader;
}());
exports.DatabaseLoader = DatabaseLoader;
var DatabaseWatcher = /** @class */ (function (_super) {
__extends(DatabaseWatcher, _super);
function DatabaseWatcher(source, watcher, table, sink, options) {
var _this = _super.call(this, source, table, sink) || this;
_this.source = source;
_this.watcher = watcher;
_this.table = table;
_this.sink = sink;
_this.options = options;
return _this;
}
this.loading = false
}
DatabaseWatcher.prototype.watch = function () {
return __awaiter(this, void 0, void 0, function () {
var _this = this;
return __generator(this, function (_a) {
switch (_a.label) {
case 0: return [4 /*yield*/, this.watcher.watch(this.table.table, this.watcher.updateType === UpdateType.Key
? this.delayLoadingWithFetchQueue(function () { return _this.fetchAndUpdate(); })
: this.delayLoading(function (x) { return _this.handleUpdatedRow(x); }))];
case 1:
_a.sent();
return [2 /*return*/];
}
});
});
};
DatabaseWatcher.prototype.init = function () {
return __awaiter(this, void 0, void 0, function () {
return __generator(this, function (_a) {
switch (_a.label) {
case 0:
// const start = Date.now()
this.startLoading();
return [4 /*yield*/, this.watch()];
case 1:
_a.sent();
return [4 /*yield*/, this.initialLoad()];
case 2:
_a.sent();
return [2 /*return*/];
}
});
});
};
DatabaseWatcher.prototype.initialLoad = function () {
return __awaiter(this, void 0, void 0, function () {
var _i, _a, delayedUpdate;
return __generator(this, function (_b) {
switch (_b.label) {
case 0: return [4 /*yield*/, dump_1.dump(this.source.load(this.table, {
shard: this.options.shard,
version: this.options.version,
}), this.sink.load)];
case 1:
_b.sent();
for (_i = 0, _a = this.doneLoading(); _i < _a.length; _i++) {
delayedUpdate = _a[_i];
this.handleUpdatedRow(delayedUpdate);
}
return [2 /*return*/];
}
});
});
};
DatabaseWatcher.prototype.fetchAndUpdate = function () {
var _a, _b;
return __awaiter(this, void 0, void 0, function () {
var _loop_1, this_1;
var _this = this;
return __generator(this, function (_c) {
switch (_c.label) {
case 0:
if (this.loading || !this.updateQueue.length)
return [2 /*return*/];
this.loading = true;
_loop_1 = function () {
var updates, ready;
return __generator(this, function (_d) {
switch (_d.label) {
case 0:
updates = this_1.updateQueue;
if (this_1.options.debug)
console.log("fetchAndUpdate: queue=" + updates.length);
this_1.updateQueue = [];
return [4 /*yield*/, p_settle_1.default(updates
.filter((_b = (_a = this_1.sink) === null || _a === void 0 ? void 0 : _a.filterUpdate) !== null && _b !== void 0 ? _b : (function (x) { return x; }))
.map(function (update) { return function () {
return update.op === load_1.UpdateOp.Delete
? Promise.resolve(_this.sink.parseUpdate(update))
: _this.source.fetch(_this.table, update);
}; }), { concurrency: this_1.options.concurrency })];
case 1:
ready = _d.sent();
ready.forEach(function (x, i) {
return _this.handleUpdatedRow({
op: updates[i].op,
key: updates[i].key,
row: x.isFulfilled ? x.value : undefined,
});
});
return [2 /*return*/];
}
});
};
this_1 = this;
_c.label = 1;
case 1:
if (!this.updateQueue.length) return [3 /*break*/, 3];
return [5 /*yield**/, _loop_1()];
case 2:
_c.sent();
return [3 /*break*/, 1];
case 3:
this.loading = false;
return [2 /*return*/];
}
});
});
};
DatabaseWatcher.prototype.handleUpdatedRow = function (update) {
var _a, _b;
if (!update.row)
return null;
if (this.options.shard &&
!load_1.shardMatchText((_b = update.key[(_a = this.table.shardField) !== null && _a !== void 0 ? _a : '']) !== null && _b !== void 0 ? _b : '', this.options.shard)) {
return null;
}
if (this.sink.filterUpdate && !this.sink.filterUpdate(update))
return null;
var key = this.sink.parseRow(update.key);
var row = this.sink.parseUpdate(update);
switch (update.op) {
case load_1.UpdateOp.Insert:
case load_1.UpdateOp.Update:
return this.sink.upsert(key, row);
case load_1.UpdateOp.Delete:
return this.sink.upsert(key, null);
}
};
return DatabaseWatcher;
}(DatabaseLoader));
exports.DatabaseWatcher = DatabaseWatcher;
function idempotentSinkFromMemorySink(table, sink, watcherOptions) {
var _a, _b, _c, _d, _e;
var update = (_a = sink.update) !== null && _a !== void 0 ? _a : assignProps;
return __assign(__assign({}, watcherOptions), { upsert: function (key, row) {
var item = sink.find(key);
if (sink.updated)
sink.updated(item, row);
if (!row)
return sink.remove(key);
if (item) {
var rowkeys = Object.keys(row);
var rekey = rowkeys.some(function (k) { return table.keyFields.includes(k); });
if (rekey) {
var removedItem = sink.remove(key);
return sink.insert(__assign(__assign({}, (removedItem !== null && removedItem !== void 0 ? removedItem : item)), row));
}
else {
update(item, row);
return item;
}
}
else {
return sink.insert(row);
}
}, load: sink.insert, parseKey: (_c = (_b = watcherOptions === null || watcherOptions === void 0 ? void 0 : watcherOptions.parseKey) !== null && _b !== void 0 ? _b : sink.parseKey) !== null && _c !== void 0 ? _c : (function (x) { return x; }), parseRow: (_e = (_d = watcherOptions === null || watcherOptions === void 0 ? void 0 : watcherOptions.parseRow) !== null && _d !== void 0 ? _d : sink.parseRow) !== null && _e !== void 0 ? _e : (function (x) { return x; }), parseUpdate: function (x) { return x.row; } });
}
export function applyUpdatedRow<Item>(sink: DatabaseSink<Item>, update: UpdatedRow) {
if (!update.row) return null
switch (update.op) {
case UpdateOp.Insert:
return sink.insert(update.row as Item)
case UpdateOp.Update:
const item = sink.find(update.key)
if (item === undefined) {
return null
} else if (isUpdate) {
const keys = Object.keys(update.row)
const rekey = keyFields.some((k) => keys.includes(k))
if (rekey) {
const item = remove(key)
return insert({ ...item, ...update.row })
} else {
const currentItem = sink.getFn(key)
const record = currentItem as Record<string, any>
if (updated) updated(currentItem, update.row)
sink.update(record, update.row)
return currentItem
exports.idempotentSinkFromMemorySink = idempotentSinkFromMemorySink;
function assignProps(record, itemRecord) {
Object.keys(itemRecord).forEach(function (k) {
var _a;
var propk = itemRecord[k];
if (propk instanceof Object) {
var outk_1 = (_a = record[k]) !== null && _a !== void 0 ? _a : (record[k] = {});
Object.keys(propk).forEach(function (l) { return (outk_1[l] = propk[l]); });
}
}
break
case UpdateOp.Remove:
remove(key)
}
return true
else {
record[k] = propk;
}
});
}
export function handleUpdatedRowItem<Item, Sink>(
op: string,
item: X,
sink: DatabaseSink<Item, Key>,
filter?: (x: X) => boolean,
) {
const itemRecord = item as Record<string, any>
const isUpdate = op === 'UPDATE'
if (isUpdate || op === 'DELETE') {
const index = findIndex(item)
if (index < 0) {
if (!filter || filter(item)) return null
} else if (isUpdate) {
const currentItem = sink.getFn(key)
const record = currentItem as Record<string, any>
let rekey = false
keyFields.forEach(
(k) => (rekey = rekey || record[k] < itemRecord[k] || itemRecord[k] < record[k])
)
if (rekey) {
remove(index)
assignUpdate(record, itemRecord)
insert(currentItem)
} else {
if (updated) updated(currentItem, itemRecord)
assignUpdate(record, itemRecord)
}
return currentItem
} else return remove(index)
} else if (op === 'INSERT') {
if (!filter || filter(item)) {
insert(item)
return item
} else {
return null
}
} else {
return null
}
}
export function assignProps(record: Record<string, any>, itemRecord: Record<string, any>) {
Object.keys(itemRecord).forEach((k) => {
const propk = itemRecord[k]
if (propk instanceof Object) {
const outk = record[k] ?? (record[k] = {})
Object.keys(propk).forEach((l) => (outk[l] = propk[l]))
} else {
record[k] = propk
}
})
}
*/
exports.assignProps = assignProps;
//# sourceMappingURL=watch.js.map
{
"name": "db-watch",
"version": "0.0.4",
"description": "Create Postgres triggers and watch notify listeners",
"version": "0.0.5",
"description": "Database replication, inlcuding Postgres triggers and monitoring with notify listeners",
"author": "wholebuzz",

@@ -19,2 +19,3 @@ "license": "Apache-2.0",

"test:coverage": "jest --runInBand --ci --passWithNoTests --coverage --no-cache",
"badge:coverage": "istanbul-cobertura-badger -v -b coverage",
"lint": "tslint -c tslint.json --project .",

@@ -29,19 +30,22 @@ "fix": "yarn lint --fix",

"dependencies": {
"batch2": "^2.0.0",
"byline": "^5.0.0",
"dbgate-query-splitter": "4.4.0-alpha.2",
"p-settle": "^4.1.1",
"sorted-array-functions": "^1.3.0",
"through2": "^3.0.1",
"tree-stream": "^1.0.13"
"tree-stream": "^1.0.14"
},
"devDependencies": {
"@types/byline": "^4.2.33",
"@types/jest": "^26.0.22",
"@types/node": "^13.13.5",
"@types/pg": "^8.6.1",
"@types/rimraf": "^3.0.0",
"@types/through2": "^2.0.36",
"dbcp": "^1.0.2",
"dotenv": "^10.0.0",
"jest": "^26.6.3",
"hasha": "^5.2.2",
"istanbul-cobertura-badger": "^1.3.1",
"knex": "^0.21.1",
"pg-listen": "^1.5.1",
"prettier": "^2.3.2",
"rimraf": "^3.0.2",
"ts-jest": "^26.5.4",

@@ -48,0 +52,0 @@ "tslint": "^5.20.0",

@@ -1,18 +0,18 @@

# pg-watch
# db-watch [![image](https://img.shields.io/npm/v/db-watch)](https://www.npmjs.com/package/db-watch) [![test](https://github.com/wholebuzz/db-watch/actions/workflows/test.yaml/badge.svg)](https://github.com/wholebuzz/db-watch/actions/workflows/test.yaml)
Create Postgres triggers and watch notify listeners.
Various implementation of database replication, including creating Postgres triggers and monitoring with notify listeners.
## Example
```
```typescript
// Add to migrations
// for (const query of setupQueries) await knex.raw(query)
for (const query of setupQueries) await knex.raw(query)
// After CREATE TABLE users
// await knex.raw(createNotifyRowFunction('users', 'updated', "'username', orig.username"))
// await knex.raw(createNotifyTrigger('users', 'updated'))
await knex.raw(createNotifyRowFunction('users', 'updated', "'username', orig.username"))
await knex.raw(createNotifyTrigger('users', 'updated'))
// Maximum PostgreSQL NOTIFY payload is 8192 bytes.
// await knex.raw(createNotifyRowFieldsFunction('event', 'updated', "'date', orig.date, 'guid', orig.guid))
// await knex.raw(createNotifyTrigger('event', 'updated'))
await knex.raw(createNotifyRowFieldsFunction('event', 'updated', "'date', orig.date, 'guid', orig.guid))
await knex.raw(createNotifyTrigger('event', 'updated'))
```

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc