Comparing version 0.0.4 to 0.0.5
@@ -5,17 +5,6 @@ /// <reference types="node" /> | ||
export declare const sorted: any; | ||
export interface Shard { | ||
index: number; | ||
modulus: number; | ||
} | ||
export interface StreamRowsOptions { | ||
shard?: Shard; | ||
shardKey?: string; | ||
shardInverse?: boolean; | ||
version?: Date; | ||
versionKey?: string; | ||
} | ||
export declare function newArraySink<X>(output: X[], transform?: (x: X) => X): (item: X) => void; | ||
export declare function newSortedArraySink<X>(output: X[], compare: (a: X, b: X) => number, transform?: (x: X) => X, mergeExisting?: (existing: X, addItem: X) => void): (item: X) => void; | ||
export declare function newSortedArraySink<X>(output: X[], compare: (a: X, b: X) => number, transform?: (x: X) => X, mergeLeft?: (existing: X, addItem: X) => void): (item: X) => void; | ||
export declare function dump<X>(stream: ReadableStreamTree, sink: (x: X) => void): Promise<void>; | ||
export declare function dumpStream(stream: ReadableStreamTree, output: Transform): Promise<undefined>; | ||
//# sourceMappingURL=dump.d.ts.map |
@@ -16,11 +16,11 @@ "use strict"; | ||
exports.newArraySink = newArraySink; | ||
function newSortedArraySink(output, compare, transform, mergeExisting) { | ||
function newSortedArraySink(output, compare, transform, mergeLeft) { | ||
return function (item) { | ||
var added = false; | ||
var addItem = transform ? transform(item) : item; | ||
if (mergeExisting) { | ||
if (mergeLeft) { | ||
var existing = exports.sorted.eq(output, item, compare); | ||
added = existing >= 0; | ||
if (added) | ||
mergeExisting(output[existing], addItem); | ||
mergeLeft(output[existing], addItem); | ||
} | ||
@@ -27,0 +27,0 @@ if (!added) |
@@ -1,18 +0,12 @@ | ||
import { SplitQueryStream } from 'dbgate-query-splitter/lib/splitQueryStream'; | ||
import Knex from 'knex'; | ||
import StreamTree, { ReadableStreamTree } from 'tree-stream'; | ||
import { StreamRowsOptions } from './dump'; | ||
import { InsertRowsOptions } from './restore'; | ||
export declare function streamFromKnex(knex: Knex, query: Knex.QueryBuilder, options?: StreamRowsOptions): ReadableStreamTree; | ||
export declare function streamToKnex(source: { | ||
knex?: Knex; | ||
transaction?: Knex.Transaction; | ||
}, options: InsertRowsOptions): StreamTree.WritableStreamTree; | ||
export declare function streamToKnexRaw(source: { | ||
knex?: Knex; | ||
transaction?: Knex.Transaction; | ||
}, options?: { | ||
returning?: boolean; | ||
}): StreamTree.WritableStreamTree; | ||
export declare function newStatementSplitterStream(type?: any): SplitQueryStream; | ||
import { ReadableStreamTree } from 'tree-stream'; | ||
import { DatabaseLoaderSource, DatabaseLoaderSourceOptions, DatabaseLoadOptions, DatabaseTable, UpdatedRow } from './load'; | ||
export declare class KnexLoaderSource extends DatabaseLoaderSource { | ||
knex: Knex; | ||
options?: DatabaseLoaderSourceOptions | undefined; | ||
constructor(knex: Knex, options?: DatabaseLoaderSourceOptions | undefined); | ||
close(): Promise<void>; | ||
fetch(table: DatabaseTable, update: UpdatedRow): Promise<any>; | ||
load(table: DatabaseTable, options: DatabaseLoadOptions): ReadableStreamTree; | ||
} | ||
//# sourceMappingURL=knex.d.ts.map |
181
lib/knex.js
"use strict"; | ||
var __extends = (this && this.__extends) || (function () { | ||
var extendStatics = function (d, b) { | ||
extendStatics = Object.setPrototypeOf || | ||
({ __proto__: [] } instanceof Array && function (d, b) { d.__proto__ = b; }) || | ||
function (d, b) { for (var p in b) if (Object.prototype.hasOwnProperty.call(b, p)) d[p] = b[p]; }; | ||
return extendStatics(d, b); | ||
}; | ||
return function (d, b) { | ||
if (typeof b !== "function" && b !== null) | ||
throw new TypeError("Class extends value " + String(b) + " is not a constructor or null"); | ||
extendStatics(d, b); | ||
function __() { this.constructor = d; } | ||
d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __()); | ||
}; | ||
})(); | ||
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { | ||
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); } | ||
return new (P || (P = Promise))(function (resolve, reject) { | ||
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } } | ||
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } } | ||
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); } | ||
step((generator = generator.apply(thisArg, _arguments || [])).next()); | ||
}); | ||
}; | ||
var __generator = (this && this.__generator) || function (thisArg, body) { | ||
var _ = { label: 0, sent: function() { if (t[0] & 1) throw t[1]; return t[1]; }, trys: [], ops: [] }, f, y, t, g; | ||
return g = { next: verb(0), "throw": verb(1), "return": verb(2) }, typeof Symbol === "function" && (g[Symbol.iterator] = function() { return this; }), g; | ||
function verb(n) { return function (v) { return step([n, v]); }; } | ||
function step(op) { | ||
if (f) throw new TypeError("Generator is already executing."); | ||
while (_) try { | ||
if (f = 1, y && (t = op[0] & 2 ? y["return"] : op[0] ? y["throw"] || ((t = y["return"]) && t.call(y), 0) : y.next) && !(t = t.call(y, op[1])).done) return t; | ||
if (y = 0, t) op = [op[0] & 2, t.value]; | ||
switch (op[0]) { | ||
case 0: case 1: t = op; break; | ||
case 4: _.label++; return { value: op[1], done: false }; | ||
case 5: _.label++; y = op[1]; op = [0]; continue; | ||
case 7: op = _.ops.pop(); _.trys.pop(); continue; | ||
default: | ||
if (!(t = _.trys, t = t.length > 0 && t[t.length - 1]) && (op[0] === 6 || op[0] === 2)) { _ = 0; continue; } | ||
if (op[0] === 3 && (!t || (op[1] > t[0] && op[1] < t[3]))) { _.label = op[1]; break; } | ||
if (op[0] === 6 && _.label < t[1]) { _.label = t[1]; t = op; break; } | ||
if (t && _.label < t[2]) { _.label = t[2]; _.ops.push(op); break; } | ||
if (t[2]) _.ops.pop(); | ||
_.trys.pop(); continue; | ||
} | ||
op = body.call(thisArg, _); | ||
} catch (e) { op = [6, e]; y = 0; } finally { f = t = 0; } | ||
if (op[0] & 5) throw op[1]; return { value: op[0] ? op[1] : void 0, done: true }; | ||
} | ||
}; | ||
var __importDefault = (this && this.__importDefault) || function (mod) { | ||
@@ -6,78 +57,64 @@ return (mod && mod.__esModule) ? mod : { "default": mod }; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.newStatementSplitterStream = exports.streamToKnexRaw = exports.streamToKnex = exports.streamFromKnex = void 0; | ||
var byline_1 = __importDefault(require("byline")); | ||
var options_1 = require("dbgate-query-splitter/lib/options"); | ||
var splitQueryStream_1 = require("dbgate-query-splitter/lib/splitQueryStream"); | ||
var through2_1 = __importDefault(require("through2")); | ||
exports.KnexLoaderSource = void 0; | ||
var tree_stream_1 = __importDefault(require("tree-stream")); | ||
var batch2 = require('batch2'); | ||
function streamFromKnex(knex, query, options) { | ||
if (options === null || options === void 0 ? void 0 : options.shard) { | ||
query = query.where(knex.raw("shard(" + (options.shardKey || 'id') + ", " + options.shard.modulus + ")"), options.shardInverse ? '!=' : '=', options.shard.index); | ||
var load_1 = require("./load"); | ||
var KnexLoaderSource = /** @class */ (function (_super) { | ||
__extends(KnexLoaderSource, _super); | ||
function KnexLoaderSource(knex, options) { | ||
var _this = _super.call(this) || this; | ||
_this.knex = knex; | ||
_this.options = options; | ||
return _this; | ||
} | ||
if (options === null || options === void 0 ? void 0 : options.version) { | ||
query = query.where([(options === null || options === void 0 ? void 0 : options.versionKey) || 'updated_at', '>', options.version]); | ||
} | ||
return tree_stream_1.default.readable(query.stream()); | ||
} | ||
exports.streamFromKnex = streamFromKnex; | ||
function streamToKnex(source, options) { | ||
var _a; | ||
var stream = tree_stream_1.default.writable(through2_1.default.obj(function (data, _, callback) { | ||
var _this = this; | ||
var query = source.transaction | ||
? source.transaction.batchInsert(options.table, data) | ||
: source.knex.batchInsert(options.table, data); | ||
if (options.returning) | ||
query = query.returning(options.returning); | ||
if (source.transaction) | ||
query = query.transacting(source.transaction); | ||
query | ||
.then(function (result) { | ||
if (options.returning) | ||
_this.push(result); | ||
callback(); | ||
}) | ||
.catch(function (err) { | ||
throw err; | ||
KnexLoaderSource.prototype.close = function () { | ||
var _a; | ||
return __awaiter(this, void 0, void 0, function () { | ||
return __generator(this, function (_b) { | ||
switch (_b.label) { | ||
case 0: | ||
if (!((_a = this.options) === null || _a === void 0 ? void 0 : _a.knexOwner)) return [3 /*break*/, 2]; | ||
return [4 /*yield*/, this.knex.destroy()]; | ||
case 1: | ||
_b.sent(); | ||
_b.label = 2; | ||
case 2: return [2 /*return*/]; | ||
} | ||
}); | ||
}); | ||
})); | ||
return stream.pipeFrom(batch2.obj({ size: (_a = options.batchSize) !== null && _a !== void 0 ? _a : 4000 })); | ||
} | ||
exports.streamToKnex = streamToKnex; | ||
function streamToKnexRaw(source, options) { | ||
var stream = tree_stream_1.default.writable(through2_1.default.obj(function (data, _, callback) { | ||
var _this = this; | ||
var text = data.replace(/\?/g, '\\?'); | ||
var query = source.transaction ? source.transaction.raw(text) : source.knex.raw(text); | ||
query | ||
.then(function (result) { | ||
if (options === null || options === void 0 ? void 0 : options.returning) | ||
_this.push(result); | ||
callback(); | ||
}) | ||
.catch(function (err) { | ||
throw err; | ||
}; | ||
KnexLoaderSource.prototype.fetch = function (table, update) { | ||
return __awaiter(this, void 0, void 0, function () { | ||
var query, _i, _a, keyField, ret; | ||
return __generator(this, function (_b) { | ||
switch (_b.label) { | ||
case 0: | ||
query = this.knex(table.table); | ||
for (_i = 0, _a = table.keyFields; _i < _a.length; _i++) { | ||
keyField = _a[_i]; | ||
query = query.where(keyField, update.key[keyField]); | ||
} | ||
if (update.updated && update.updated.length > 0) { | ||
query = query.returning(update.updated.filter(function (x) { return x !== 'archiving'; })); | ||
} | ||
return [4 /*yield*/, query]; | ||
case 1: | ||
ret = _b.sent(); | ||
return [2 /*return*/, ret[0]]; | ||
} | ||
}); | ||
}); | ||
})); | ||
stream = stream.pipeFrom(newStatementSplitterStream((source.transaction || source.knex).context.client.config.client)); | ||
stream = stream.pipeFrom(byline_1.default.createStream()); | ||
return stream; | ||
} | ||
exports.streamToKnexRaw = streamToKnexRaw; | ||
function newStatementSplitterStream(type) { | ||
switch (type) { | ||
case 'postgresql': | ||
return new splitQueryStream_1.SplitQueryStream(options_1.postgreSplitterOptions); | ||
case 'mysql': | ||
return new splitQueryStream_1.SplitQueryStream(options_1.mysqlSplitterOptions); | ||
case 'mssql': | ||
return new splitQueryStream_1.SplitQueryStream(options_1.mssqlSplitterOptions); | ||
case 'sqlite': | ||
return new splitQueryStream_1.SplitQueryStream(options_1.sqliteSplitterOptions); | ||
default: | ||
return new splitQueryStream_1.SplitQueryStream(options_1.defaultSplitterOptions); | ||
} | ||
} | ||
exports.newStatementSplitterStream = newStatementSplitterStream; | ||
}; | ||
KnexLoaderSource.prototype.load = function (table, options) { | ||
var query = this.knex(table.table); | ||
if (options === null || options === void 0 ? void 0 : options.shard) { | ||
query = query.where(this.knex.raw("shard(" + (table.shardField || 'id') + ", " + options.shard.modulus + ")"), options.shardInverse ? '!=' : '=', options.shard.index); | ||
} | ||
if (options === null || options === void 0 ? void 0 : options.version) { | ||
query = query.where([table.versionField || 'updated_at', '>', options.version]); | ||
} | ||
return tree_stream_1.default.readable(query.stream()); | ||
}; | ||
return KnexLoaderSource; | ||
}(load_1.DatabaseLoaderSource)); | ||
exports.KnexLoaderSource = KnexLoaderSource; | ||
//# sourceMappingURL=knex.js.map |
@@ -0,1 +1,3 @@ | ||
export declare function shardIntegerSQL(column: string, modulus: string | number): string; | ||
export declare function shardTextSQL(column: string, modulus: string | number): string; | ||
export declare const setupQueries: string[]; | ||
@@ -2,0 +4,0 @@ /** |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.dropTrigger = exports.createNotifyTrigger = exports.dropNotifyFunction = exports.createNotifyRowFieldsFunction = exports.createNotifyRowFunction = exports.createNotifyFunction = exports.setupQueries = void 0; | ||
exports.dropTrigger = exports.createNotifyTrigger = exports.dropNotifyFunction = exports.createNotifyRowFieldsFunction = exports.createNotifyRowFunction = exports.createNotifyFunction = exports.setupQueries = exports.shardTextSQL = exports.shardIntegerSQL = void 0; | ||
function shardIntegerSQL(column, modulus) { | ||
return "mod(" + column + ", " + modulus + ")"; | ||
} | ||
exports.shardIntegerSQL = shardIntegerSQL; | ||
function shardTextSQL(column, modulus) { | ||
return "mod(('x' || right(md5(" + column + "), 4))::bit(16)::int, " + modulus + ")"; | ||
} | ||
exports.shardTextSQL = shardTextSQL; | ||
exports.setupQueries = [ | ||
'CREATE EXTENSION hstore', | ||
'CREATE EXTENSION IF NOT EXISTS hstore', | ||
"CREATE OR REPLACE FUNCTION jsonb_to_hstore(jsonb)\n RETURNS hstore AS\n $func$\n SELECT COALESCE(hstore(array_agg(key), array_agg(value)), hstore(''))\n FROM jsonb_each_text($1)\n $func$ LANGUAGE sql IMMUTABLE STRICT", | ||
"CREATE OR REPLACE FUNCTION shard(text, int)\n RETURNS int AS\n $func$\n SELECT mod(('x' || right(md5($1), 4))::bit(16)::int, $2)\n $func$ LANGUAGE sql IMMUTABLE STRICT", | ||
"CREATE OR REPLACE FUNCTION link_from_url(text)\n RETURNS text AS\n $func$\n SELECT SUBSTR($1, STRPOS($1, '://') + 3)\n $func$ LANGUAGE sql IMMUTABLE STRICT", | ||
"CREATE OR REPLACE FUNCTION shard(text, int)\n RETURNS int AS\n $func$\n " + shardTextSQL('$1', '$2') + "\n $func$ LANGUAGE sql IMMUTABLE STRICT", | ||
]; | ||
@@ -10,0 +17,0 @@ /** |
import { ClientConfig } from 'pg'; | ||
import { Subscriber } from 'pg-listen'; | ||
import { DatabaseListener, UpdatedRow } from '../watch'; | ||
export declare class PostgresListener implements DatabaseListener { | ||
import { UpdatedRow } from '../load'; | ||
import { DatabaseWatcherSource, UpdateType } from '../watch'; | ||
export declare class PostgresTriggerWatcher extends DatabaseWatcherSource { | ||
config: ClientConfig; | ||
subscriber: Subscriber | undefined; | ||
constructor(config: ClientConfig); | ||
constructor(config: ClientConfig, updateType: UpdateType); | ||
close(): Promise<void>; | ||
watch(table: string, callback: (payload: UpdatedRow) => void): Promise<void>; | ||
getSubscriber(): Promise<Subscriber<{ | ||
[channel: string]: any; | ||
}>>; | ||
listen(channel: string, callback: (payload: UpdatedRow) => void): Promise<void>; | ||
} | ||
//# sourceMappingURL=watch.d.ts.map |
"use strict"; | ||
var __extends = (this && this.__extends) || (function () { | ||
var extendStatics = function (d, b) { | ||
extendStatics = Object.setPrototypeOf || | ||
({ __proto__: [] } instanceof Array && function (d, b) { d.__proto__ = b; }) || | ||
function (d, b) { for (var p in b) if (Object.prototype.hasOwnProperty.call(b, p)) d[p] = b[p]; }; | ||
return extendStatics(d, b); | ||
}; | ||
return function (d, b) { | ||
if (typeof b !== "function" && b !== null) | ||
throw new TypeError("Class extends value " + String(b) + " is not a constructor or null"); | ||
extendStatics(d, b); | ||
function __() { this.constructor = d; } | ||
d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __()); | ||
}; | ||
})(); | ||
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { | ||
@@ -42,9 +57,13 @@ function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); } | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.PostgresListener = void 0; | ||
exports.PostgresTriggerWatcher = void 0; | ||
var pg_listen_1 = __importDefault(require("pg-listen")); | ||
var PostgresListener = /** @class */ (function () { | ||
function PostgresListener(config) { | ||
this.config = config; | ||
var watch_1 = require("../watch"); | ||
var PostgresTriggerWatcher = /** @class */ (function (_super) { | ||
__extends(PostgresTriggerWatcher, _super); | ||
function PostgresTriggerWatcher(config, updateType) { | ||
var _this = _super.call(this, watch_1.WatchMethod.Trigger, updateType) || this; | ||
_this.config = config; | ||
return _this; | ||
} | ||
PostgresListener.prototype.getSubscriber = function () { | ||
PostgresTriggerWatcher.prototype.close = function () { | ||
return __awaiter(this, void 0, void 0, function () { | ||
@@ -54,12 +73,9 @@ return __generator(this, function (_a) { | ||
case 0: | ||
if (!!this.subscriber) return [3 /*break*/, 2]; | ||
this.subscriber = pg_listen_1.default(this.config); | ||
this.subscriber.events.on('error', function (error) { | ||
throw error; | ||
}); | ||
return [4 /*yield*/, this.subscriber.connect()]; | ||
if (!this.subscriber) | ||
return [2 /*return*/]; | ||
return [4 /*yield*/, this.subscriber.close()]; | ||
case 1: | ||
_a.sent(); | ||
_a.label = 2; | ||
case 2: return [2 /*return*/, this.subscriber]; | ||
this.subscriber = undefined; | ||
return [2 /*return*/]; | ||
} | ||
@@ -69,8 +85,10 @@ }); | ||
}; | ||
PostgresListener.prototype.listen = function (channel, callback) { | ||
PostgresTriggerWatcher.prototype.watch = function (table, callback) { | ||
return __awaiter(this, void 0, void 0, function () { | ||
var subscriber; | ||
var channel, subscriber; | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: return [4 /*yield*/, this.getSubscriber()]; | ||
case 0: | ||
channel = table + "_updated"; | ||
return [4 /*yield*/, this.getSubscriber()]; | ||
case 1: | ||
@@ -87,5 +105,24 @@ subscriber = _a.sent(); | ||
}; | ||
return PostgresListener; | ||
}()); | ||
exports.PostgresListener = PostgresListener; | ||
PostgresTriggerWatcher.prototype.getSubscriber = function () { | ||
return __awaiter(this, void 0, void 0, function () { | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: | ||
if (!!this.subscriber) return [3 /*break*/, 2]; | ||
this.subscriber = pg_listen_1.default(this.config); | ||
this.subscriber.events.on('error', function (error) { | ||
throw error; | ||
}); | ||
return [4 /*yield*/, this.subscriber.connect()]; | ||
case 1: | ||
_a.sent(); | ||
_a.label = 2; | ||
case 2: return [2 /*return*/, this.subscriber]; | ||
} | ||
}); | ||
}); | ||
}; | ||
return PostgresTriggerWatcher; | ||
}(watch_1.DatabaseWatcherSource)); | ||
exports.PostgresTriggerWatcher = PostgresTriggerWatcher; | ||
//# sourceMappingURL=watch.js.map |
@@ -1,25 +0,75 @@ | ||
export interface DatabaseListener { | ||
listen(channel: string, callback: (payload: UpdatedRow) => void): Promise<void>; | ||
import { DatabaseLoaderSource, DatabaseTable, DatabaseTableParser, Shard, UpdatedRow } from './load'; | ||
export declare enum WatchMethod { | ||
Log = 0, | ||
Poll = 1, | ||
Trigger = 2 | ||
} | ||
export interface DatabaseSink<Item> { | ||
keyFields: string[]; | ||
insert: (item: Item) => Item | null; | ||
find: (update: UpdatedRow) => Item | null; | ||
remove: (update: UpdatedRow) => Item | null; | ||
update?: (record: Item, itemRecord: Record<string, any>) => void; | ||
updated?: (current: Item, row: Record<string, any>) => void; | ||
validate?: (row: Record<string, any>) => Item; | ||
export declare enum AntiShardBehavior { | ||
None = 0, | ||
Interleaved = 1, | ||
ShardFirst = 2, | ||
ShardLast = 3 | ||
} | ||
export interface UpdatedRow { | ||
op: UpdateOp; | ||
table?: string; | ||
key: Record<string, any>; | ||
row?: Record<string, any>; | ||
updated?: string[]; | ||
export declare enum UpdateType { | ||
Delta = 0, | ||
Full = 1, | ||
Key = 2 | ||
} | ||
export declare enum UpdateOp { | ||
Insert = "INSERT", | ||
Update = "UPDATE", | ||
Delete = "DELETE" | ||
export interface IdempotentDatabaseSink<Item> extends DatabaseTableParser<Item> { | ||
load: (item: Item) => void; | ||
upsert: (key: Partial<Item>, value: Partial<Item> | null) => Item | null; | ||
} | ||
export interface MemoryDatabaseSink<Item> extends Partial<DatabaseTableParser<Item>> { | ||
insert: (item: Item) => Item; | ||
find: (key: Partial<Item>) => Item | null; | ||
remove: (key: Partial<Item>) => Item | null; | ||
update?: (current: Item, update: Partial<Item>) => void; | ||
updated?: (current: Item | null, update: Partial<Item> | null) => void; | ||
} | ||
export interface DatabaseWatcherSink<Item> extends IdempotentDatabaseSink<Item> { | ||
filterUpdate?: (item: UpdatedRow) => boolean; | ||
} | ||
export declare abstract class DatabaseWatcherSource { | ||
watchMethod: WatchMethod; | ||
updateType: UpdateType; | ||
constructor(watchMethod: WatchMethod, updateType: UpdateType); | ||
abstract watch(table: string, callback: (payload: UpdatedRow) => void): Promise<void>; | ||
} | ||
export interface DatabaseWatcherOptions { | ||
antiShardBehavior?: AntiShardBehavior; | ||
concurrency: number; | ||
debug?: boolean; | ||
shard?: Shard; | ||
shardField?: string; | ||
version?: Date; | ||
} | ||
export declare class DatabaseLoader<Item> { | ||
source: DatabaseLoaderSource; | ||
table: DatabaseTable; | ||
sink: IdempotentDatabaseSink<Item>; | ||
updateQueue: UpdatedRow[]; | ||
loading: boolean; | ||
loaded: boolean; | ||
version?: Date; | ||
constructor(source: DatabaseLoaderSource, table: DatabaseTable, sink: IdempotentDatabaseSink<Item>); | ||
startLoading(): void; | ||
doneLoading(): UpdatedRow[]; | ||
delayLoading(updateFn: (update: UpdatedRow) => void): (update: UpdatedRow) => void; | ||
delayLoadingWithFetchQueue(dequeue: () => void): (update: UpdatedRow) => void; | ||
} | ||
export declare class DatabaseWatcher<Item> extends DatabaseLoader<Item> { | ||
source: DatabaseLoaderSource; | ||
watcher: DatabaseWatcherSource; | ||
table: DatabaseTable; | ||
sink: DatabaseWatcherSink<Item>; | ||
options: DatabaseWatcherOptions; | ||
constructor(source: DatabaseLoaderSource, watcher: DatabaseWatcherSource, table: DatabaseTable, sink: DatabaseWatcherSink<Item>, options: DatabaseWatcherOptions); | ||
watch(): Promise<void>; | ||
init(): Promise<void>; | ||
initialLoad(): Promise<void>; | ||
fetchAndUpdate(): Promise<void>; | ||
handleUpdatedRow(update: UpdatedRow): Item | null; | ||
} | ||
export declare function idempotentSinkFromMemorySink<Item>(table: DatabaseTable, sink: MemoryDatabaseSink<Item>, watcherOptions?: Partial<DatabaseWatcherSink<Item>>): DatabaseWatcherSink<Item>; | ||
export declare function assignProps(record: Record<string, any>, itemRecord: Record<string, any>): void; | ||
//# sourceMappingURL=watch.d.ts.map |
472
lib/watch.js
"use strict"; | ||
// import pSettle from 'p-settle' | ||
var __extends = (this && this.__extends) || (function () { | ||
var extendStatics = function (d, b) { | ||
extendStatics = Object.setPrototypeOf || | ||
({ __proto__: [] } instanceof Array && function (d, b) { d.__proto__ = b; }) || | ||
function (d, b) { for (var p in b) if (Object.prototype.hasOwnProperty.call(b, p)) d[p] = b[p]; }; | ||
return extendStatics(d, b); | ||
}; | ||
return function (d, b) { | ||
if (typeof b !== "function" && b !== null) | ||
throw new TypeError("Class extends value " + String(b) + " is not a constructor or null"); | ||
extendStatics(d, b); | ||
function __() { this.constructor = d; } | ||
d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __()); | ||
}; | ||
})(); | ||
var __assign = (this && this.__assign) || function () { | ||
__assign = Object.assign || function(t) { | ||
for (var s, i = 1, n = arguments.length; i < n; i++) { | ||
s = arguments[i]; | ||
for (var p in s) if (Object.prototype.hasOwnProperty.call(s, p)) | ||
t[p] = s[p]; | ||
} | ||
return t; | ||
}; | ||
return __assign.apply(this, arguments); | ||
}; | ||
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { | ||
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); } | ||
return new (P || (P = Promise))(function (resolve, reject) { | ||
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } } | ||
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } } | ||
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); } | ||
step((generator = generator.apply(thisArg, _arguments || [])).next()); | ||
}); | ||
}; | ||
var __generator = (this && this.__generator) || function (thisArg, body) { | ||
var _ = { label: 0, sent: function() { if (t[0] & 1) throw t[1]; return t[1]; }, trys: [], ops: [] }, f, y, t, g; | ||
return g = { next: verb(0), "throw": verb(1), "return": verb(2) }, typeof Symbol === "function" && (g[Symbol.iterator] = function() { return this; }), g; | ||
function verb(n) { return function (v) { return step([n, v]); }; } | ||
function step(op) { | ||
if (f) throw new TypeError("Generator is already executing."); | ||
while (_) try { | ||
if (f = 1, y && (t = op[0] & 2 ? y["return"] : op[0] ? y["throw"] || ((t = y["return"]) && t.call(y), 0) : y.next) && !(t = t.call(y, op[1])).done) return t; | ||
if (y = 0, t) op = [op[0] & 2, t.value]; | ||
switch (op[0]) { | ||
case 0: case 1: t = op; break; | ||
case 4: _.label++; return { value: op[1], done: false }; | ||
case 5: _.label++; y = op[1]; op = [0]; continue; | ||
case 7: op = _.ops.pop(); _.trys.pop(); continue; | ||
default: | ||
if (!(t = _.trys, t = t.length > 0 && t[t.length - 1]) && (op[0] === 6 || op[0] === 2)) { _ = 0; continue; } | ||
if (op[0] === 3 && (!t || (op[1] > t[0] && op[1] < t[3]))) { _.label = op[1]; break; } | ||
if (op[0] === 6 && _.label < t[1]) { _.label = t[1]; t = op; break; } | ||
if (t && _.label < t[2]) { _.label = t[2]; _.ops.push(op); break; } | ||
if (t[2]) _.ops.pop(); | ||
_.trys.pop(); continue; | ||
} | ||
op = body.call(thisArg, _); | ||
} catch (e) { op = [6, e]; y = 0; } finally { f = t = 0; } | ||
if (op[0] & 5) throw op[1]; return { value: op[0] ? op[1] : void 0, done: true }; | ||
} | ||
}; | ||
var __importDefault = (this && this.__importDefault) || function (mod) { | ||
return (mod && mod.__esModule) ? mod : { "default": mod }; | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.UpdateOp = void 0; | ||
var UpdateOp; | ||
(function (UpdateOp) { | ||
UpdateOp["Insert"] = "INSERT"; | ||
UpdateOp["Update"] = "UPDATE"; | ||
UpdateOp["Delete"] = "DELETE"; | ||
})(UpdateOp = exports.UpdateOp || (exports.UpdateOp = {})); | ||
/* | ||
export class DatabaseLoader<X> { | ||
loading = false | ||
loaded = false | ||
updateQueue: UpdatedRow[] = [] | ||
startLoading() { | ||
this.loading = true | ||
} | ||
doneLoading() { | ||
const updates = this.updateQueue | ||
this.updateQueue = [] | ||
this.loading = false | ||
this.loaded = true | ||
return updates | ||
} | ||
delayLoading(updateFn: (update: UpdatedRow) => void) { | ||
return (update: UpdatedRow) => { | ||
if (this.loading) this.updateQeue.push(update) | ||
else updateFn(update) | ||
exports.assignProps = exports.idempotentSinkFromMemorySink = exports.DatabaseWatcher = exports.DatabaseLoader = exports.DatabaseWatcherSource = exports.UpdateType = exports.AntiShardBehavior = exports.WatchMethod = void 0; | ||
var p_settle_1 = __importDefault(require("p-settle")); | ||
var dump_1 = require("./dump"); | ||
var load_1 = require("./load"); | ||
var WatchMethod; | ||
(function (WatchMethod) { | ||
WatchMethod[WatchMethod["Log"] = 0] = "Log"; | ||
WatchMethod[WatchMethod["Poll"] = 1] = "Poll"; | ||
WatchMethod[WatchMethod["Trigger"] = 2] = "Trigger"; | ||
})(WatchMethod = exports.WatchMethod || (exports.WatchMethod = {})); | ||
var AntiShardBehavior; | ||
(function (AntiShardBehavior) { | ||
AntiShardBehavior[AntiShardBehavior["None"] = 0] = "None"; | ||
AntiShardBehavior[AntiShardBehavior["Interleaved"] = 1] = "Interleaved"; | ||
AntiShardBehavior[AntiShardBehavior["ShardFirst"] = 2] = "ShardFirst"; | ||
AntiShardBehavior[AntiShardBehavior["ShardLast"] = 3] = "ShardLast"; | ||
})(AntiShardBehavior = exports.AntiShardBehavior || (exports.AntiShardBehavior = {})); | ||
var UpdateType; | ||
(function (UpdateType) { | ||
UpdateType[UpdateType["Delta"] = 0] = "Delta"; | ||
UpdateType[UpdateType["Full"] = 1] = "Full"; | ||
UpdateType[UpdateType["Key"] = 2] = "Key"; | ||
})(UpdateType = exports.UpdateType || (exports.UpdateType = {})); | ||
var DatabaseWatcherSource = /** @class */ (function () { | ||
function DatabaseWatcherSource(watchMethod, updateType) { | ||
this.watchMethod = watchMethod; | ||
this.updateType = updateType; | ||
} | ||
} | ||
delayLoadingWithFetchQueue(dequeue: () => void) { | ||
return (update: UpdatedRow) => { | ||
this.updateQueue.push(update) | ||
dequeue() | ||
return DatabaseWatcherSource; | ||
}()); | ||
exports.DatabaseWatcherSource = DatabaseWatcherSource; | ||
var DatabaseLoader = /** @class */ (function () { | ||
function DatabaseLoader(source, table, sink) { | ||
this.source = source; | ||
this.table = table; | ||
this.sink = sink; | ||
this.updateQueue = []; | ||
this.loading = false; | ||
this.loaded = false; | ||
} | ||
} | ||
} | ||
export class DatabaseWatcher<X> extends DatabaseLoader<X> { | ||
constructor(public listener: DatabaseListener, public sink: DatabaseSink<X>) {} | ||
async listen(channel: string) { | ||
this.listener.listen( | ||
'event_updated', | ||
this.delayLoadingWithFetchQueue(() => this.fetchAndUpdateEvents()) | ||
), | ||
async fetchAndUpdate( | ||
filterFn: (update: UpdatedRow) => boolean, | ||
parseFn: (update: UpdatedRow) => X, | ||
fetchFn: (update: UpdatedRow) => Promise<X>, | ||
updateFn: (update: UpdatedRow) => void, | ||
concurrency = 2, | ||
debug = false | ||
) { | ||
if (this.loading || !this.updateQueue.length) return | ||
this.loading = true | ||
while (this.updateQueue.length) { | ||
const updates = this.updateQueue | ||
if (debug) console.log(`fetchAndUpdate: queue=${updates.length}`) | ||
this.updateQueue = [] | ||
const ready = await pSettle( | ||
updates | ||
.filter(filterFn) | ||
.map((x) => () => (x.op === 'DELETE' ? Promise.resolve(parseFn(x)) : fetchFn(x))), | ||
{ concurrency } | ||
) | ||
ready.forEach((x, i) => | ||
updateFn({ | ||
op: updates[i].op, | ||
key: updates[i].key, | ||
row: x.isFulfilled ? x.value : undefined, | ||
}) | ||
) | ||
DatabaseLoader.prototype.startLoading = function () { | ||
this.loading = true; | ||
}; | ||
DatabaseLoader.prototype.doneLoading = function () { | ||
var updates = this.updateQueue; | ||
this.updateQueue = []; | ||
this.loading = false; | ||
this.loaded = true; | ||
return updates; | ||
}; | ||
DatabaseLoader.prototype.delayLoading = function (updateFn) { | ||
var _this = this; | ||
return function (update) { | ||
if (_this.loading) | ||
_this.updateQueue.push(update); | ||
else | ||
updateFn(update); | ||
}; | ||
}; | ||
DatabaseLoader.prototype.delayLoadingWithFetchQueue = function (dequeue) { | ||
var _this = this; | ||
return function (update) { | ||
_this.updateQueue.push(update); | ||
dequeue(); | ||
}; | ||
}; | ||
return DatabaseLoader; | ||
}()); | ||
exports.DatabaseLoader = DatabaseLoader; | ||
var DatabaseWatcher = /** @class */ (function (_super) { | ||
__extends(DatabaseWatcher, _super); | ||
function DatabaseWatcher(source, watcher, table, sink, options) { | ||
var _this = _super.call(this, source, table, sink) || this; | ||
_this.source = source; | ||
_this.watcher = watcher; | ||
_this.table = table; | ||
_this.sink = sink; | ||
_this.options = options; | ||
return _this; | ||
} | ||
this.loading = false | ||
} | ||
DatabaseWatcher.prototype.watch = function () { | ||
return __awaiter(this, void 0, void 0, function () { | ||
var _this = this; | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: return [4 /*yield*/, this.watcher.watch(this.table.table, this.watcher.updateType === UpdateType.Key | ||
? this.delayLoadingWithFetchQueue(function () { return _this.fetchAndUpdate(); }) | ||
: this.delayLoading(function (x) { return _this.handleUpdatedRow(x); }))]; | ||
case 1: | ||
_a.sent(); | ||
return [2 /*return*/]; | ||
} | ||
}); | ||
}); | ||
}; | ||
DatabaseWatcher.prototype.init = function () { | ||
return __awaiter(this, void 0, void 0, function () { | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: | ||
// const start = Date.now() | ||
this.startLoading(); | ||
return [4 /*yield*/, this.watch()]; | ||
case 1: | ||
_a.sent(); | ||
return [4 /*yield*/, this.initialLoad()]; | ||
case 2: | ||
_a.sent(); | ||
return [2 /*return*/]; | ||
} | ||
}); | ||
}); | ||
}; | ||
DatabaseWatcher.prototype.initialLoad = function () { | ||
return __awaiter(this, void 0, void 0, function () { | ||
var _i, _a, delayedUpdate; | ||
return __generator(this, function (_b) { | ||
switch (_b.label) { | ||
case 0: return [4 /*yield*/, dump_1.dump(this.source.load(this.table, { | ||
shard: this.options.shard, | ||
version: this.options.version, | ||
}), this.sink.load)]; | ||
case 1: | ||
_b.sent(); | ||
for (_i = 0, _a = this.doneLoading(); _i < _a.length; _i++) { | ||
delayedUpdate = _a[_i]; | ||
this.handleUpdatedRow(delayedUpdate); | ||
} | ||
return [2 /*return*/]; | ||
} | ||
}); | ||
}); | ||
}; | ||
DatabaseWatcher.prototype.fetchAndUpdate = function () { | ||
var _a, _b; | ||
return __awaiter(this, void 0, void 0, function () { | ||
var _loop_1, this_1; | ||
var _this = this; | ||
return __generator(this, function (_c) { | ||
switch (_c.label) { | ||
case 0: | ||
if (this.loading || !this.updateQueue.length) | ||
return [2 /*return*/]; | ||
this.loading = true; | ||
_loop_1 = function () { | ||
var updates, ready; | ||
return __generator(this, function (_d) { | ||
switch (_d.label) { | ||
case 0: | ||
updates = this_1.updateQueue; | ||
if (this_1.options.debug) | ||
console.log("fetchAndUpdate: queue=" + updates.length); | ||
this_1.updateQueue = []; | ||
return [4 /*yield*/, p_settle_1.default(updates | ||
.filter((_b = (_a = this_1.sink) === null || _a === void 0 ? void 0 : _a.filterUpdate) !== null && _b !== void 0 ? _b : (function (x) { return x; })) | ||
.map(function (update) { return function () { | ||
return update.op === load_1.UpdateOp.Delete | ||
? Promise.resolve(_this.sink.parseUpdate(update)) | ||
: _this.source.fetch(_this.table, update); | ||
}; }), { concurrency: this_1.options.concurrency })]; | ||
case 1: | ||
ready = _d.sent(); | ||
ready.forEach(function (x, i) { | ||
return _this.handleUpdatedRow({ | ||
op: updates[i].op, | ||
key: updates[i].key, | ||
row: x.isFulfilled ? x.value : undefined, | ||
}); | ||
}); | ||
return [2 /*return*/]; | ||
} | ||
}); | ||
}; | ||
this_1 = this; | ||
_c.label = 1; | ||
case 1: | ||
if (!this.updateQueue.length) return [3 /*break*/, 3]; | ||
return [5 /*yield**/, _loop_1()]; | ||
case 2: | ||
_c.sent(); | ||
return [3 /*break*/, 1]; | ||
case 3: | ||
this.loading = false; | ||
return [2 /*return*/]; | ||
} | ||
}); | ||
}); | ||
}; | ||
DatabaseWatcher.prototype.handleUpdatedRow = function (update) { | ||
var _a, _b; | ||
if (!update.row) | ||
return null; | ||
if (this.options.shard && | ||
!load_1.shardMatchText((_b = update.key[(_a = this.table.shardField) !== null && _a !== void 0 ? _a : '']) !== null && _b !== void 0 ? _b : '', this.options.shard)) { | ||
return null; | ||
} | ||
if (this.sink.filterUpdate && !this.sink.filterUpdate(update)) | ||
return null; | ||
var key = this.sink.parseRow(update.key); | ||
var row = this.sink.parseUpdate(update); | ||
switch (update.op) { | ||
case load_1.UpdateOp.Insert: | ||
case load_1.UpdateOp.Update: | ||
return this.sink.upsert(key, row); | ||
case load_1.UpdateOp.Delete: | ||
return this.sink.upsert(key, null); | ||
} | ||
}; | ||
return DatabaseWatcher; | ||
}(DatabaseLoader)); | ||
exports.DatabaseWatcher = DatabaseWatcher; | ||
function idempotentSinkFromMemorySink(table, sink, watcherOptions) { | ||
var _a, _b, _c, _d, _e; | ||
var update = (_a = sink.update) !== null && _a !== void 0 ? _a : assignProps; | ||
return __assign(__assign({}, watcherOptions), { upsert: function (key, row) { | ||
var item = sink.find(key); | ||
if (sink.updated) | ||
sink.updated(item, row); | ||
if (!row) | ||
return sink.remove(key); | ||
if (item) { | ||
var rowkeys = Object.keys(row); | ||
var rekey = rowkeys.some(function (k) { return table.keyFields.includes(k); }); | ||
if (rekey) { | ||
var removedItem = sink.remove(key); | ||
return sink.insert(__assign(__assign({}, (removedItem !== null && removedItem !== void 0 ? removedItem : item)), row)); | ||
} | ||
else { | ||
update(item, row); | ||
return item; | ||
} | ||
} | ||
else { | ||
return sink.insert(row); | ||
} | ||
}, load: sink.insert, parseKey: (_c = (_b = watcherOptions === null || watcherOptions === void 0 ? void 0 : watcherOptions.parseKey) !== null && _b !== void 0 ? _b : sink.parseKey) !== null && _c !== void 0 ? _c : (function (x) { return x; }), parseRow: (_e = (_d = watcherOptions === null || watcherOptions === void 0 ? void 0 : watcherOptions.parseRow) !== null && _d !== void 0 ? _d : sink.parseRow) !== null && _e !== void 0 ? _e : (function (x) { return x; }), parseUpdate: function (x) { return x.row; } }); | ||
} | ||
export function applyUpdatedRow<Item>(sink: DatabaseSink<Item>, update: UpdatedRow) { | ||
if (!update.row) return null | ||
switch (update.op) { | ||
case UpdateOp.Insert: | ||
return sink.insert(update.row as Item) | ||
case UpdateOp.Update: | ||
const item = sink.find(update.key) | ||
if (item === undefined) { | ||
return null | ||
} else if (isUpdate) { | ||
const keys = Object.keys(update.row) | ||
const rekey = keyFields.some((k) => keys.includes(k)) | ||
if (rekey) { | ||
const item = remove(key) | ||
return insert({ ...item, ...update.row }) | ||
} else { | ||
const currentItem = sink.getFn(key) | ||
const record = currentItem as Record<string, any> | ||
if (updated) updated(currentItem, update.row) | ||
sink.update(record, update.row) | ||
return currentItem | ||
exports.idempotentSinkFromMemorySink = idempotentSinkFromMemorySink; | ||
function assignProps(record, itemRecord) { | ||
Object.keys(itemRecord).forEach(function (k) { | ||
var _a; | ||
var propk = itemRecord[k]; | ||
if (propk instanceof Object) { | ||
var outk_1 = (_a = record[k]) !== null && _a !== void 0 ? _a : (record[k] = {}); | ||
Object.keys(propk).forEach(function (l) { return (outk_1[l] = propk[l]); }); | ||
} | ||
} | ||
break | ||
case UpdateOp.Remove: | ||
remove(key) | ||
} | ||
return true | ||
else { | ||
record[k] = propk; | ||
} | ||
}); | ||
} | ||
export function handleUpdatedRowItem<Item, Sink>( | ||
op: string, | ||
item: X, | ||
sink: DatabaseSink<Item, Key>, | ||
filter?: (x: X) => boolean, | ||
) { | ||
const itemRecord = item as Record<string, any> | ||
const isUpdate = op === 'UPDATE' | ||
if (isUpdate || op === 'DELETE') { | ||
const index = findIndex(item) | ||
if (index < 0) { | ||
if (!filter || filter(item)) return null | ||
} else if (isUpdate) { | ||
const currentItem = sink.getFn(key) | ||
const record = currentItem as Record<string, any> | ||
let rekey = false | ||
keyFields.forEach( | ||
(k) => (rekey = rekey || record[k] < itemRecord[k] || itemRecord[k] < record[k]) | ||
) | ||
if (rekey) { | ||
remove(index) | ||
assignUpdate(record, itemRecord) | ||
insert(currentItem) | ||
} else { | ||
if (updated) updated(currentItem, itemRecord) | ||
assignUpdate(record, itemRecord) | ||
} | ||
return currentItem | ||
} else return remove(index) | ||
} else if (op === 'INSERT') { | ||
if (!filter || filter(item)) { | ||
insert(item) | ||
return item | ||
} else { | ||
return null | ||
} | ||
} else { | ||
return null | ||
} | ||
} | ||
export function assignProps(record: Record<string, any>, itemRecord: Record<string, any>) { | ||
Object.keys(itemRecord).forEach((k) => { | ||
const propk = itemRecord[k] | ||
if (propk instanceof Object) { | ||
const outk = record[k] ?? (record[k] = {}) | ||
Object.keys(propk).forEach((l) => (outk[l] = propk[l])) | ||
} else { | ||
record[k] = propk | ||
} | ||
}) | ||
} | ||
*/ | ||
exports.assignProps = assignProps; | ||
//# sourceMappingURL=watch.js.map |
{ | ||
"name": "db-watch", | ||
"version": "0.0.4", | ||
"description": "Create Postgres triggers and watch notify listeners", | ||
"version": "0.0.5", | ||
"description": "Database replication, inlcuding Postgres triggers and monitoring with notify listeners", | ||
"author": "wholebuzz", | ||
@@ -19,2 +19,3 @@ "license": "Apache-2.0", | ||
"test:coverage": "jest --runInBand --ci --passWithNoTests --coverage --no-cache", | ||
"badge:coverage": "istanbul-cobertura-badger -v -b coverage", | ||
"lint": "tslint -c tslint.json --project .", | ||
@@ -29,19 +30,22 @@ "fix": "yarn lint --fix", | ||
"dependencies": { | ||
"batch2": "^2.0.0", | ||
"byline": "^5.0.0", | ||
"dbgate-query-splitter": "4.4.0-alpha.2", | ||
"p-settle": "^4.1.1", | ||
"sorted-array-functions": "^1.3.0", | ||
"through2": "^3.0.1", | ||
"tree-stream": "^1.0.13" | ||
"tree-stream": "^1.0.14" | ||
}, | ||
"devDependencies": { | ||
"@types/byline": "^4.2.33", | ||
"@types/jest": "^26.0.22", | ||
"@types/node": "^13.13.5", | ||
"@types/pg": "^8.6.1", | ||
"@types/rimraf": "^3.0.0", | ||
"@types/through2": "^2.0.36", | ||
"dbcp": "^1.0.2", | ||
"dotenv": "^10.0.0", | ||
"jest": "^26.6.3", | ||
"hasha": "^5.2.2", | ||
"istanbul-cobertura-badger": "^1.3.1", | ||
"knex": "^0.21.1", | ||
"pg-listen": "^1.5.1", | ||
"prettier": "^2.3.2", | ||
"rimraf": "^3.0.2", | ||
"ts-jest": "^26.5.4", | ||
@@ -48,0 +52,0 @@ "tslint": "^5.20.0", |
@@ -1,18 +0,18 @@ | ||
# pg-watch | ||
# db-watch [![image](https://img.shields.io/npm/v/db-watch)](https://www.npmjs.com/package/db-watch) [![test](https://github.com/wholebuzz/db-watch/actions/workflows/test.yaml/badge.svg)](https://github.com/wholebuzz/db-watch/actions/workflows/test.yaml) | ||
Create Postgres triggers and watch notify listeners. | ||
Various implementation of database replication, including creating Postgres triggers and monitoring with notify listeners. | ||
## Example | ||
``` | ||
```typescript | ||
// Add to migrations | ||
// for (const query of setupQueries) await knex.raw(query) | ||
for (const query of setupQueries) await knex.raw(query) | ||
// After CREATE TABLE users | ||
// await knex.raw(createNotifyRowFunction('users', 'updated', "'username', orig.username")) | ||
// await knex.raw(createNotifyTrigger('users', 'updated')) | ||
await knex.raw(createNotifyRowFunction('users', 'updated', "'username', orig.username")) | ||
await knex.raw(createNotifyTrigger('users', 'updated')) | ||
// Maximum PostgreSQL NOTIFY payload is 8192 bytes. | ||
// await knex.raw(createNotifyRowFieldsFunction('event', 'updated', "'date', orig.date, 'guid', orig.guid)) | ||
// await knex.raw(createNotifyTrigger('event', 'updated')) | ||
await knex.raw(createNotifyRowFieldsFunction('event', 'updated', "'date', orig.date, 'guid', orig.guid)) | ||
await knex.raw(createNotifyTrigger('event', 'updated')) | ||
``` |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 10 instances in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
128447
4
46
1721
23
10
1
+ Addedp-settle@^4.1.1
+ Addedp-limit@2.3.0(transitive)
+ Addedp-reflect@2.1.0(transitive)
+ Addedp-settle@4.1.1(transitive)
+ Addedp-try@2.2.0(transitive)
- Removedbatch2@^2.0.0
- Removedbyline@^5.0.0
- Removeddbgate-query-splitter@4.4.0-alpha.2
- Removedbatch2@2.0.0(transitive)
- Removedbyline@5.0.0(transitive)
- Removeddbgate-query-splitter@4.4.0-alpha.2(transitive)
- Removedthrough2@4.0.2(transitive)
Updatedtree-stream@^1.0.14