
Security News
Attackers Are Hunting High-Impact Node.js Maintainers in a Coordinated Social Engineering Campaign
Multiple high-impact npm maintainers confirm they have been targeted in the same social engineering campaign that compromised Axios.
pg-parallel
Advanced tools
A specialized Node.js wrapper for `node-postgres` that prevents event-loop blocking by offloading heavy CPU tasks and complex transactions to worker threads.
Non-blocking PostgreSQL for Node.js with worker thread support
A specialized wrapper around node-postgres that prevents event-loop blocking
by offloading heavy CPU tasks and complex transactions to worker threads.
client.release() neededPgParallelError with standardized ErrorCategorylogger with key events (retries, breaker
transitions, worker failures)npm install pg-parallel pg
Note: pg is a peer dependency and must be installed alongside
pg-parallel.
This library is built on top of
node-postgres (pg), a non-blocking
PostgreSQL client for Node.js. The pg package is included as a peer dependency
and must be installed alongside pg-parallel.
Requirements:
pg v8.11.3+ (peer dependency)import { PgParallel } from 'pg-parallel';
const db = new PgParallel({
connectionString: 'postgresql://user:pass@localhost/db',
maxWorkers: 4, // Optional: defaults to CPU core count
});
// Standard I/O query (main thread)
const { rows } = await db.query('SELECT * FROM users WHERE id = $1', [1]);
// CPU-intensive task (worker thread)
function fibonacci(n) {
if (n <= 1) return n;
return fibonacci(n - 1) + fibonacci(n - 2);
}
const result = await db.task(fibonacci, [42]);
// Mixed workload with database access (worker thread)
const processed = await db.worker(async (client) => {
const { rows } = await client.query('SELECT data FROM table');
return rows.map((row) => row.data.toUpperCase());
});
await db.shutdown(); // Clean shutdown
new PgParallel(config: PgParallelConfig)
The config object extends pg.PoolConfig with additional properties:
maxWorkers?: number - Number of worker threads (defaults to
os.cpus().length)retry?: RetryConfig - Automatic retry for transient failurescircuitBreaker?: CircuitBreakerConfig - Circuit breaker for database
operationslogger?: Logger - Optional logger for observabilitydb.query(config, values?)Execute standard I/O queries on the main thread pool.
// Simple query
const { rows } = await db.query('SELECT * FROM users WHERE id = $1', [1]);
// With query config
const result = await db.query({
text: 'SELECT * FROM users WHERE active = $1',
values: [true],
});
db.warmup()Pre-initializes the worker thread pool to avoid a "cold start" latency on the
first call to .task() or .worker(). This is useful in performance-sensitive
applications where the initial startup time of workers should be minimized.
// It's a good practice to warmup the workers during application startup
await db.warmup();
db.task(fn, args)Execute CPU-intensive functions in worker threads.
// For recursive functions, use a named function declaration
function fibonacci(n: number): number {
if (n <= 1) return n;
return fibonacci(n - 1) + fibonacci(n - 2);
}
const result = await db.task(fibonacci, [40]);
db.worker(task)Execute database operations and CPU-intensive logic in worker threads with dedicated client connection.
Function-based workers:
// Simple example
const result = await db.worker(async (client) => {
const { rows } = await client.query("SELECT 'value' as data");
return rows[0].data.toUpperCase();
});
// Transaction example
await db.worker(async (client) => {
await client.query('BEGIN');
const { rows } = await client.query(
'UPDATE accounts SET balance = balance - 100 WHERE id = 1 RETURNING balance',
);
if (rows[0].balance < 0) {
throw new Error('Insufficient funds');
}
await client.query(
'UPDATE accounts SET balance = balance + 100 WHERE id = 2',
);
await client.query('COMMIT');
});
File-based workers:
// Using WorkerFileTask interface
const result = await db.worker({
taskPath: path.resolve(__dirname, 'tasks/my-worker.js'),
taskName: 'processData', // Optional: defaults to 'handler'
args: ['arg1', 'arg2'], // Optional: arguments passed to the function
});
WorkerFileTask Interface:
interface WorkerFileTask {
taskPath: string; // Absolute path to the module file
taskName?: string; // Function name to execute (defaults to 'handler')
args?: any[]; // Arguments to pass to the function
}
Note: No manual client.release() needed - lifecycle is managed
automatically.
db.shutdown()Gracefully shut down all connections and terminate workers.
await db.shutdown();
pg-parallel includes optional resilience features and a pluggable logger.
import {
PgParallel,
type RetryConfig,
type CircuitBreakerConfig,
} from 'pg-parallel';
const retry: RetryConfig = {
maxAttempts: 4,
initialDelayMs: 100,
maxDelayMs: 1500,
backoffFactor: 2,
jitter: true,
};
const circuitBreaker: CircuitBreakerConfig = {
failureThreshold: 5,
cooldownMs: 10_000,
halfOpenMaxCalls: 2,
halfOpenSuccessesToClose: 2,
};
const db = new PgParallel({
connectionString: process.env.DATABASE_URL!,
retry,
circuitBreaker,
logger: {
info: (m, meta) => console.log(m, meta),
warn: (m, meta) => console.warn(m, meta),
error: (m, meta) => console.error(m, meta),
},
});
Notes:
PgParallelError with an ErrorCategory for easier
routing/metrics.For production code, organize worker logic in separate files using the
WorkerFileTask interface:
// tasks/report-worker.js
const { randomUUID } = require('crypto');
module.exports = {
generateReport: async (client, reportType = 'summary') => {
const { rows } = await client.query(
"SELECT * FROM (SELECT 1 as id, 'Sample Data' as name) as sales_data",
);
// Generate unique report ID using crypto.randomUUID
const reportId = randomUUID();
// Simulate report generation
const reportContent = `${reportType} Report for ${rows.length} records`;
return {
id: reportId,
type: reportType,
recordCount: rows.length,
generatedAt: new Date().toISOString(),
content: reportContent,
};
},
// Default handler (called when no taskName is specified)
handler: async (client, message = 'Default task') => {
const { rows } = await client.query('SELECT NOW() as timestamp');
const taskId = randomUUID();
return { id: taskId, message, timestamp: rows[0].timestamp };
},
};
// main.ts
import * as path from 'path';
// Execute specific named function
const report = await db.worker({
taskPath: path.resolve(process.cwd(), 'tasks/report-worker.js'),
taskName: 'generateReport',
args: ['detailed'],
});
// Execute default handler
const result = await db.worker({
taskPath: path.resolve(process.cwd(), 'tasks/report-worker.js'),
args: ['Hello from main thread'],
});
Key benefits of file-based workers:
Functions passed to db.task() and db.worker() must be self-contained and not
rely on any variables from their parent scope. This is because the function is
serialized, sent to a different thread, and deserialized, losing its original
closure.
Example: Accessing External Variables
// Wrong - references parent scope
const TAX_RATE = 0.07;
await db.task((price) => price * (1 + TAX_RATE), [100]);
// Correct - self-contained
await db.task(
(price) => {
const TAX_RATE = 0.07;
return price * (1 + TAX_RATE);
},
[100],
);
Example: Recursive Functions
For a function to call itself recursively inside a worker, it must be a named
function declaration. An arrow function assigned to a const will not work
because its name is part of the closure that gets lost.
// Wrong - recursive call will fail inside the worker
const fibonacciArrow = (n: number): number => {
if (n <= 1) return n;
// This call will fail as 'fibonacciArrow' is not in the function's own scope
return fibonacciArrow(n - 1) + fibonacciArrow(n - 2);
};
await db.task(fibonacciArrow, [40]);
// Correct - named function is self-contained
function fibonacci(n: number): number {
if (n <= 1) return n;
return fibonacci(n - 1) + fibonacci(n - 2);
}
await db.task(fibonacci, [40]);
For advanced usage, pg-parallel exports utility classes that power the
internal resilience features. These can be used independently for custom
implementations:
import {
ErrorUtils,
RetryUtils,
CircuitBreakerUtils,
type CircuitBreakerState,
type RetryConfig,
} from 'pg-parallel';
// Error classification and handling
const isRetryable = ErrorUtils.isTransient(error);
const category = ErrorUtils.categorizeError(error);
const wrappedError = ErrorUtils.wrapError(error);
// Custom retry logic
const retryConfig: RetryConfig = {
maxAttempts: 3,
initialDelayMs: 100,
maxDelayMs: 1000,
backoffFactor: 2,
jitter: true,
};
const result = await RetryUtils.executeWithRetry(
() => someAsyncOperation(),
retryConfig,
'my-operation',
);
// Circuit breaker management
const breakerState = CircuitBreakerUtils.createInitialState();
const breakerConfig = CircuitBreakerUtils.getDefaultConfig();
CircuitBreakerUtils.onBreakerFailure(breakerState, breakerConfig);
await CircuitBreakerUtils.ensureBreakerState(breakerState, breakerConfig);
Available Utility Classes:
ErrorUtils: Error categorization, transient detection, and wrappingRetryUtils: Exponential backoff retry logic with jitterCircuitBreakerUtils: Circuit breaker state management and transitionsPerformance benchmarks demonstrate when pg-parallel provides benefits over
standard pg.Pool. These tests were conducted on Apple M1 (8 cores) with
PostgreSQL 15.
| Scenario | pg-parallel | Baseline | Improvement |
|---|---|---|---|
| Pure I/O (10,000 queries, maxWorkers: 1) | 0.410s avg | 0.446s avg | +8.07% faster ✅ |
| Pure CPU (8 fibonacci tasks) | 7.298s avg | 19.904s avg | 2.73x faster |
| Mixed I/O + CPU (8 tasks) | 7.710s avg | 22.878s avg | 2.97x faster |
You can run comprehensive benchmarks yourself to validate these results. Each benchmark runs 10 iterations for statistical accuracy:
# Pure I/O Benchmark (10,000 queries per run)
ts-node src/benchmarks/benchmark-io-10-runs.ts
# Pure CPU Benchmark (8 Fibonacci tasks per run)
ts-node src/benchmarks/benchmark-cpu-10-runs.ts
# Mixed I/O + CPU Benchmark (8 mixed tasks per run)
ts-node src/benchmarks/benchmark-mixed-10-runs.ts
Benchmark Output Includes:
Requirements:
DATABASE_URL environment variable configuredmax) is split
between the main pool and worker pools. By default, the number of workers is
os.cpus().length.maxWorkers: 1 to
maintain optimal connection allocation while still benefiting from the
performance optimizations.maxWorkers: 1 now exceeds pg.Pool
performance (see table above).Use pg-parallel when:
Consider pg.Pool when:
Note: With recent optimizations, pg-parallel now outperforms pg.Pool even in pure I/O scenarios while providing additional resilience features.
All benchmarks use:
Use pg-parallel for:
Consider standard pg only when:
Problem: Using require() inside function-based workers fails.
Solution: Use file-based workers instead:
// Wrong - this will fail
await db.worker(async (client) => {
const { randomUUID } = require('crypto'); // Error: require is not defined
// ...
});
// Correct - use file-based workers
await db.worker({
taskPath: path.resolve(__dirname, 'tasks/my-worker.js'),
taskName: 'processData',
});
Problem: Workers are not being released properly.
Solution: Ensure your worker functions complete without hanging:
// Wrong - infinite loop or hanging operation
await db.worker(async (client) => {
while (true) {
// This will hang the worker
}
});
// Correct - ensure function completes
await db.worker(async (client) => {
const result = await client.query('SELECT NOW()');
return result.rows[0];
});
Problem: TypeScript files don't work well with worker threads.
Solution: Keep worker files as JavaScript (.js) and main files as
TypeScript:
// main.ts (TypeScript)
import { PgParallel } from 'pg-parallel';
const result = await db.worker({
taskPath: path.resolve(process.cwd(), 'workers/processor.js'), // .js file
taskName: 'process',
});
// workers/processor.js (JavaScript)
module.exports = {
process: async (client, data) => {
// Worker logic here
return processedData;
},
};
Problem: Overhead from worker threads negates benefits.
Solution: Use workers only for CPU-intensive tasks:
// Wrong - simple query doesn't need worker
await db.worker(async (client) => {
return await client.query('SELECT 1');
});
// Correct - use main thread for simple queries
const result = await db.query('SELECT 1');
// Correct - use worker for heavy computation
await db.worker(async (client) => {
const { rows } = await client.query('SELECT * FROM large_table');
return rows.map((row) => heavyProcessing(row)); // CPU-intensive
});
Executive Summary: Comprehensive benchmark analysis reveals that pg-parallel
outperforms pg.Pool in light I/O workloads (5.4% faster) while providing
superior reliability features. For mixed I/O+CPU workloads, pg-parallel delivers
2.97x faster performance. The key is proper configuration: use
maxWorkers: 1 for pure I/O and always call warmup() for optimal performance.
For detailed performance analysis and scenario-specific recommendations:
| Load Scenario | pg-parallel | pg.Pool | Difference | Winner | Key Factor |
|---|---|---|---|---|---|
| Light Load | 6,289 ops/sec | 5,952 ops/sec | +5.4% faster | pg-parallel | Circuit breaker + I/O optimization |
| Medium Load | 13,477 ops/sec | 15,152 ops/sec | -12.4% slower | pg.Pool | Raw connection efficiency |
| Heavy Load | 15,432 ops/sec | 17,153 ops/sec | -11.1% slower | pg.Pool | High-throughput optimization |
| Workload Type | pg-parallel | pg.Pool/Sequential | Performance Gain | Use Case |
|---|---|---|---|---|
| Pure CPU | 7.298s avg | 19.904s | 2.73x faster | Data processing tasks |
| Mixed I/O+CPU | 7.710s avg | 22.878s | 2.97x faster | ETL operations |
Key Insights:
maxWorkers: 1 essential for optimal I/O performance1. Light I/O Applications (Recommended: pg-parallel)
// WINNER: pg-parallel (5.4% faster + resilience)
const db = new PgParallel({
...pgConfig,
maxWorkers: 1, // Critical: Use 1 worker for pure I/O
retryConfig: {
maxRetries: 3,
baseDelay: 100,
},
circuitBreakerConfig: {
failureThreshold: 5,
resetTimeout: 30000,
},
});
await db.warmup(); // Mandatory for optimal performance!
// Perfect for: REST APIs, CRUD operations, light queries
await db.query('SELECT * FROM users WHERE id = $1', [userId]);
2. Heavy I/O Applications (Consider: pg.Pool)
// WINNER: pg.Pool (10-12% faster in pure throughput)
import { Pool } from 'pg';
const pool = new Pool({
...pgConfig,
max: 50, // Higher connection pool for throughput
idleTimeoutMillis: 30000,
});
// Best for: High-frequency queries, bulk operations, reporting
await pool.query('SELECT COUNT(*) FROM large_table');
// Alternative: pg-parallel with trade-off awareness
const db = new PgParallel({ ...pgConfig, maxWorkers: 1 });
await db.warmup();
// 11% slower but with circuit breaker + retry + zero errors
await db.query('SELECT * FROM large_dataset WHERE complex_condition');
3. Mixed I/O + CPU Workloads (Strongly Recommended: pg-parallel)
// CLEAR WINNER: pg-parallel (2.97x faster than pg.Pool)
const db = new PgParallel({
...pgConfig,
maxWorkers: 4, // More workers for CPU tasks
workerIdleTimeout: 60000,
});
await db.warmup();
// ETL operations, data processing, complex transformations
await db.worker(async (client) => {
const data = await client.query('SELECT * FROM raw_data LIMIT 10000');
const processed = await processLargeDataset(data.rows); // CPU-intensive
await client.query('INSERT INTO processed_data VALUES ...', processed);
return processed.length;
});
4. Pure CPU Tasks (Strongly Recommended: pg-parallel)
// CLEAR WINNER: pg-parallel (2.73x faster than sequential)
await db.task(() => {
// CPU-intensive operations in isolated worker thread
return calculateComplexReport(largeDataset);
}); // No database blocking!
| Use Case | Library | maxWorkers | Performance | Reliability | Best For |
|---|---|---|---|---|---|
| Light I/O | pg-parallel | 1 | +5.4% faster | Circuit breaker + Retry | Production APIs |
| Heavy I/O | pg.Pool | N/A | +10-12% faster | Basic | High-throughput reports |
| Mixed Workloads | pg-parallel | 2-4 | +197% faster | Full resilience | ETL, data processing |
| CPU Tasks | pg-parallel | 2-8 | +173% faster | Worker isolation | Analytics, calculations |
// ✅ DO: For pure I/O workloads
const db = new PgParallel({ maxWorkers: 1 }); // Optimal for I/O
// ❌ DON'T: Multiple workers for simple queries
const db = new PgParallel({ maxWorkers: 4 }); // Overhead for pure I/O
// ✅ DO: Always warmup in production
await db.warmup(); // 1,135% performance improvement
// ✅ DO: Use appropriate methods
await db.query(sql); // Pure I/O - now faster than pg.Pool
await db.task(fn); // Pure CPU - 2.73x faster
await db.worker(fn); // Mixed I/O+CPU - 2.97x faster
If you encounter issues not covered here:
This project uses the following third-party libraries:
Note: Previous versions used the uuid library, but since v1.4.0,
pg-parallel uses Node.js built-in crypto.randomUUID() for zero external
dependencies.
MIT © Jonathan Givisiez
FAQs
A specialized Node.js wrapper for `node-postgres` that prevents event-loop blocking by offloading heavy CPU tasks and complex transactions to worker threads.
The npm package pg-parallel receives a total of 0 weekly downloads. As such, pg-parallel popularity was classified as not popular.
We found that pg-parallel demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
Multiple high-impact npm maintainers confirm they have been targeted in the same social engineering campaign that compromised Axios.

Security News
Axios compromise traced to social engineering, showing how attacks on maintainers can bypass controls and expose the broader software supply chain.

Security News
Node.js has paused its bug bounty program after funding ended, removing payouts for vulnerability reports but keeping its security process unchanged.