Promise Pool 🏖️

A background task processor focused on performance, reliability, and durability.
TLDR; An upgraded Promise queue that's essentially a stateful Promise.all()
wrapper.
| Table of Contents
Smart Multi-Threaded Execution
Diagram of Promise Pool's 'Minimal Blocking' design
Key Promise Pool
Features
Promise Pool strives to excel at 4 key goals:
- Durability - won't fail in unpredictable ways - even under extreme load.
- Extensible by Design - supports Promise-based libraries (examples include: retry handling, debounce/throttle)
- Reliability - control your pool with a total runtime limit (align to max HTTP/Lambda request limit), idle timeout (catch orphan or zombie situations), plus a concurrent worker limit.
- Performance - the work pool queue uses a speedy Ring Buffer design!*
* Since this is JavaScript, our Ring Buffer is more like three JS Arrays in a trenchcoat.
Who needs a Promise Pool
?
- Any Node Services (Lambda functions, etc.) which does background work, defined as:
- Long-running async function(s),
- where the
return
value isn't used (in the current request.) - And failures are handled by logging.
Features
API
PromisePool
exposes 3 methods:
.add(...tasks)
- add one (or more) tasks for background processing. (A task is a function that wraps a Promise
value. e.g. () => Promise.resolve(1)
)..drain()
- Returns a promise that resolves when all tasks have been processed, or another thread takes over waiting by calling .drain()
again..done()
- Drains AND 'finalizes' the pool. No more tasks can be added after this. Can be called from multiple threads, only runs once.
See either the Usage Section below, or checkout the /examples
folder for more complete examples.
Install
npm install @elite-libs/promise-pool
yarn add @elite-libs/promise-pool
Usage
Minimal Example
import PromisePool from '@elite-libs/promise-pool';
const pool = new PromisePool();
(async () => {
pool.add(() => saveToS3(data));
pool.add(() => expensiveBackgroundWork(data));
await pool.drain();
})();
Singleton Pattern
Recommended for virtually all projects. (API, CLI, Lambda, Frontend, etc.)
The singleton pattern creates exactly 1 Promise Pool
- as soon as the script is imported (typically on the first run).
This ensures the maxWorkers
value will act as a global limit on the number of tasks that can run at the same time.
File /services/taskPoolSingleton.ts
import PromisePool from '@elite-libs/promise-pool';
export const taskPool = new PromisePool({
maxWorkers: 6,
});
Recipes
See examples below, or check out the /examples
folder for more complete examples.
AWS Lambda & Middy
Promise Pool
in some middy
middleware:
File 1/2 ./services/taskPoolMiddleware.ts
Note: The imported taskPool
is a singleton instance defined in the taskPoolSingleton
file.
import { taskPool } from './services/taskPoolSingleton';
export const taskPoolMiddleware = () => ({
before: (request) => {
Object.assign(request.context, { taskPool });
},
after: async (request) => {
await request.context.taskPool.drain();
}
});
Now you can use taskPool
in your Lambda function via event.context.taskPool
:
File 2/2 ./handlers/example.handler.ts
import middy from '@middy/core';
import { taskPoolMiddleware } from './services/taskPoolMiddleware';
const handleEvent = (event) => {
const { taskPool } = event.context;
const data = getDataFromEvent(event);
taskPool.add(() => saveToS3(data));
taskPool.add(() => expensiveBackgroundWork(data));
return {
statusCode: 200,
body: JSON.stringify({
message: 'Success',
}),
}
}
export const handler = middy(handleEvent)
.use(taskPoolMiddleware());
Express Middleware
File 1/3 /services/taskPoolSingleton.mjs
import PromisePool from '@elite-libs/promise-pool'
export const taskPool = new PromisePool({
maxWorkers: 6
})
File 2/3 /middleware/taskPool.middleware.mjs
import { taskPool } from "../services/taskPoolSingleton.mjs";
const taskPoolMiddleware = {
setup: (request, response, next) => {
request.taskPool = taskPool
next()
},
cleanup: (request, response, next) => {
if (request.taskPool && 'drain' in request.taskPool) {
taskPool.drain()
}
next()
}
}
export default taskPoolMiddleware
To use the taskPoolMiddleware
in your Express app, you'd include taskPoolMiddleware.setup()
and taskPoolMiddleware.cleanup()
.
File 3/3 /app.mjs
import taskPoolMiddleware from "../middleware/taskPool.middleware.mjs"
export const app = express()
app.use(taskPoolMiddleware.setup)
app.use(express.bodyParser())
app.post('/users/', function post(request, response, next) {
const { taskPool } = request
const data = getDataFromBody(request.body)
taskPool.add(() => logMetrics(data))
taskPool.add(() => saveToS3(request))
taskPool.add(() => expensiveBackgroundWork(data))
taskPool.add(
() => logMetrics(data),
() => saveToS3(request),
() => expensiveBackgroundWork(data)
)
next()
})
app.use(taskPoolMiddleware.cleanup)
Changelog
v1.3.1 - April 2023
- Upgraded dev dependencies (dependabot).
- Cleaned up README code & description.
v1.3.0 - September 2022
- What? Adds Smart
.drain()
behavior. - Why?
- Speeds up concurrent server environments!
- prevents several types of difficult (ghost) bugs!
- stack overflows/mem access violations,
- exceeding rarely hit OS limits, (e.g. max open handle/file/socket limits)
- exceeding limits of Network hardware (e.g. connections/sec, total open socket limit, etc.)
- uneven observed wait times.
- How? Only awaits the latest
.drain()
caller. - Who?
Server
+ Singleton
use cases will see a major benefit to this design. - Huh? See diagram