
Security News
Axios Maintainer Confirms Social Engineering Attack Behind npm Compromise
Axios compromise traced to social engineering, showing how attacks on maintainers can bypass controls and expose the broader software supply chain.
pg-multiverse
Advanced tools
๐ Multi-cluster PostgreSQL manager for Node.js with TypeScript support. Features read/write splitting, intelligent load balancing, distributed caching, database migrations, health monitoring, and seamless multi-schema operations.
Multi-cluster PostgreSQL manager for Node.js with TypeScript support
PG Multiverse is a powerful, enterprise-grade PostgreSQL multi-cluster manager that brings advanced database scaling capabilities to Node.js applications. With full TypeScript support, intelligent load balancing, distributed caching, and seamless multi-schema operations.
# Core library
npm install pg-multiverse
# Optional: For TypeScript migration support
npm install ts-node
# Optional: CLI tools
npm install -g pg-multiverse
# or use: npx pgm <command>
import { PgMultiverse } from 'pg-multiverse';
// Define your data types
interface User {
id: number;
email: string;
name: string;
active: boolean;
}
// Initialize with cluster configuration
const postgres = new PgMultiverse({
enableCache: true,
enableMetrics: true,
enableTransactions: true
});
await postgres.initialize({
users_cluster: {
schemas: ['users', 'auth'],
primary: {
host: 'primary.db.com',
port: 5432,
database: 'app_users',
user: 'postgres',
password: 'password'
},
replicas: [{
host: 'replica.db.com',
port: 5432,
database: 'app_users',
user: 'postgres',
password: 'password'
}]
}
});
// Type-safe queries with automatic cluster routing
const users = await postgres.query<User>(
'SELECT * FROM users WHERE active = $1',
[true],
{
schema: 'users', // Auto-routes to users_cluster
cache: true, // Cache results
cacheTtl: 300000 // 5 minute cache
}
);
// users.rows is automatically typed as User[]
users.rows.forEach(user => {
console.log(`${user.name} <${user.email}>`);
});
// Enable migrations
const postgres = new PgMultiverse({
enableMigrations: true,
migrations: {
migrationsPath: './migrations',
autoCreateMigrationsTable: true,
},
});
await postgres.initialize(config);
// Run pending migrations
await postgres.migrate();
// Check migration status
const status = await postgres.getMigrationStatus();
console.log(`Applied: ${status.appliedMigrations}, Pending: ${status.pendingMigrations}`);
// Rollback last migration
await postgres.rollback({ steps: 1 });
// Cross-cluster transactions
await postgres.withTransaction(['users', 'orders'], async (tx) => {
// Update user in users_cluster
await tx.query(
'UPDATE users SET last_order = NOW() WHERE id = $1',
[userId],
{ schema: 'users' }
);
// Create order in orders_cluster
const order = await tx.query<Order>(
'INSERT INTO orders (user_id, total) VALUES ($1, $2) RETURNING *',
[userId, total],
{ schema: 'orders' }
);
return order.rows[0];
});
PG Multiverse manages multiple PostgreSQL clusters, each handling specific schemas:
const clusterConfig = {
// User data cluster
users_cluster: {
schemas: ['users', 'profiles', 'auth'],
primary: { /* primary connection */ },
replicas: [{ /* replica connections */ }],
readPreference: 'replica',
consistencyLevel: 'eventual'
},
// Commerce cluster
commerce_cluster: {
schemas: ['products', 'orders', 'payments'],
primary: { /* primary connection */ },
readPreference: 'primary',
consistencyLevel: 'strong'
}
};
{
loadBalancing: {
strategy: 'health_aware',
weights: {
'replica_1': 2,
'replica_2': 1
},
healthThreshold: 80
}
}
Built-in distributed caching with multiple invalidation strategies:
// Cache with automatic invalidation
await postgres.query('SELECT * FROM products', [], {
schema: 'products',
cache: true,
cacheTtl: 1800000, // 30 minutes
cacheKey: 'all_products',
tags: ['products', 'catalog'] // For bulk invalidation
});
// Invalidate cache
await postgres.invalidateCache({
schema: 'products'
});
await postgres.invalidateCache({
tags: ['products', 'inventory']
});
class PgMultiverse {
// Initialize with cluster configuration
async initialize(config: ClusterConfiguration): Promise<void>
// Execute type-safe queries
async query<T>(sql: string, params?: any[], options?: QueryOptions): Promise<QueryResult<T>>
// Distributed transactions
async withTransaction<T>(schemas: string[], callback: TransactionCallback<T>): Promise<T>
// Database migrations
async migrate(options?: MigrationExecutionOptions): Promise<MigrationStatus>
async rollback(options?: MigrationRollbackOptions): Promise<MigrationStatus>
async getMigrationStatus(): Promise<MigrationStatus>
async createMigration(name: string, options: MigrationCreateOptions): Promise<string>
getMigrations(): Migration[]
addMigration(migration: Migration): void
getMigrationManager(): MigrationManager
// Health and metrics
async healthCheck(): Promise<HealthReport>
getMetrics(): SystemMetrics
// Cache management
async invalidateCache(criteria: InvalidationCriteria): Promise<number>
// Lifecycle
async close(): Promise<void>
}
interface MultiClusterConfig {
enableCache?: boolean; // Enable distributed caching
enableMetrics?: boolean; // Collect performance metrics
enableTransactions?: boolean; // Enable distributed transactions
enableMigrations?: boolean; // Enable database migrations
cache?: {
maxSize?: number; // Max cache entries (1000)
ttl?: number; // Default TTL in ms (300000)
enableCompression?: boolean; // Compress large entries
};
migrations?: {
migrationsPath?: string; // Path to migration files (./migrations)
migrationsTable?: string; // Control table name
lockTable?: string; // Locking table name
autoCreateMigrationsTable?: boolean; // Auto-create control tables
validateChecksums?: boolean; // Validate migration integrity
allowOutOfOrder?: boolean; // Require sequential execution
lockTimeout?: number; // Lock timeout in ms (60000)
batchSize?: number; // Batch size for parallel execution
logger?: MigrationLogger; // Custom logger instance
};
cluster?: {
healthCheckInterval?: number; // Health check frequency (30000ms)
retryAttempts?: number; // Connection retry attempts (3)
maxFailuresBeforeMarkDown?: number; // Failures before marking down (3)
};
}
PG Multiverse includes a powerful migration system that supports multi-cluster and multi-schema databases with both TypeScript and JavaScript.
const postgres = new PgMultiverse({
enableMigrations: true,
migrations: {
migrationsPath: './migrations', // Path to migration files
migrationsTable: 'pg_multiverse_migrations', // Control table name
lockTable: 'pg_multiverse_migration_locks', // Locking table name
autoCreateMigrationsTable: true, // Auto-create control tables
validateChecksums: true, // Validate migration integrity
allowOutOfOrder: false, // Require sequential execution
lockTimeout: 60000, // Lock timeout in ms
batchSize: 100, // Batch size for parallel execution
logger: customLogger // Custom logger instance
}
});
// migrations/20241230120000_create_users.ts
import { Migration, MigrationContext } from 'pg-multiverse';
const migration: Migration = {
version: '20241230120000_create_users',
name: 'create_users',
description: 'Create users table with profiles',
targetSchemas: ['users', 'auth'],
targetClusters: ['users_cluster'], // Optional: specific clusters
dependencies: ['20241230110000_create_base'], // Optional: dependencies
tags: ['users', 'initial'], // Optional: tags for organization
createdAt: new Date(),
async up(context: MigrationContext): Promise<void> {
context.logger.info(`Creating users table in ${context.schema}`);
// Type-safe query execution
await context.query(`
CREATE TABLE users (
id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
name VARCHAR(255) NOT NULL,
active BOOLEAN DEFAULT true,
created_at TIMESTAMP DEFAULT NOW()
)
`);
// Insert seed data
await context.query(`
INSERT INTO users (email, name) VALUES
($1, $2), ($3, $4)
`, ['admin@example.com', 'Admin', 'user@example.com', 'User']);
context.logger.info('Users table created successfully');
},
async down(context: MigrationContext): Promise<void> {
context.logger.info(`Dropping users table from ${context.schema}`);
await context.query(`DROP TABLE IF EXISTS users`);
context.logger.info('Users table dropped successfully');
}
};
export default migration;
// migrations/20241230130000_add_profiles.js
const migration = {
version: '20241230130000_add_profiles',
name: 'add_profiles',
description: 'Add user profiles functionality',
targetSchemas: ['users'],
createdAt: new Date(),
async up(context) {
await context.query(`
CREATE TABLE profiles (
user_id INTEGER PRIMARY KEY REFERENCES users(id),
bio TEXT,
avatar_url VARCHAR(500),
settings JSONB DEFAULT '{}'
)
`);
},
async down(context) {
await context.query(`DROP TABLE IF EXISTS profiles`);
}
};
module.exports = migration;
// Run all pending migrations
await postgres.migrate();
// Run migrations with options
await postgres.migrate({
targetVersion: '20241230120000_create_users', // Run up to specific version
targetSchemas: ['users', 'auth'], // Specific schemas only
targetClusters: ['users_cluster'], // Specific clusters only
dryRun: true, // Show what would run without executing
parallel: true, // Run in parallel where possible
maxParallel: 4, // Max parallel executions
continueOnError: false, // Stop on first error
});
// Run with detailed progress monitoring
postgres.getMigrationManager().on('migrationStarted', (data) => {
console.log(`โก Starting: ${data.name} on ${data.schema}@${data.cluster}`);
});
postgres.getMigrationManager().on('migrationCompleted', (data) => {
console.log(`โ
Completed: ${data.name} (${data.duration}ms)`);
});
// Rollback last migration
await postgres.rollback({ steps: 1 });
// Rollback to specific version
await postgres.rollback({
targetVersion: '20241230110000_create_base',
targetSchemas: ['users'],
dryRun: true // Preview rollback without executing
});
// Rollback with error handling
try {
await postgres.rollback({ steps: 2, force: true });
} catch (error) {
console.error('Rollback failed:', error.message);
}
// Get comprehensive status
const status = await postgres.getMigrationStatus();
console.log(`Total: ${status.totalMigrations}`);
console.log(`Applied: ${status.appliedMigrations}`);
console.log(`Pending: ${status.pendingMigrations}`);
// Status by schema
Object.entries(status.bySchema).forEach(([schema, stats]) => {
console.log(`${schema}: ${stats.applied} applied, ${stats.pending} pending`);
if (stats.lastApplied) {
console.log(` Last applied: ${stats.lastApplied}`);
}
});
// Status by cluster
Object.entries(status.byCluster).forEach(([cluster, stats]) => {
console.log(`${cluster}: ${stats.applied} applied, ${stats.pending} pending`);
});
// List all available migrations
const migrations = postgres.getMigrations();
migrations.forEach(migration => {
console.log(`${migration.version}: ${migration.name}`);
console.log(` Schemas: ${migration.targetSchemas.join(', ')}`);
console.log(` Description: ${migration.description}`);
});
// Create new migration file
const filePath = await postgres.createMigration('add_user_permissions', {
targetSchemas: ['users', 'auth'],
targetClusters: ['users_cluster'],
description: 'Add user permissions and roles system',
tags: ['permissions', 'security']
});
console.log(`Migration created: ${filePath}`);
Install the CLI globally or use npx:
npm install -g pg-multiverse
# or use npx pg-multiverse
# Create a new migration
pgm create add_user_roles --schemas users,auth --description "Add role-based permissions"
# Run migrations
pgm migrate # Run all pending
pgm migrate --target 20241230120000 # Run up to specific version
pgm migrate --schemas users,products # Specific schemas only
pgm migrate --parallel # Run in parallel
pgm migrate --dry-run # Preview without executing
# Check status
pgm status # Overall status
pgm status --schemas users # Schema-specific status
pgm status --verbose # Detailed information
# Rollback migrations
pgm rollback --steps 1 # Rollback last migration
pgm rollback --target 20241230110000 # Rollback to specific version
pgm rollback --dry-run # Preview rollback
# List migrations
pgm list # Show all migrations
pgm list --verbose # Show detailed information
# Global options
pgm --config config.json # Use custom config file
pgm --migrations ./db/migrations # Custom migrations path
pgm --verbose # Verbose output
const migration: Migration = {
version: '20241230150000_create_orders',
name: 'create_orders',
targetSchemas: ['orders'],
dependencies: ['20241230120000_create_users'], // Users must exist first
async up(context) {
// Reference data from other clusters is handled automatically
await context.query(`
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
user_email VARCHAR(255) NOT NULL, -- Reference to users cluster
total DECIMAL(10,2) NOT NULL,
status VARCHAR(50) DEFAULT 'pending'
)
`);
}
};
const migration: Migration = {
version: '20241230160000_conditional_update',
name: 'conditional_update',
targetSchemas: ['users'],
async up(context) {
// Check if column exists before adding
const result = await context.query(`
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'users' AND column_name = 'last_login'
`);
if (result.rows.length === 0) {
await context.query(`
ALTER TABLE users ADD COLUMN last_login TIMESTAMP
`);
context.logger.info('Added last_login column');
} else {
context.logger.info('last_login column already exists, skipping');
}
}
};
const migration: Migration = {
version: '20241230170000_migrate_user_data',
name: 'migrate_user_data',
targetSchemas: ['users'],
async up(context) {
// Process large datasets in batches
let offset = 0;
const batchSize = 1000;
let hasMore = true;
while (hasMore) {
const users = await context.query(`
SELECT id, old_field
FROM users
WHERE new_field IS NULL
LIMIT $1 OFFSET $2
`, [batchSize, offset]);
if (users.rows.length === 0) {
hasMore = false;
break;
}
// Process batch
for (const user of users.rows) {
await context.query(`
UPDATE users
SET new_field = $1
WHERE id = $2
`, [processOldField(user.old_field), user.id]);
}
offset += batchSize;
context.logger.info(`Processed ${offset} users`);
}
}
};
YYYYMMDDHHMMSS_descriptive_name.ts
20241230120000_create_users_table.ts
20241230130000_add_user_permissions.ts
20241230140000_migrate_legacy_data.ts
// Always test with dry-run first
await postgres.migrate({ dryRun: true });
// Use transactions for complex migrations
const migration: Migration = {
async up(context) {
await context.query('BEGIN');
try {
await context.query('-- complex operations');
await context.query('COMMIT');
} catch (error) {
await context.query('ROLLBACK');
throw error;
}
}
};
// Comprehensive error handling
postgres.getMigrationManager().on('migrationFailed', (data) => {
console.error(`โ Migration ${data.name} failed:`, data.error.message);
// Send alerts, log to monitoring system, etc.
});
{
orders_cluster: {
schemas: ['orders'],
sharding: {
strategy: 'hash',
key: 'user_id',
partitions: 4
},
primary: { /* config */ },
replicas: [
{ /* shard 1 replica */ },
{ /* shard 2 replica */ },
{ /* shard 3 replica */ },
{ /* shard 4 replica */ }
]
}
}
// Register schemas at runtime
postgres.registerSchema('analytics', 'analytics_cluster', {
cacheStrategy: 'aggressive',
priority: 3
});
// Query the new schema
const events = await postgres.query(
'SELECT * FROM events WHERE date > $1',
[startDate],
{ schema: 'analytics' }
);
// Monitor cluster health
postgres.on('clusterDown', ({ clusterId, reason }) => {
console.error(`Cluster ${clusterId} is down: ${reason}`);
// Trigger alerts, logging, etc.
});
postgres.on('clusterRecovered', ({ clusterId, downtime }) => {
console.log(`Cluster ${clusterId} recovered after ${downtime}ms`);
});
// Monitor cache performance
postgres.on('cacheHit', ({ key, schema }) => {
console.log(`Cache hit for ${key} in schema ${schema}`);
});
// Monitor query performance
postgres.on('queryExecuted', ({ duration, clusterId }) => {
if (duration > 1000) {
console.warn(`Slow query detected: ${duration}ms on ${clusterId}`);
}
});
npm test
# With real PostgreSQL instances
npm run test:integration
npm run test:coverage
const metrics = postgres.getMetrics();
console.log(`Total Queries: ${metrics.totalQueries}`);
console.log(`Average Response Time: ${metrics.avgResponseTime}ms`);
console.log(`Error Rate: ${metrics.errorRate}%`);
// Per-cluster metrics
Object.entries(metrics.clusters).forEach(([clusterId, stats]) => {
console.log(`${clusterId}: ${stats.queries.total} queries, ${stats.connections.active} active connections`);
});
// Cache performance
if (metrics.cache) {
console.log(`Cache Hit Rate: ${(metrics.cache.hitRate * 100).toFixed(2)}%`);
console.log(`Cache Size: ${metrics.cache.itemCount} items`);
}
const health = await postgres.healthCheck();
Object.entries(health).forEach(([clusterId, status]) => {
console.log(`${clusterId}: ${status.healthy ? 'โ
' : 'โ'} (${status.responseTime}ms)`);
if (!status.healthy) {
console.error(`Error: ${status.error}`);
console.log(`Failure Count: ${status.failureCount}`);
}
});
| Feature | Performance |
|---|---|
| Connection Pooling | 10,000+ concurrent connections |
| Query Caching | 95%+ cache hit rates |
| Load Balancing | Sub-1ms routing overhead |
| Health Checks | 30s intervals, <5ms latency |
| Failover Time | <2s automatic recovery |
Connection Tuning
{
connectionPool: {
min: 5, // Always keep 5 connections warm
max: 50, // Scale up to 50 under load
warmupConnections: true
}
}
Cache Optimization
{
cache: {
maxSize: 10000, // Increase for high-traffic apps
enableCompression: true, // Save memory on large results
compressionThreshold: 1024
}
}
Health Check Tuning
{
cluster: {
healthCheckInterval: 15000, // More frequent checks
maxFailuresBeforeMarkDown: 5 // More resilient to transient issues
}
}
pg to pg-multiverse// Before: Direct pg usage
import { Pool } from 'pg';
const pool = new Pool({ /* config */ });
const result = await pool.query('SELECT * FROM users');
// After: PG Multiverse with type safety
import { PgMultiverse } from 'pg-multiverse';
const postgres = new PgMultiverse();
await postgres.initialize(config);
const result = await postgres.query<User>('SELECT * FROM users');
We welcome contributions! Please see our Contributing Guide.
# Clone the repository
git clone https://github.com/andeerc/pg-multiverse.git
cd pg-multiverse
# Install dependencies
npm install
# Run tests
npm test
# Build the project
npm run build
See CHANGELOG.md for version history.
MIT ยฉ PG Multiverse Team
๐ Star us on GitHub โข ๐ Documentation โข ๐ฌ Discord Community โข ๐ Report Bug
Made with โค๏ธ by the PG Multiverse Team
FAQs
๐ Multi-cluster PostgreSQL manager for Node.js with TypeScript support. Features read/write splitting, intelligent load balancing, distributed caching, database migrations, health monitoring, and seamless multi-schema operations.
We found that pg-multiverse demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago.ย It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
Axios compromise traced to social engineering, showing how attacks on maintainers can bypass controls and expose the broader software supply chain.

Security News
Node.js has paused its bug bounty program after funding ended, removing payouts for vulnerability reports but keeping its security process unchanged.

Security News
The Axios compromise shows how time-dependent dependency resolution makes exposure harder to detect and contain.