Big News: Socket raises $60M Series C at a $1B valuation to secure software supply chains for AI-driven development.Announcement
Sign In

@qbobjx/plugins

Package Overview
Dependencies
Maintainers
1
Versions
6
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@qbobjx/plugins - npm Package Compare versions

Comparing version
0.4.0
to
0.5.0
+1
dist/postgres.d.ts
export * from './postgres/index.js';
export * from './postgres/index.js';
import { type ObjxPlugin } from '@qbobjx/core';
import { type PostgresPluginBaseOptions } from './shared.js';
export interface PostgresCachePluginOptions extends PostgresPluginBaseOptions {
readonly schema?: string;
readonly autoProvision?: boolean;
readonly defaultTtlSeconds?: number;
readonly table?: string;
}
export interface PostgresCachePluginMetadata {
readonly schema: string;
readonly autoProvision: boolean;
readonly defaultTtlSeconds: number;
readonly table: string;
}
export declare const POSTGRES_CACHE_METADATA_KEY = "postgres.cache";
export declare function createPostgresCachePlugin(options?: PostgresCachePluginOptions): Readonly<ObjxPlugin>;
export declare function buildCacheUpsertSql(schema?: string, table?: string): string;
export declare function buildCacheGetSql(schema?: string, table?: string): string;
export declare function buildCachePruneExpiredSql(schema?: string, table?: string): string;
export declare function createMaterializedViewRefreshSql(name: string, concurrently?: boolean): string;
export declare function createCacheInvalidationTriggerSql(options: {
readonly schema?: string;
readonly sourceTable: string;
readonly cacheTable?: string;
readonly keyExpression: string;
}): string;
export declare function buildCacheMetricsSql(schema?: string, table?: string): string;
import { definePlugin } from '@qbobjx/core';
import { DEFAULT_INTERNAL_SCHEMA, assertSafeSqlIdentifier, quoteQualifiedSqlIdentifier, quoteSqlIdentifier, withDefaultMetadataKey, } from './shared.js';
export const POSTGRES_CACHE_METADATA_KEY = 'postgres.cache';
export function createPostgresCachePlugin(options = {}) {
const metadataKey = withDefaultMetadataKey(options.metadataKey, POSTGRES_CACHE_METADATA_KEY);
return definePlugin({
name: 'postgres-cache',
hooks: {
onModelRegister(context) {
context.setMetadata(metadataKey, {
schema: options.schema ?? DEFAULT_INTERNAL_SCHEMA,
autoProvision: options.autoProvision ?? true,
defaultTtlSeconds: options.defaultTtlSeconds ?? 60,
table: options.table ?? 'cache_entries',
});
},
},
});
}
function resolveCacheTable(schema = DEFAULT_INTERNAL_SCHEMA, table = 'cache_entries') {
const safeSchema = assertSafeSqlIdentifier(schema);
const safeTable = assertSafeSqlIdentifier(table);
return {
qualifiedCacheTable: quoteQualifiedSqlIdentifier(safeSchema, safeTable),
};
}
export function buildCacheUpsertSql(schema = DEFAULT_INTERNAL_SCHEMA, table = 'cache_entries') {
const { qualifiedCacheTable } = resolveCacheTable(schema, table);
return `insert into ${qualifiedCacheTable} (cache_key, value, expires_at, tags, updated_at) values ($1, $2::jsonb, $3, $4::text[], now()) on conflict (cache_key) do update set value = excluded.value, expires_at = excluded.expires_at, tags = excluded.tags, updated_at = now();`;
}
export function buildCacheGetSql(schema = DEFAULT_INTERNAL_SCHEMA, table = 'cache_entries') {
const { qualifiedCacheTable } = resolveCacheTable(schema, table);
return `update ${qualifiedCacheTable} set hits = hits + 1 where cache_key = $1 and (expires_at is null or expires_at > now()) returning value;`;
}
export function buildCachePruneExpiredSql(schema = DEFAULT_INTERNAL_SCHEMA, table = 'cache_entries') {
const { qualifiedCacheTable } = resolveCacheTable(schema, table);
return `delete from ${qualifiedCacheTable} where expires_at is not null and expires_at <= now();`;
}
export function createMaterializedViewRefreshSql(name, concurrently = true) {
const safeName = quoteSqlIdentifier(assertSafeSqlIdentifier(name));
return concurrently
? `refresh materialized view concurrently ${safeName};`
: `refresh materialized view ${safeName};`;
}
export function createCacheInvalidationTriggerSql(options) {
const schema = options.schema ?? DEFAULT_INTERNAL_SCHEMA;
const safeSchema = assertSafeSqlIdentifier(schema);
const safeSourceTable = assertSafeSqlIdentifier(options.sourceTable);
const { qualifiedCacheTable } = resolveCacheTable(schema, options.cacheTable ?? 'cache_entries');
const functionName = quoteQualifiedSqlIdentifier(safeSchema, `invalidate_${safeSourceTable}_cache`);
return `create or replace function ${functionName}() returns trigger as $$ begin delete from ${qualifiedCacheTable} where cache_key = ${options.keyExpression}; return new; end; $$ language plpgsql;`;
}
export function buildCacheMetricsSql(schema = DEFAULT_INTERNAL_SCHEMA, table = 'cache_entries') {
const { qualifiedCacheTable } = resolveCacheTable(schema, table);
return `select count(*) as total_entries, coalesce(sum(hits), 0) as total_hits, count(*) filter (where expires_at is not null and expires_at <= now()) as expired_entries from ${qualifiedCacheTable};`;
}
import type { ModelPluginRegistration, ModelRegistry } from '@qbobjx/core';
import { type PostgresCachePluginMetadata } from './cache.js';
import { type PostgresEventsPluginMetadata } from './events.js';
import { type PostgresJsonPluginMetadata } from './json.js';
import { type PostgresObservabilityPluginMetadata } from './observability.js';
import { type PostgresQueuePluginMetadata } from './queue.js';
import { type PostgresSearchPluginMetadata } from './search.js';
import { type PostgresSecurityPluginMetadata } from './security.js';
import { type PostgresTimeseriesPluginMetadata } from './timeseries.js';
import { type PostgresVectorPluginMetadata } from './vector.js';
type AnyRegistration = ModelPluginRegistration;
export interface PostgresRuntimeConfig {
readonly queue?: Readonly<PostgresQueuePluginMetadata>;
readonly events?: Readonly<PostgresEventsPluginMetadata>;
readonly cache?: Readonly<PostgresCachePluginMetadata>;
readonly search?: Readonly<PostgresSearchPluginMetadata>;
readonly json?: Readonly<PostgresJsonPluginMetadata>;
readonly security?: Readonly<PostgresSecurityPluginMetadata>;
readonly observability?: Readonly<PostgresObservabilityPluginMetadata>;
readonly timeseries?: Readonly<PostgresTimeseriesPluginMetadata>;
readonly vector?: Readonly<PostgresVectorPluginMetadata>;
}
export interface PostgresExecutionContextSettingBinding {
readonly setting: string;
readonly contextKey: string;
readonly required?: boolean;
readonly isLocal?: boolean;
readonly applyOnNestedTransactions?: boolean;
}
export interface PostgresExecutionContextSettingsOptions {
readonly bindings: readonly PostgresExecutionContextSettingBinding[];
}
export interface ResolvePostgresConfigOptions {
readonly throwOnConflict?: boolean;
}
export type PostgresRegistrationSource = ModelRegistry | readonly AnyRegistration[];
export interface ResolvePostgresIntegrationOptions extends ResolvePostgresConfigOptions {
readonly tenantContextKey?: string;
readonly required?: boolean;
readonly isLocal?: boolean;
readonly applyOnNestedTransactions?: boolean;
}
export interface PostgresIntegration {
readonly config: PostgresRuntimeConfig;
readonly executionContextSettings?: PostgresExecutionContextSettingsOptions;
}
export declare function resolvePostgresConfig(source: PostgresRegistrationSource, options?: ResolvePostgresConfigOptions): PostgresRuntimeConfig;
export declare function createPostgresExecutionContextSettingsFromConfig(config: PostgresRuntimeConfig, options?: {
readonly tenantContextKey?: string;
readonly required?: boolean;
readonly isLocal?: boolean;
readonly applyOnNestedTransactions?: boolean;
}): PostgresExecutionContextSettingsOptions | undefined;
export declare function createPostgresExecutionContextSettingsFromRegistrations(source: PostgresRegistrationSource, options?: ResolvePostgresIntegrationOptions): PostgresExecutionContextSettingsOptions | undefined;
export declare function resolvePostgresIntegration(source: PostgresRegistrationSource, options?: ResolvePostgresIntegrationOptions): PostgresIntegration;
/**
* @deprecated Use `PostgresRuntimeConfig`.
*/
export type PostgresSpecialistResolvedConfig = PostgresRuntimeConfig;
/**
* @deprecated Use `PostgresExecutionContextSettingBinding`.
*/
export type PostgresSpecialistExecutionContextSettingBinding = PostgresExecutionContextSettingBinding;
/**
* @deprecated Use `PostgresExecutionContextSettingsOptions`.
*/
export type PostgresSpecialistExecutionContextSettingsOptions = PostgresExecutionContextSettingsOptions;
/**
* @deprecated Use `ResolvePostgresConfigOptions`.
*/
export type ResolvePostgresSpecialistConfigOptions = ResolvePostgresConfigOptions;
/**
* @deprecated Use `PostgresRegistrationSource`.
*/
export type PostgresSpecialistRegistrationSource = PostgresRegistrationSource;
/**
* @deprecated Use `resolvePostgresConfig`.
*/
export declare const resolvePostgresSpecialistConfig: typeof resolvePostgresConfig;
/**
* @deprecated Use `resolvePostgresIntegration`.
*/
export declare const resolvePostgresSpecialistIntegration: typeof resolvePostgresIntegration;
export {};
import { POSTGRES_CACHE_METADATA_KEY } from './cache.js';
import { POSTGRES_EVENTS_METADATA_KEY } from './events.js';
import { POSTGRES_JSON_METADATA_KEY } from './json.js';
import { POSTGRES_OBSERVABILITY_METADATA_KEY, } from './observability.js';
import { POSTGRES_QUEUE_METADATA_KEY } from './queue.js';
import { POSTGRES_SEARCH_METADATA_KEY } from './search.js';
import { POSTGRES_SECURITY_METADATA_KEY, } from './security.js';
import { POSTGRES_TIMESERIES_METADATA_KEY, } from './timeseries.js';
import { POSTGRES_VECTOR_METADATA_KEY } from './vector.js';
function isRegistryLike(value) {
return typeof value === 'object' && value !== null && 'all' in value && typeof value.all === 'function';
}
function freezeIfDefined(value) {
return value === undefined ? undefined : Object.freeze({ ...value });
}
function registrationsFromSource(source) {
if (Array.isArray(source)) {
return source;
}
if (isRegistryLike(source)) {
return source.all();
}
return [];
}
function stableValue(value) {
return JSON.stringify(value, (_key, nextValue) => {
if (nextValue && typeof nextValue === 'object' && !Array.isArray(nextValue)) {
return Object.fromEntries(Object.entries(nextValue).sort(([left], [right]) => left.localeCompare(right)));
}
return nextValue;
});
}
function describeConflict(metadataKey, values) {
const details = values
.map(({ modelName, value }) => `${modelName}=${stableValue(value)}`)
.join(', ');
return `Conflicting PostgreSQL runtime plugin metadata for "${metadataKey}": ${details}`;
}
function resolveUniqueMetadata(registrations, metadataKey, options) {
const values = registrations
.map((registration) => ({
modelName: registration.model.name,
value: registration.metadata.get(metadataKey),
}))
.filter((entry) => entry.value !== undefined);
const firstEntry = values[0];
if (!firstEntry) {
return undefined;
}
const first = stableValue(firstEntry.value);
const hasConflict = values.some((entry) => stableValue(entry.value) !== first);
if (hasConflict && (options.throwOnConflict ?? true)) {
throw new Error(describeConflict(metadataKey, values));
}
return firstEntry.value;
}
function assignIfDefined(target, key, value) {
if (value !== undefined) {
target[key] = value;
}
}
export function resolvePostgresConfig(source, options = {}) {
const registrations = registrationsFromSource(source);
const config = {};
assignIfDefined(config, 'queue', freezeIfDefined(resolveUniqueMetadata(registrations, POSTGRES_QUEUE_METADATA_KEY, options)));
assignIfDefined(config, 'events', freezeIfDefined(resolveUniqueMetadata(registrations, POSTGRES_EVENTS_METADATA_KEY, options)));
assignIfDefined(config, 'cache', freezeIfDefined(resolveUniqueMetadata(registrations, POSTGRES_CACHE_METADATA_KEY, options)));
assignIfDefined(config, 'search', freezeIfDefined(resolveUniqueMetadata(registrations, POSTGRES_SEARCH_METADATA_KEY, options)));
assignIfDefined(config, 'json', freezeIfDefined(resolveUniqueMetadata(registrations, POSTGRES_JSON_METADATA_KEY, options)));
assignIfDefined(config, 'security', freezeIfDefined(resolveUniqueMetadata(registrations, POSTGRES_SECURITY_METADATA_KEY, options)));
assignIfDefined(config, 'observability', freezeIfDefined(resolveUniqueMetadata(registrations, POSTGRES_OBSERVABILITY_METADATA_KEY, options)));
assignIfDefined(config, 'timeseries', freezeIfDefined(resolveUniqueMetadata(registrations, POSTGRES_TIMESERIES_METADATA_KEY, options)));
assignIfDefined(config, 'vector', freezeIfDefined(resolveUniqueMetadata(registrations, POSTGRES_VECTOR_METADATA_KEY, options)));
return Object.freeze(config);
}
export function createPostgresExecutionContextSettingsFromConfig(config, options = {}) {
if (!config.security?.tenantSettingName) {
return undefined;
}
const binding = options.applyOnNestedTransactions !== undefined
? {
setting: config.security.tenantSettingName,
contextKey: options.tenantContextKey ?? 'tenantId',
required: options.required ?? true,
isLocal: options.isLocal ?? true,
applyOnNestedTransactions: options.applyOnNestedTransactions,
}
: {
setting: config.security.tenantSettingName,
contextKey: options.tenantContextKey ?? 'tenantId',
required: options.required ?? true,
isLocal: options.isLocal ?? true,
};
return Object.freeze({
bindings: Object.freeze([Object.freeze(binding)]),
});
}
export function createPostgresExecutionContextSettingsFromRegistrations(source, options = {}) {
const config = resolvePostgresConfig(source, options);
const executionOptions = options.tenantContextKey !== undefined ||
options.required !== undefined ||
options.isLocal !== undefined ||
options.applyOnNestedTransactions !== undefined
? {
...(options.tenantContextKey !== undefined
? { tenantContextKey: options.tenantContextKey }
: {}),
...(options.required !== undefined ? { required: options.required } : {}),
...(options.isLocal !== undefined ? { isLocal: options.isLocal } : {}),
...(options.applyOnNestedTransactions !== undefined
? { applyOnNestedTransactions: options.applyOnNestedTransactions }
: {}),
}
: {};
return createPostgresExecutionContextSettingsFromConfig(config, executionOptions);
}
export function resolvePostgresIntegration(source, options = {}) {
const config = resolvePostgresConfig(source, options);
const executionContextSettings = createPostgresExecutionContextSettingsFromConfig(config, {
...(options.tenantContextKey !== undefined
? { tenantContextKey: options.tenantContextKey }
: {}),
...(options.required !== undefined ? { required: options.required } : {}),
...(options.isLocal !== undefined ? { isLocal: options.isLocal } : {}),
...(options.applyOnNestedTransactions !== undefined
? { applyOnNestedTransactions: options.applyOnNestedTransactions }
: {}),
});
return Object.freeze({
config,
...(executionContextSettings ? { executionContextSettings } : {}),
});
}
/**
* @deprecated Use `resolvePostgresConfig`.
*/
export const resolvePostgresSpecialistConfig = resolvePostgresConfig;
/**
* @deprecated Use `resolvePostgresIntegration`.
*/
export const resolvePostgresSpecialistIntegration = resolvePostgresIntegration;
import { type ObjxPlugin } from '@qbobjx/core';
import { type PostgresPluginBaseOptions } from './shared.js';
export interface PostgresEventsPluginOptions extends PostgresPluginBaseOptions {
readonly schema?: string;
readonly autoProvision?: boolean;
readonly outboxTable?: string;
readonly notifyChannel?: string;
readonly claimTtlMs?: number;
readonly dispatcherId?: string;
}
export interface PostgresEventsPluginMetadata {
readonly schema: string;
readonly autoProvision: boolean;
readonly outboxTable: string;
readonly notifyChannel: string;
readonly claimTtlMs: number;
readonly dispatcherId: string;
}
export interface PostgresOutboxDispatchBatchSqlOptions {
readonly claimTtlMs?: number;
readonly dispatcherId?: string;
}
export declare const POSTGRES_EVENTS_METADATA_KEY = "postgres.events";
export declare function createPostgresEventsPlugin(options?: PostgresEventsPluginOptions): Readonly<ObjxPlugin>;
export declare function buildOutboxPublishSql(schema?: string, table?: string): string;
export declare function buildOutboxDispatchBatchSql(schema?: string, table?: string, options?: PostgresOutboxDispatchBatchSqlOptions): string;
export declare function buildOutboxAckSql(schema?: string, table?: string): string;
export declare function buildOutboxFailSql(schema?: string, table?: string): string;
export declare function buildListenSql(channel?: string): string;
export declare function buildNotifySql(channel?: string): string;
import { definePlugin } from '@qbobjx/core';
import { DEFAULT_INTERNAL_SCHEMA, assertSafeSqlIdentifier, quoteQualifiedSqlIdentifier, quoteSqlLiteral, withDefaultMetadataKey, } from './shared.js';
export const POSTGRES_EVENTS_METADATA_KEY = 'postgres.events';
export function createPostgresEventsPlugin(options = {}) {
const metadataKey = withDefaultMetadataKey(options.metadataKey, POSTGRES_EVENTS_METADATA_KEY);
return definePlugin({
name: 'postgres-events',
hooks: {
onModelRegister(context) {
context.setMetadata(metadataKey, {
schema: options.schema ?? DEFAULT_INTERNAL_SCHEMA,
autoProvision: options.autoProvision ?? true,
outboxTable: options.outboxTable ?? 'outbox_events',
notifyChannel: options.notifyChannel ?? 'objx_events',
claimTtlMs: normalizeClaimTtlMs(options.claimTtlMs),
dispatcherId: options.dispatcherId ?? 'objx-dispatcher',
});
},
},
});
}
function resolveOutboxTable(schema = DEFAULT_INTERNAL_SCHEMA, table = 'outbox_events') {
const safeSchema = assertSafeSqlIdentifier(schema);
const safeTable = assertSafeSqlIdentifier(table);
return {
qualifiedOutboxTable: quoteQualifiedSqlIdentifier(safeSchema, safeTable),
};
}
function resolveChannel(channel = 'objx_events') {
return assertSafeSqlIdentifier(channel);
}
function normalizeClaimTtlMs(value) {
const normalized = Math.trunc(value ?? 30_000);
if (!Number.isFinite(normalized) || normalized <= 0) {
throw new Error('Postgres outbox claim TTL must be a finite number greater than zero.');
}
return normalized;
}
export function buildOutboxPublishSql(schema = DEFAULT_INTERNAL_SCHEMA, table = 'outbox_events') {
const { qualifiedOutboxTable } = resolveOutboxTable(schema, table);
return `insert into ${qualifiedOutboxTable} (event_name, payload, aggregate_id, aggregate_type, idempotency_key) values ($1, $2::jsonb, $3, $4, $5) on conflict do nothing returning id;`;
}
export function buildOutboxDispatchBatchSql(schema = DEFAULT_INTERNAL_SCHEMA, table = 'outbox_events', options = {}) {
const { qualifiedOutboxTable } = resolveOutboxTable(schema, table);
const claimTtlMs = normalizeClaimTtlMs(options.claimTtlMs);
return `with next_events as (select id from ${qualifiedOutboxTable} where dispatched_at is null and (claimed_at is null or claim_expires_at is null or claim_expires_at <= now()) order by occurred_at asc, id asc limit $1 for update skip locked) update ${qualifiedOutboxTable} as e set claimed_at = now(), claim_expires_at = now() + ($3::int || ' milliseconds')::interval, claimed_by = $2, attempts = attempts + 1 from next_events where e.id = next_events.id returning e.*, ${claimTtlMs}::int as claim_ttl_ms;`;
}
export function buildOutboxAckSql(schema = DEFAULT_INTERNAL_SCHEMA, table = 'outbox_events') {
const { qualifiedOutboxTable } = resolveOutboxTable(schema, table);
return `update ${qualifiedOutboxTable} set dispatched_at = now(), claimed_at = null, claim_expires_at = null, claimed_by = null where id = $1;`;
}
export function buildOutboxFailSql(schema = DEFAULT_INTERNAL_SCHEMA, table = 'outbox_events') {
const { qualifiedOutboxTable } = resolveOutboxTable(schema, table);
return `update ${qualifiedOutboxTable} set claimed_at = null, claim_expires_at = null, claimed_by = null, last_error = $2 where id = $1;`;
}
export function buildListenSql(channel = 'objx_events') {
const safeChannel = resolveChannel(channel);
return `listen ${safeChannel};`;
}
export function buildNotifySql(channel = 'objx_events') {
const safeChannel = resolveChannel(channel);
return `select pg_notify(${quoteSqlLiteral(safeChannel)}, $1);`;
}
export * from './cache.js';
export * from './config.js';
export * from './events.js';
export * from './internal-schema.js';
export * from './json.js';
export * from './observability.js';
export * from './preset.js';
export * from './queue.js';
export * from './search.js';
export * from './security.js';
export * from './shared.js';
export * from './timeseries.js';
export * from './vector.js';
export * from './runtime.js';
export * from './cache.js';
export * from './config.js';
export * from './events.js';
export * from './internal-schema.js';
export * from './json.js';
export * from './observability.js';
export * from './preset.js';
export * from './queue.js';
export * from './search.js';
export * from './security.js';
export * from './shared.js';
export * from './timeseries.js';
export * from './vector.js';
export * from './runtime.js';
export interface PostgresInternalSchemaOptions {
readonly schema?: string;
readonly queueTable?: string;
readonly dlqTable?: string;
readonly outboxTable?: string;
readonly cacheTable?: string;
readonly migrationsTable?: string;
}
export declare function createPostgresInternalSchemaSql(options?: PostgresInternalSchemaOptions): readonly string[];
export declare function createPostgresAdvisoryLockSql(lockKey: number): string;
export declare function createPostgresAdvisoryUnlockSql(lockKey: number): string;
import { DEFAULT_INTERNAL_SCHEMA, assertSafeSqlIdentifier, quoteQualifiedSqlIdentifier, } from './shared.js';
function resolveInternalSchemaNames(options = {}) {
const schema = assertSafeSqlIdentifier(options.schema ?? DEFAULT_INTERNAL_SCHEMA);
const queueTable = assertSafeSqlIdentifier(options.queueTable ?? 'queue_jobs');
const dlqTable = assertSafeSqlIdentifier(options.dlqTable ?? 'queue_dlq');
const outboxTable = assertSafeSqlIdentifier(options.outboxTable ?? 'outbox_events');
const cacheTable = assertSafeSqlIdentifier(options.cacheTable ?? 'cache_entries');
const migrationsTable = assertSafeSqlIdentifier(options.migrationsTable ?? 'runtime_migrations');
return {
schema,
queueTable,
dlqTable,
outboxTable,
cacheTable,
migrationsTable,
qualifiedSchema: quoteQualifiedSqlIdentifier(schema),
qualifiedQueueTable: quoteQualifiedSqlIdentifier(schema, queueTable),
qualifiedDlqTable: quoteQualifiedSqlIdentifier(schema, dlqTable),
qualifiedOutboxTable: quoteQualifiedSqlIdentifier(schema, outboxTable),
qualifiedCacheTable: quoteQualifiedSqlIdentifier(schema, cacheTable),
qualifiedMigrationsTable: quoteQualifiedSqlIdentifier(schema, migrationsTable),
};
}
export function createPostgresInternalSchemaSql(options = {}) {
const names = resolveInternalSchemaNames(options);
return [
`create schema if not exists ${names.qualifiedSchema};`,
`create table if not exists ${names.qualifiedMigrationsTable} (
id bigserial primary key,
plugin_name text not null,
version text not null,
applied_at timestamptz not null default now(),
unique(plugin_name, version)
);`,
`create table if not exists ${names.qualifiedQueueTable} (
id bigserial primary key,
queue_name text not null,
job_name text not null,
payload jsonb not null,
status text not null default 'pending',
priority int not null default 0,
run_at timestamptz not null default now(),
attempts int not null default 0,
max_attempts int not null default 8,
dedupe_key text,
locked_at timestamptz,
locked_by text,
last_error text,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now(),
constraint ${quoteQualifiedSqlIdentifier(`${names.queueTable}_status_chk`)} check (
status in ('pending', 'running', 'done', 'dead')
)
);`,
`create index if not exists ${quoteQualifiedSqlIdentifier(`${names.queueTable}_pending_idx`)} on ${names.qualifiedQueueTable} (priority desc, run_at asc, id asc) where status = 'pending';`,
`create index if not exists ${quoteQualifiedSqlIdentifier(`${names.queueTable}_running_locked_idx`)} on ${names.qualifiedQueueTable} (locked_at asc, id asc) where status = 'running' and locked_at is not null;`,
`create unique index if not exists ${quoteQualifiedSqlIdentifier(`${names.queueTable}_dedupe_key_uidx`)} on ${names.qualifiedQueueTable} (dedupe_key) where dedupe_key is not null;`,
`create table if not exists ${names.qualifiedDlqTable} (
id bigserial primary key,
job_id bigint,
queue_name text not null,
job_name text not null,
payload jsonb not null,
failed_at timestamptz not null default now(),
error text
);`,
`create index if not exists ${quoteQualifiedSqlIdentifier(`${names.dlqTable}_job_id_idx`)} on ${names.qualifiedDlqTable} (job_id);`,
`create index if not exists ${quoteQualifiedSqlIdentifier(`${names.dlqTable}_failed_at_idx`)} on ${names.qualifiedDlqTable} (failed_at desc);`,
`create table if not exists ${names.qualifiedOutboxTable} (
id bigserial primary key,
event_name text not null,
payload jsonb not null,
aggregate_id text,
aggregate_type text,
idempotency_key text,
occurred_at timestamptz not null default now(),
claimed_at timestamptz,
claim_expires_at timestamptz,
claimed_by text,
dispatched_at timestamptz,
attempts int not null default 0,
last_error text
);`,
`create index if not exists ${quoteQualifiedSqlIdentifier(`${names.outboxTable}_pending_idx`)} on ${names.qualifiedOutboxTable} (occurred_at asc, id asc) where dispatched_at is null and claimed_at is null;`,
`create index if not exists ${quoteQualifiedSqlIdentifier(`${names.outboxTable}_claim_expired_idx`)} on ${names.qualifiedOutboxTable} (claim_expires_at asc, id asc) where dispatched_at is null and claim_expires_at is not null;`,
`create index if not exists ${quoteQualifiedSqlIdentifier(`${names.outboxTable}_claimed_by_idx`)} on ${names.qualifiedOutboxTable} (claimed_by, claim_expires_at desc) where dispatched_at is null and claimed_by is not null;`,
`create unique index if not exists ${quoteQualifiedSqlIdentifier(`${names.outboxTable}_idempotency_uidx`)} on ${names.qualifiedOutboxTable} (idempotency_key) where idempotency_key is not null;`,
`create table if not exists ${names.qualifiedCacheTable} (
cache_key text primary key,
value jsonb not null,
expires_at timestamptz,
tags text[] not null default '{}',
hits bigint not null default 0,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);`,
`create index if not exists ${quoteQualifiedSqlIdentifier(`${names.cacheTable}_expires_at_idx`)} on ${names.qualifiedCacheTable} (expires_at) where expires_at is not null;`,
`create index if not exists ${quoteQualifiedSqlIdentifier(`${names.cacheTable}_tags_gin_idx`)} on ${names.qualifiedCacheTable} using gin (tags);`,
];
}
export function createPostgresAdvisoryLockSql(lockKey) {
return `select pg_advisory_lock(${Math.trunc(lockKey)});`;
}
export function createPostgresAdvisoryUnlockSql(lockKey) {
return `select pg_advisory_unlock(${Math.trunc(lockKey)});`;
}
import { type ObjxPlugin } from '@qbobjx/core';
import { type PostgresPluginBaseOptions } from './shared.js';
export interface PostgresJsonPluginOptions extends PostgresPluginBaseOptions {
readonly defaultPathMode?: 'strict' | 'lax';
readonly suggestIndexes?: boolean;
}
export interface PostgresJsonPluginMetadata {
readonly defaultPathMode: 'strict' | 'lax';
readonly suggestIndexes: boolean;
}
export declare const POSTGRES_JSON_METADATA_KEY = "postgres.json";
export declare function createPostgresJsonPlugin(options?: PostgresJsonPluginOptions): Readonly<ObjxPlugin>;
export declare function buildJsonPathWhereSql(column: string, jsonPath: string): string;
export declare function buildJsonProjectionSql(column: string, projection: readonly string[]): string;
export declare function createJsonIndexesSql(options: {
readonly table: string;
readonly jsonColumn: string;
readonly scalarPaths?: readonly string[];
}): readonly string[];
import { definePlugin } from '@qbobjx/core';
import { assertSafeSqlIdentifier, quoteSqlIdentifier, withDefaultMetadataKey, } from './shared.js';
export const POSTGRES_JSON_METADATA_KEY = 'postgres.json';
export function createPostgresJsonPlugin(options = {}) {
const metadataKey = withDefaultMetadataKey(options.metadataKey, POSTGRES_JSON_METADATA_KEY);
return definePlugin({
name: 'postgres-json',
hooks: {
onModelRegister(context) {
context.setMetadata(metadataKey, {
defaultPathMode: options.defaultPathMode ?? 'lax',
suggestIndexes: options.suggestIndexes ?? true,
});
},
},
});
}
function escapeSqlLiteral(value) {
return value.replaceAll("'", "''");
}
function resolveJsonColumn(column) {
return quoteSqlIdentifier(assertSafeSqlIdentifier(column));
}
function resolveJsonPathSegments(path) {
const segments = path
.split(',')
.map((segment) => segment.trim())
.filter((segment) => segment.length > 0);
if (segments.length === 0) {
throw new Error('JSON path must contain at least one segment.');
}
return segments.map((segment) => {
if (!/^[a-zA-Z0-9_]+$/.test(segment)) {
throw new Error(`Unsafe JSON path segment "${segment}".`);
}
return segment;
});
}
function toJsonPathArrayLiteral(path) {
return resolveJsonPathSegments(path)
.map((segment) => `"${segment.replaceAll('"', '""')}"`)
.join(',');
}
export function buildJsonPathWhereSql(column, jsonPath) {
const resolvedColumn = resolveJsonColumn(column);
return `${resolvedColumn} @@ '${escapeSqlLiteral(jsonPath)}'`;
}
export function buildJsonProjectionSql(column, projection) {
const resolvedColumn = resolveJsonColumn(column);
return projection
.map((path) => {
const segments = resolveJsonPathSegments(path);
const alias = quoteSqlIdentifier(segments.join('_'));
const pathLiteral = toJsonPathArrayLiteral(path);
return `${resolvedColumn} #>> '{${pathLiteral}}' as ${alias}`;
})
.join(', ');
}
export function createJsonIndexesSql(options) {
const table = assertSafeSqlIdentifier(options.table);
const jsonColumn = assertSafeSqlIdentifier(options.jsonColumn);
const qualifiedTable = quoteSqlIdentifier(table);
const quotedJsonColumn = quoteSqlIdentifier(jsonColumn);
const scalarPaths = options.scalarPaths ?? [];
return [
`create index if not exists ${quoteSqlIdentifier(`${table}_${jsonColumn}_gin_idx`)} on ${qualifiedTable} using gin (${quotedJsonColumn});`,
...scalarPaths.map((path, index) => {
const pathLiteral = toJsonPathArrayLiteral(path);
return `create index if not exists ${quoteSqlIdentifier(`${table}_${jsonColumn}_scalar_${index}_idx`)} on ${qualifiedTable} (( ${quotedJsonColumn} #>> '{${pathLiteral}}' ));`;
}),
];
}
import { type ObjxPlugin } from '@qbobjx/core';
import { type PostgresPluginBaseOptions } from './shared.js';
export interface PostgresObservabilityPluginOptions extends PostgresPluginBaseOptions {
readonly captureExplainAnalyze?: boolean;
readonly usePgStatStatements?: boolean;
readonly slowQueryThresholdMs?: number;
emit?(event: PostgresObservabilityEvent): void;
}
export interface PostgresObservabilityPluginMetadata {
readonly captureExplainAnalyze: boolean;
readonly usePgStatStatements: boolean;
readonly slowQueryThresholdMs: number;
}
export interface SqlLintIssue {
readonly code: 'SELECT_STAR' | 'MISSING_WHERE' | 'NO_LIMIT';
readonly message: string;
}
export interface PostgresObservabilityBaseEvent {
readonly plugin: 'postgres-observability';
readonly modelName?: string;
readonly tableName?: string;
readonly queryKind?: string;
readonly executionContextId?: string;
readonly transactionId?: string;
readonly sql?: string;
readonly parameterCount?: number;
readonly metadata?: Readonly<Record<string, unknown>>;
readonly lintIssues: readonly SqlLintIssue[];
}
export interface PostgresObservabilityQueryExecuteEvent extends PostgresObservabilityBaseEvent {
readonly type: 'query:execute';
readonly startedAt: Date;
}
export interface PostgresObservabilityQueryResultEvent extends PostgresObservabilityBaseEvent {
readonly type: 'query:result';
readonly startedAt: Date;
readonly finishedAt: Date;
readonly durationMs: number;
readonly rowCount?: number;
readonly isSlowQuery: boolean;
}
export interface PostgresObservabilityQueryErrorEvent extends PostgresObservabilityBaseEvent {
readonly type: 'query:error';
readonly startedAt: Date;
readonly finishedAt: Date;
readonly durationMs: number;
readonly error: unknown;
readonly isSlowQuery: boolean;
}
export type PostgresObservabilityEvent = PostgresObservabilityQueryExecuteEvent | PostgresObservabilityQueryResultEvent | PostgresObservabilityQueryErrorEvent;
export declare const POSTGRES_OBSERVABILITY_METADATA_KEY = "postgres.observability";
export declare function createPostgresObservabilityPlugin(options?: PostgresObservabilityPluginOptions): Readonly<ObjxPlugin>;
export declare function createExplainAnalyzeSql(sqlText: string): string;
export declare function createPgStatStatementsSql(limit?: number): string;
export declare function lintSqlAntiPatterns(sqlText: string): readonly SqlLintIssue[];
import { definePlugin } from '@qbobjx/core';
import { withDefaultMetadataKey } from './shared.js';
export const POSTGRES_OBSERVABILITY_METADATA_KEY = 'postgres.observability';
function isRecord(value) {
return typeof value === 'object' && value !== null && !Array.isArray(value);
}
function resolveLintIssues(sqlText) {
return sqlText ? lintSqlAntiPatterns(sqlText) : [];
}
function resolveTimingInfo(context) {
return {
...(context.timing?.startedAt ? { startedAt: context.timing.startedAt } : {}),
...(context.timing?.finishedAt ? { finishedAt: context.timing.finishedAt } : {}),
...(context.timing?.durationMs !== undefined
? { durationMs: context.timing.durationMs }
: {}),
};
}
function resolveRowCount(result) {
if (typeof result === 'number' && Number.isFinite(result)) {
return result;
}
if (Array.isArray(result)) {
return result.length;
}
if (isRecord(result)) {
if (typeof result.rowCount === 'number' && Number.isFinite(result.rowCount)) {
return result.rowCount;
}
if (Array.isArray(result.rows)) {
return result.rows.length;
}
}
return undefined;
}
function extractCompiledQueryInfo(context) {
const compiledQuery = context.compiledQuery;
if (!compiledQuery) {
return {};
}
return {
sql: compiledQuery.sql,
parameterCount: compiledQuery.parameterCount,
...(compiledQuery.metadata ? { metadata: compiledQuery.metadata } : {}),
};
}
function extractExecutionInfo(context) {
return {
...(context.executionContext?.id
? { executionContextId: context.executionContext.id }
: {}),
...(context.executionContext?.transaction?.id
? { transactionId: context.executionContext.transaction.id }
: {}),
};
}
function createBaseEvent(context) {
const queryInfo = extractCompiledQueryInfo(context);
const lintIssues = resolveLintIssues(queryInfo.sql);
const executionInfo = extractExecutionInfo(context);
return {
plugin: 'postgres-observability',
...(context.model?.name ? { modelName: context.model.name } : {}),
...(context.model?.dbTable ? { tableName: context.model.dbTable } : {}),
...(context.queryKind ? { queryKind: context.queryKind } : {}),
...(executionInfo.executionContextId
? { executionContextId: executionInfo.executionContextId }
: {}),
...(executionInfo.transactionId ? { transactionId: executionInfo.transactionId } : {}),
...(queryInfo.sql ? { sql: queryInfo.sql } : {}),
...(queryInfo.parameterCount !== undefined
? { parameterCount: queryInfo.parameterCount }
: {}),
...(queryInfo.metadata ? { metadata: queryInfo.metadata } : {}),
lintIssues,
};
}
export function createPostgresObservabilityPlugin(options = {}) {
const metadataKey = withDefaultMetadataKey(options.metadataKey, POSTGRES_OBSERVABILITY_METADATA_KEY);
const emitEvent = (event) => {
options.emit?.(event);
};
const slowQueryThresholdMs = options.slowQueryThresholdMs ?? 250;
return definePlugin({
name: 'postgres-observability',
hooks: {
onModelRegister(context) {
context.setMetadata(metadataKey, {
captureExplainAnalyze: options.captureExplainAnalyze ?? false,
usePgStatStatements: options.usePgStatStatements ?? true,
slowQueryThresholdMs,
});
},
onQueryExecute(context) {
const timing = resolveTimingInfo(context);
const startedAt = timing.startedAt ?? new Date();
emitEvent({
type: 'query:execute',
...createBaseEvent({
model: context.model,
queryKind: context.queryKind,
executionContext: context.executionContext,
compiledQuery: context.compiledQuery,
}),
startedAt,
});
},
onResult(context) {
const timing = resolveTimingInfo(context);
const finishedAt = timing.finishedAt ?? new Date();
const startedAt = timing.startedAt ?? finishedAt;
const durationMs = timing.durationMs ?? finishedAt.getTime() - startedAt.getTime();
const rowCount = resolveRowCount(context.result);
emitEvent({
type: 'query:result',
...createBaseEvent({
model: context.model,
queryKind: context.queryKind,
executionContext: context.executionContext,
compiledQuery: context.compiledQuery,
}),
startedAt,
finishedAt,
durationMs,
...(rowCount !== undefined ? { rowCount } : {}),
isSlowQuery: durationMs >= slowQueryThresholdMs,
});
return undefined;
},
onError(context) {
const timing = resolveTimingInfo(context);
const finishedAt = timing.finishedAt ?? new Date();
const startedAt = timing.startedAt ?? finishedAt;
const durationMs = timing.durationMs ?? finishedAt.getTime() - startedAt.getTime();
emitEvent({
type: 'query:error',
...createBaseEvent({
model: context.model,
queryKind: context.queryKind,
executionContext: context.executionContext,
compiledQuery: context.compiledQuery,
}),
startedAt,
finishedAt,
durationMs,
error: context.error,
isSlowQuery: durationMs >= slowQueryThresholdMs,
});
return undefined;
},
},
});
}
export function createExplainAnalyzeSql(sqlText) {
return `explain (analyze, buffers, format json) ${sqlText}`;
}
export function createPgStatStatementsSql(limit = 20) {
return `select query, calls, total_exec_time, mean_exec_time from pg_stat_statements order by total_exec_time desc limit ${Math.max(1, limit)};`;
}
export function lintSqlAntiPatterns(sqlText) {
const normalized = sqlText.toLowerCase();
const issues = [];
if (normalized.includes('select *')) {
issues.push({ code: 'SELECT_STAR', message: 'Avoid SELECT * in production queries.' });
}
if (normalized.startsWith('update ') && !normalized.includes(' where ')) {
issues.push({ code: 'MISSING_WHERE', message: 'UPDATE without WHERE detected.' });
}
if (normalized.startsWith('select ') && !normalized.includes(' limit ')) {
issues.push({ code: 'NO_LIMIT', message: 'SELECT without LIMIT can be expensive.' });
}
return issues;
}
import type { ObjxPlugin } from '@qbobjx/core';
import { type PostgresEventsPluginOptions } from './events.js';
import { type PostgresQueuePluginOptions } from './queue.js';
export type PostgresFeature = 'search' | 'queue' | 'events' | 'cache' | 'vector' | 'timeseries' | 'json' | 'security' | 'observability';
export interface PostgresPresetOptions {
readonly schema?: string;
readonly include?: readonly PostgresFeature[];
readonly queue?: Omit<PostgresQueuePluginOptions, 'schema'>;
readonly events?: Omit<PostgresEventsPluginOptions, 'schema'>;
}
export declare function createPostgresPreset(options?: PostgresPresetOptions): readonly Readonly<ObjxPlugin>[];
/**
* @deprecated Use `PostgresFeature`.
*/
export type PostgresSpecialistFeature = PostgresFeature;
/**
* @deprecated Use `PostgresPresetOptions`.
*/
export type PostgresPluginPresetOptions = PostgresPresetOptions;
/**
* @deprecated Use `createPostgresPreset`.
*/
export declare const createPostgresSpecialistPreset: typeof createPostgresPreset;
import { createPostgresCachePlugin } from './cache.js';
import { createPostgresEventsPlugin } from './events.js';
import { createPostgresJsonPlugin } from './json.js';
import { createPostgresObservabilityPlugin } from './observability.js';
import { createPostgresQueuePlugin } from './queue.js';
import { createPostgresSearchPlugin } from './search.js';
import { createPostgresSecurityPlugin } from './security.js';
import { DEFAULT_INTERNAL_SCHEMA } from './shared.js';
import { createPostgresTimeseriesPlugin } from './timeseries.js';
import { createPostgresVectorPlugin } from './vector.js';
const DEFAULT_POSTGRES_FEATURES = [
'search',
'queue',
'events',
'cache',
'vector',
'timeseries',
'json',
'security',
'observability',
];
export function createPostgresPreset(options = {}) {
const schema = options.schema ?? DEFAULT_INTERNAL_SCHEMA;
const enabled = new Set(options.include ?? DEFAULT_POSTGRES_FEATURES);
const plugins = [];
if (enabled.has('search')) {
plugins.push(createPostgresSearchPlugin());
}
if (enabled.has('queue')) {
plugins.push(createPostgresQueuePlugin({ schema, ...options.queue }));
}
if (enabled.has('events')) {
plugins.push(createPostgresEventsPlugin({ schema, ...options.events }));
}
if (enabled.has('cache')) {
plugins.push(createPostgresCachePlugin({ schema }));
}
if (enabled.has('vector')) {
plugins.push(createPostgresVectorPlugin());
}
if (enabled.has('timeseries')) {
plugins.push(createPostgresTimeseriesPlugin());
}
if (enabled.has('json')) {
plugins.push(createPostgresJsonPlugin());
}
if (enabled.has('security')) {
plugins.push(createPostgresSecurityPlugin());
}
if (enabled.has('observability')) {
plugins.push(createPostgresObservabilityPlugin());
}
return plugins;
}
/**
* @deprecated Use `createPostgresPreset`.
*/
export const createPostgresSpecialistPreset = createPostgresPreset;
import { type ObjxPlugin } from '@qbobjx/core';
import { type PostgresPluginBaseOptions } from './shared.js';
export interface PostgresQueuePluginOptions extends PostgresPluginBaseOptions {
readonly schema?: string;
readonly autoProvision?: boolean;
readonly defaultQueue?: string;
readonly lockTtlMs?: number;
readonly maxAttempts?: number;
}
export interface PostgresQueuePluginMetadata {
readonly schema: string;
readonly autoProvision: boolean;
readonly defaultQueue: string;
readonly lockTtlMs: number;
readonly maxAttempts: number;
}
export type QueueBackoffStrategy = 'fixed' | 'exponential';
export interface QueueBackoffOptions {
readonly strategy?: QueueBackoffStrategy;
readonly baseMs?: number;
readonly maxMs?: number;
}
export declare const POSTGRES_QUEUE_METADATA_KEY = "postgres.queue";
export declare function createPostgresQueuePlugin(options?: PostgresQueuePluginOptions): Readonly<ObjxPlugin>;
export declare function computeQueueBackoffMs(attempt: number, options?: QueueBackoffOptions): number;
export declare function buildQueueEnqueueSql(schema?: string, table?: string): string;
export declare function buildQueueDequeueSql(schema?: string, table?: string, options?: {
readonly lockTtlMs?: number;
}): string;
export declare function buildQueueCompleteSql(schema?: string, table?: string): string;
export declare function buildQueueFailSql(schema?: string, table?: string, dlqTable?: string): string;
export declare function buildQueueRenewLeaseSql(schema?: string, table?: string, options?: {
readonly lockTtlMs?: number;
}): string;
export declare function buildQueueReclaimExpiredSql(schema?: string, table?: string, options?: {
readonly lockTtlMs?: number;
}): string;
import { definePlugin } from '@qbobjx/core';
import { DEFAULT_INTERNAL_SCHEMA, assertSafeSqlIdentifier, quoteQualifiedSqlIdentifier, withDefaultMetadataKey, } from './shared.js';
export const POSTGRES_QUEUE_METADATA_KEY = 'postgres.queue';
export function createPostgresQueuePlugin(options = {}) {
const metadataKey = withDefaultMetadataKey(options.metadataKey, POSTGRES_QUEUE_METADATA_KEY);
return definePlugin({
name: 'postgres-queue',
hooks: {
onModelRegister(context) {
context.setMetadata(metadataKey, {
schema: options.schema ?? DEFAULT_INTERNAL_SCHEMA,
autoProvision: options.autoProvision ?? true,
defaultQueue: options.defaultQueue ?? 'default',
lockTtlMs: options.lockTtlMs ?? 30_000,
maxAttempts: options.maxAttempts ?? 8,
});
},
},
});
}
export function computeQueueBackoffMs(attempt, options = {}) {
const baseMs = options.baseMs ?? 500;
const maxMs = options.maxMs ?? 30_000;
const strategy = options.strategy ?? 'exponential';
if (attempt <= 0) {
return baseMs;
}
if (strategy === 'fixed') {
return Math.min(baseMs, maxMs);
}
return Math.min(baseMs * 2 ** (attempt - 1), maxMs);
}
function normalizeLeaseTtlMs(value) {
const normalized = Math.trunc(value ?? 30_000);
if (!Number.isFinite(normalized) || normalized <= 0) {
throw new Error('Postgres queue lock TTL must be a finite number greater than zero.');
}
return normalized;
}
function resolveQueueTables(schema = DEFAULT_INTERNAL_SCHEMA, table = 'queue_jobs', dlqTable = 'queue_dlq') {
const safeSchema = assertSafeSqlIdentifier(schema);
const safeTable = assertSafeSqlIdentifier(table);
const safeDlqTable = assertSafeSqlIdentifier(dlqTable);
return {
qualifiedQueueTable: quoteQualifiedSqlIdentifier(safeSchema, safeTable),
qualifiedDlqTable: quoteQualifiedSqlIdentifier(safeSchema, safeDlqTable),
};
}
export function buildQueueEnqueueSql(schema = DEFAULT_INTERNAL_SCHEMA, table = 'queue_jobs') {
const { qualifiedQueueTable } = resolveQueueTables(schema, table);
return `insert into ${qualifiedQueueTable} (queue_name, job_name, payload, priority, run_at, max_attempts, dedupe_key) values ($1, $2, $3::jsonb, $4, $5, $6, $7) on conflict do nothing returning id;`;
}
export function buildQueueDequeueSql(schema = DEFAULT_INTERNAL_SCHEMA, table = 'queue_jobs', options = {}) {
const { qualifiedQueueTable } = resolveQueueTables(schema, table);
const lockTtlMs = normalizeLeaseTtlMs(options.lockTtlMs);
return `with next_job as (select id from ${qualifiedQueueTable} where (status = 'pending' and run_at <= now()) or (status = 'running' and locked_at is not null and locked_at <= now() - ($2::int || ' milliseconds')::interval) order by priority desc, run_at asc, id asc limit 1 for update skip locked) update ${qualifiedQueueTable} as j set status = 'running', locked_at = now(), locked_by = $1, updated_at = now() from next_job where j.id = next_job.id returning j.*, ${lockTtlMs}::int as lock_ttl_ms;`;
}
export function buildQueueCompleteSql(schema = DEFAULT_INTERNAL_SCHEMA, table = 'queue_jobs') {
const { qualifiedQueueTable } = resolveQueueTables(schema, table);
return `update ${qualifiedQueueTable} set status = 'done', locked_at = null, locked_by = null, updated_at = now() where id = $1 and status = 'running';`;
}
export function buildQueueFailSql(schema = DEFAULT_INTERNAL_SCHEMA, table = 'queue_jobs', dlqTable = 'queue_dlq') {
const { qualifiedQueueTable, qualifiedDlqTable } = resolveQueueTables(schema, table, dlqTable);
return `with failed_job as (update ${qualifiedQueueTable} set status = case when attempts + 1 >= max_attempts then 'dead' else 'pending' end, attempts = attempts + 1, run_at = case when attempts + 1 >= max_attempts then run_at else now() + ($2::int || ' milliseconds')::interval end, last_error = $3, locked_at = null, locked_by = null, updated_at = now() where id = $1 returning id, queue_name, job_name, payload, status, last_error), inserted_dlq as (insert into ${qualifiedDlqTable} (job_id, queue_name, job_name, payload, error) select id, queue_name, job_name, payload, last_error from failed_job where status = 'dead' returning job_id) select * from failed_job;`;
}
export function buildQueueRenewLeaseSql(schema = DEFAULT_INTERNAL_SCHEMA, table = 'queue_jobs', options = {}) {
const { qualifiedQueueTable } = resolveQueueTables(schema, table);
const lockTtlMs = normalizeLeaseTtlMs(options.lockTtlMs);
return `update ${qualifiedQueueTable} set locked_at = now(), updated_at = now() where id = $1 and status = 'running' and locked_by = $2 and locked_at is not null and locked_at > now() - ($3::int || ' milliseconds')::interval returning *, ${lockTtlMs}::int as lock_ttl_ms;`;
}
export function buildQueueReclaimExpiredSql(schema = DEFAULT_INTERNAL_SCHEMA, table = 'queue_jobs', options = {}) {
const { qualifiedQueueTable } = resolveQueueTables(schema, table);
const lockTtlMs = normalizeLeaseTtlMs(options.lockTtlMs);
return `update ${qualifiedQueueTable} set status = 'pending', locked_at = null, locked_by = null, updated_at = now() where status = 'running' and locked_at is not null and locked_at <= now() - ($1::int || ' milliseconds')::interval returning *, ${lockTtlMs}::int as lock_ttl_ms;`;
}
import type { ExecutionContext, ModelPluginRegistration, ModelRegistry } from '@qbobjx/core';
import type { CompiledQuery, ObjxSession, SqlResultSet } from '@qbobjx/sql-engine';
import { type PostgresRegistrationSource, type PostgresRuntimeConfig } from './config.js';
import { type SqlLintIssue } from './observability.js';
import { type QueueBackoffOptions } from './queue.js';
export interface PostgresExecutionRequest {
readonly executionContext?: ExecutionContext;
readonly transactionId?: string;
}
export interface PostgresRuntimeExecutor {
execute<T = unknown>(sql: string, params?: readonly unknown[], request?: PostgresExecutionRequest): Promise<T>;
}
export interface PostgresSessionExecutorOptions {
readonly resultMode?: 'smart' | 'result-set' | 'rows' | 'first-or-null';
}
export interface PostgresSessionLike<TTransaction = unknown> {
execute(query: CompiledQuery, options?: {
readonly executionContext?: ExecutionContext;
readonly transaction?: TTransaction;
}): Promise<SqlResultSet>;
currentExecutionContext?(): ExecutionContext | undefined;
}
export interface QueueEnqueueInput {
readonly queueName?: string;
readonly jobName: string;
readonly payload: unknown;
readonly priority?: number;
readonly runAt?: Date;
readonly maxAttempts?: number;
readonly dedupeKey?: string;
}
export interface QueueFailInput {
readonly jobId: number;
readonly attempt: number;
readonly error: string;
readonly backoff?: QueueBackoffOptions;
}
export interface QueueLeaseRenewInput {
readonly jobId: number;
readonly workerId: string;
readonly leaseMs?: number;
}
export interface EventPublishInput {
readonly eventName: string;
readonly payload: unknown;
readonly aggregateId?: string;
readonly aggregateType?: string;
readonly idempotencyKey?: string;
}
export interface CacheSetInput {
readonly key: string;
readonly value: unknown;
readonly expiresAt?: Date | null;
readonly tags?: readonly string[];
}
export interface SearchInput {
readonly table: string;
readonly query: string;
readonly vectorColumn?: string;
readonly language?: string;
readonly rankFunction?: 'ts_rank' | 'ts_rank_cd';
readonly limit?: number;
readonly highlightColumn?: string;
}
export interface VectorSearchInput {
readonly table: string;
readonly vector: readonly number[];
readonly column?: string;
readonly whereSql?: string;
readonly limit?: number;
}
export interface TimeseriesPartitionInput {
readonly table: string;
readonly partitionName: string;
readonly from: string;
readonly to: string;
}
export interface SecurityPolicyInput {
readonly table: string;
readonly tenantColumn?: string;
readonly settingName?: string;
readonly policyName?: string;
}
export interface QueueWorkerOptions {
readonly workerId?: string;
readonly intervalMs?: number;
readonly idleDelayMs?: number;
readonly maxLoops?: number;
readonly autoComplete?: boolean;
readonly autoFail?: boolean;
readonly autoHeartbeat?: boolean;
readonly heartbeatIntervalMs?: number;
readonly leaseMs?: number;
readonly backoff?: QueueBackoffOptions;
readonly resolveJobId?: (job: unknown) => number | undefined;
readonly formatError?: (error: unknown) => string;
}
export interface EventDispatcherOptions {
readonly batchSize?: number;
readonly intervalMs?: number;
readonly maxLoops?: number;
readonly autoAck?: boolean;
readonly autoFail?: boolean;
readonly dispatcherId?: string;
readonly leaseMs?: number;
readonly resolveEventId?: (event: unknown) => number | undefined;
readonly formatError?: (error: unknown) => string;
}
export interface ProvisionOptions {
readonly lockKey?: number;
readonly pluginName?: string;
readonly version?: string;
readonly strict?: boolean;
}
export interface BackgroundHandle {
stop(): void;
readonly done: Promise<number>;
}
export interface PostgresRuntimeMetrics {
readonly queue: {
readonly enqueued: number;
readonly dequeued: number;
readonly completed: number;
readonly failed: number;
};
readonly events: {
readonly published: number;
readonly acked: number;
readonly failed: number;
readonly dispatchedBatches: number;
};
readonly cache: {
readonly hits: number;
readonly sets: number;
readonly prunes: number;
};
}
export interface PostgresRuntimeOptions {
readonly config?: PostgresRuntimeConfig;
}
export type PostgresRuntimeSource = PostgresRegistrationSource | readonly ModelPluginRegistration[];
export interface CreatePostgresRuntimeFromSessionOptions<TTransaction = unknown> extends PostgresRuntimeOptions, PostgresSessionExecutorOptions {
readonly source?: PostgresRuntimeSource;
readonly executionContext?: ExecutionContext;
readonly transaction?: TTransaction;
}
export declare function createPostgresSessionExecutor<TTransaction = unknown>(session: PostgresSessionLike<TTransaction>, options?: PostgresSessionExecutorOptions & {
readonly executionContext?: ExecutionContext;
readonly transaction?: TTransaction;
}): PostgresRuntimeExecutor;
export declare class PostgresRuntime {
#private;
constructor(executor: PostgresRuntimeExecutor, request?: PostgresExecutionRequest, config?: PostgresRuntimeConfig);
get config(): PostgresRuntimeConfig;
provisionInternalSchema(options?: ProvisionOptions): Promise<void>;
withRequest(request: PostgresExecutionRequest): PostgresRuntime;
metrics(): PostgresRuntimeMetrics;
readonly queue: {
enqueue: (input: QueueEnqueueInput) => Promise<unknown>;
dequeue: (workerId: string) => Promise<unknown>;
renewLease: (input: QueueLeaseRenewInput) => Promise<unknown>;
reclaimExpired: (_workerId: string, leaseMs?: number) => Promise<unknown>;
complete: (jobId: number) => Promise<unknown>;
fail: (input: QueueFailInput) => Promise<unknown>;
};
readonly events: {
publish: (input: EventPublishInput) => Promise<unknown>;
dispatchBatch: (batchSize: number, options?: {
readonly dispatcherId?: string;
readonly leaseMs?: number;
}) => Promise<unknown>;
ack: (id: number) => Promise<unknown>;
fail: (id: number, error: string) => Promise<unknown>;
listen: (channel?: string) => Promise<unknown>;
};
readonly cache: {
get: (key: string) => Promise<unknown>;
set: (input: CacheSetInput) => Promise<unknown>;
getOrCompute: <TValue>(key: string, compute: () => Promise<TValue>, options?: {
expiresAt?: Date | null;
tags?: readonly string[];
}) => Promise<TValue>;
pruneExpired: () => Promise<unknown>;
metrics: () => Promise<unknown>;
};
readonly search: {
query: (input: SearchInput) => Promise<unknown>;
};
readonly vector: {
addColumn: (table: string, column?: string, dimensions?: number) => Promise<unknown>;
createIndex: (table: string, column?: string) => Promise<unknown>;
similarity: (input: VectorSearchInput) => Promise<unknown>;
};
readonly timeseries: {
setupPartitioning: (table: string, timestampColumn: string) => Promise<unknown>;
createPartition: (input: TimeseriesPartitionInput) => Promise<unknown>;
applyRetention: (table: string, timestampColumn: string, retentionDays?: number) => Promise<unknown>;
enableTimescaleCompression: (hypertable: string) => Promise<unknown>;
};
readonly json: {
wherePath: (column: string, path: string) => string;
projection: (column: string, paths: readonly string[]) => string;
};
readonly security: {
enableRls: (table: string) => Promise<unknown>;
createTenantPolicy: (input: SecurityPolicyInput) => Promise<unknown>;
setLocalTenant: (tenantId: string, settingName?: string) => Promise<unknown>;
};
readonly observability: {
explainAnalyze: (sqlText: string) => Promise<unknown>;
topStatements: (limit?: number) => Promise<unknown>;
lint: (sqlText: string) => readonly SqlLintIssue[];
};
runQueueWorker(handler: (job: unknown) => Promise<void>, options?: QueueWorkerOptions): Promise<number>;
runEventDispatcher(handler: (event: unknown) => Promise<void>, options?: EventDispatcherOptions): Promise<number>;
startQueueWorker(handler: (job: unknown) => Promise<void>, options?: QueueWorkerOptions): BackgroundHandle;
startEventDispatcher(handler: (event: unknown) => Promise<void>, options?: EventDispatcherOptions): BackgroundHandle;
}
export declare function createPostgresRuntime(executor: PostgresRuntimeExecutor, options?: PostgresRuntimeOptions): PostgresRuntime;
export declare function createPostgresRuntimeFromConfig(executor: PostgresRuntimeExecutor, config: PostgresRuntimeConfig): PostgresRuntime;
export declare function createPostgresRuntimeFromRegistrations(executor: PostgresRuntimeExecutor, source: PostgresRuntimeSource, options?: PostgresRuntimeOptions): PostgresRuntime;
export declare function createPostgresRuntimeFromRegistry(executor: PostgresRuntimeExecutor, registry: ModelRegistry, options?: PostgresRuntimeOptions): PostgresRuntime;
export declare function createPostgresRuntimeFromSession<TTransaction = unknown>(session: PostgresSessionLike<TTransaction>, options?: CreatePostgresRuntimeFromSessionOptions<TTransaction>): PostgresRuntime;
export declare function createPostgresRuntimeFromObjxSession<TTransaction = unknown>(session: ObjxSession<TTransaction>, options?: CreatePostgresRuntimeFromSessionOptions<TTransaction>): PostgresRuntime;
/**
* @deprecated Use `PostgresRuntimeOptions`.
*/
export type PostgresSpecialistRuntimeOptions = PostgresRuntimeOptions;
/**
* @deprecated Use `PostgresRuntimeSource`.
*/
export type PostgresSpecialistRuntimeSource = PostgresRuntimeSource;
/**
* @deprecated Use `PostgresRuntime`.
*/
export declare const PostgresSpecialistRuntime: typeof PostgresRuntime;
/**
* @deprecated Use `createPostgresRuntime`.
*/
export declare const createPostgresSpecialistRuntime: typeof createPostgresRuntime;
/**
* @deprecated Use `createPostgresRuntimeFromConfig`.
*/
export declare const createPostgresSpecialistRuntimeFromConfig: typeof createPostgresRuntimeFromConfig;
/**
* @deprecated Use `createPostgresRuntimeFromRegistrations`.
*/
export declare const createPostgresSpecialistRuntimeFromRegistrations: typeof createPostgresRuntimeFromRegistrations;
/**
* @deprecated Use `createPostgresRuntimeFromRegistry`.
*/
export declare const createPostgresSpecialistRuntimeFromRegistry: typeof createPostgresRuntimeFromRegistry;
/**
* @deprecated Use `createPostgresRuntimeFromSession`.
*/
export declare const createPostgresSpecialistRuntimeFromSession: typeof createPostgresRuntimeFromSession;
/**
* @deprecated Use `createPostgresRuntimeFromObjxSession`.
*/
export declare const createPostgresSpecialistRuntimeFromObjxSession: typeof createPostgresRuntimeFromObjxSession;
import { buildCacheGetSql, buildCacheMetricsSql, buildCachePruneExpiredSql, buildCacheUpsertSql, } from './cache.js';
import { resolvePostgresConfig, } from './config.js';
import { buildListenSql, buildNotifySql, buildOutboxAckSql, buildOutboxDispatchBatchSql, buildOutboxFailSql, buildOutboxPublishSql, } from './events.js';
import { createPostgresAdvisoryLockSql, createPostgresAdvisoryUnlockSql, createPostgresInternalSchemaSql, } from './internal-schema.js';
import { buildJsonPathWhereSql, buildJsonProjectionSql } from './json.js';
import { createExplainAnalyzeSql, createPgStatStatementsSql, lintSqlAntiPatterns, } from './observability.js';
import { buildQueueCompleteSql, buildQueueDequeueSql, buildQueueEnqueueSql, buildQueueFailSql, buildQueueReclaimExpiredSql, buildQueueRenewLeaseSql, computeQueueBackoffMs, } from './queue.js';
import { buildPostgresSearchQuerySql, } from './search.js';
import { createEnableRlsSql, createSetLocalTenantSql, createTenantIsolationPolicySql, } from './security.js';
import { createPartitionSql, createPartitionedTableSql, createRetentionSql, createTimescaleCompressionSql, } from './timeseries.js';
import { buildVectorSimilarityQuerySql, createVectorColumnSql, createVectorIndexSql, } from './vector.js';
const INITIAL_METRICS = {
queue: { enqueued: 0, dequeued: 0, completed: 0, failed: 0 },
events: { published: 0, acked: 0, failed: 0, dispatchedBatches: 0 },
cache: { hits: 0, sets: 0, prunes: 0 },
};
function isRecord(value) {
return typeof value === 'object' && value !== null && !Array.isArray(value);
}
function cloneMetrics(metrics) {
return {
queue: { ...metrics.queue },
events: { ...metrics.events },
cache: { ...metrics.cache },
};
}
function normalizeErrorMessage(error) {
if (error instanceof Error && error.message) {
return error.message;
}
return String(error);
}
function resolveEntityId(value, explicitResolver) {
if (explicitResolver) {
return explicitResolver(value);
}
if (!isRecord(value)) {
return undefined;
}
if (typeof value.id === 'number' && Number.isFinite(value.id)) {
return value.id;
}
return undefined;
}
function isRegistrySource(source) {
return typeof source === 'object' && source !== null && 'all' in source;
}
function registrationsFromSource(source) {
if (Array.isArray(source)) {
return source;
}
if (isRegistrySource(source)) {
return source.all();
}
return source;
}
function resolveQueueConfig(config) {
return Object.freeze({
schema: config.queue?.schema ?? config.events?.schema ?? config.cache?.schema ?? 'objx_internal',
autoProvision: config.queue?.autoProvision ?? true,
defaultQueue: config.queue?.defaultQueue ?? 'default',
lockTtlMs: config.queue?.lockTtlMs ?? 30_000,
maxAttempts: config.queue?.maxAttempts ?? 8,
});
}
function resolveEventsConfig(config) {
return Object.freeze({
schema: config.events?.schema ?? config.queue?.schema ?? config.cache?.schema ?? 'objx_internal',
autoProvision: config.events?.autoProvision ?? true,
outboxTable: config.events?.outboxTable ?? 'outbox_events',
notifyChannel: config.events?.notifyChannel ?? 'objx_events',
claimTtlMs: config.events?.claimTtlMs ?? 30_000,
dispatcherId: config.events?.dispatcherId ?? 'objx-dispatcher',
});
}
function resolveCacheConfig(config) {
return Object.freeze({
schema: config.cache?.schema ?? config.queue?.schema ?? config.events?.schema ?? 'objx_internal',
autoProvision: config.cache?.autoProvision ?? true,
defaultTtlSeconds: config.cache?.defaultTtlSeconds ?? 60,
table: config.cache?.table ?? 'cache_entries',
});
}
function resolveSearchConfig(config) {
return Object.freeze({
defaultLanguage: config.search?.defaultLanguage ?? 'simple',
autoMigrateHelpers: config.search?.autoMigrateHelpers ?? true,
rankFunction: config.search?.rankFunction ?? 'ts_rank_cd',
});
}
function resolveSecurityConfig(config) {
return Object.freeze({
tenantSettingName: config.security?.tenantSettingName ?? 'objx.tenant_id',
enforceRls: config.security?.enforceRls ?? true,
});
}
function resolveTimeseriesConfig(config) {
return Object.freeze({
useTimescaleWhenAvailable: config.timeseries?.useTimescaleWhenAvailable ?? true,
defaultRetentionDays: config.timeseries?.defaultRetentionDays ?? 30,
defaultPartitionWindow: config.timeseries?.defaultPartitionWindow ?? 'week',
});
}
function resolveObservabilityConfig(config) {
return Object.freeze({
captureExplainAnalyze: config.observability?.captureExplainAnalyze ?? false,
usePgStatStatements: config.observability?.usePgStatStatements ?? true,
slowQueryThresholdMs: config.observability?.slowQueryThresholdMs ?? 250,
});
}
function resolveVectorConfig(config) {
return Object.freeze({
extensionName: config.vector?.extensionName ?? 'vector',
distance: config.vector?.distance ?? 'cosine',
indexMethod: config.vector?.indexMethod ?? 'hnsw',
});
}
function assignResolvedConfigIfDefined(target, key, value) {
if (value !== undefined) {
target[key] = value;
}
}
function mergeResolvedConfig(resolved, override) {
if (!override) {
return resolved;
}
const merged = {};
assignResolvedConfigIfDefined(merged, 'queue', override.queue ?? resolved.queue);
assignResolvedConfigIfDefined(merged, 'events', override.events ?? resolved.events);
assignResolvedConfigIfDefined(merged, 'cache', override.cache ?? resolved.cache);
assignResolvedConfigIfDefined(merged, 'search', override.search ?? resolved.search);
assignResolvedConfigIfDefined(merged, 'json', override.json ?? resolved.json);
assignResolvedConfigIfDefined(merged, 'security', override.security ?? resolved.security);
assignResolvedConfigIfDefined(merged, 'observability', override.observability ?? resolved.observability);
assignResolvedConfigIfDefined(merged, 'timeseries', override.timeseries ?? resolved.timeseries);
assignResolvedConfigIfDefined(merged, 'vector', override.vector ?? resolved.vector);
return Object.freeze(merged);
}
function createCompiledQuery(sqlText, params = []) {
return {
sql: sqlText,
parameters: params.map((value) => ({ value })),
metadata: Object.freeze({}),
};
}
function normalizeSessionResult(resultSet, mode) {
const resultMode = mode ?? 'smart';
if (resultMode === 'result-set') {
return resultSet;
}
if (resultMode === 'rows') {
return resultSet.rows;
}
if (resultMode === 'first-or-null') {
return resultSet.rows[0] ?? null;
}
if (resultSet.rows.length === 0) {
return null;
}
if (resultSet.rows.length === 1) {
return resultSet.rows[0];
}
return resultSet.rows;
}
export function createPostgresSessionExecutor(session, options = {}) {
return {
async execute(sqlText, params = [], request) {
const compiledQuery = createCompiledQuery(sqlText, params);
const executionContext = request?.executionContext ??
options.executionContext ??
session.currentExecutionContext?.();
const resultSet = await session.execute(compiledQuery, {
...(executionContext ? { executionContext } : {}),
...(options.transaction !== undefined ? { transaction: options.transaction } : {}),
});
return normalizeSessionResult(resultSet, options.resultMode);
},
};
}
export class PostgresRuntime {
#executor;
#request;
#config;
#metrics = INITIAL_METRICS;
constructor(executor, request, config = Object.freeze({})) {
this.#executor = executor;
this.#request = request;
this.#config = config;
}
get config() {
return this.#config;
}
async provisionInternalSchema(options = {}) {
const lockKey = options.lockKey ?? 883_201;
const queueConfig = resolveQueueConfig(this.#config);
const eventsConfig = resolveEventsConfig(this.#config);
const cacheConfig = resolveCacheConfig(this.#config);
const schema = queueConfig.schema;
await this.#executor.execute(createPostgresAdvisoryLockSql(lockKey), [], this.#request);
try {
for (const ddl of createPostgresInternalSchemaSql({
schema,
queueTable: 'queue_jobs',
dlqTable: 'queue_dlq',
outboxTable: eventsConfig.outboxTable,
cacheTable: cacheConfig.table,
migrationsTable: 'runtime_migrations',
})) {
await this.#executor.execute(ddl, [], this.#request);
}
const pluginName = options.pluginName ?? 'postgres-runtime';
const version = options.version ?? '1';
if (options.strict) {
const existing = await this.#executor.execute(`select count(*)::int as "rowCount" from "${schema}"."runtime_migrations" where plugin_name = $1 and version = $2;`, [pluginName, version], this.#request);
const count = typeof existing?.rowCount === 'number' ? existing.rowCount : 0;
if (count === 0) {
throw new Error(`Strict mode enabled and runtime migration ${pluginName}@${version} is not registered.`);
}
}
else {
await this.#executor.execute(`insert into "${schema}"."runtime_migrations" (plugin_name, version) values ($1, $2) on conflict do nothing;`, [pluginName, version], this.#request);
}
}
finally {
await this.#executor.execute(createPostgresAdvisoryUnlockSql(lockKey), [], this.#request);
}
}
withRequest(request) {
const next = new PostgresRuntime(this.#executor, request, this.#config);
next.#metrics = cloneMetrics(this.#metrics);
return next;
}
metrics() {
return cloneMetrics(this.#metrics);
}
queue = {
enqueue: async (input) => {
const queueConfig = resolveQueueConfig(this.#config);
this.#metrics = {
...this.#metrics,
queue: { ...this.#metrics.queue, enqueued: this.#metrics.queue.enqueued + 1 },
};
return this.#executor.execute(buildQueueEnqueueSql(queueConfig.schema), [
input.queueName ?? queueConfig.defaultQueue,
input.jobName,
JSON.stringify(input.payload),
input.priority ?? 0,
input.runAt ?? new Date(),
input.maxAttempts ?? queueConfig.maxAttempts,
input.dedupeKey ?? null,
], this.#request);
},
dequeue: async (workerId) => {
const queueConfig = resolveQueueConfig(this.#config);
this.#metrics = {
...this.#metrics,
queue: { ...this.#metrics.queue, dequeued: this.#metrics.queue.dequeued + 1 },
};
return this.#executor.execute(buildQueueDequeueSql(queueConfig.schema, 'queue_jobs', {
lockTtlMs: queueConfig.lockTtlMs,
}), [workerId, queueConfig.lockTtlMs], this.#request);
},
renewLease: async (input) => {
const queueConfig = resolveQueueConfig(this.#config);
const leaseMs = Math.max(1, Math.trunc(input.leaseMs ?? queueConfig.lockTtlMs));
return this.#executor.execute(buildQueueRenewLeaseSql(queueConfig.schema, 'queue_jobs', {
lockTtlMs: leaseMs,
}), [input.jobId, input.workerId, leaseMs], this.#request);
},
reclaimExpired: async (_workerId, leaseMs) => {
const queueConfig = resolveQueueConfig(this.#config);
const effectiveLeaseMs = Math.max(1, Math.trunc(leaseMs ?? queueConfig.lockTtlMs));
return this.#executor.execute(buildQueueReclaimExpiredSql(queueConfig.schema, 'queue_jobs', {
lockTtlMs: effectiveLeaseMs,
}), [effectiveLeaseMs], this.#request);
},
complete: async (jobId) => {
const queueConfig = resolveQueueConfig(this.#config);
this.#metrics = {
...this.#metrics,
queue: { ...this.#metrics.queue, completed: this.#metrics.queue.completed + 1 },
};
return this.#executor.execute(buildQueueCompleteSql(queueConfig.schema), [jobId], this.#request);
},
fail: async (input) => {
const queueConfig = resolveQueueConfig(this.#config);
this.#metrics = {
...this.#metrics,
queue: { ...this.#metrics.queue, failed: this.#metrics.queue.failed + 1 },
};
const retryDelayMs = computeQueueBackoffMs(input.attempt, input.backoff);
return this.#executor.execute(buildQueueFailSql(queueConfig.schema), [input.jobId, retryDelayMs, input.error], this.#request);
},
};
events = {
publish: async (input) => {
const eventsConfig = resolveEventsConfig(this.#config);
this.#metrics = {
...this.#metrics,
events: { ...this.#metrics.events, published: this.#metrics.events.published + 1 },
};
const row = await this.#executor.execute(buildOutboxPublishSql(eventsConfig.schema, eventsConfig.outboxTable), [
input.eventName,
JSON.stringify(input.payload),
input.aggregateId ?? null,
input.aggregateType ?? null,
input.idempotencyKey ?? null,
], this.#request);
await this.#executor.execute(buildNotifySql(eventsConfig.notifyChannel), [input.eventName], this.#request);
return row;
},
dispatchBatch: async (batchSize, options = {}) => {
const eventsConfig = resolveEventsConfig(this.#config);
const dispatcherId = options.dispatcherId ?? eventsConfig.dispatcherId;
const leaseMs = Math.max(1, Math.trunc(options.leaseMs ?? eventsConfig.claimTtlMs));
this.#metrics = {
...this.#metrics,
events: {
...this.#metrics.events,
dispatchedBatches: this.#metrics.events.dispatchedBatches + 1,
},
};
return this.#executor.execute(buildOutboxDispatchBatchSql(eventsConfig.schema, eventsConfig.outboxTable), [batchSize, dispatcherId, leaseMs], this.#request);
},
ack: async (id) => {
const eventsConfig = resolveEventsConfig(this.#config);
this.#metrics = {
...this.#metrics,
events: { ...this.#metrics.events, acked: this.#metrics.events.acked + 1 },
};
return this.#executor.execute(buildOutboxAckSql(eventsConfig.schema, eventsConfig.outboxTable), [id], this.#request);
},
fail: async (id, error) => {
const eventsConfig = resolveEventsConfig(this.#config);
this.#metrics = {
...this.#metrics,
events: { ...this.#metrics.events, failed: this.#metrics.events.failed + 1 },
};
return this.#executor.execute(buildOutboxFailSql(eventsConfig.schema, eventsConfig.outboxTable), [id, error], this.#request);
},
listen: async (channel = resolveEventsConfig(this.#config).notifyChannel) => {
return this.#executor.execute(buildListenSql(channel), [], this.#request);
},
};
cache = {
get: async (key) => {
const cacheConfig = resolveCacheConfig(this.#config);
this.#metrics = {
...this.#metrics,
cache: { ...this.#metrics.cache, hits: this.#metrics.cache.hits + 1 },
};
return this.#executor.execute(buildCacheGetSql(cacheConfig.schema, cacheConfig.table), [key], this.#request);
},
set: async (input) => {
const cacheConfig = resolveCacheConfig(this.#config);
this.#metrics = {
...this.#metrics,
cache: { ...this.#metrics.cache, sets: this.#metrics.cache.sets + 1 },
};
const expiresAt = input.expiresAt === undefined
? new Date(Date.now() + cacheConfig.defaultTtlSeconds * 1_000)
: input.expiresAt;
return this.#executor.execute(buildCacheUpsertSql(cacheConfig.schema, cacheConfig.table), [input.key, JSON.stringify(input.value), expiresAt ?? null, input.tags ?? []], this.#request);
},
getOrCompute: async (key, compute, options = {}) => {
const cached = await this.cache.get(key);
if (cached && typeof cached === 'object' && 'value' in cached) {
return cached.value;
}
const value = await compute();
await this.cache.set({
key,
value,
...(options.expiresAt !== undefined ? { expiresAt: options.expiresAt } : {}),
...(options.tags !== undefined ? { tags: options.tags } : {}),
});
return value;
},
pruneExpired: async () => {
const cacheConfig = resolveCacheConfig(this.#config);
this.#metrics = {
...this.#metrics,
cache: { ...this.#metrics.cache, prunes: this.#metrics.cache.prunes + 1 },
};
return this.#executor.execute(buildCachePruneExpiredSql(cacheConfig.schema, cacheConfig.table), [], this.#request);
},
metrics: async () => {
const cacheConfig = resolveCacheConfig(this.#config);
return this.#executor.execute(buildCacheMetricsSql(cacheConfig.schema, cacheConfig.table), [], this.#request);
},
};
search = {
query: async (input) => {
const searchConfig = resolveSearchConfig(this.#config);
const sql = buildPostgresSearchQuerySql({
...input,
language: input.language ?? searchConfig.defaultLanguage,
rankFunction: input.rankFunction ?? searchConfig.rankFunction,
});
return this.#executor.execute(sql, [input.query], this.#request);
},
};
vector = {
addColumn: async (table, column = 'embedding', dimensions = 1536) => {
return this.#executor.execute(createVectorColumnSql(table, column, dimensions), [], this.#request);
},
createIndex: async (table, column = 'embedding') => {
const vectorConfig = resolveVectorConfig(this.#config);
const operatorClass = vectorConfig.distance === 'l2'
? 'vector_l2_ops'
: vectorConfig.distance === 'ip'
? 'vector_ip_ops'
: 'vector_cosine_ops';
return this.#executor.execute(createVectorIndexSql({
table,
column,
method: vectorConfig.indexMethod,
operatorClass,
}), [], this.#request);
},
similarity: async (input) => {
const vectorConfig = resolveVectorConfig(this.#config);
const distanceOperator = vectorConfig.distance === 'l2'
? '<->'
: vectorConfig.distance === 'ip'
? '<#>'
: '<=>';
const sql = buildVectorSimilarityQuerySql({
...input,
distanceOperator,
});
return this.#executor.execute(sql, [`[${input.vector.join(',')}]`], this.#request);
},
};
timeseries = {
setupPartitioning: async (table, timestampColumn) => {
return this.#executor.execute(createPartitionedTableSql({ table, timestampColumn }), [], this.#request);
},
createPartition: async (input) => {
return this.#executor.execute(createPartitionSql(input), [], this.#request);
},
applyRetention: async (table, timestampColumn, retentionDays) => {
const timeseriesConfig = resolveTimeseriesConfig(this.#config);
return this.#executor.execute(createRetentionSql({
table,
timestampColumn,
retentionDays: retentionDays ?? timeseriesConfig.defaultRetentionDays,
}), [], this.#request);
},
enableTimescaleCompression: async (hypertable) => {
return this.#executor.execute(createTimescaleCompressionSql(hypertable), [], this.#request);
},
};
json = {
wherePath: (column, path) => buildJsonPathWhereSql(column, path),
projection: (column, paths) => buildJsonProjectionSql(column, paths),
};
security = {
enableRls: async (table) => {
return this.#executor.execute(createEnableRlsSql(table), [], this.#request);
},
createTenantPolicy: async (input) => {
const securityConfig = resolveSecurityConfig(this.#config);
return this.#executor.execute(createTenantIsolationPolicySql({
...input,
settingName: input.settingName ?? securityConfig.tenantSettingName,
}), [], this.#request);
},
setLocalTenant: async (tenantId, settingName = resolveSecurityConfig(this.#config).tenantSettingName) => {
return this.#executor.execute(createSetLocalTenantSql(settingName), [tenantId], this.#request);
},
};
observability = {
explainAnalyze: async (sqlText) => {
return this.#executor.execute(createExplainAnalyzeSql(sqlText), [], this.#request);
},
topStatements: async (limit = 20) => {
const observabilityConfig = resolveObservabilityConfig(this.#config);
if (!observabilityConfig.usePgStatStatements) {
throw new Error('pg_stat_statements support is disabled by runtime plugin config.');
}
return this.#executor.execute(createPgStatStatementsSql(limit), [], this.#request);
},
lint: (sqlText) => lintSqlAntiPatterns(sqlText),
};
async runQueueWorker(handler, options = {}) {
const queueConfig = resolveQueueConfig(this.#config);
const workerId = options.workerId ?? 'objx-worker';
const maxLoops = options.maxLoops ?? Number.POSITIVE_INFINITY;
const idleDelayMs = options.idleDelayMs ?? options.intervalMs ?? 250;
const autoHeartbeat = options.autoHeartbeat ?? false;
const leaseMs = Math.max(1, Math.trunc(options.leaseMs ?? queueConfig.lockTtlMs));
const heartbeatIntervalMs = Math.max(1, Math.trunc(options.heartbeatIntervalMs ?? Math.max(1, Math.floor(leaseMs / 2))));
let loops = 0;
while (loops < maxLoops) {
const job = await this.queue.dequeue(workerId);
if (!job) {
await delay(idleDelayMs);
loops += 1;
continue;
}
const jobId = resolveEntityId(job, options.resolveJobId);
let heartbeatHandle;
try {
if (autoHeartbeat && jobId !== undefined) {
heartbeatHandle = setInterval(() => {
void this.queue.renewLease({
jobId,
workerId,
leaseMs,
});
}, heartbeatIntervalMs);
}
await handler(job);
if (heartbeatHandle) {
clearInterval(heartbeatHandle);
heartbeatHandle = undefined;
}
if (options.autoComplete ?? true) {
if (jobId !== undefined) {
await this.queue.complete(jobId);
}
}
}
catch (error) {
if (heartbeatHandle) {
clearInterval(heartbeatHandle);
heartbeatHandle = undefined;
}
if (options.autoFail ?? true) {
if (jobId !== undefined) {
await this.queue.fail({
jobId,
attempt: isRecord(job) && typeof job.attempts === 'number'
? job.attempts + 1
: 1,
error: options.formatError?.(error) ?? normalizeErrorMessage(error),
...(options.backoff !== undefined ? { backoff: options.backoff } : {}),
});
}
}
throw error;
}
loops += 1;
}
return loops;
}
async runEventDispatcher(handler, options = {}) {
const eventsConfig = resolveEventsConfig(this.#config);
const batchSize = options.batchSize ?? 100;
const intervalMs = options.intervalMs ?? 250;
const maxLoops = options.maxLoops ?? Number.POSITIVE_INFINITY;
const dispatcherId = options.dispatcherId ?? eventsConfig.dispatcherId;
const leaseMs = Math.max(1, Math.trunc(options.leaseMs ?? eventsConfig.claimTtlMs));
let loops = 0;
while (loops < maxLoops) {
const batch = await this.events.dispatchBatch(batchSize, {
dispatcherId,
leaseMs,
});
if (!Array.isArray(batch) || batch.length === 0) {
await delay(intervalMs);
loops += 1;
continue;
}
for (const event of batch) {
try {
await handler(event);
if (options.autoAck ?? true) {
const eventId = resolveEntityId(event, options.resolveEventId);
if (eventId !== undefined) {
await this.events.ack(eventId);
}
}
}
catch (error) {
if (options.autoFail ?? true) {
const eventId = resolveEntityId(event, options.resolveEventId);
if (eventId !== undefined) {
await this.events.fail(eventId, options.formatError?.(error) ?? normalizeErrorMessage(error));
}
}
throw error;
}
}
loops += 1;
}
return loops;
}
startQueueWorker(handler, options = {}) {
let stopped = false;
const done = (async () => {
let loops = 0;
const queueConfig = resolveQueueConfig(this.#config);
const workerId = options.workerId ?? 'objx-worker';
const maxLoops = options.maxLoops ?? Number.POSITIVE_INFINITY;
const idleDelayMs = options.idleDelayMs ?? options.intervalMs ?? 250;
const autoHeartbeat = options.autoHeartbeat ?? false;
const leaseMs = Math.max(1, Math.trunc(options.leaseMs ?? queueConfig.lockTtlMs));
const heartbeatIntervalMs = Math.max(1, Math.trunc(options.heartbeatIntervalMs ?? Math.max(1, Math.floor(leaseMs / 2))));
while (!stopped && loops < maxLoops) {
const job = await this.queue.dequeue(workerId);
if (!job) {
await delay(idleDelayMs);
loops += 1;
continue;
}
const jobId = resolveEntityId(job, options.resolveJobId);
let heartbeatHandle;
try {
if (autoHeartbeat && jobId !== undefined) {
heartbeatHandle = setInterval(() => {
void this.queue.renewLease({
jobId,
workerId,
leaseMs,
});
}, heartbeatIntervalMs);
}
await handler(job);
if (heartbeatHandle) {
clearInterval(heartbeatHandle);
heartbeatHandle = undefined;
}
if (options.autoComplete ?? true) {
if (jobId !== undefined) {
await this.queue.complete(jobId);
}
}
}
catch (error) {
if (heartbeatHandle) {
clearInterval(heartbeatHandle);
heartbeatHandle = undefined;
}
if (options.autoFail ?? true) {
if (jobId !== undefined) {
await this.queue.fail({
jobId,
attempt: isRecord(job) && typeof job.attempts === 'number'
? job.attempts + 1
: 1,
error: options.formatError?.(error) ?? normalizeErrorMessage(error),
...(options.backoff !== undefined ? { backoff: options.backoff } : {}),
});
}
}
throw error;
}
loops += 1;
}
return loops;
})();
return {
stop() {
stopped = true;
},
done,
};
}
startEventDispatcher(handler, options = {}) {
let stopped = false;
const done = (async () => {
let loops = 0;
const eventsConfig = resolveEventsConfig(this.#config);
const batchSize = options.batchSize ?? 100;
const intervalMs = options.intervalMs ?? 250;
const maxLoops = options.maxLoops ?? Number.POSITIVE_INFINITY;
const dispatcherId = options.dispatcherId ?? eventsConfig.dispatcherId;
const leaseMs = Math.max(1, Math.trunc(options.leaseMs ?? eventsConfig.claimTtlMs));
while (!stopped && loops < maxLoops) {
const batch = await this.events.dispatchBatch(batchSize, {
dispatcherId,
leaseMs,
});
if (!Array.isArray(batch) || batch.length === 0) {
await delay(intervalMs);
loops += 1;
continue;
}
for (const event of batch) {
if (stopped) {
break;
}
try {
await handler(event);
if (options.autoAck ?? true) {
const eventId = resolveEntityId(event, options.resolveEventId);
if (eventId !== undefined) {
await this.events.ack(eventId);
}
}
}
catch (error) {
if (options.autoFail ?? true) {
const eventId = resolveEntityId(event, options.resolveEventId);
if (eventId !== undefined) {
await this.events.fail(eventId, options.formatError?.(error) ?? normalizeErrorMessage(error));
}
}
throw error;
}
}
loops += 1;
}
return loops;
})();
return {
stop() {
stopped = true;
},
done,
};
}
}
function delay(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
export function createPostgresRuntime(executor, options = {}) {
return new PostgresRuntime(executor, undefined, options.config ?? Object.freeze({}));
}
export function createPostgresRuntimeFromConfig(executor, config) {
return new PostgresRuntime(executor, undefined, config);
}
export function createPostgresRuntimeFromRegistrations(executor, source, options = {}) {
const resolved = resolvePostgresConfig(registrationsFromSource(source));
return new PostgresRuntime(executor, undefined, mergeResolvedConfig(resolved, options.config));
}
export function createPostgresRuntimeFromRegistry(executor, registry, options = {}) {
return createPostgresRuntimeFromRegistrations(executor, registry, options);
}
export function createPostgresRuntimeFromSession(session, options = {}) {
const executorOptions = {
...(options.resultMode !== undefined ? { resultMode: options.resultMode } : {}),
...(options.executionContext ? { executionContext: options.executionContext } : {}),
...(options.transaction !== undefined ? { transaction: options.transaction } : {}),
};
const executor = createPostgresSessionExecutor(session, executorOptions);
const resolvedConfig = options.source
? resolvePostgresConfig(registrationsFromSource(options.source))
: Object.freeze({});
return new PostgresRuntime(executor, options.executionContext ? { executionContext: options.executionContext } : undefined, mergeResolvedConfig(resolvedConfig, options.config));
}
export function createPostgresRuntimeFromObjxSession(session, options = {}) {
return createPostgresRuntimeFromSession(session, options);
}
/**
* @deprecated Use `PostgresRuntime`.
*/
export const PostgresSpecialistRuntime = PostgresRuntime;
/**
* @deprecated Use `createPostgresRuntime`.
*/
export const createPostgresSpecialistRuntime = createPostgresRuntime;
/**
* @deprecated Use `createPostgresRuntimeFromConfig`.
*/
export const createPostgresSpecialistRuntimeFromConfig = createPostgresRuntimeFromConfig;
/**
* @deprecated Use `createPostgresRuntimeFromRegistrations`.
*/
export const createPostgresSpecialistRuntimeFromRegistrations = createPostgresRuntimeFromRegistrations;
/**
* @deprecated Use `createPostgresRuntimeFromRegistry`.
*/
export const createPostgresSpecialistRuntimeFromRegistry = createPostgresRuntimeFromRegistry;
/**
* @deprecated Use `createPostgresRuntimeFromSession`.
*/
export const createPostgresSpecialistRuntimeFromSession = createPostgresRuntimeFromSession;
/**
* @deprecated Use `createPostgresRuntimeFromObjxSession`.
*/
export const createPostgresSpecialistRuntimeFromObjxSession = createPostgresRuntimeFromObjxSession;
import { type ObjxPlugin } from '@qbobjx/core';
import { type PostgresPluginBaseOptions } from './shared.js';
export interface PostgresSearchPluginOptions extends PostgresPluginBaseOptions {
readonly defaultLanguage?: string;
readonly autoMigrateHelpers?: boolean;
readonly rankFunction?: 'ts_rank' | 'ts_rank_cd';
}
export interface PostgresSearchPluginMetadata {
readonly defaultLanguage: string;
readonly autoMigrateHelpers: boolean;
readonly rankFunction: 'ts_rank' | 'ts_rank_cd';
}
export interface PostgresSearchMigrationOptions {
readonly table: string;
readonly sourceColumns: readonly string[];
readonly vectorColumn?: string;
readonly language?: string;
readonly indexName?: string;
}
export interface PostgresSearchQueryOptions {
readonly table: string;
readonly vectorColumn?: string;
readonly query: string;
readonly language?: string;
readonly rankAlias?: string;
readonly rankFunction?: 'ts_rank' | 'ts_rank_cd';
readonly limit?: number;
readonly highlightColumn?: string;
}
export declare const POSTGRES_SEARCH_METADATA_KEY = "postgres.search";
export declare function createPostgresSearchPlugin(options?: PostgresSearchPluginOptions): Readonly<ObjxPlugin>;
export declare function createPostgresSearchMigrationSql(options: PostgresSearchMigrationOptions): readonly string[];
export declare function buildPostgresSearchQuerySql(options: PostgresSearchQueryOptions): string;
import { definePlugin } from '@qbobjx/core';
import { assertSafeSqlIdentifier, quoteSqlIdentifier, quoteSqlLiteral, withDefaultMetadataKey, } from './shared.js';
export const POSTGRES_SEARCH_METADATA_KEY = 'postgres.search';
export function createPostgresSearchPlugin(options = {}) {
const metadataKey = withDefaultMetadataKey(options.metadataKey, POSTGRES_SEARCH_METADATA_KEY);
return definePlugin({
name: 'postgres-search',
hooks: {
onModelRegister(context) {
context.setMetadata(metadataKey, {
defaultLanguage: options.defaultLanguage ?? 'simple',
autoMigrateHelpers: options.autoMigrateHelpers ?? true,
rankFunction: options.rankFunction ?? 'ts_rank_cd',
});
},
},
});
}
function resolveSearchTable(table) {
return quoteSqlIdentifier(assertSafeSqlIdentifier(table));
}
function resolveSearchColumn(column) {
return quoteSqlIdentifier(assertSafeSqlIdentifier(column));
}
function resolveSearchLanguage(language) {
return quoteSqlLiteral(language ?? 'simple');
}
function resolveSearchRankFunction(rankFunction) {
return rankFunction ?? 'ts_rank_cd';
}
export function createPostgresSearchMigrationSql(options) {
const qualifiedTable = resolveSearchTable(options.table);
const vectorColumn = resolveSearchColumn(options.vectorColumn ?? 'search_vector');
const language = resolveSearchLanguage(options.language);
const indexName = quoteSqlIdentifier(assertSafeSqlIdentifier(options.indexName ?? `${options.table}_${options.vectorColumn ?? 'search_vector'}_gin_idx`));
const vectorExpression = options.sourceColumns
.map((column) => `coalesce(${resolveSearchColumn(column)}, '')`)
.join(` || ' ' || `);
return [
`alter table ${qualifiedTable} add column if not exists ${vectorColumn} tsvector generated always as (to_tsvector(${language}, ${vectorExpression})) stored;`,
`create index if not exists ${indexName} on ${qualifiedTable} using gin (${vectorColumn});`,
];
}
export function buildPostgresSearchQuerySql(options) {
const qualifiedTable = resolveSearchTable(options.table);
const vectorColumn = resolveSearchColumn(options.vectorColumn ?? 'search_vector');
const language = resolveSearchLanguage(options.language);
const rankAlias = quoteSqlIdentifier(assertSafeSqlIdentifier(options.rankAlias ?? 'search_rank'));
const rankFunction = resolveSearchRankFunction(options.rankFunction);
const limitClause = options.limit ? ` limit ${Math.max(1, Math.trunc(options.limit))}` : '';
const queryExpr = `websearch_to_tsquery(${language}, $1)`;
const highlightColumn = options.highlightColumn
? resolveSearchColumn(options.highlightColumn)
: undefined;
const headline = highlightColumn
? `, ts_headline(${language}, ${highlightColumn}, ${queryExpr}) as "highlight"`
: '';
return `select ${qualifiedTable}.*, ${rankFunction}(${vectorColumn}, ${queryExpr}) as ${rankAlias}${headline} from ${qualifiedTable} where ${vectorColumn} @@ ${queryExpr} order by ${rankAlias} desc${limitClause};`;
}
import { type ObjxPlugin } from '@qbobjx/core';
import { type PostgresPluginBaseOptions } from './shared.js';
export interface PostgresSecurityPluginOptions extends PostgresPluginBaseOptions {
readonly tenantSettingName?: string;
readonly enforceRls?: boolean;
}
export interface PostgresSecurityPluginMetadata {
readonly tenantSettingName: string;
readonly enforceRls: boolean;
}
export declare const POSTGRES_SECURITY_METADATA_KEY = "postgres.security";
export declare function createPostgresSecurityPlugin(options?: PostgresSecurityPluginOptions): Readonly<ObjxPlugin>;
export declare function createEnableRlsSql(table: string): string;
export declare function createTenantIsolationPolicySql(options: {
readonly table: string;
readonly tenantColumn?: string;
readonly settingName?: string;
readonly policyName?: string;
}): string;
export declare function createSetLocalTenantSql(settingName?: string): string;
import { definePlugin } from '@qbobjx/core';
import { assertSafeSqlIdentifier, quoteSqlIdentifier, quoteSqlLiteral, withDefaultMetadataKey, } from './shared.js';
export const POSTGRES_SECURITY_METADATA_KEY = 'postgres.security';
export function createPostgresSecurityPlugin(options = {}) {
const metadataKey = withDefaultMetadataKey(options.metadataKey, POSTGRES_SECURITY_METADATA_KEY);
return definePlugin({
name: 'postgres-security',
hooks: {
onModelRegister(context) {
context.setMetadata(metadataKey, {
tenantSettingName: options.tenantSettingName ?? 'objx.tenant_id',
enforceRls: options.enforceRls ?? true,
});
},
},
});
}
function resolveTableIdentifier(table) {
return quoteSqlIdentifier(assertSafeSqlIdentifier(table));
}
function resolveColumnIdentifier(column) {
return quoteSqlIdentifier(assertSafeSqlIdentifier(column));
}
function resolvePolicyIdentifier(policyName) {
return quoteSqlIdentifier(assertSafeSqlIdentifier(policyName));
}
export function createEnableRlsSql(table) {
const qualifiedTable = resolveTableIdentifier(table);
return `alter table ${qualifiedTable} enable row level security;`;
}
export function createTenantIsolationPolicySql(options) {
const table = resolveTableIdentifier(options.table);
const tenantColumn = resolveColumnIdentifier(options.tenantColumn ?? 'tenant_id');
const settingName = quoteSqlLiteral(options.settingName ?? 'objx.tenant_id');
const policyName = resolvePolicyIdentifier(options.policyName ?? `${options.table}_tenant_isolation`);
return `create policy ${policyName} on ${table} using (${tenantColumn} = current_setting(${settingName}, true));`;
}
export function createSetLocalTenantSql(settingName = 'objx.tenant_id') {
return `select set_config(${quoteSqlLiteral(settingName)}, $1, true);`;
}
export declare const DEFAULT_INTERNAL_SCHEMA = "objx_internal";
export interface PostgresPluginBaseOptions {
readonly metadataKey?: string;
}
export declare function withDefaultMetadataKey(value: string | undefined, fallback: string): string;
export declare function assertSafeSqlIdentifier(identifier: string): string;
export declare function quoteSqlIdentifier(identifier: string): string;
export declare function quoteQualifiedSqlIdentifier(...path: readonly string[]): string;
export declare function quoteSqlLiteral(value: string): string;
export declare function resolveSchemaAndTable(schema: string, table: string): {
readonly schema: string;
readonly table: string;
readonly qualifiedName: string;
};
export declare function resolveTableAndColumn(table: string, column: string): {
readonly table: string;
readonly column: string;
readonly qualifiedTable: string;
readonly quotedColumn: string;
};
export const DEFAULT_INTERNAL_SCHEMA = 'objx_internal';
export function withDefaultMetadataKey(value, fallback) {
return value && value.trim().length > 0 ? value : fallback;
}
const IDENTIFIER_REGEX = /^[a-zA-Z_][a-zA-Z0-9_]*$/;
export function assertSafeSqlIdentifier(identifier) {
if (!IDENTIFIER_REGEX.test(identifier)) {
throw new Error(`Unsafe SQL identifier "${identifier}".`);
}
return identifier;
}
export function quoteSqlIdentifier(identifier) {
return `"${assertSafeSqlIdentifier(identifier).replaceAll('"', '""')}"`;
}
export function quoteQualifiedSqlIdentifier(...path) {
if (path.length === 0) {
throw new Error('Expected at least one SQL identifier part.');
}
return path.map((part) => quoteSqlIdentifier(part)).join('.');
}
export function quoteSqlLiteral(value) {
return `'${value.replaceAll("'", "''")}'`;
}
export function resolveSchemaAndTable(schema, table) {
const safeSchema = assertSafeSqlIdentifier(schema);
const safeTable = assertSafeSqlIdentifier(table);
return {
schema: safeSchema,
table: safeTable,
qualifiedName: quoteQualifiedSqlIdentifier(safeSchema, safeTable),
};
}
export function resolveTableAndColumn(table, column) {
const safeTable = assertSafeSqlIdentifier(table);
const safeColumn = assertSafeSqlIdentifier(column);
return {
table: safeTable,
column: safeColumn,
qualifiedTable: quoteSqlIdentifier(safeTable),
quotedColumn: quoteSqlIdentifier(safeColumn),
};
}
import { type ObjxPlugin } from '@qbobjx/core';
import { type PostgresPluginBaseOptions } from './shared.js';
export interface PostgresTimeseriesPluginOptions extends PostgresPluginBaseOptions {
readonly useTimescaleWhenAvailable?: boolean;
readonly defaultRetentionDays?: number;
readonly defaultPartitionWindow?: 'day' | 'week' | 'month';
}
export interface PostgresTimeseriesPluginMetadata {
readonly useTimescaleWhenAvailable: boolean;
readonly defaultRetentionDays: number;
readonly defaultPartitionWindow: 'day' | 'week' | 'month';
}
export declare const POSTGRES_TIMESERIES_METADATA_KEY = "postgres.timeseries";
export declare function createPostgresTimeseriesPlugin(options?: PostgresTimeseriesPluginOptions): Readonly<ObjxPlugin>;
export declare function createPartitionedTableSql(options: {
readonly table: string;
readonly timestampColumn: string;
}): string;
export declare function createPartitionSql(options: {
readonly table: string;
readonly partitionName: string;
readonly from: string;
readonly to: string;
}): string;
export declare function createRetentionSql(options: {
readonly table: string;
readonly timestampColumn: string;
readonly retentionDays: number;
}): string;
export declare function createTimescaleCompressionSql(hypertable: string): string;
import { definePlugin } from '@qbobjx/core';
import { assertSafeSqlIdentifier, quoteSqlIdentifier, quoteSqlLiteral, withDefaultMetadataKey, } from './shared.js';
export const POSTGRES_TIMESERIES_METADATA_KEY = 'postgres.timeseries';
export function createPostgresTimeseriesPlugin(options = {}) {
const metadataKey = withDefaultMetadataKey(options.metadataKey, POSTGRES_TIMESERIES_METADATA_KEY);
return definePlugin({
name: 'postgres-timeseries',
hooks: {
onModelRegister(context) {
context.setMetadata(metadataKey, {
useTimescaleWhenAvailable: options.useTimescaleWhenAvailable ?? true,
defaultRetentionDays: options.defaultRetentionDays ?? 30,
defaultPartitionWindow: options.defaultPartitionWindow ?? 'week',
});
},
},
});
}
function resolveTableIdentifier(table) {
return quoteSqlIdentifier(assertSafeSqlIdentifier(table));
}
function resolveColumnIdentifier(column) {
return quoteSqlIdentifier(assertSafeSqlIdentifier(column));
}
function resolvePartitionName(partitionName) {
return quoteSqlIdentifier(assertSafeSqlIdentifier(partitionName));
}
function normalizeRetentionDays(value) {
if (!Number.isFinite(value)) {
throw new Error('Retention days must be a finite number.');
}
const normalized = Math.trunc(value);
if (normalized < 0) {
throw new Error('Retention days must be zero or greater.');
}
return normalized;
}
export function createPartitionedTableSql(options) {
const table = resolveTableIdentifier(options.table);
const timestampColumn = resolveColumnIdentifier(options.timestampColumn);
return `create table if not exists ${table} (${timestampColumn} timestamptz not null) partition by range (${timestampColumn});`;
}
export function createPartitionSql(options) {
const table = resolveTableIdentifier(options.table);
const partitionName = resolvePartitionName(options.partitionName);
const from = quoteSqlLiteral(options.from);
const to = quoteSqlLiteral(options.to);
return `create table if not exists ${partitionName} partition of ${table} for values from (${from}) to (${to});`;
}
export function createRetentionSql(options) {
const table = resolveTableIdentifier(options.table);
const timestampColumn = resolveColumnIdentifier(options.timestampColumn);
const retentionDays = normalizeRetentionDays(options.retentionDays);
return `delete from ${table} where ${timestampColumn} < now() - interval '${retentionDays} days';`;
}
export function createTimescaleCompressionSql(hypertable) {
const table = resolveTableIdentifier(hypertable);
return `alter table ${table} set (timescaledb.compress = true);`;
}
import { type ObjxPlugin } from '@qbobjx/core';
import { type PostgresPluginBaseOptions } from './shared.js';
export interface PostgresVectorPluginOptions extends PostgresPluginBaseOptions {
readonly extensionName?: string;
readonly distance?: 'cosine' | 'l2' | 'ip';
readonly indexMethod?: 'ivfflat' | 'hnsw';
}
export interface PostgresVectorPluginMetadata {
readonly extensionName: string;
readonly distance: 'cosine' | 'l2' | 'ip';
readonly indexMethod: 'ivfflat' | 'hnsw';
}
export declare const POSTGRES_VECTOR_METADATA_KEY = "postgres.vector";
export declare function createPostgresVectorPlugin(options?: PostgresVectorPluginOptions): Readonly<ObjxPlugin>;
export declare function createVectorColumnSql(table: string, column?: string, dimensions?: number): string;
export declare function createVectorIndexSql(options: {
readonly table: string;
readonly column?: string;
readonly method?: 'ivfflat' | 'hnsw';
readonly operatorClass?: 'vector_cosine_ops' | 'vector_l2_ops' | 'vector_ip_ops';
readonly indexName?: string;
}): string;
export declare function buildVectorSimilarityQuerySql(options: {
readonly table: string;
readonly column?: string;
readonly distanceOperator?: '<=>' | '<->' | '<#>';
readonly whereSql?: string;
readonly limit?: number;
}): string;
import { definePlugin } from '@qbobjx/core';
import { assertSafeSqlIdentifier, quoteSqlIdentifier, withDefaultMetadataKey, } from './shared.js';
export const POSTGRES_VECTOR_METADATA_KEY = 'postgres.vector';
export function createPostgresVectorPlugin(options = {}) {
const metadataKey = withDefaultMetadataKey(options.metadataKey, POSTGRES_VECTOR_METADATA_KEY);
return definePlugin({
name: 'postgres-vector',
hooks: {
onModelRegister(context) {
context.setMetadata(metadataKey, {
extensionName: options.extensionName ?? 'vector',
distance: options.distance ?? 'cosine',
indexMethod: options.indexMethod ?? 'hnsw',
});
},
},
});
}
function resolveTable(table) {
return quoteSqlIdentifier(assertSafeSqlIdentifier(table));
}
function resolveColumn(column) {
return quoteSqlIdentifier(assertSafeSqlIdentifier(column));
}
function resolveIndexName(indexName) {
return quoteSqlIdentifier(assertSafeSqlIdentifier(indexName));
}
function resolveOperatorClass(operatorClass) {
return operatorClass ?? 'vector_cosine_ops';
}
function resolveDistanceOperator(distanceOperator) {
return distanceOperator ?? '<=>';
}
export function createVectorColumnSql(table, column = 'embedding', dimensions = 1536) {
const qualifiedTable = resolveTable(table);
const quotedColumn = resolveColumn(column);
const safeDimensions = Math.max(1, Math.trunc(dimensions));
return `alter table ${qualifiedTable} add column if not exists ${quotedColumn} vector(${safeDimensions});`;
}
export function createVectorIndexSql(options) {
const tableName = assertSafeSqlIdentifier(options.table);
const columnName = assertSafeSqlIdentifier(options.column ?? 'embedding');
const method = options.method ?? 'hnsw';
const operatorClass = resolveOperatorClass(options.operatorClass);
const indexName = resolveIndexName(options.indexName ?? `${tableName}_${columnName}_${method}_idx`);
const qualifiedTable = resolveTable(tableName);
const quotedColumn = resolveColumn(columnName);
return `create index if not exists ${indexName} on ${qualifiedTable} using ${method} (${quotedColumn} ${operatorClass});`;
}
export function buildVectorSimilarityQuerySql(options) {
const qualifiedTable = resolveTable(options.table);
const quotedColumn = resolveColumn(options.column ?? 'embedding');
const distanceOperator = resolveDistanceOperator(options.distanceOperator);
const whereClause = options.whereSql ? ` where ${options.whereSql}` : '';
const limit = Math.max(1, Math.trunc(options.limit ?? 20));
return `select ${qualifiedTable}.*, ${quotedColumn} ${distanceOperator} $1::vector as "similarity_distance" from ${qualifiedTable}${whereClause} order by ${quotedColumn} ${distanceOperator} $1::vector asc limit ${limit};`;
}
+1
-0

@@ -72,1 +72,2 @@ import { type ObjxPlugin } from '@qbobjx/core';

export declare function createTenantScopePlugin(options?: TenantScopePluginOptions): Readonly<ObjxPlugin>;
export * from './postgres.js';

@@ -174,1 +174,2 @@ import { definePlugin } from '@qbobjx/core';

}
export * from './postgres.js';
+3
-2
{
"name": "@qbobjx/plugins",
"version": "0.4.0",
"version": "0.5.0",
"private": false,

@@ -30,4 +30,5 @@ "type": "module",

"dependencies": {
"@qbobjx/core": "0.4.0"
"@qbobjx/core": "0.5.0",
"@qbobjx/sql-engine": "0.5.0"
}
}
+276
-0

@@ -89,1 +89,277 @@ # @qbobjx/plugins

protected work inside `session.transaction(...)`.
## PostgreSQL Runtime Suite (experimental)
OBJX now ships a PostgreSQL-focused runtime suite directly in `@qbobjx/plugins` with internal defaults intended for `objx_internal` tables managed by the ORM.
```ts
import {
createPostgresEventsPlugin,
createPostgresPreset,
createPostgresQueuePlugin,
createPostgresSearchPlugin,
} from '@qbobjx/plugins';
const runtimePreset = createPostgresPreset({
schema: 'objx_internal',
queue: { defaultQueue: 'default' },
events: { notifyChannel: 'objx_events' },
});
// habilitar apenas queue (cron/jobs)
const onlyQueue = createPostgresPreset({
include: ['queue'],
queue: { defaultQueue: 'cron' },
});
// habilitar apenas events
const onlyEvents = createPostgresPreset({
include: ['events'],
events: { notifyChannel: 'events_only' },
});
const queuePlugin = createPostgresQueuePlugin({
schema: 'objx_internal',
autoProvision: true,
defaultQueue: 'critical',
});
const eventsPlugin = createPostgresEventsPlugin({
schema: 'objx_internal',
autoProvision: true,
});
const searchPlugin = createPostgresSearchPlugin({
defaultLanguage: 'english',
rankFunction: 'ts_rank_cd',
});
```
Included PostgreSQL runtime plugin factories:
- `createPostgresSearchPlugin()`
- `createPostgresQueuePlugin()`
- `createPostgresEventsPlugin()`
- `createPostgresCachePlugin()`
- `createPostgresVectorPlugin()`
- `createPostgresTimeseriesPlugin()`
- `createPostgresJsonPlugin()`
- `createPostgresSecurityPlugin()`
- `createPostgresObservabilityPlugin()`
- `createPostgresPreset()`
- `createPostgresInternalSchemaSql()`
Observação: cada plugin vive em módulo próprio dentro de `src/postgres/*`; o preset apenas compõe os plugins que você escolher em `include`.
## Runtime API (experimental)
Além dos builders SQL, o módulo PostgreSQL agora expõe um runtime de alto nível para executar queue, events, cache, search, vector, timeseries, json, security e observability.
Você pode usar o runtime com qualquer executor compatível:
```ts
import { createPostgresRuntime } from '@qbobjx/plugins';
const runtime = createPostgresRuntime({
async execute(sql, params = []) {
return db.execute(sql, params);
},
});
await runtime.queue.enqueue({
queueName: 'default',
jobName: 'sync-project',
payload: { projectId: 1 },
});
await runtime.events.publish({
eventName: 'project.created',
payload: { projectId: 1 },
});
```
O runtime também inclui:
- `provisionInternalSchema()` para bootstrap automático de tabelas internas (`runtime_migrations`, queue, outbox, cache).
- `withRequest({ executionContext, transactionId })` para execução com contexto explícito.
- `runQueueWorker(...)` e `runEventDispatcher(...)` para loops básicos de worker/dispatcher.
- `metrics()` com contadores operacionais por domínio.
## Driver session integration
O caminho recomendado com o driver oficial é integrar os plugins PostgreSQL com `createPostgresSession(...)` e criar o runtime a partir da própria sessão.
```ts
import { col, createModelRegistry, defineModel } from '@qbobjx/core';
import { createPostgresSession } from '@qbobjx/postgres-driver';
import {
createPostgresEventsPlugin,
createPostgresIntegration,
createPostgresQueuePlugin,
createPostgresRuntimeFromSession,
} from '@qbobjx/plugins';
const RuntimeConfig = defineModel({
table: 'runtime_config',
columns: {
id: col.int().primary(),
tenantId: col.text(),
},
plugins: [
createPostgresQueuePlugin({
schema: 'objx_internal',
defaultQueue: 'default',
}),
createPostgresEventsPlugin({
schema: 'objx_internal',
notifyChannel: 'objx_events',
}),
],
});
const registry = createModelRegistry();
registry.register(RuntimeConfig);
const integration = createPostgresIntegration(registry);
const session = createPostgresSession({
pool,
executionContextSettings: integration.executionContextSettings,
});
const runtime = createPostgresRuntimeFromSession(session, {
source: registry,
config: integration.config,
});
```
Essa integração garante que:
- o runtime use a configuração real vinda dos plugins
- o driver oficial aplique `set_config(...)` de forma transacional quando necessário
- fila, outbox, cache e demais APIs compartilhem a mesma base de configuração
## PostgreSQL Runtime API guide
A runtime has one required contract: an executor with `execute(sql, params, request?)`.
```ts
type Executor = {
execute<T = unknown>(
sql: string,
params?: readonly unknown[],
request?: { executionContext?: unknown; transactionId?: string },
): Promise<T>;
};
```
Também existe integração pronta com `ObjxSession` / `createPostgresSession(...)`:
- `createPostgresSessionExecutor(session, options?)`
- `createPostgresRuntimeFromSession(session, options?)`
- `createPostgresRuntimeFromObjxSession(session, options?)`
### Bootstrapping and lifecycle
```ts
import { createPostgresRuntime } from '@qbobjx/plugins';
const runtime = createPostgresRuntime(executor);
// Creates objx_internal + runtime_migrations + queue/outbox/cache tables.
await runtime.provisionInternalSchema({
pluginName: 'objx-postgres-runtime',
version: '1',
});
// Bind an explicit request/transaction context for all subsequent calls.
const contextualRuntime = runtime.withRequest({ executionContext });
```
### Queue API
```ts
await runtime.queue.enqueue({
queueName: 'default',
jobName: 'invoice.generate',
payload: { invoiceId: 'inv_1' },
priority: 10,
maxAttempts: 8,
});
const job = await runtime.queue.dequeue('worker-a');
if (job) {
// ...process
await runtime.queue.complete(1);
}
```
- `enqueue`: creates queued job rows.
- `dequeue`: claims a pending job with SKIP LOCKED semantics.
- `complete`: marks running job as done.
- `fail`: applies retry/backoff or dead status depending on attempts.
### Events API (Outbox)
```ts
await runtime.events.publish({
eventName: 'project.created',
payload: { projectId: 'p_1' },
aggregateId: 'p_1',
aggregateType: 'project',
idempotencyKey: 'project.created:p_1',
});
const batch = await runtime.events.dispatchBatch(100);
for (const event of batch as Array<{ id: number }>) {
// ...deliver to webhook/broker
await runtime.events.ack(event.id);
}
```
### Cache API
```ts
const value = await runtime.cache.getOrCompute(
'project:summary:p_1',
async () => ({ id: 'p_1', name: 'API revamp' }),
{ tags: ['project', 'summary'] },
);
```
- `get`, `set`, `getOrCompute`, `pruneExpired`, `metrics`.
### Search, Vector, JSON, Security, Observability
```ts
await runtime.search.query({ table: 'docs', query: 'postgres runtime', limit: 20 });
await runtime.vector.similarity({ table: 'docs', vector: [0.12, 0.44, 0.05] });
const where = runtime.json.wherePath('metadata', '$.tenant == "acme"');
await runtime.security.setLocalTenant('acme');
const top = await runtime.observability.topStatements(10);
```
### Background handles
```ts
const queueHandle = runtime.startQueueWorker(async (job) => {
// business logic
}, { workerId: 'worker-1' });
const eventHandle = runtime.startEventDispatcher(async (event) => {
// delivery logic
}, { batchSize: 200 });
// later
queueHandle.stop();
eventHandle.stop();
await queueHandle.done;
await eventHandle.done;
```
### Runtime metrics
```ts
const snapshot = runtime.metrics();
console.log(snapshot.queue.enqueued, snapshot.events.published, snapshot.cache.hits);
```
Functional project example: `examples/postgres-runtime`.