Big News: Socket raises $60M Series C at a $1B valuation to secure software supply chains for AI-driven development.Announcement
Sign In

@fortify-ts/bulkhead

Package Overview
Dependencies
Maintainers
1
Versions
7
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@fortify-ts/bulkhead - npm Package Compare versions

Comparing version
0.1.1
to
0.1.2
+196
-18
dist/index.cjs

@@ -54,2 +54,151 @@ "use strict";

// src/ring-buffer.ts
var RingBuffer = class {
/**
* Create a new ring buffer.
*
* @param capacity - Maximum number of elements the buffer can hold
*/
constructor(capacity) {
this.capacity = capacity;
__publicField(this, "buffer");
__publicField(this, "head", 0);
__publicField(this, "tail", 0);
__publicField(this, "count", 0);
this.buffer = new Array(capacity);
}
/**
* Add an element to the end of the buffer.
*
* @param item - The item to add
* @returns true if added, false if buffer is full
*/
push(item) {
if (this.count >= this.capacity) {
return false;
}
this.buffer[this.tail] = item;
this.tail = (this.tail + 1) % this.capacity;
this.count++;
return true;
}
/**
* Remove and return the element at the front of the buffer.
*
* @returns The item at the front, or undefined if empty
*/
shift() {
if (this.count === 0) {
return void 0;
}
const item = this.buffer[this.head];
this.buffer[this.head] = void 0;
this.head = (this.head + 1) % this.capacity;
this.count--;
return item;
}
/**
* Get the number of elements in the buffer.
*/
get length() {
return this.count;
}
/**
* Check if the buffer is empty.
*/
isEmpty() {
return this.count === 0;
}
/**
* Check if the buffer is full.
*/
isFull() {
return this.count >= this.capacity;
}
/**
* Find the index of an item in the buffer.
* Note: This is O(n) but should be rare (used for abort handling).
*
* @param item - The item to find
* @returns The index of the item, or -1 if not found
*/
indexOf(item) {
for (let i = 0; i < this.count; i++) {
const idx = (this.head + i) % this.capacity;
if (this.buffer[idx] === item) {
return i;
}
}
return -1;
}
/**
* Remove an item at the given logical index.
* Note: This is O(n) due to shifting elements.
*
* @param index - The logical index (0 = front of queue)
* @returns true if item was removed
*/
removeAt(index) {
if (index < 0 || index >= this.count) {
return false;
}
for (let i = index; i < this.count - 1; i++) {
const fromIdx = (this.head + i + 1) % this.capacity;
const toIdx = (this.head + i) % this.capacity;
this.buffer[toIdx] = this.buffer[fromIdx];
}
const lastIdx = (this.head + this.count - 1) % this.capacity;
this.buffer[lastIdx] = void 0;
this.tail = (this.tail - 1 + this.capacity) % this.capacity;
this.count--;
return true;
}
/**
* Remove an item from the buffer.
* Note: This is O(n) but should be rare (used for abort handling).
*
* @param item - The item to remove
* @returns true if item was found and removed
*/
remove(item) {
const index = this.indexOf(item);
if (index === -1) {
return false;
}
return this.removeAt(index);
}
/**
* Get all items and clear the buffer.
*
* @returns Array of all items in queue order
*/
drain() {
const items = [];
for (let i = 0; i < this.count; i++) {
const idx = (this.head + i) % this.capacity;
const item = this.buffer[idx];
if (item !== void 0) {
items.push(item);
}
this.buffer[idx] = void 0;
}
this.head = 0;
this.tail = 0;
this.count = 0;
return items;
}
/**
* Clear all elements from the buffer.
*/
clear() {
for (let i = 0; i < this.count; i++) {
const idx = (this.head + i) % this.capacity;
this.buffer[idx] = void 0;
}
this.head = 0;
this.tail = 0;
this.count = 0;
}
};
// src/semaphore.ts

@@ -61,9 +210,11 @@ var Semaphore = class {

* @param maxPermits - Maximum number of concurrent permits
* @param maxQueue - Maximum queue size (defaults to 1000)
*/
constructor(maxPermits) {
constructor(maxPermits, maxQueue = 1e3) {
__publicField(this, "permits");
__publicField(this, "maxPermits");
__publicField(this, "waitingQueue", []);
__publicField(this, "waitingQueue");
this.maxPermits = maxPermits;
this.permits = maxPermits;
this.waitingQueue = new RingBuffer(maxQueue);
}

@@ -101,13 +252,13 @@ /**

const waiter = { resolve, reject };
this.waitingQueue.push(waiter);
if (signal) {
const onAbort = () => {
const index = this.waitingQueue.indexOf(waiter);
if (index !== -1) {
this.waitingQueue.splice(index, 1);
if (this.waitingQueue.remove(waiter)) {
reject(signal.reason ?? new DOMException("Aborted", "AbortError"));
}
};
waiter.signal = signal;
waiter.onAbort = onAbort;
signal.addEventListener("abort", onAbort, { once: true });
}
this.waitingQueue.push(waiter);
});

@@ -119,5 +270,10 @@ }

release() {
if (this.waitingQueue.length > 0) {
if (!this.waitingQueue.isEmpty()) {
const waiter = this.waitingQueue.shift();
waiter?.resolve();
if (waiter) {
if (waiter.signal && waiter.onAbort) {
waiter.signal.removeEventListener("abort", waiter.onAbort);
}
waiter.resolve();
}
} else if (this.permits < this.maxPermits) {

@@ -143,4 +299,7 @@ this.permits++;

rejectAll(error) {
const waiters = this.waitingQueue.splice(0, this.waitingQueue.length);
const waiters = this.waitingQueue.drain();
for (const waiter of waiters) {
if (waiter.signal && waiter.onAbort) {
waiter.signal.removeEventListener("abort", waiter.onAbort);
}
waiter.reject(error);

@@ -166,5 +325,9 @@ }

this.logger = this.config.logger ?? import_core.noopLogger;
this.semaphore = new Semaphore(this.config.maxConcurrent);
this.semaphore = new Semaphore(
this.config.maxConcurrent,
Math.max(this.config.maxQueue, 1)
// At least 1 for edge cases
);
if (this.config.maxQueue > 0) {
this.queueSemaphore = new Semaphore(this.config.maxQueue);
this.queueSemaphore = new Semaphore(this.config.maxQueue, 1);
}

@@ -183,3 +346,3 @@ }

if (this.closed) {
throw new import_core.BulkheadFullError();
throw this.createFullError("Bulkhead is closed");
}

@@ -212,4 +375,5 @@ if (signal?.aborted) {

this.closed = true;
this.semaphore.rejectAll(new import_core.BulkheadFullError());
this.queueSemaphore?.rejectAll(new import_core.BulkheadFullError());
const closeError = this.createFullError("Bulkhead closed");
this.semaphore.rejectAll(closeError);
this.queueSemaphore?.rejectAll(closeError);
this.logger.info("Bulkhead closed");

@@ -240,8 +404,9 @@ }

this.onRejected();
throw new import_core.BulkheadFullError();
throw this.createFullError("Bulkhead is full - no queue configured");
}
if (!this.queueSemaphore?.tryAcquire()) {
this.onRejected();
throw new import_core.BulkheadFullError();
throw this.createFullError("Bulkhead queue is full");
}
let timeoutId;
try {

@@ -257,5 +422,5 @@ let timeoutController;

}
(0, import_core.sleep)(this.config.queueTimeout).then(() => {
timeoutId = setTimeout(() => {
timeoutController?.abort(new DOMException("Queue timeout", "TimeoutError"));
});
}, this.config.queueTimeout);
}

@@ -272,2 +437,5 @@ try {

} finally {
if (timeoutId) {
clearTimeout(timeoutId);
}
this.queueSemaphore?.release();

@@ -294,3 +462,13 @@ }

}
/**
* Create a BulkheadFullError with current state context.
*/
createFullError(message) {
return new import_core.BulkheadFullError(
message ?? "Bulkhead is full",
this.activeCount(),
this.queuedCount()
);
}
};
//# sourceMappingURL=index.cjs.map
+1
-1

@@ -1,1 +0,1 @@

{"version":3,"sources":["../src/index.ts","../src/bulkhead.ts","../src/config.ts","../src/semaphore.ts"],"sourcesContent":["export { Bulkhead } from './bulkhead.js';\nexport { Semaphore } from './semaphore.js';\nexport {\n bulkheadConfigSchema,\n type BulkheadConfig,\n type BulkheadConfigInput,\n type BulkheadConfigInputFull,\n parseBulkheadConfig,\n} from './config.js';\n","import {\n type Operation,\n type Pattern,\n type Resettable,\n BulkheadFullError,\n type FortifyLogger,\n noopLogger,\n sleep,\n} from '@fortify-ts/core';\nimport {\n type BulkheadConfig,\n type BulkheadConfigInputFull,\n parseBulkheadConfig,\n} from './config.js';\nimport { Semaphore } from './semaphore.js';\n\n/**\n * Bulkhead pattern implementation for limiting concurrent operations.\n *\n * Prevents resource exhaustion by limiting the number of concurrent executions,\n * with optional queueing for overflow requests.\n *\n * @template T - The return type of operations\n *\n * @example\n * ```typescript\n * const bulkhead = new Bulkhead<Response>({\n * maxConcurrent: 5,\n * maxQueue: 10,\n * queueTimeout: 5000,\n * onRejected: () => console.log('Request rejected'),\n * });\n *\n * const result = await bulkhead.execute(async (signal) => {\n * return fetch('/api/data', { signal });\n * });\n * ```\n */\nexport class Bulkhead<T> implements Pattern<T>, Resettable {\n private readonly config: BulkheadConfig;\n private readonly logger: FortifyLogger;\n private readonly semaphore: Semaphore;\n private readonly queueSemaphore: Semaphore | undefined;\n private closed = false;\n\n /**\n * Create a new Bulkhead instance.\n *\n * @param config - Bulkhead configuration\n */\n constructor(config?: BulkheadConfigInputFull) {\n this.config = parseBulkheadConfig(config);\n this.logger = this.config.logger ?? noopLogger;\n this.semaphore = new Semaphore(this.config.maxConcurrent);\n\n // Only create queue semaphore if maxQueue > 0\n if (this.config.maxQueue > 0) {\n this.queueSemaphore = new Semaphore(this.config.maxQueue);\n }\n }\n\n /**\n * Execute an operation within the bulkhead's concurrency limits.\n *\n * @param operation - The async operation to execute\n * @param signal - Optional AbortSignal for cancellation\n * @returns Promise resolving to the operation result\n * @throws {BulkheadFullError} When bulkhead is at capacity and queue is full\n * @throws {DOMException} When cancelled via signal (AbortError)\n */\n async execute(operation: Operation<T>, signal?: AbortSignal): Promise<T> {\n // Check if closed\n if (this.closed) {\n throw new BulkheadFullError();\n }\n\n // Check if cancelled\n if (signal?.aborted) {\n throw signal.reason ?? new DOMException('Aborted', 'AbortError');\n }\n\n // Try to acquire semaphore immediately\n if (this.semaphore.tryAcquire()) {\n return this.executeWithPermit(operation, signal);\n }\n\n // Bulkhead full, try to queue\n return this.enqueue(operation, signal);\n }\n\n /**\n * Get the number of currently active executions.\n */\n activeCount(): number {\n return this.config.maxConcurrent - this.semaphore.availablePermits();\n }\n\n /**\n * Get the number of requests currently waiting in the queue.\n */\n queuedCount(): number {\n return this.semaphore.queueLength();\n }\n\n /**\n * Close the bulkhead, rejecting all pending requests.\n */\n close(): void {\n if (this.closed) return;\n\n this.closed = true;\n this.semaphore.rejectAll(new BulkheadFullError());\n this.queueSemaphore?.rejectAll(new BulkheadFullError());\n this.logger.info('Bulkhead closed');\n }\n\n /**\n * Reset the bulkhead to accept new requests.\n */\n reset(): void {\n this.closed = false;\n this.logger.info('Bulkhead reset');\n }\n\n /**\n * Execute operation with semaphore permit held.\n */\n private async executeWithPermit(\n operation: Operation<T>,\n signal?: AbortSignal\n ): Promise<T> {\n try {\n return await operation(signal ?? new AbortController().signal);\n } finally {\n this.semaphore.release();\n }\n }\n\n /**\n * Attempt to queue the request when bulkhead is full.\n */\n private async enqueue(\n operation: Operation<T>,\n signal?: AbortSignal\n ): Promise<T> {\n // If no queue configured, reject immediately\n if (this.config.maxQueue === 0) {\n this.onRejected();\n throw new BulkheadFullError();\n }\n\n // Try to acquire queue slot\n if (!this.queueSemaphore?.tryAcquire()) {\n // Queue is full, reject\n this.onRejected();\n throw new BulkheadFullError();\n }\n\n try {\n // Create combined signal for queue timeout\n let timeoutController: AbortController | undefined;\n let combinedSignal = signal;\n\n if (this.config.queueTimeout > 0) {\n timeoutController = new AbortController();\n\n // Create combined signal\n if (signal) {\n combinedSignal = AbortSignal.any([signal, timeoutController.signal]);\n } else {\n combinedSignal = timeoutController.signal;\n }\n\n // Start timeout\n sleep(this.config.queueTimeout).then(() => {\n timeoutController?.abort(new DOMException('Queue timeout', 'TimeoutError'));\n });\n }\n\n // Wait for execution semaphore\n try {\n await this.semaphore.acquire(combinedSignal);\n } catch (error) {\n // If aborted due to queue timeout, call onRejected\n if (\n error instanceof DOMException &&\n error.name === 'TimeoutError'\n ) {\n this.onRejected();\n }\n throw error;\n }\n\n // Got permit, execute\n return await this.executeWithPermit(operation, signal);\n } finally {\n this.queueSemaphore?.release();\n }\n }\n\n /**\n * Handle rejection event.\n */\n private onRejected(): void {\n this.logger.warn('Bulkhead rejection', {\n maxConcurrent: this.config.maxConcurrent,\n maxQueue: this.config.maxQueue,\n });\n\n if (this.config.onRejected) {\n try {\n this.config.onRejected();\n } catch (error) {\n this.logger.error('onRejected callback threw an error', {\n error: error instanceof Error ? error.message : String(error),\n });\n }\n }\n }\n}\n","import { z } from 'zod';\nimport { type FortifyLogger } from '@fortify-ts/core';\n\n/**\n * Zod schema for Bulkhead configuration.\n */\nexport const bulkheadConfigSchema = z.object({\n /** Maximum number of concurrent executions allowed (default: 10) */\n maxConcurrent: z.number().int().positive().default(10),\n /** Maximum size of overflow queue, 0 means no queue (default: 0) */\n maxQueue: z.number().int().nonnegative().default(0),\n /** Maximum time a request can wait in queue in milliseconds, 0 means no timeout (default: 0) */\n queueTimeout: z.number().int().nonnegative().default(0),\n});\n\n/**\n * Raw config input type (before defaults are applied).\n */\nexport type BulkheadConfigInput = z.input<typeof bulkheadConfigSchema>;\n\n/**\n * Parsed config type (after defaults are applied).\n */\nexport type BulkheadConfigParsed = z.output<typeof bulkheadConfigSchema>;\n\n/**\n * Full configuration type including callbacks and logger.\n */\nexport interface BulkheadConfig extends BulkheadConfigParsed {\n /** Callback when a request is rejected */\n onRejected: (() => void) | undefined;\n /** Logger instance for structured logging */\n logger: FortifyLogger | undefined;\n}\n\n/**\n * Input config type for constructor.\n */\nexport interface BulkheadConfigInputFull extends BulkheadConfigInput {\n onRejected?: () => void;\n logger?: FortifyLogger;\n}\n\n/**\n * Parse and validate bulkhead configuration.\n *\n * @param config - Raw configuration input\n * @returns Validated configuration with defaults applied\n */\nexport function parseBulkheadConfig(config?: BulkheadConfigInputFull): BulkheadConfig {\n const parsed = bulkheadConfigSchema.parse(config ?? {});\n return {\n ...parsed,\n onRejected: config?.onRejected,\n logger: config?.logger,\n };\n}\n","/**\n * A Promise-based semaphore for limiting concurrent operations.\n */\nexport class Semaphore {\n private permits: number;\n private readonly maxPermits: number;\n private readonly waitingQueue: Array<{\n resolve: () => void;\n reject: (error: Error) => void;\n }> = [];\n\n /**\n * Create a new semaphore.\n *\n * @param maxPermits - Maximum number of concurrent permits\n */\n constructor(maxPermits: number) {\n this.maxPermits = maxPermits;\n this.permits = maxPermits;\n }\n\n /**\n * Try to acquire a permit without waiting.\n *\n * @returns true if a permit was acquired, false otherwise\n */\n tryAcquire(): boolean {\n if (this.permits > 0) {\n this.permits--;\n return true;\n }\n return false;\n }\n\n /**\n * Acquire a permit, waiting if necessary.\n *\n * @param signal - Optional AbortSignal for cancellation\n * @returns Promise that resolves when permit is acquired\n * @throws {DOMException} When cancelled via signal (AbortError)\n */\n acquire(signal?: AbortSignal): Promise<void> {\n // Check if cancelled\n if (signal?.aborted) {\n return Promise.reject(\n signal.reason ?? new DOMException('Aborted', 'AbortError')\n );\n }\n\n // Try to acquire immediately\n if (this.permits > 0) {\n this.permits--;\n return Promise.resolve();\n }\n\n // Add to wait queue\n return new Promise<void>((resolve, reject) => {\n const waiter = { resolve, reject };\n this.waitingQueue.push(waiter);\n\n // Set up abort handler\n if (signal) {\n const onAbort = () => {\n const index = this.waitingQueue.indexOf(waiter);\n if (index !== -1) {\n this.waitingQueue.splice(index, 1);\n reject(signal.reason ?? new DOMException('Aborted', 'AbortError'));\n }\n };\n\n signal.addEventListener('abort', onAbort, { once: true });\n }\n });\n }\n\n /**\n * Release a permit back to the semaphore.\n */\n release(): void {\n if (this.waitingQueue.length > 0) {\n // Give permit to next waiter\n const waiter = this.waitingQueue.shift();\n waiter?.resolve();\n } else if (this.permits < this.maxPermits) {\n // Return permit to pool\n this.permits++;\n }\n }\n\n /**\n * Get the number of available permits.\n */\n availablePermits(): number {\n return this.permits;\n }\n\n /**\n * Get the number of waiters in the queue.\n */\n queueLength(): number {\n return this.waitingQueue.length;\n }\n\n /**\n * Reject all waiters with the given error.\n */\n rejectAll(error: Error): void {\n const waiters = this.waitingQueue.splice(0, this.waitingQueue.length);\n for (const waiter of waiters) {\n waiter.reject(error);\n }\n }\n}\n"],"mappings":";;;;;;;;;;;;;;;;;;;;;;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;;;ACAA,kBAQO;;;ACRP,iBAAkB;AAMX,IAAM,uBAAuB,aAAE,OAAO;AAAA;AAAA,EAE3C,eAAe,aAAE,OAAO,EAAE,IAAI,EAAE,SAAS,EAAE,QAAQ,EAAE;AAAA;AAAA,EAErD,UAAU,aAAE,OAAO,EAAE,IAAI,EAAE,YAAY,EAAE,QAAQ,CAAC;AAAA;AAAA,EAElD,cAAc,aAAE,OAAO,EAAE,IAAI,EAAE,YAAY,EAAE,QAAQ,CAAC;AACxD,CAAC;AAoCM,SAAS,oBAAoB,QAAkD;AACpF,QAAM,SAAS,qBAAqB,MAAM,UAAU,CAAC,CAAC;AACtD,SAAO;AAAA,IACL,GAAG;AAAA,IACH,YAAY,QAAQ;AAAA,IACpB,QAAQ,QAAQ;AAAA,EAClB;AACF;;;ACrDO,IAAM,YAAN,MAAgB;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAarB,YAAY,YAAoB;AAZhC,wBAAQ;AACR,wBAAiB;AACjB,wBAAiB,gBAGZ,CAAC;AAQJ,SAAK,aAAa;AAClB,SAAK,UAAU;AAAA,EACjB;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAOA,aAAsB;AACpB,QAAI,KAAK,UAAU,GAAG;AACpB,WAAK;AACL,aAAO;AAAA,IACT;AACA,WAAO;AAAA,EACT;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EASA,QAAQ,QAAqC;AAE3C,QAAI,QAAQ,SAAS;AACnB,aAAO,QAAQ;AAAA,QACb,OAAO,UAAU,IAAI,aAAa,WAAW,YAAY;AAAA,MAC3D;AAAA,IACF;AAGA,QAAI,KAAK,UAAU,GAAG;AACpB,WAAK;AACL,aAAO,QAAQ,QAAQ;AAAA,IACzB;AAGA,WAAO,IAAI,QAAc,CAAC,SAAS,WAAW;AAC5C,YAAM,SAAS,EAAE,SAAS,OAAO;AACjC,WAAK,aAAa,KAAK,MAAM;AAG7B,UAAI,QAAQ;AACV,cAAM,UAAU,MAAM;AACpB,gBAAM,QAAQ,KAAK,aAAa,QAAQ,MAAM;AAC9C,cAAI,UAAU,IAAI;AAChB,iBAAK,aAAa,OAAO,OAAO,CAAC;AACjC,mBAAO,OAAO,UAAU,IAAI,aAAa,WAAW,YAAY,CAAC;AAAA,UACnE;AAAA,QACF;AAEA,eAAO,iBAAiB,SAAS,SAAS,EAAE,MAAM,KAAK,CAAC;AAAA,MAC1D;AAAA,IACF,CAAC;AAAA,EACH;AAAA;AAAA;AAAA;AAAA,EAKA,UAAgB;AACd,QAAI,KAAK,aAAa,SAAS,GAAG;AAEhC,YAAM,SAAS,KAAK,aAAa,MAAM;AACvC,cAAQ,QAAQ;AAAA,IAClB,WAAW,KAAK,UAAU,KAAK,YAAY;AAEzC,WAAK;AAAA,IACP;AAAA,EACF;AAAA;AAAA;AAAA;AAAA,EAKA,mBAA2B;AACzB,WAAO,KAAK;AAAA,EACd;AAAA;AAAA;AAAA;AAAA,EAKA,cAAsB;AACpB,WAAO,KAAK,aAAa;AAAA,EAC3B;AAAA;AAAA;AAAA;AAAA,EAKA,UAAU,OAAoB;AAC5B,UAAM,UAAU,KAAK,aAAa,OAAO,GAAG,KAAK,aAAa,MAAM;AACpE,eAAW,UAAU,SAAS;AAC5B,aAAO,OAAO,KAAK;AAAA,IACrB;AAAA,EACF;AACF;;;AF1EO,IAAM,WAAN,MAAoD;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAYzD,YAAY,QAAkC;AAX9C,wBAAiB;AACjB,wBAAiB;AACjB,wBAAiB;AACjB,wBAAiB;AACjB,wBAAQ,UAAS;AAQf,SAAK,SAAS,oBAAoB,MAAM;AACxC,SAAK,SAAS,KAAK,OAAO,UAAU;AACpC,SAAK,YAAY,IAAI,UAAU,KAAK,OAAO,aAAa;AAGxD,QAAI,KAAK,OAAO,WAAW,GAAG;AAC5B,WAAK,iBAAiB,IAAI,UAAU,KAAK,OAAO,QAAQ;AAAA,IAC1D;AAAA,EACF;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAWA,MAAM,QAAQ,WAAyB,QAAkC;AAEvE,QAAI,KAAK,QAAQ;AACf,YAAM,IAAI,8BAAkB;AAAA,IAC9B;AAGA,QAAI,QAAQ,SAAS;AACnB,YAAM,OAAO,UAAU,IAAI,aAAa,WAAW,YAAY;AAAA,IACjE;AAGA,QAAI,KAAK,UAAU,WAAW,GAAG;AAC/B,aAAO,KAAK,kBAAkB,WAAW,MAAM;AAAA,IACjD;AAGA,WAAO,KAAK,QAAQ,WAAW,MAAM;AAAA,EACvC;AAAA;AAAA;AAAA;AAAA,EAKA,cAAsB;AACpB,WAAO,KAAK,OAAO,gBAAgB,KAAK,UAAU,iBAAiB;AAAA,EACrE;AAAA;AAAA;AAAA;AAAA,EAKA,cAAsB;AACpB,WAAO,KAAK,UAAU,YAAY;AAAA,EACpC;AAAA;AAAA;AAAA;AAAA,EAKA,QAAc;AACZ,QAAI,KAAK,OAAQ;AAEjB,SAAK,SAAS;AACd,SAAK,UAAU,UAAU,IAAI,8BAAkB,CAAC;AAChD,SAAK,gBAAgB,UAAU,IAAI,8BAAkB,CAAC;AACtD,SAAK,OAAO,KAAK,iBAAiB;AAAA,EACpC;AAAA;AAAA;AAAA;AAAA,EAKA,QAAc;AACZ,SAAK,SAAS;AACd,SAAK,OAAO,KAAK,gBAAgB;AAAA,EACnC;AAAA;AAAA;AAAA;AAAA,EAKA,MAAc,kBACZ,WACA,QACY;AACZ,QAAI;AACF,aAAO,MAAM,UAAU,UAAU,IAAI,gBAAgB,EAAE,MAAM;AAAA,IAC/D,UAAE;AACA,WAAK,UAAU,QAAQ;AAAA,IACzB;AAAA,EACF;AAAA;AAAA;AAAA;AAAA,EAKA,MAAc,QACZ,WACA,QACY;AAEZ,QAAI,KAAK,OAAO,aAAa,GAAG;AAC9B,WAAK,WAAW;AAChB,YAAM,IAAI,8BAAkB;AAAA,IAC9B;AAGA,QAAI,CAAC,KAAK,gBAAgB,WAAW,GAAG;AAEtC,WAAK,WAAW;AAChB,YAAM,IAAI,8BAAkB;AAAA,IAC9B;AAEA,QAAI;AAEF,UAAI;AACJ,UAAI,iBAAiB;AAErB,UAAI,KAAK,OAAO,eAAe,GAAG;AAChC,4BAAoB,IAAI,gBAAgB;AAGxC,YAAI,QAAQ;AACV,2BAAiB,YAAY,IAAI,CAAC,QAAQ,kBAAkB,MAAM,CAAC;AAAA,QACrE,OAAO;AACL,2BAAiB,kBAAkB;AAAA,QACrC;AAGA,+BAAM,KAAK,OAAO,YAAY,EAAE,KAAK,MAAM;AACzC,6BAAmB,MAAM,IAAI,aAAa,iBAAiB,cAAc,CAAC;AAAA,QAC5E,CAAC;AAAA,MACH;AAGA,UAAI;AACF,cAAM,KAAK,UAAU,QAAQ,cAAc;AAAA,MAC7C,SAAS,OAAO;AAEd,YACE,iBAAiB,gBACjB,MAAM,SAAS,gBACf;AACA,eAAK,WAAW;AAAA,QAClB;AACA,cAAM;AAAA,MACR;AAGA,aAAO,MAAM,KAAK,kBAAkB,WAAW,MAAM;AAAA,IACvD,UAAE;AACA,WAAK,gBAAgB,QAAQ;AAAA,IAC/B;AAAA,EACF;AAAA;AAAA;AAAA;AAAA,EAKQ,aAAmB;AACzB,SAAK,OAAO,KAAK,sBAAsB;AAAA,MACrC,eAAe,KAAK,OAAO;AAAA,MAC3B,UAAU,KAAK,OAAO;AAAA,IACxB,CAAC;AAED,QAAI,KAAK,OAAO,YAAY;AAC1B,UAAI;AACF,aAAK,OAAO,WAAW;AAAA,MACzB,SAAS,OAAO;AACd,aAAK,OAAO,MAAM,sCAAsC;AAAA,UACtD,OAAO,iBAAiB,QAAQ,MAAM,UAAU,OAAO,KAAK;AAAA,QAC9D,CAAC;AAAA,MACH;AAAA,IACF;AAAA,EACF;AACF;","names":[]}
{"version":3,"sources":["../src/index.ts","../src/bulkhead.ts","../src/config.ts","../src/ring-buffer.ts","../src/semaphore.ts"],"sourcesContent":["export { Bulkhead } from './bulkhead.js';\nexport { Semaphore } from './semaphore.js';\nexport {\n bulkheadConfigSchema,\n type BulkheadConfig,\n type BulkheadConfigInput,\n type BulkheadConfigInputFull,\n parseBulkheadConfig,\n} from './config.js';\n","import {\n type Operation,\n type Pattern,\n type Resettable,\n BulkheadFullError,\n type FortifyLogger,\n noopLogger,\n} from '@fortify-ts/core';\nimport {\n type BulkheadConfig,\n type BulkheadConfigInputFull,\n parseBulkheadConfig,\n} from './config.js';\nimport { Semaphore } from './semaphore.js';\n\n/**\n * Bulkhead pattern implementation for limiting concurrent operations.\n *\n * Prevents resource exhaustion by limiting the number of concurrent executions,\n * with optional queueing for overflow requests.\n *\n * @template T - The return type of operations\n *\n * @example\n * ```typescript\n * const bulkhead = new Bulkhead<Response>({\n * maxConcurrent: 5,\n * maxQueue: 10,\n * queueTimeout: 5000,\n * onRejected: () => console.log('Request rejected'),\n * });\n *\n * const result = await bulkhead.execute(async (signal) => {\n * return fetch('/api/data', { signal });\n * });\n * ```\n */\nexport class Bulkhead<T> implements Pattern<T>, Resettable {\n private readonly config: BulkheadConfig;\n private readonly logger: FortifyLogger;\n private readonly semaphore: Semaphore;\n private readonly queueSemaphore: Semaphore | undefined;\n private closed = false;\n\n /**\n * Create a new Bulkhead instance.\n *\n * @param config - Bulkhead configuration\n */\n constructor(config?: BulkheadConfigInputFull) {\n this.config = parseBulkheadConfig(config);\n this.logger = this.config.logger ?? noopLogger;\n // Execution semaphore: queue capacity = maxQueue (bounded by queue semaphore)\n this.semaphore = new Semaphore(\n this.config.maxConcurrent,\n Math.max(this.config.maxQueue, 1) // At least 1 for edge cases\n );\n\n // Only create queue semaphore if maxQueue > 0\n if (this.config.maxQueue > 0) {\n // Queue semaphore: only used for tryAcquire, queue never used\n this.queueSemaphore = new Semaphore(this.config.maxQueue, 1);\n }\n }\n\n /**\n * Execute an operation within the bulkhead's concurrency limits.\n *\n * @param operation - The async operation to execute\n * @param signal - Optional AbortSignal for cancellation\n * @returns Promise resolving to the operation result\n * @throws {BulkheadFullError} When bulkhead is at capacity and queue is full\n * @throws {DOMException} When cancelled via signal (AbortError)\n */\n async execute(operation: Operation<T>, signal?: AbortSignal): Promise<T> {\n // Check if closed\n if (this.closed) {\n throw this.createFullError('Bulkhead is closed');\n }\n\n // Check if cancelled\n if (signal?.aborted) {\n throw signal.reason ?? new DOMException('Aborted', 'AbortError');\n }\n\n // Try to acquire semaphore immediately\n if (this.semaphore.tryAcquire()) {\n return this.executeWithPermit(operation, signal);\n }\n\n // Bulkhead full, try to queue\n return this.enqueue(operation, signal);\n }\n\n /**\n * Get the number of currently active executions.\n */\n activeCount(): number {\n return this.config.maxConcurrent - this.semaphore.availablePermits();\n }\n\n /**\n * Get the number of requests currently waiting in the queue.\n */\n queuedCount(): number {\n return this.semaphore.queueLength();\n }\n\n /**\n * Close the bulkhead, rejecting all pending requests.\n */\n close(): void {\n if (this.closed) return;\n\n this.closed = true;\n const closeError = this.createFullError('Bulkhead closed');\n this.semaphore.rejectAll(closeError);\n this.queueSemaphore?.rejectAll(closeError);\n this.logger.info('Bulkhead closed');\n }\n\n /**\n * Reset the bulkhead to accept new requests.\n */\n reset(): void {\n this.closed = false;\n this.logger.info('Bulkhead reset');\n }\n\n /**\n * Execute operation with semaphore permit held.\n */\n private async executeWithPermit(\n operation: Operation<T>,\n signal?: AbortSignal\n ): Promise<T> {\n try {\n return await operation(signal ?? new AbortController().signal);\n } finally {\n this.semaphore.release();\n }\n }\n\n /**\n * Attempt to queue the request when bulkhead is full.\n */\n private async enqueue(\n operation: Operation<T>,\n signal?: AbortSignal\n ): Promise<T> {\n // If no queue configured, reject immediately\n if (this.config.maxQueue === 0) {\n this.onRejected();\n throw this.createFullError('Bulkhead is full - no queue configured');\n }\n\n // Try to acquire queue slot\n if (!this.queueSemaphore?.tryAcquire()) {\n // Queue is full, reject\n this.onRejected();\n throw this.createFullError('Bulkhead queue is full');\n }\n\n // Track timeout for cleanup in finally block\n let timeoutId: ReturnType<typeof setTimeout> | undefined;\n\n try {\n // Create combined signal for queue timeout\n let timeoutController: AbortController | undefined;\n let combinedSignal = signal;\n\n if (this.config.queueTimeout > 0) {\n timeoutController = new AbortController();\n\n // Create combined signal\n if (signal) {\n combinedSignal = AbortSignal.any([signal, timeoutController.signal]);\n } else {\n combinedSignal = timeoutController.signal;\n }\n\n // Start timeout with proper cleanup\n timeoutId = setTimeout(() => {\n timeoutController?.abort(new DOMException('Queue timeout', 'TimeoutError'));\n }, this.config.queueTimeout);\n }\n\n // Wait for execution semaphore\n try {\n await this.semaphore.acquire(combinedSignal);\n } catch (error) {\n // If aborted due to queue timeout, call onRejected\n if (\n error instanceof DOMException &&\n error.name === 'TimeoutError'\n ) {\n this.onRejected();\n }\n throw error;\n }\n\n // Got permit, execute\n return await this.executeWithPermit(operation, signal);\n } finally {\n // Clear timeout to prevent memory leak\n if (timeoutId) {\n clearTimeout(timeoutId);\n }\n this.queueSemaphore?.release();\n }\n }\n\n /**\n * Handle rejection event.\n */\n private onRejected(): void {\n this.logger.warn('Bulkhead rejection', {\n maxConcurrent: this.config.maxConcurrent,\n maxQueue: this.config.maxQueue,\n });\n\n if (this.config.onRejected) {\n try {\n this.config.onRejected();\n } catch (error) {\n this.logger.error('onRejected callback threw an error', {\n error: error instanceof Error ? error.message : String(error),\n });\n }\n }\n }\n\n /**\n * Create a BulkheadFullError with current state context.\n */\n private createFullError(message?: string): BulkheadFullError {\n return new BulkheadFullError(\n message ?? 'Bulkhead is full',\n this.activeCount(),\n this.queuedCount()\n );\n }\n}\n","import { z } from 'zod';\nimport { type FortifyLogger } from '@fortify-ts/core';\n\n/**\n * Zod schema for Bulkhead configuration.\n */\nexport const bulkheadConfigSchema = z.object({\n /** Maximum number of concurrent executions allowed (default: 10) */\n maxConcurrent: z.number().int().positive().default(10),\n /** Maximum size of overflow queue, 0 means no queue (default: 0) */\n maxQueue: z.number().int().nonnegative().default(0),\n /** Maximum time a request can wait in queue in milliseconds, 0 means no timeout (default: 0) */\n queueTimeout: z.number().int().nonnegative().default(0),\n});\n\n/**\n * Raw config input type (before defaults are applied).\n */\nexport type BulkheadConfigInput = z.input<typeof bulkheadConfigSchema>;\n\n/**\n * Parsed config type (after defaults are applied).\n */\nexport type BulkheadConfigParsed = z.output<typeof bulkheadConfigSchema>;\n\n/**\n * Full configuration type including callbacks and logger.\n */\nexport interface BulkheadConfig extends BulkheadConfigParsed {\n /** Callback when a request is rejected */\n onRejected: (() => void) | undefined;\n /** Logger instance for structured logging */\n logger: FortifyLogger | undefined;\n}\n\n/**\n * Input config type for constructor.\n */\nexport interface BulkheadConfigInputFull extends BulkheadConfigInput {\n onRejected?: () => void;\n logger?: FortifyLogger;\n}\n\n/**\n * Parse and validate bulkhead configuration.\n *\n * @param config - Raw configuration input\n * @returns Validated configuration with defaults applied\n */\nexport function parseBulkheadConfig(config?: BulkheadConfigInputFull): BulkheadConfig {\n const parsed = bulkheadConfigSchema.parse(config ?? {});\n return {\n ...parsed,\n onRejected: config?.onRejected,\n logger: config?.logger,\n };\n}\n","/**\n * O(1) ring buffer queue implementation.\n *\n * Provides O(1) enqueue and dequeue operations by using a circular buffer\n * with head and tail pointers.\n *\n * @template T - The type of elements in the buffer\n */\nexport class RingBuffer<T> {\n private readonly buffer: (T | undefined)[];\n private head = 0;\n private tail = 0;\n private count = 0;\n\n /**\n * Create a new ring buffer.\n *\n * @param capacity - Maximum number of elements the buffer can hold\n */\n constructor(private readonly capacity: number) {\n this.buffer = new Array(capacity);\n }\n\n /**\n * Add an element to the end of the buffer.\n *\n * @param item - The item to add\n * @returns true if added, false if buffer is full\n */\n push(item: T): boolean {\n if (this.count >= this.capacity) {\n return false;\n }\n\n this.buffer[this.tail] = item;\n this.tail = (this.tail + 1) % this.capacity;\n this.count++;\n return true;\n }\n\n /**\n * Remove and return the element at the front of the buffer.\n *\n * @returns The item at the front, or undefined if empty\n */\n shift(): T | undefined {\n if (this.count === 0) {\n return undefined;\n }\n\n const item = this.buffer[this.head];\n this.buffer[this.head] = undefined; // Help GC\n this.head = (this.head + 1) % this.capacity;\n this.count--;\n return item;\n }\n\n /**\n * Get the number of elements in the buffer.\n */\n get length(): number {\n return this.count;\n }\n\n /**\n * Check if the buffer is empty.\n */\n isEmpty(): boolean {\n return this.count === 0;\n }\n\n /**\n * Check if the buffer is full.\n */\n isFull(): boolean {\n return this.count >= this.capacity;\n }\n\n /**\n * Find the index of an item in the buffer.\n * Note: This is O(n) but should be rare (used for abort handling).\n *\n * @param item - The item to find\n * @returns The index of the item, or -1 if not found\n */\n indexOf(item: T): number {\n for (let i = 0; i < this.count; i++) {\n const idx = (this.head + i) % this.capacity;\n if (this.buffer[idx] === item) {\n return i;\n }\n }\n return -1;\n }\n\n /**\n * Remove an item at the given logical index.\n * Note: This is O(n) due to shifting elements.\n *\n * @param index - The logical index (0 = front of queue)\n * @returns true if item was removed\n */\n removeAt(index: number): boolean {\n if (index < 0 || index >= this.count) {\n return false;\n }\n\n // Shift elements to fill the gap\n for (let i = index; i < this.count - 1; i++) {\n const fromIdx = (this.head + i + 1) % this.capacity;\n const toIdx = (this.head + i) % this.capacity;\n this.buffer[toIdx] = this.buffer[fromIdx];\n }\n\n // Clear the last element\n const lastIdx = (this.head + this.count - 1) % this.capacity;\n this.buffer[lastIdx] = undefined;\n\n // Update tail\n this.tail = (this.tail - 1 + this.capacity) % this.capacity;\n this.count--;\n return true;\n }\n\n /**\n * Remove an item from the buffer.\n * Note: This is O(n) but should be rare (used for abort handling).\n *\n * @param item - The item to remove\n * @returns true if item was found and removed\n */\n remove(item: T): boolean {\n const index = this.indexOf(item);\n if (index === -1) {\n return false;\n }\n return this.removeAt(index);\n }\n\n /**\n * Get all items and clear the buffer.\n *\n * @returns Array of all items in queue order\n */\n drain(): T[] {\n const items: T[] = [];\n for (let i = 0; i < this.count; i++) {\n const idx = (this.head + i) % this.capacity;\n const item = this.buffer[idx];\n if (item !== undefined) {\n items.push(item);\n }\n this.buffer[idx] = undefined; // Help GC\n }\n this.head = 0;\n this.tail = 0;\n this.count = 0;\n return items;\n }\n\n /**\n * Clear all elements from the buffer.\n */\n clear(): void {\n for (let i = 0; i < this.count; i++) {\n const idx = (this.head + i) % this.capacity;\n this.buffer[idx] = undefined;\n }\n this.head = 0;\n this.tail = 0;\n this.count = 0;\n }\n}\n","import { RingBuffer } from './ring-buffer.js';\n\n/**\n * Waiter in the semaphore queue.\n */\ninterface Waiter {\n resolve: () => void;\n reject: (error: Error) => void;\n signal?: AbortSignal;\n onAbort?: () => void;\n}\n\n/**\n * A Promise-based semaphore for limiting concurrent operations.\n * Uses an O(1) ring buffer for the waiting queue.\n */\nexport class Semaphore {\n private permits: number;\n private readonly maxPermits: number;\n private readonly waitingQueue: RingBuffer<Waiter>;\n\n /**\n * Create a new semaphore.\n *\n * @param maxPermits - Maximum number of concurrent permits\n * @param maxQueue - Maximum queue size (defaults to 1000)\n */\n constructor(maxPermits: number, maxQueue = 1000) {\n this.maxPermits = maxPermits;\n this.permits = maxPermits;\n this.waitingQueue = new RingBuffer<Waiter>(maxQueue);\n }\n\n /**\n * Try to acquire a permit without waiting.\n *\n * @returns true if a permit was acquired, false otherwise\n */\n tryAcquire(): boolean {\n if (this.permits > 0) {\n this.permits--;\n return true;\n }\n return false;\n }\n\n /**\n * Acquire a permit, waiting if necessary.\n *\n * @param signal - Optional AbortSignal for cancellation\n * @returns Promise that resolves when permit is acquired\n * @throws {DOMException} When cancelled via signal (AbortError)\n */\n acquire(signal?: AbortSignal): Promise<void> {\n // Check if cancelled\n if (signal?.aborted) {\n return Promise.reject(\n signal.reason ?? new DOMException('Aborted', 'AbortError')\n );\n }\n\n // Try to acquire immediately\n if (this.permits > 0) {\n this.permits--;\n return Promise.resolve();\n }\n\n // Add to wait queue\n return new Promise<void>((resolve, reject) => {\n const waiter: Waiter = { resolve, reject };\n\n // Set up abort handler\n if (signal) {\n const onAbort = () => {\n if (this.waitingQueue.remove(waiter)) {\n reject(signal.reason ?? new DOMException('Aborted', 'AbortError'));\n }\n };\n\n waiter.signal = signal;\n waiter.onAbort = onAbort;\n signal.addEventListener('abort', onAbort, { once: true });\n }\n\n this.waitingQueue.push(waiter);\n });\n }\n\n /**\n * Release a permit back to the semaphore.\n */\n release(): void {\n if (!this.waitingQueue.isEmpty()) {\n // Give permit to next waiter (O(1) with ring buffer)\n const waiter = this.waitingQueue.shift();\n if (waiter) {\n // Clean up abort listener to prevent memory leak\n if (waiter.signal && waiter.onAbort) {\n waiter.signal.removeEventListener('abort', waiter.onAbort);\n }\n waiter.resolve();\n }\n } else if (this.permits < this.maxPermits) {\n // Return permit to pool\n this.permits++;\n }\n }\n\n /**\n * Get the number of available permits.\n */\n availablePermits(): number {\n return this.permits;\n }\n\n /**\n * Get the number of waiters in the queue.\n */\n queueLength(): number {\n return this.waitingQueue.length;\n }\n\n /**\n * Reject all waiters with the given error.\n */\n rejectAll(error: Error): void {\n const waiters = this.waitingQueue.drain();\n for (const waiter of waiters) {\n // Clean up abort listener to prevent memory leak\n if (waiter.signal && waiter.onAbort) {\n waiter.signal.removeEventListener('abort', waiter.onAbort);\n }\n waiter.reject(error);\n }\n }\n}\n"],"mappings":";;;;;;;;;;;;;;;;;;;;;;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;;;ACAA,kBAOO;;;ACPP,iBAAkB;AAMX,IAAM,uBAAuB,aAAE,OAAO;AAAA;AAAA,EAE3C,eAAe,aAAE,OAAO,EAAE,IAAI,EAAE,SAAS,EAAE,QAAQ,EAAE;AAAA;AAAA,EAErD,UAAU,aAAE,OAAO,EAAE,IAAI,EAAE,YAAY,EAAE,QAAQ,CAAC;AAAA;AAAA,EAElD,cAAc,aAAE,OAAO,EAAE,IAAI,EAAE,YAAY,EAAE,QAAQ,CAAC;AACxD,CAAC;AAoCM,SAAS,oBAAoB,QAAkD;AACpF,QAAM,SAAS,qBAAqB,MAAM,UAAU,CAAC,CAAC;AACtD,SAAO;AAAA,IACL,GAAG;AAAA,IACH,YAAY,QAAQ;AAAA,IACpB,QAAQ,QAAQ;AAAA,EAClB;AACF;;;AChDO,IAAM,aAAN,MAAoB;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAWzB,YAA6B,UAAkB;AAAlB;AAV7B,wBAAiB;AACjB,wBAAQ,QAAO;AACf,wBAAQ,QAAO;AACf,wBAAQ,SAAQ;AAQd,SAAK,SAAS,IAAI,MAAM,QAAQ;AAAA,EAClC;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAQA,KAAK,MAAkB;AACrB,QAAI,KAAK,SAAS,KAAK,UAAU;AAC/B,aAAO;AAAA,IACT;AAEA,SAAK,OAAO,KAAK,IAAI,IAAI;AACzB,SAAK,QAAQ,KAAK,OAAO,KAAK,KAAK;AACnC,SAAK;AACL,WAAO;AAAA,EACT;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAOA,QAAuB;AACrB,QAAI,KAAK,UAAU,GAAG;AACpB,aAAO;AAAA,IACT;AAEA,UAAM,OAAO,KAAK,OAAO,KAAK,IAAI;AAClC,SAAK,OAAO,KAAK,IAAI,IAAI;AACzB,SAAK,QAAQ,KAAK,OAAO,KAAK,KAAK;AACnC,SAAK;AACL,WAAO;AAAA,EACT;AAAA;AAAA;AAAA;AAAA,EAKA,IAAI,SAAiB;AACnB,WAAO,KAAK;AAAA,EACd;AAAA;AAAA;AAAA;AAAA,EAKA,UAAmB;AACjB,WAAO,KAAK,UAAU;AAAA,EACxB;AAAA;AAAA;AAAA;AAAA,EAKA,SAAkB;AAChB,WAAO,KAAK,SAAS,KAAK;AAAA,EAC5B;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EASA,QAAQ,MAAiB;AACvB,aAAS,IAAI,GAAG,IAAI,KAAK,OAAO,KAAK;AACnC,YAAM,OAAO,KAAK,OAAO,KAAK,KAAK;AACnC,UAAI,KAAK,OAAO,GAAG,MAAM,MAAM;AAC7B,eAAO;AAAA,MACT;AAAA,IACF;AACA,WAAO;AAAA,EACT;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EASA,SAAS,OAAwB;AAC/B,QAAI,QAAQ,KAAK,SAAS,KAAK,OAAO;AACpC,aAAO;AAAA,IACT;AAGA,aAAS,IAAI,OAAO,IAAI,KAAK,QAAQ,GAAG,KAAK;AAC3C,YAAM,WAAW,KAAK,OAAO,IAAI,KAAK,KAAK;AAC3C,YAAM,SAAS,KAAK,OAAO,KAAK,KAAK;AACrC,WAAK,OAAO,KAAK,IAAI,KAAK,OAAO,OAAO;AAAA,IAC1C;AAGA,UAAM,WAAW,KAAK,OAAO,KAAK,QAAQ,KAAK,KAAK;AACpD,SAAK,OAAO,OAAO,IAAI;AAGvB,SAAK,QAAQ,KAAK,OAAO,IAAI,KAAK,YAAY,KAAK;AACnD,SAAK;AACL,WAAO;AAAA,EACT;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EASA,OAAO,MAAkB;AACvB,UAAM,QAAQ,KAAK,QAAQ,IAAI;AAC/B,QAAI,UAAU,IAAI;AAChB,aAAO;AAAA,IACT;AACA,WAAO,KAAK,SAAS,KAAK;AAAA,EAC5B;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAOA,QAAa;AACX,UAAM,QAAa,CAAC;AACpB,aAAS,IAAI,GAAG,IAAI,KAAK,OAAO,KAAK;AACnC,YAAM,OAAO,KAAK,OAAO,KAAK,KAAK;AACnC,YAAM,OAAO,KAAK,OAAO,GAAG;AAC5B,UAAI,SAAS,QAAW;AACtB,cAAM,KAAK,IAAI;AAAA,MACjB;AACA,WAAK,OAAO,GAAG,IAAI;AAAA,IACrB;AACA,SAAK,OAAO;AACZ,SAAK,OAAO;AACZ,SAAK,QAAQ;AACb,WAAO;AAAA,EACT;AAAA;AAAA;AAAA;AAAA,EAKA,QAAc;AACZ,aAAS,IAAI,GAAG,IAAI,KAAK,OAAO,KAAK;AACnC,YAAM,OAAO,KAAK,OAAO,KAAK,KAAK;AACnC,WAAK,OAAO,GAAG,IAAI;AAAA,IACrB;AACA,SAAK,OAAO;AACZ,SAAK,OAAO;AACZ,SAAK,QAAQ;AAAA,EACf;AACF;;;AC5JO,IAAM,YAAN,MAAgB;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAWrB,YAAY,YAAoB,WAAW,KAAM;AAVjD,wBAAQ;AACR,wBAAiB;AACjB,wBAAiB;AASf,SAAK,aAAa;AAClB,SAAK,UAAU;AACf,SAAK,eAAe,IAAI,WAAmB,QAAQ;AAAA,EACrD;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAOA,aAAsB;AACpB,QAAI,KAAK,UAAU,GAAG;AACpB,WAAK;AACL,aAAO;AAAA,IACT;AACA,WAAO;AAAA,EACT;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EASA,QAAQ,QAAqC;AAE3C,QAAI,QAAQ,SAAS;AACnB,aAAO,QAAQ;AAAA,QACb,OAAO,UAAU,IAAI,aAAa,WAAW,YAAY;AAAA,MAC3D;AAAA,IACF;AAGA,QAAI,KAAK,UAAU,GAAG;AACpB,WAAK;AACL,aAAO,QAAQ,QAAQ;AAAA,IACzB;AAGA,WAAO,IAAI,QAAc,CAAC,SAAS,WAAW;AAC5C,YAAM,SAAiB,EAAE,SAAS,OAAO;AAGzC,UAAI,QAAQ;AACV,cAAM,UAAU,MAAM;AACpB,cAAI,KAAK,aAAa,OAAO,MAAM,GAAG;AACpC,mBAAO,OAAO,UAAU,IAAI,aAAa,WAAW,YAAY,CAAC;AAAA,UACnE;AAAA,QACF;AAEA,eAAO,SAAS;AAChB,eAAO,UAAU;AACjB,eAAO,iBAAiB,SAAS,SAAS,EAAE,MAAM,KAAK,CAAC;AAAA,MAC1D;AAEA,WAAK,aAAa,KAAK,MAAM;AAAA,IAC/B,CAAC;AAAA,EACH;AAAA;AAAA;AAAA;AAAA,EAKA,UAAgB;AACd,QAAI,CAAC,KAAK,aAAa,QAAQ,GAAG;AAEhC,YAAM,SAAS,KAAK,aAAa,MAAM;AACvC,UAAI,QAAQ;AAEV,YAAI,OAAO,UAAU,OAAO,SAAS;AACnC,iBAAO,OAAO,oBAAoB,SAAS,OAAO,OAAO;AAAA,QAC3D;AACA,eAAO,QAAQ;AAAA,MACjB;AAAA,IACF,WAAW,KAAK,UAAU,KAAK,YAAY;AAEzC,WAAK;AAAA,IACP;AAAA,EACF;AAAA;AAAA;AAAA;AAAA,EAKA,mBAA2B;AACzB,WAAO,KAAK;AAAA,EACd;AAAA;AAAA;AAAA;AAAA,EAKA,cAAsB;AACpB,WAAO,KAAK,aAAa;AAAA,EAC3B;AAAA;AAAA;AAAA;AAAA,EAKA,UAAU,OAAoB;AAC5B,UAAM,UAAU,KAAK,aAAa,MAAM;AACxC,eAAW,UAAU,SAAS;AAE5B,UAAI,OAAO,UAAU,OAAO,SAAS;AACnC,eAAO,OAAO,oBAAoB,SAAS,OAAO,OAAO;AAAA,MAC3D;AACA,aAAO,OAAO,KAAK;AAAA,IACrB;AAAA,EACF;AACF;;;AHlGO,IAAM,WAAN,MAAoD;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAYzD,YAAY,QAAkC;AAX9C,wBAAiB;AACjB,wBAAiB;AACjB,wBAAiB;AACjB,wBAAiB;AACjB,wBAAQ,UAAS;AAQf,SAAK,SAAS,oBAAoB,MAAM;AACxC,SAAK,SAAS,KAAK,OAAO,UAAU;AAEpC,SAAK,YAAY,IAAI;AAAA,MACnB,KAAK,OAAO;AAAA,MACZ,KAAK,IAAI,KAAK,OAAO,UAAU,CAAC;AAAA;AAAA,IAClC;AAGA,QAAI,KAAK,OAAO,WAAW,GAAG;AAE5B,WAAK,iBAAiB,IAAI,UAAU,KAAK,OAAO,UAAU,CAAC;AAAA,IAC7D;AAAA,EACF;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAWA,MAAM,QAAQ,WAAyB,QAAkC;AAEvE,QAAI,KAAK,QAAQ;AACf,YAAM,KAAK,gBAAgB,oBAAoB;AAAA,IACjD;AAGA,QAAI,QAAQ,SAAS;AACnB,YAAM,OAAO,UAAU,IAAI,aAAa,WAAW,YAAY;AAAA,IACjE;AAGA,QAAI,KAAK,UAAU,WAAW,GAAG;AAC/B,aAAO,KAAK,kBAAkB,WAAW,MAAM;AAAA,IACjD;AAGA,WAAO,KAAK,QAAQ,WAAW,MAAM;AAAA,EACvC;AAAA;AAAA;AAAA;AAAA,EAKA,cAAsB;AACpB,WAAO,KAAK,OAAO,gBAAgB,KAAK,UAAU,iBAAiB;AAAA,EACrE;AAAA;AAAA;AAAA;AAAA,EAKA,cAAsB;AACpB,WAAO,KAAK,UAAU,YAAY;AAAA,EACpC;AAAA;AAAA;AAAA;AAAA,EAKA,QAAc;AACZ,QAAI,KAAK,OAAQ;AAEjB,SAAK,SAAS;AACd,UAAM,aAAa,KAAK,gBAAgB,iBAAiB;AACzD,SAAK,UAAU,UAAU,UAAU;AACnC,SAAK,gBAAgB,UAAU,UAAU;AACzC,SAAK,OAAO,KAAK,iBAAiB;AAAA,EACpC;AAAA;AAAA;AAAA;AAAA,EAKA,QAAc;AACZ,SAAK,SAAS;AACd,SAAK,OAAO,KAAK,gBAAgB;AAAA,EACnC;AAAA;AAAA;AAAA;AAAA,EAKA,MAAc,kBACZ,WACA,QACY;AACZ,QAAI;AACF,aAAO,MAAM,UAAU,UAAU,IAAI,gBAAgB,EAAE,MAAM;AAAA,IAC/D,UAAE;AACA,WAAK,UAAU,QAAQ;AAAA,IACzB;AAAA,EACF;AAAA;AAAA;AAAA;AAAA,EAKA,MAAc,QACZ,WACA,QACY;AAEZ,QAAI,KAAK,OAAO,aAAa,GAAG;AAC9B,WAAK,WAAW;AAChB,YAAM,KAAK,gBAAgB,wCAAwC;AAAA,IACrE;AAGA,QAAI,CAAC,KAAK,gBAAgB,WAAW,GAAG;AAEtC,WAAK,WAAW;AAChB,YAAM,KAAK,gBAAgB,wBAAwB;AAAA,IACrD;AAGA,QAAI;AAEJ,QAAI;AAEF,UAAI;AACJ,UAAI,iBAAiB;AAErB,UAAI,KAAK,OAAO,eAAe,GAAG;AAChC,4BAAoB,IAAI,gBAAgB;AAGxC,YAAI,QAAQ;AACV,2BAAiB,YAAY,IAAI,CAAC,QAAQ,kBAAkB,MAAM,CAAC;AAAA,QACrE,OAAO;AACL,2BAAiB,kBAAkB;AAAA,QACrC;AAGA,oBAAY,WAAW,MAAM;AAC3B,6BAAmB,MAAM,IAAI,aAAa,iBAAiB,cAAc,CAAC;AAAA,QAC5E,GAAG,KAAK,OAAO,YAAY;AAAA,MAC7B;AAGA,UAAI;AACF,cAAM,KAAK,UAAU,QAAQ,cAAc;AAAA,MAC7C,SAAS,OAAO;AAEd,YACE,iBAAiB,gBACjB,MAAM,SAAS,gBACf;AACA,eAAK,WAAW;AAAA,QAClB;AACA,cAAM;AAAA,MACR;AAGA,aAAO,MAAM,KAAK,kBAAkB,WAAW,MAAM;AAAA,IACvD,UAAE;AAEA,UAAI,WAAW;AACb,qBAAa,SAAS;AAAA,MACxB;AACA,WAAK,gBAAgB,QAAQ;AAAA,IAC/B;AAAA,EACF;AAAA;AAAA;AAAA;AAAA,EAKQ,aAAmB;AACzB,SAAK,OAAO,KAAK,sBAAsB;AAAA,MACrC,eAAe,KAAK,OAAO;AAAA,MAC3B,UAAU,KAAK,OAAO;AAAA,IACxB,CAAC;AAED,QAAI,KAAK,OAAO,YAAY;AAC1B,UAAI;AACF,aAAK,OAAO,WAAW;AAAA,MACzB,SAAS,OAAO;AACd,aAAK,OAAO,MAAM,sCAAsC;AAAA,UACtD,OAAO,iBAAiB,QAAQ,MAAM,UAAU,OAAO,KAAK;AAAA,QAC9D,CAAC;AAAA,MACH;AAAA,IACF;AAAA,EACF;AAAA;AAAA;AAAA;AAAA,EAKQ,gBAAgB,SAAqC;AAC3D,WAAO,IAAI;AAAA,MACT,WAAW;AAAA,MACX,KAAK,YAAY;AAAA,MACjB,KAAK,YAAY;AAAA,IACnB;AAAA,EACF;AACF;","names":[]}

@@ -116,2 +116,6 @@ import { FortifyLogger, Pattern, Resettable, Operation } from '@fortify-ts/core';

private onRejected;
/**
* Create a BulkheadFullError with current state context.
*/
private createFullError;
}

@@ -121,2 +125,3 @@

* A Promise-based semaphore for limiting concurrent operations.
* Uses an O(1) ring buffer for the waiting queue.
*/

@@ -131,4 +136,5 @@ declare class Semaphore {

* @param maxPermits - Maximum number of concurrent permits
* @param maxQueue - Maximum queue size (defaults to 1000)
*/
constructor(maxPermits: number);
constructor(maxPermits: number, maxQueue?: number);
/**

@@ -135,0 +141,0 @@ * Try to acquire a permit without waiting.

@@ -116,2 +116,6 @@ import { FortifyLogger, Pattern, Resettable, Operation } from '@fortify-ts/core';

private onRejected;
/**
* Create a BulkheadFullError with current state context.
*/
private createFullError;
}

@@ -121,2 +125,3 @@

* A Promise-based semaphore for limiting concurrent operations.
* Uses an O(1) ring buffer for the waiting queue.
*/

@@ -131,4 +136,5 @@ declare class Semaphore {

* @param maxPermits - Maximum number of concurrent permits
* @param maxQueue - Maximum queue size (defaults to 1000)
*/
constructor(maxPermits: number);
constructor(maxPermits: number, maxQueue?: number);
/**

@@ -135,0 +141,0 @@ * Try to acquire a permit without waiting.

@@ -8,4 +8,3 @@ var __defProp = Object.defineProperty;

BulkheadFullError,
noopLogger,
sleep
noopLogger
} from "@fortify-ts/core";

@@ -32,2 +31,151 @@

// src/ring-buffer.ts
var RingBuffer = class {
/**
* Create a new ring buffer.
*
* @param capacity - Maximum number of elements the buffer can hold
*/
constructor(capacity) {
this.capacity = capacity;
__publicField(this, "buffer");
__publicField(this, "head", 0);
__publicField(this, "tail", 0);
__publicField(this, "count", 0);
this.buffer = new Array(capacity);
}
/**
* Add an element to the end of the buffer.
*
* @param item - The item to add
* @returns true if added, false if buffer is full
*/
push(item) {
if (this.count >= this.capacity) {
return false;
}
this.buffer[this.tail] = item;
this.tail = (this.tail + 1) % this.capacity;
this.count++;
return true;
}
/**
* Remove and return the element at the front of the buffer.
*
* @returns The item at the front, or undefined if empty
*/
shift() {
if (this.count === 0) {
return void 0;
}
const item = this.buffer[this.head];
this.buffer[this.head] = void 0;
this.head = (this.head + 1) % this.capacity;
this.count--;
return item;
}
/**
* Get the number of elements in the buffer.
*/
get length() {
return this.count;
}
/**
* Check if the buffer is empty.
*/
isEmpty() {
return this.count === 0;
}
/**
* Check if the buffer is full.
*/
isFull() {
return this.count >= this.capacity;
}
/**
* Find the index of an item in the buffer.
* Note: This is O(n) but should be rare (used for abort handling).
*
* @param item - The item to find
* @returns The index of the item, or -1 if not found
*/
indexOf(item) {
for (let i = 0; i < this.count; i++) {
const idx = (this.head + i) % this.capacity;
if (this.buffer[idx] === item) {
return i;
}
}
return -1;
}
/**
* Remove an item at the given logical index.
* Note: This is O(n) due to shifting elements.
*
* @param index - The logical index (0 = front of queue)
* @returns true if item was removed
*/
removeAt(index) {
if (index < 0 || index >= this.count) {
return false;
}
for (let i = index; i < this.count - 1; i++) {
const fromIdx = (this.head + i + 1) % this.capacity;
const toIdx = (this.head + i) % this.capacity;
this.buffer[toIdx] = this.buffer[fromIdx];
}
const lastIdx = (this.head + this.count - 1) % this.capacity;
this.buffer[lastIdx] = void 0;
this.tail = (this.tail - 1 + this.capacity) % this.capacity;
this.count--;
return true;
}
/**
* Remove an item from the buffer.
* Note: This is O(n) but should be rare (used for abort handling).
*
* @param item - The item to remove
* @returns true if item was found and removed
*/
remove(item) {
const index = this.indexOf(item);
if (index === -1) {
return false;
}
return this.removeAt(index);
}
/**
* Get all items and clear the buffer.
*
* @returns Array of all items in queue order
*/
drain() {
const items = [];
for (let i = 0; i < this.count; i++) {
const idx = (this.head + i) % this.capacity;
const item = this.buffer[idx];
if (item !== void 0) {
items.push(item);
}
this.buffer[idx] = void 0;
}
this.head = 0;
this.tail = 0;
this.count = 0;
return items;
}
/**
* Clear all elements from the buffer.
*/
clear() {
for (let i = 0; i < this.count; i++) {
const idx = (this.head + i) % this.capacity;
this.buffer[idx] = void 0;
}
this.head = 0;
this.tail = 0;
this.count = 0;
}
};
// src/semaphore.ts

@@ -39,9 +187,11 @@ var Semaphore = class {

* @param maxPermits - Maximum number of concurrent permits
* @param maxQueue - Maximum queue size (defaults to 1000)
*/
constructor(maxPermits) {
constructor(maxPermits, maxQueue = 1e3) {
__publicField(this, "permits");
__publicField(this, "maxPermits");
__publicField(this, "waitingQueue", []);
__publicField(this, "waitingQueue");
this.maxPermits = maxPermits;
this.permits = maxPermits;
this.waitingQueue = new RingBuffer(maxQueue);
}

@@ -79,13 +229,13 @@ /**

const waiter = { resolve, reject };
this.waitingQueue.push(waiter);
if (signal) {
const onAbort = () => {
const index = this.waitingQueue.indexOf(waiter);
if (index !== -1) {
this.waitingQueue.splice(index, 1);
if (this.waitingQueue.remove(waiter)) {
reject(signal.reason ?? new DOMException("Aborted", "AbortError"));
}
};
waiter.signal = signal;
waiter.onAbort = onAbort;
signal.addEventListener("abort", onAbort, { once: true });
}
this.waitingQueue.push(waiter);
});

@@ -97,5 +247,10 @@ }

release() {
if (this.waitingQueue.length > 0) {
if (!this.waitingQueue.isEmpty()) {
const waiter = this.waitingQueue.shift();
waiter?.resolve();
if (waiter) {
if (waiter.signal && waiter.onAbort) {
waiter.signal.removeEventListener("abort", waiter.onAbort);
}
waiter.resolve();
}
} else if (this.permits < this.maxPermits) {

@@ -121,4 +276,7 @@ this.permits++;

rejectAll(error) {
const waiters = this.waitingQueue.splice(0, this.waitingQueue.length);
const waiters = this.waitingQueue.drain();
for (const waiter of waiters) {
if (waiter.signal && waiter.onAbort) {
waiter.signal.removeEventListener("abort", waiter.onAbort);
}
waiter.reject(error);

@@ -144,5 +302,9 @@ }

this.logger = this.config.logger ?? noopLogger;
this.semaphore = new Semaphore(this.config.maxConcurrent);
this.semaphore = new Semaphore(
this.config.maxConcurrent,
Math.max(this.config.maxQueue, 1)
// At least 1 for edge cases
);
if (this.config.maxQueue > 0) {
this.queueSemaphore = new Semaphore(this.config.maxQueue);
this.queueSemaphore = new Semaphore(this.config.maxQueue, 1);
}

@@ -161,3 +323,3 @@ }

if (this.closed) {
throw new BulkheadFullError();
throw this.createFullError("Bulkhead is closed");
}

@@ -190,4 +352,5 @@ if (signal?.aborted) {

this.closed = true;
this.semaphore.rejectAll(new BulkheadFullError());
this.queueSemaphore?.rejectAll(new BulkheadFullError());
const closeError = this.createFullError("Bulkhead closed");
this.semaphore.rejectAll(closeError);
this.queueSemaphore?.rejectAll(closeError);
this.logger.info("Bulkhead closed");

@@ -218,8 +381,9 @@ }

this.onRejected();
throw new BulkheadFullError();
throw this.createFullError("Bulkhead is full - no queue configured");
}
if (!this.queueSemaphore?.tryAcquire()) {
this.onRejected();
throw new BulkheadFullError();
throw this.createFullError("Bulkhead queue is full");
}
let timeoutId;
try {

@@ -235,5 +399,5 @@ let timeoutController;

}
sleep(this.config.queueTimeout).then(() => {
timeoutId = setTimeout(() => {
timeoutController?.abort(new DOMException("Queue timeout", "TimeoutError"));
});
}, this.config.queueTimeout);
}

@@ -250,2 +414,5 @@ try {

} finally {
if (timeoutId) {
clearTimeout(timeoutId);
}
this.queueSemaphore?.release();

@@ -272,2 +439,12 @@ }

}
/**
* Create a BulkheadFullError with current state context.
*/
createFullError(message) {
return new BulkheadFullError(
message ?? "Bulkhead is full",
this.activeCount(),
this.queuedCount()
);
}
};

@@ -274,0 +451,0 @@ export {

@@ -1,1 +0,1 @@

{"version":3,"sources":["../src/bulkhead.ts","../src/config.ts","../src/semaphore.ts"],"sourcesContent":["import {\n type Operation,\n type Pattern,\n type Resettable,\n BulkheadFullError,\n type FortifyLogger,\n noopLogger,\n sleep,\n} from '@fortify-ts/core';\nimport {\n type BulkheadConfig,\n type BulkheadConfigInputFull,\n parseBulkheadConfig,\n} from './config.js';\nimport { Semaphore } from './semaphore.js';\n\n/**\n * Bulkhead pattern implementation for limiting concurrent operations.\n *\n * Prevents resource exhaustion by limiting the number of concurrent executions,\n * with optional queueing for overflow requests.\n *\n * @template T - The return type of operations\n *\n * @example\n * ```typescript\n * const bulkhead = new Bulkhead<Response>({\n * maxConcurrent: 5,\n * maxQueue: 10,\n * queueTimeout: 5000,\n * onRejected: () => console.log('Request rejected'),\n * });\n *\n * const result = await bulkhead.execute(async (signal) => {\n * return fetch('/api/data', { signal });\n * });\n * ```\n */\nexport class Bulkhead<T> implements Pattern<T>, Resettable {\n private readonly config: BulkheadConfig;\n private readonly logger: FortifyLogger;\n private readonly semaphore: Semaphore;\n private readonly queueSemaphore: Semaphore | undefined;\n private closed = false;\n\n /**\n * Create a new Bulkhead instance.\n *\n * @param config - Bulkhead configuration\n */\n constructor(config?: BulkheadConfigInputFull) {\n this.config = parseBulkheadConfig(config);\n this.logger = this.config.logger ?? noopLogger;\n this.semaphore = new Semaphore(this.config.maxConcurrent);\n\n // Only create queue semaphore if maxQueue > 0\n if (this.config.maxQueue > 0) {\n this.queueSemaphore = new Semaphore(this.config.maxQueue);\n }\n }\n\n /**\n * Execute an operation within the bulkhead's concurrency limits.\n *\n * @param operation - The async operation to execute\n * @param signal - Optional AbortSignal for cancellation\n * @returns Promise resolving to the operation result\n * @throws {BulkheadFullError} When bulkhead is at capacity and queue is full\n * @throws {DOMException} When cancelled via signal (AbortError)\n */\n async execute(operation: Operation<T>, signal?: AbortSignal): Promise<T> {\n // Check if closed\n if (this.closed) {\n throw new BulkheadFullError();\n }\n\n // Check if cancelled\n if (signal?.aborted) {\n throw signal.reason ?? new DOMException('Aborted', 'AbortError');\n }\n\n // Try to acquire semaphore immediately\n if (this.semaphore.tryAcquire()) {\n return this.executeWithPermit(operation, signal);\n }\n\n // Bulkhead full, try to queue\n return this.enqueue(operation, signal);\n }\n\n /**\n * Get the number of currently active executions.\n */\n activeCount(): number {\n return this.config.maxConcurrent - this.semaphore.availablePermits();\n }\n\n /**\n * Get the number of requests currently waiting in the queue.\n */\n queuedCount(): number {\n return this.semaphore.queueLength();\n }\n\n /**\n * Close the bulkhead, rejecting all pending requests.\n */\n close(): void {\n if (this.closed) return;\n\n this.closed = true;\n this.semaphore.rejectAll(new BulkheadFullError());\n this.queueSemaphore?.rejectAll(new BulkheadFullError());\n this.logger.info('Bulkhead closed');\n }\n\n /**\n * Reset the bulkhead to accept new requests.\n */\n reset(): void {\n this.closed = false;\n this.logger.info('Bulkhead reset');\n }\n\n /**\n * Execute operation with semaphore permit held.\n */\n private async executeWithPermit(\n operation: Operation<T>,\n signal?: AbortSignal\n ): Promise<T> {\n try {\n return await operation(signal ?? new AbortController().signal);\n } finally {\n this.semaphore.release();\n }\n }\n\n /**\n * Attempt to queue the request when bulkhead is full.\n */\n private async enqueue(\n operation: Operation<T>,\n signal?: AbortSignal\n ): Promise<T> {\n // If no queue configured, reject immediately\n if (this.config.maxQueue === 0) {\n this.onRejected();\n throw new BulkheadFullError();\n }\n\n // Try to acquire queue slot\n if (!this.queueSemaphore?.tryAcquire()) {\n // Queue is full, reject\n this.onRejected();\n throw new BulkheadFullError();\n }\n\n try {\n // Create combined signal for queue timeout\n let timeoutController: AbortController | undefined;\n let combinedSignal = signal;\n\n if (this.config.queueTimeout > 0) {\n timeoutController = new AbortController();\n\n // Create combined signal\n if (signal) {\n combinedSignal = AbortSignal.any([signal, timeoutController.signal]);\n } else {\n combinedSignal = timeoutController.signal;\n }\n\n // Start timeout\n sleep(this.config.queueTimeout).then(() => {\n timeoutController?.abort(new DOMException('Queue timeout', 'TimeoutError'));\n });\n }\n\n // Wait for execution semaphore\n try {\n await this.semaphore.acquire(combinedSignal);\n } catch (error) {\n // If aborted due to queue timeout, call onRejected\n if (\n error instanceof DOMException &&\n error.name === 'TimeoutError'\n ) {\n this.onRejected();\n }\n throw error;\n }\n\n // Got permit, execute\n return await this.executeWithPermit(operation, signal);\n } finally {\n this.queueSemaphore?.release();\n }\n }\n\n /**\n * Handle rejection event.\n */\n private onRejected(): void {\n this.logger.warn('Bulkhead rejection', {\n maxConcurrent: this.config.maxConcurrent,\n maxQueue: this.config.maxQueue,\n });\n\n if (this.config.onRejected) {\n try {\n this.config.onRejected();\n } catch (error) {\n this.logger.error('onRejected callback threw an error', {\n error: error instanceof Error ? error.message : String(error),\n });\n }\n }\n }\n}\n","import { z } from 'zod';\nimport { type FortifyLogger } from '@fortify-ts/core';\n\n/**\n * Zod schema for Bulkhead configuration.\n */\nexport const bulkheadConfigSchema = z.object({\n /** Maximum number of concurrent executions allowed (default: 10) */\n maxConcurrent: z.number().int().positive().default(10),\n /** Maximum size of overflow queue, 0 means no queue (default: 0) */\n maxQueue: z.number().int().nonnegative().default(0),\n /** Maximum time a request can wait in queue in milliseconds, 0 means no timeout (default: 0) */\n queueTimeout: z.number().int().nonnegative().default(0),\n});\n\n/**\n * Raw config input type (before defaults are applied).\n */\nexport type BulkheadConfigInput = z.input<typeof bulkheadConfigSchema>;\n\n/**\n * Parsed config type (after defaults are applied).\n */\nexport type BulkheadConfigParsed = z.output<typeof bulkheadConfigSchema>;\n\n/**\n * Full configuration type including callbacks and logger.\n */\nexport interface BulkheadConfig extends BulkheadConfigParsed {\n /** Callback when a request is rejected */\n onRejected: (() => void) | undefined;\n /** Logger instance for structured logging */\n logger: FortifyLogger | undefined;\n}\n\n/**\n * Input config type for constructor.\n */\nexport interface BulkheadConfigInputFull extends BulkheadConfigInput {\n onRejected?: () => void;\n logger?: FortifyLogger;\n}\n\n/**\n * Parse and validate bulkhead configuration.\n *\n * @param config - Raw configuration input\n * @returns Validated configuration with defaults applied\n */\nexport function parseBulkheadConfig(config?: BulkheadConfigInputFull): BulkheadConfig {\n const parsed = bulkheadConfigSchema.parse(config ?? {});\n return {\n ...parsed,\n onRejected: config?.onRejected,\n logger: config?.logger,\n };\n}\n","/**\n * A Promise-based semaphore for limiting concurrent operations.\n */\nexport class Semaphore {\n private permits: number;\n private readonly maxPermits: number;\n private readonly waitingQueue: Array<{\n resolve: () => void;\n reject: (error: Error) => void;\n }> = [];\n\n /**\n * Create a new semaphore.\n *\n * @param maxPermits - Maximum number of concurrent permits\n */\n constructor(maxPermits: number) {\n this.maxPermits = maxPermits;\n this.permits = maxPermits;\n }\n\n /**\n * Try to acquire a permit without waiting.\n *\n * @returns true if a permit was acquired, false otherwise\n */\n tryAcquire(): boolean {\n if (this.permits > 0) {\n this.permits--;\n return true;\n }\n return false;\n }\n\n /**\n * Acquire a permit, waiting if necessary.\n *\n * @param signal - Optional AbortSignal for cancellation\n * @returns Promise that resolves when permit is acquired\n * @throws {DOMException} When cancelled via signal (AbortError)\n */\n acquire(signal?: AbortSignal): Promise<void> {\n // Check if cancelled\n if (signal?.aborted) {\n return Promise.reject(\n signal.reason ?? new DOMException('Aborted', 'AbortError')\n );\n }\n\n // Try to acquire immediately\n if (this.permits > 0) {\n this.permits--;\n return Promise.resolve();\n }\n\n // Add to wait queue\n return new Promise<void>((resolve, reject) => {\n const waiter = { resolve, reject };\n this.waitingQueue.push(waiter);\n\n // Set up abort handler\n if (signal) {\n const onAbort = () => {\n const index = this.waitingQueue.indexOf(waiter);\n if (index !== -1) {\n this.waitingQueue.splice(index, 1);\n reject(signal.reason ?? new DOMException('Aborted', 'AbortError'));\n }\n };\n\n signal.addEventListener('abort', onAbort, { once: true });\n }\n });\n }\n\n /**\n * Release a permit back to the semaphore.\n */\n release(): void {\n if (this.waitingQueue.length > 0) {\n // Give permit to next waiter\n const waiter = this.waitingQueue.shift();\n waiter?.resolve();\n } else if (this.permits < this.maxPermits) {\n // Return permit to pool\n this.permits++;\n }\n }\n\n /**\n * Get the number of available permits.\n */\n availablePermits(): number {\n return this.permits;\n }\n\n /**\n * Get the number of waiters in the queue.\n */\n queueLength(): number {\n return this.waitingQueue.length;\n }\n\n /**\n * Reject all waiters with the given error.\n */\n rejectAll(error: Error): void {\n const waiters = this.waitingQueue.splice(0, this.waitingQueue.length);\n for (const waiter of waiters) {\n waiter.reject(error);\n }\n }\n}\n"],"mappings":";;;;;AAAA;AAAA,EAIE;AAAA,EAEA;AAAA,EACA;AAAA,OACK;;;ACRP,SAAS,SAAS;AAMX,IAAM,uBAAuB,EAAE,OAAO;AAAA;AAAA,EAE3C,eAAe,EAAE,OAAO,EAAE,IAAI,EAAE,SAAS,EAAE,QAAQ,EAAE;AAAA;AAAA,EAErD,UAAU,EAAE,OAAO,EAAE,IAAI,EAAE,YAAY,EAAE,QAAQ,CAAC;AAAA;AAAA,EAElD,cAAc,EAAE,OAAO,EAAE,IAAI,EAAE,YAAY,EAAE,QAAQ,CAAC;AACxD,CAAC;AAoCM,SAAS,oBAAoB,QAAkD;AACpF,QAAM,SAAS,qBAAqB,MAAM,UAAU,CAAC,CAAC;AACtD,SAAO;AAAA,IACL,GAAG;AAAA,IACH,YAAY,QAAQ;AAAA,IACpB,QAAQ,QAAQ;AAAA,EAClB;AACF;;;ACrDO,IAAM,YAAN,MAAgB;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAarB,YAAY,YAAoB;AAZhC,wBAAQ;AACR,wBAAiB;AACjB,wBAAiB,gBAGZ,CAAC;AAQJ,SAAK,aAAa;AAClB,SAAK,UAAU;AAAA,EACjB;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAOA,aAAsB;AACpB,QAAI,KAAK,UAAU,GAAG;AACpB,WAAK;AACL,aAAO;AAAA,IACT;AACA,WAAO;AAAA,EACT;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EASA,QAAQ,QAAqC;AAE3C,QAAI,QAAQ,SAAS;AACnB,aAAO,QAAQ;AAAA,QACb,OAAO,UAAU,IAAI,aAAa,WAAW,YAAY;AAAA,MAC3D;AAAA,IACF;AAGA,QAAI,KAAK,UAAU,GAAG;AACpB,WAAK;AACL,aAAO,QAAQ,QAAQ;AAAA,IACzB;AAGA,WAAO,IAAI,QAAc,CAAC,SAAS,WAAW;AAC5C,YAAM,SAAS,EAAE,SAAS,OAAO;AACjC,WAAK,aAAa,KAAK,MAAM;AAG7B,UAAI,QAAQ;AACV,cAAM,UAAU,MAAM;AACpB,gBAAM,QAAQ,KAAK,aAAa,QAAQ,MAAM;AAC9C,cAAI,UAAU,IAAI;AAChB,iBAAK,aAAa,OAAO,OAAO,CAAC;AACjC,mBAAO,OAAO,UAAU,IAAI,aAAa,WAAW,YAAY,CAAC;AAAA,UACnE;AAAA,QACF;AAEA,eAAO,iBAAiB,SAAS,SAAS,EAAE,MAAM,KAAK,CAAC;AAAA,MAC1D;AAAA,IACF,CAAC;AAAA,EACH;AAAA;AAAA;AAAA;AAAA,EAKA,UAAgB;AACd,QAAI,KAAK,aAAa,SAAS,GAAG;AAEhC,YAAM,SAAS,KAAK,aAAa,MAAM;AACvC,cAAQ,QAAQ;AAAA,IAClB,WAAW,KAAK,UAAU,KAAK,YAAY;AAEzC,WAAK;AAAA,IACP;AAAA,EACF;AAAA;AAAA;AAAA;AAAA,EAKA,mBAA2B;AACzB,WAAO,KAAK;AAAA,EACd;AAAA;AAAA;AAAA;AAAA,EAKA,cAAsB;AACpB,WAAO,KAAK,aAAa;AAAA,EAC3B;AAAA;AAAA;AAAA;AAAA,EAKA,UAAU,OAAoB;AAC5B,UAAM,UAAU,KAAK,aAAa,OAAO,GAAG,KAAK,aAAa,MAAM;AACpE,eAAW,UAAU,SAAS;AAC5B,aAAO,OAAO,KAAK;AAAA,IACrB;AAAA,EACF;AACF;;;AF1EO,IAAM,WAAN,MAAoD;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAYzD,YAAY,QAAkC;AAX9C,wBAAiB;AACjB,wBAAiB;AACjB,wBAAiB;AACjB,wBAAiB;AACjB,wBAAQ,UAAS;AAQf,SAAK,SAAS,oBAAoB,MAAM;AACxC,SAAK,SAAS,KAAK,OAAO,UAAU;AACpC,SAAK,YAAY,IAAI,UAAU,KAAK,OAAO,aAAa;AAGxD,QAAI,KAAK,OAAO,WAAW,GAAG;AAC5B,WAAK,iBAAiB,IAAI,UAAU,KAAK,OAAO,QAAQ;AAAA,IAC1D;AAAA,EACF;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAWA,MAAM,QAAQ,WAAyB,QAAkC;AAEvE,QAAI,KAAK,QAAQ;AACf,YAAM,IAAI,kBAAkB;AAAA,IAC9B;AAGA,QAAI,QAAQ,SAAS;AACnB,YAAM,OAAO,UAAU,IAAI,aAAa,WAAW,YAAY;AAAA,IACjE;AAGA,QAAI,KAAK,UAAU,WAAW,GAAG;AAC/B,aAAO,KAAK,kBAAkB,WAAW,MAAM;AAAA,IACjD;AAGA,WAAO,KAAK,QAAQ,WAAW,MAAM;AAAA,EACvC;AAAA;AAAA;AAAA;AAAA,EAKA,cAAsB;AACpB,WAAO,KAAK,OAAO,gBAAgB,KAAK,UAAU,iBAAiB;AAAA,EACrE;AAAA;AAAA;AAAA;AAAA,EAKA,cAAsB;AACpB,WAAO,KAAK,UAAU,YAAY;AAAA,EACpC;AAAA;AAAA;AAAA;AAAA,EAKA,QAAc;AACZ,QAAI,KAAK,OAAQ;AAEjB,SAAK,SAAS;AACd,SAAK,UAAU,UAAU,IAAI,kBAAkB,CAAC;AAChD,SAAK,gBAAgB,UAAU,IAAI,kBAAkB,CAAC;AACtD,SAAK,OAAO,KAAK,iBAAiB;AAAA,EACpC;AAAA;AAAA;AAAA;AAAA,EAKA,QAAc;AACZ,SAAK,SAAS;AACd,SAAK,OAAO,KAAK,gBAAgB;AAAA,EACnC;AAAA;AAAA;AAAA;AAAA,EAKA,MAAc,kBACZ,WACA,QACY;AACZ,QAAI;AACF,aAAO,MAAM,UAAU,UAAU,IAAI,gBAAgB,EAAE,MAAM;AAAA,IAC/D,UAAE;AACA,WAAK,UAAU,QAAQ;AAAA,IACzB;AAAA,EACF;AAAA;AAAA;AAAA;AAAA,EAKA,MAAc,QACZ,WACA,QACY;AAEZ,QAAI,KAAK,OAAO,aAAa,GAAG;AAC9B,WAAK,WAAW;AAChB,YAAM,IAAI,kBAAkB;AAAA,IAC9B;AAGA,QAAI,CAAC,KAAK,gBAAgB,WAAW,GAAG;AAEtC,WAAK,WAAW;AAChB,YAAM,IAAI,kBAAkB;AAAA,IAC9B;AAEA,QAAI;AAEF,UAAI;AACJ,UAAI,iBAAiB;AAErB,UAAI,KAAK,OAAO,eAAe,GAAG;AAChC,4BAAoB,IAAI,gBAAgB;AAGxC,YAAI,QAAQ;AACV,2BAAiB,YAAY,IAAI,CAAC,QAAQ,kBAAkB,MAAM,CAAC;AAAA,QACrE,OAAO;AACL,2BAAiB,kBAAkB;AAAA,QACrC;AAGA,cAAM,KAAK,OAAO,YAAY,EAAE,KAAK,MAAM;AACzC,6BAAmB,MAAM,IAAI,aAAa,iBAAiB,cAAc,CAAC;AAAA,QAC5E,CAAC;AAAA,MACH;AAGA,UAAI;AACF,cAAM,KAAK,UAAU,QAAQ,cAAc;AAAA,MAC7C,SAAS,OAAO;AAEd,YACE,iBAAiB,gBACjB,MAAM,SAAS,gBACf;AACA,eAAK,WAAW;AAAA,QAClB;AACA,cAAM;AAAA,MACR;AAGA,aAAO,MAAM,KAAK,kBAAkB,WAAW,MAAM;AAAA,IACvD,UAAE;AACA,WAAK,gBAAgB,QAAQ;AAAA,IAC/B;AAAA,EACF;AAAA;AAAA;AAAA;AAAA,EAKQ,aAAmB;AACzB,SAAK,OAAO,KAAK,sBAAsB;AAAA,MACrC,eAAe,KAAK,OAAO;AAAA,MAC3B,UAAU,KAAK,OAAO;AAAA,IACxB,CAAC;AAED,QAAI,KAAK,OAAO,YAAY;AAC1B,UAAI;AACF,aAAK,OAAO,WAAW;AAAA,MACzB,SAAS,OAAO;AACd,aAAK,OAAO,MAAM,sCAAsC;AAAA,UACtD,OAAO,iBAAiB,QAAQ,MAAM,UAAU,OAAO,KAAK;AAAA,QAC9D,CAAC;AAAA,MACH;AAAA,IACF;AAAA,EACF;AACF;","names":[]}
{"version":3,"sources":["../src/bulkhead.ts","../src/config.ts","../src/ring-buffer.ts","../src/semaphore.ts"],"sourcesContent":["import {\n type Operation,\n type Pattern,\n type Resettable,\n BulkheadFullError,\n type FortifyLogger,\n noopLogger,\n} from '@fortify-ts/core';\nimport {\n type BulkheadConfig,\n type BulkheadConfigInputFull,\n parseBulkheadConfig,\n} from './config.js';\nimport { Semaphore } from './semaphore.js';\n\n/**\n * Bulkhead pattern implementation for limiting concurrent operations.\n *\n * Prevents resource exhaustion by limiting the number of concurrent executions,\n * with optional queueing for overflow requests.\n *\n * @template T - The return type of operations\n *\n * @example\n * ```typescript\n * const bulkhead = new Bulkhead<Response>({\n * maxConcurrent: 5,\n * maxQueue: 10,\n * queueTimeout: 5000,\n * onRejected: () => console.log('Request rejected'),\n * });\n *\n * const result = await bulkhead.execute(async (signal) => {\n * return fetch('/api/data', { signal });\n * });\n * ```\n */\nexport class Bulkhead<T> implements Pattern<T>, Resettable {\n private readonly config: BulkheadConfig;\n private readonly logger: FortifyLogger;\n private readonly semaphore: Semaphore;\n private readonly queueSemaphore: Semaphore | undefined;\n private closed = false;\n\n /**\n * Create a new Bulkhead instance.\n *\n * @param config - Bulkhead configuration\n */\n constructor(config?: BulkheadConfigInputFull) {\n this.config = parseBulkheadConfig(config);\n this.logger = this.config.logger ?? noopLogger;\n // Execution semaphore: queue capacity = maxQueue (bounded by queue semaphore)\n this.semaphore = new Semaphore(\n this.config.maxConcurrent,\n Math.max(this.config.maxQueue, 1) // At least 1 for edge cases\n );\n\n // Only create queue semaphore if maxQueue > 0\n if (this.config.maxQueue > 0) {\n // Queue semaphore: only used for tryAcquire, queue never used\n this.queueSemaphore = new Semaphore(this.config.maxQueue, 1);\n }\n }\n\n /**\n * Execute an operation within the bulkhead's concurrency limits.\n *\n * @param operation - The async operation to execute\n * @param signal - Optional AbortSignal for cancellation\n * @returns Promise resolving to the operation result\n * @throws {BulkheadFullError} When bulkhead is at capacity and queue is full\n * @throws {DOMException} When cancelled via signal (AbortError)\n */\n async execute(operation: Operation<T>, signal?: AbortSignal): Promise<T> {\n // Check if closed\n if (this.closed) {\n throw this.createFullError('Bulkhead is closed');\n }\n\n // Check if cancelled\n if (signal?.aborted) {\n throw signal.reason ?? new DOMException('Aborted', 'AbortError');\n }\n\n // Try to acquire semaphore immediately\n if (this.semaphore.tryAcquire()) {\n return this.executeWithPermit(operation, signal);\n }\n\n // Bulkhead full, try to queue\n return this.enqueue(operation, signal);\n }\n\n /**\n * Get the number of currently active executions.\n */\n activeCount(): number {\n return this.config.maxConcurrent - this.semaphore.availablePermits();\n }\n\n /**\n * Get the number of requests currently waiting in the queue.\n */\n queuedCount(): number {\n return this.semaphore.queueLength();\n }\n\n /**\n * Close the bulkhead, rejecting all pending requests.\n */\n close(): void {\n if (this.closed) return;\n\n this.closed = true;\n const closeError = this.createFullError('Bulkhead closed');\n this.semaphore.rejectAll(closeError);\n this.queueSemaphore?.rejectAll(closeError);\n this.logger.info('Bulkhead closed');\n }\n\n /**\n * Reset the bulkhead to accept new requests.\n */\n reset(): void {\n this.closed = false;\n this.logger.info('Bulkhead reset');\n }\n\n /**\n * Execute operation with semaphore permit held.\n */\n private async executeWithPermit(\n operation: Operation<T>,\n signal?: AbortSignal\n ): Promise<T> {\n try {\n return await operation(signal ?? new AbortController().signal);\n } finally {\n this.semaphore.release();\n }\n }\n\n /**\n * Attempt to queue the request when bulkhead is full.\n */\n private async enqueue(\n operation: Operation<T>,\n signal?: AbortSignal\n ): Promise<T> {\n // If no queue configured, reject immediately\n if (this.config.maxQueue === 0) {\n this.onRejected();\n throw this.createFullError('Bulkhead is full - no queue configured');\n }\n\n // Try to acquire queue slot\n if (!this.queueSemaphore?.tryAcquire()) {\n // Queue is full, reject\n this.onRejected();\n throw this.createFullError('Bulkhead queue is full');\n }\n\n // Track timeout for cleanup in finally block\n let timeoutId: ReturnType<typeof setTimeout> | undefined;\n\n try {\n // Create combined signal for queue timeout\n let timeoutController: AbortController | undefined;\n let combinedSignal = signal;\n\n if (this.config.queueTimeout > 0) {\n timeoutController = new AbortController();\n\n // Create combined signal\n if (signal) {\n combinedSignal = AbortSignal.any([signal, timeoutController.signal]);\n } else {\n combinedSignal = timeoutController.signal;\n }\n\n // Start timeout with proper cleanup\n timeoutId = setTimeout(() => {\n timeoutController?.abort(new DOMException('Queue timeout', 'TimeoutError'));\n }, this.config.queueTimeout);\n }\n\n // Wait for execution semaphore\n try {\n await this.semaphore.acquire(combinedSignal);\n } catch (error) {\n // If aborted due to queue timeout, call onRejected\n if (\n error instanceof DOMException &&\n error.name === 'TimeoutError'\n ) {\n this.onRejected();\n }\n throw error;\n }\n\n // Got permit, execute\n return await this.executeWithPermit(operation, signal);\n } finally {\n // Clear timeout to prevent memory leak\n if (timeoutId) {\n clearTimeout(timeoutId);\n }\n this.queueSemaphore?.release();\n }\n }\n\n /**\n * Handle rejection event.\n */\n private onRejected(): void {\n this.logger.warn('Bulkhead rejection', {\n maxConcurrent: this.config.maxConcurrent,\n maxQueue: this.config.maxQueue,\n });\n\n if (this.config.onRejected) {\n try {\n this.config.onRejected();\n } catch (error) {\n this.logger.error('onRejected callback threw an error', {\n error: error instanceof Error ? error.message : String(error),\n });\n }\n }\n }\n\n /**\n * Create a BulkheadFullError with current state context.\n */\n private createFullError(message?: string): BulkheadFullError {\n return new BulkheadFullError(\n message ?? 'Bulkhead is full',\n this.activeCount(),\n this.queuedCount()\n );\n }\n}\n","import { z } from 'zod';\nimport { type FortifyLogger } from '@fortify-ts/core';\n\n/**\n * Zod schema for Bulkhead configuration.\n */\nexport const bulkheadConfigSchema = z.object({\n /** Maximum number of concurrent executions allowed (default: 10) */\n maxConcurrent: z.number().int().positive().default(10),\n /** Maximum size of overflow queue, 0 means no queue (default: 0) */\n maxQueue: z.number().int().nonnegative().default(0),\n /** Maximum time a request can wait in queue in milliseconds, 0 means no timeout (default: 0) */\n queueTimeout: z.number().int().nonnegative().default(0),\n});\n\n/**\n * Raw config input type (before defaults are applied).\n */\nexport type BulkheadConfigInput = z.input<typeof bulkheadConfigSchema>;\n\n/**\n * Parsed config type (after defaults are applied).\n */\nexport type BulkheadConfigParsed = z.output<typeof bulkheadConfigSchema>;\n\n/**\n * Full configuration type including callbacks and logger.\n */\nexport interface BulkheadConfig extends BulkheadConfigParsed {\n /** Callback when a request is rejected */\n onRejected: (() => void) | undefined;\n /** Logger instance for structured logging */\n logger: FortifyLogger | undefined;\n}\n\n/**\n * Input config type for constructor.\n */\nexport interface BulkheadConfigInputFull extends BulkheadConfigInput {\n onRejected?: () => void;\n logger?: FortifyLogger;\n}\n\n/**\n * Parse and validate bulkhead configuration.\n *\n * @param config - Raw configuration input\n * @returns Validated configuration with defaults applied\n */\nexport function parseBulkheadConfig(config?: BulkheadConfigInputFull): BulkheadConfig {\n const parsed = bulkheadConfigSchema.parse(config ?? {});\n return {\n ...parsed,\n onRejected: config?.onRejected,\n logger: config?.logger,\n };\n}\n","/**\n * O(1) ring buffer queue implementation.\n *\n * Provides O(1) enqueue and dequeue operations by using a circular buffer\n * with head and tail pointers.\n *\n * @template T - The type of elements in the buffer\n */\nexport class RingBuffer<T> {\n private readonly buffer: (T | undefined)[];\n private head = 0;\n private tail = 0;\n private count = 0;\n\n /**\n * Create a new ring buffer.\n *\n * @param capacity - Maximum number of elements the buffer can hold\n */\n constructor(private readonly capacity: number) {\n this.buffer = new Array(capacity);\n }\n\n /**\n * Add an element to the end of the buffer.\n *\n * @param item - The item to add\n * @returns true if added, false if buffer is full\n */\n push(item: T): boolean {\n if (this.count >= this.capacity) {\n return false;\n }\n\n this.buffer[this.tail] = item;\n this.tail = (this.tail + 1) % this.capacity;\n this.count++;\n return true;\n }\n\n /**\n * Remove and return the element at the front of the buffer.\n *\n * @returns The item at the front, or undefined if empty\n */\n shift(): T | undefined {\n if (this.count === 0) {\n return undefined;\n }\n\n const item = this.buffer[this.head];\n this.buffer[this.head] = undefined; // Help GC\n this.head = (this.head + 1) % this.capacity;\n this.count--;\n return item;\n }\n\n /**\n * Get the number of elements in the buffer.\n */\n get length(): number {\n return this.count;\n }\n\n /**\n * Check if the buffer is empty.\n */\n isEmpty(): boolean {\n return this.count === 0;\n }\n\n /**\n * Check if the buffer is full.\n */\n isFull(): boolean {\n return this.count >= this.capacity;\n }\n\n /**\n * Find the index of an item in the buffer.\n * Note: This is O(n) but should be rare (used for abort handling).\n *\n * @param item - The item to find\n * @returns The index of the item, or -1 if not found\n */\n indexOf(item: T): number {\n for (let i = 0; i < this.count; i++) {\n const idx = (this.head + i) % this.capacity;\n if (this.buffer[idx] === item) {\n return i;\n }\n }\n return -1;\n }\n\n /**\n * Remove an item at the given logical index.\n * Note: This is O(n) due to shifting elements.\n *\n * @param index - The logical index (0 = front of queue)\n * @returns true if item was removed\n */\n removeAt(index: number): boolean {\n if (index < 0 || index >= this.count) {\n return false;\n }\n\n // Shift elements to fill the gap\n for (let i = index; i < this.count - 1; i++) {\n const fromIdx = (this.head + i + 1) % this.capacity;\n const toIdx = (this.head + i) % this.capacity;\n this.buffer[toIdx] = this.buffer[fromIdx];\n }\n\n // Clear the last element\n const lastIdx = (this.head + this.count - 1) % this.capacity;\n this.buffer[lastIdx] = undefined;\n\n // Update tail\n this.tail = (this.tail - 1 + this.capacity) % this.capacity;\n this.count--;\n return true;\n }\n\n /**\n * Remove an item from the buffer.\n * Note: This is O(n) but should be rare (used for abort handling).\n *\n * @param item - The item to remove\n * @returns true if item was found and removed\n */\n remove(item: T): boolean {\n const index = this.indexOf(item);\n if (index === -1) {\n return false;\n }\n return this.removeAt(index);\n }\n\n /**\n * Get all items and clear the buffer.\n *\n * @returns Array of all items in queue order\n */\n drain(): T[] {\n const items: T[] = [];\n for (let i = 0; i < this.count; i++) {\n const idx = (this.head + i) % this.capacity;\n const item = this.buffer[idx];\n if (item !== undefined) {\n items.push(item);\n }\n this.buffer[idx] = undefined; // Help GC\n }\n this.head = 0;\n this.tail = 0;\n this.count = 0;\n return items;\n }\n\n /**\n * Clear all elements from the buffer.\n */\n clear(): void {\n for (let i = 0; i < this.count; i++) {\n const idx = (this.head + i) % this.capacity;\n this.buffer[idx] = undefined;\n }\n this.head = 0;\n this.tail = 0;\n this.count = 0;\n }\n}\n","import { RingBuffer } from './ring-buffer.js';\n\n/**\n * Waiter in the semaphore queue.\n */\ninterface Waiter {\n resolve: () => void;\n reject: (error: Error) => void;\n signal?: AbortSignal;\n onAbort?: () => void;\n}\n\n/**\n * A Promise-based semaphore for limiting concurrent operations.\n * Uses an O(1) ring buffer for the waiting queue.\n */\nexport class Semaphore {\n private permits: number;\n private readonly maxPermits: number;\n private readonly waitingQueue: RingBuffer<Waiter>;\n\n /**\n * Create a new semaphore.\n *\n * @param maxPermits - Maximum number of concurrent permits\n * @param maxQueue - Maximum queue size (defaults to 1000)\n */\n constructor(maxPermits: number, maxQueue = 1000) {\n this.maxPermits = maxPermits;\n this.permits = maxPermits;\n this.waitingQueue = new RingBuffer<Waiter>(maxQueue);\n }\n\n /**\n * Try to acquire a permit without waiting.\n *\n * @returns true if a permit was acquired, false otherwise\n */\n tryAcquire(): boolean {\n if (this.permits > 0) {\n this.permits--;\n return true;\n }\n return false;\n }\n\n /**\n * Acquire a permit, waiting if necessary.\n *\n * @param signal - Optional AbortSignal for cancellation\n * @returns Promise that resolves when permit is acquired\n * @throws {DOMException} When cancelled via signal (AbortError)\n */\n acquire(signal?: AbortSignal): Promise<void> {\n // Check if cancelled\n if (signal?.aborted) {\n return Promise.reject(\n signal.reason ?? new DOMException('Aborted', 'AbortError')\n );\n }\n\n // Try to acquire immediately\n if (this.permits > 0) {\n this.permits--;\n return Promise.resolve();\n }\n\n // Add to wait queue\n return new Promise<void>((resolve, reject) => {\n const waiter: Waiter = { resolve, reject };\n\n // Set up abort handler\n if (signal) {\n const onAbort = () => {\n if (this.waitingQueue.remove(waiter)) {\n reject(signal.reason ?? new DOMException('Aborted', 'AbortError'));\n }\n };\n\n waiter.signal = signal;\n waiter.onAbort = onAbort;\n signal.addEventListener('abort', onAbort, { once: true });\n }\n\n this.waitingQueue.push(waiter);\n });\n }\n\n /**\n * Release a permit back to the semaphore.\n */\n release(): void {\n if (!this.waitingQueue.isEmpty()) {\n // Give permit to next waiter (O(1) with ring buffer)\n const waiter = this.waitingQueue.shift();\n if (waiter) {\n // Clean up abort listener to prevent memory leak\n if (waiter.signal && waiter.onAbort) {\n waiter.signal.removeEventListener('abort', waiter.onAbort);\n }\n waiter.resolve();\n }\n } else if (this.permits < this.maxPermits) {\n // Return permit to pool\n this.permits++;\n }\n }\n\n /**\n * Get the number of available permits.\n */\n availablePermits(): number {\n return this.permits;\n }\n\n /**\n * Get the number of waiters in the queue.\n */\n queueLength(): number {\n return this.waitingQueue.length;\n }\n\n /**\n * Reject all waiters with the given error.\n */\n rejectAll(error: Error): void {\n const waiters = this.waitingQueue.drain();\n for (const waiter of waiters) {\n // Clean up abort listener to prevent memory leak\n if (waiter.signal && waiter.onAbort) {\n waiter.signal.removeEventListener('abort', waiter.onAbort);\n }\n waiter.reject(error);\n }\n }\n}\n"],"mappings":";;;;;AAAA;AAAA,EAIE;AAAA,EAEA;AAAA,OACK;;;ACPP,SAAS,SAAS;AAMX,IAAM,uBAAuB,EAAE,OAAO;AAAA;AAAA,EAE3C,eAAe,EAAE,OAAO,EAAE,IAAI,EAAE,SAAS,EAAE,QAAQ,EAAE;AAAA;AAAA,EAErD,UAAU,EAAE,OAAO,EAAE,IAAI,EAAE,YAAY,EAAE,QAAQ,CAAC;AAAA;AAAA,EAElD,cAAc,EAAE,OAAO,EAAE,IAAI,EAAE,YAAY,EAAE,QAAQ,CAAC;AACxD,CAAC;AAoCM,SAAS,oBAAoB,QAAkD;AACpF,QAAM,SAAS,qBAAqB,MAAM,UAAU,CAAC,CAAC;AACtD,SAAO;AAAA,IACL,GAAG;AAAA,IACH,YAAY,QAAQ;AAAA,IACpB,QAAQ,QAAQ;AAAA,EAClB;AACF;;;AChDO,IAAM,aAAN,MAAoB;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAWzB,YAA6B,UAAkB;AAAlB;AAV7B,wBAAiB;AACjB,wBAAQ,QAAO;AACf,wBAAQ,QAAO;AACf,wBAAQ,SAAQ;AAQd,SAAK,SAAS,IAAI,MAAM,QAAQ;AAAA,EAClC;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAQA,KAAK,MAAkB;AACrB,QAAI,KAAK,SAAS,KAAK,UAAU;AAC/B,aAAO;AAAA,IACT;AAEA,SAAK,OAAO,KAAK,IAAI,IAAI;AACzB,SAAK,QAAQ,KAAK,OAAO,KAAK,KAAK;AACnC,SAAK;AACL,WAAO;AAAA,EACT;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAOA,QAAuB;AACrB,QAAI,KAAK,UAAU,GAAG;AACpB,aAAO;AAAA,IACT;AAEA,UAAM,OAAO,KAAK,OAAO,KAAK,IAAI;AAClC,SAAK,OAAO,KAAK,IAAI,IAAI;AACzB,SAAK,QAAQ,KAAK,OAAO,KAAK,KAAK;AACnC,SAAK;AACL,WAAO;AAAA,EACT;AAAA;AAAA;AAAA;AAAA,EAKA,IAAI,SAAiB;AACnB,WAAO,KAAK;AAAA,EACd;AAAA;AAAA;AAAA;AAAA,EAKA,UAAmB;AACjB,WAAO,KAAK,UAAU;AAAA,EACxB;AAAA;AAAA;AAAA;AAAA,EAKA,SAAkB;AAChB,WAAO,KAAK,SAAS,KAAK;AAAA,EAC5B;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EASA,QAAQ,MAAiB;AACvB,aAAS,IAAI,GAAG,IAAI,KAAK,OAAO,KAAK;AACnC,YAAM,OAAO,KAAK,OAAO,KAAK,KAAK;AACnC,UAAI,KAAK,OAAO,GAAG,MAAM,MAAM;AAC7B,eAAO;AAAA,MACT;AAAA,IACF;AACA,WAAO;AAAA,EACT;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EASA,SAAS,OAAwB;AAC/B,QAAI,QAAQ,KAAK,SAAS,KAAK,OAAO;AACpC,aAAO;AAAA,IACT;AAGA,aAAS,IAAI,OAAO,IAAI,KAAK,QAAQ,GAAG,KAAK;AAC3C,YAAM,WAAW,KAAK,OAAO,IAAI,KAAK,KAAK;AAC3C,YAAM,SAAS,KAAK,OAAO,KAAK,KAAK;AACrC,WAAK,OAAO,KAAK,IAAI,KAAK,OAAO,OAAO;AAAA,IAC1C;AAGA,UAAM,WAAW,KAAK,OAAO,KAAK,QAAQ,KAAK,KAAK;AACpD,SAAK,OAAO,OAAO,IAAI;AAGvB,SAAK,QAAQ,KAAK,OAAO,IAAI,KAAK,YAAY,KAAK;AACnD,SAAK;AACL,WAAO;AAAA,EACT;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EASA,OAAO,MAAkB;AACvB,UAAM,QAAQ,KAAK,QAAQ,IAAI;AAC/B,QAAI,UAAU,IAAI;AAChB,aAAO;AAAA,IACT;AACA,WAAO,KAAK,SAAS,KAAK;AAAA,EAC5B;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAOA,QAAa;AACX,UAAM,QAAa,CAAC;AACpB,aAAS,IAAI,GAAG,IAAI,KAAK,OAAO,KAAK;AACnC,YAAM,OAAO,KAAK,OAAO,KAAK,KAAK;AACnC,YAAM,OAAO,KAAK,OAAO,GAAG;AAC5B,UAAI,SAAS,QAAW;AACtB,cAAM,KAAK,IAAI;AAAA,MACjB;AACA,WAAK,OAAO,GAAG,IAAI;AAAA,IACrB;AACA,SAAK,OAAO;AACZ,SAAK,OAAO;AACZ,SAAK,QAAQ;AACb,WAAO;AAAA,EACT;AAAA;AAAA;AAAA;AAAA,EAKA,QAAc;AACZ,aAAS,IAAI,GAAG,IAAI,KAAK,OAAO,KAAK;AACnC,YAAM,OAAO,KAAK,OAAO,KAAK,KAAK;AACnC,WAAK,OAAO,GAAG,IAAI;AAAA,IACrB;AACA,SAAK,OAAO;AACZ,SAAK,OAAO;AACZ,SAAK,QAAQ;AAAA,EACf;AACF;;;AC5JO,IAAM,YAAN,MAAgB;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAWrB,YAAY,YAAoB,WAAW,KAAM;AAVjD,wBAAQ;AACR,wBAAiB;AACjB,wBAAiB;AASf,SAAK,aAAa;AAClB,SAAK,UAAU;AACf,SAAK,eAAe,IAAI,WAAmB,QAAQ;AAAA,EACrD;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAOA,aAAsB;AACpB,QAAI,KAAK,UAAU,GAAG;AACpB,WAAK;AACL,aAAO;AAAA,IACT;AACA,WAAO;AAAA,EACT;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EASA,QAAQ,QAAqC;AAE3C,QAAI,QAAQ,SAAS;AACnB,aAAO,QAAQ;AAAA,QACb,OAAO,UAAU,IAAI,aAAa,WAAW,YAAY;AAAA,MAC3D;AAAA,IACF;AAGA,QAAI,KAAK,UAAU,GAAG;AACpB,WAAK;AACL,aAAO,QAAQ,QAAQ;AAAA,IACzB;AAGA,WAAO,IAAI,QAAc,CAAC,SAAS,WAAW;AAC5C,YAAM,SAAiB,EAAE,SAAS,OAAO;AAGzC,UAAI,QAAQ;AACV,cAAM,UAAU,MAAM;AACpB,cAAI,KAAK,aAAa,OAAO,MAAM,GAAG;AACpC,mBAAO,OAAO,UAAU,IAAI,aAAa,WAAW,YAAY,CAAC;AAAA,UACnE;AAAA,QACF;AAEA,eAAO,SAAS;AAChB,eAAO,UAAU;AACjB,eAAO,iBAAiB,SAAS,SAAS,EAAE,MAAM,KAAK,CAAC;AAAA,MAC1D;AAEA,WAAK,aAAa,KAAK,MAAM;AAAA,IAC/B,CAAC;AAAA,EACH;AAAA;AAAA;AAAA;AAAA,EAKA,UAAgB;AACd,QAAI,CAAC,KAAK,aAAa,QAAQ,GAAG;AAEhC,YAAM,SAAS,KAAK,aAAa,MAAM;AACvC,UAAI,QAAQ;AAEV,YAAI,OAAO,UAAU,OAAO,SAAS;AACnC,iBAAO,OAAO,oBAAoB,SAAS,OAAO,OAAO;AAAA,QAC3D;AACA,eAAO,QAAQ;AAAA,MACjB;AAAA,IACF,WAAW,KAAK,UAAU,KAAK,YAAY;AAEzC,WAAK;AAAA,IACP;AAAA,EACF;AAAA;AAAA;AAAA;AAAA,EAKA,mBAA2B;AACzB,WAAO,KAAK;AAAA,EACd;AAAA;AAAA;AAAA;AAAA,EAKA,cAAsB;AACpB,WAAO,KAAK,aAAa;AAAA,EAC3B;AAAA;AAAA;AAAA;AAAA,EAKA,UAAU,OAAoB;AAC5B,UAAM,UAAU,KAAK,aAAa,MAAM;AACxC,eAAW,UAAU,SAAS;AAE5B,UAAI,OAAO,UAAU,OAAO,SAAS;AACnC,eAAO,OAAO,oBAAoB,SAAS,OAAO,OAAO;AAAA,MAC3D;AACA,aAAO,OAAO,KAAK;AAAA,IACrB;AAAA,EACF;AACF;;;AHlGO,IAAM,WAAN,MAAoD;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAYzD,YAAY,QAAkC;AAX9C,wBAAiB;AACjB,wBAAiB;AACjB,wBAAiB;AACjB,wBAAiB;AACjB,wBAAQ,UAAS;AAQf,SAAK,SAAS,oBAAoB,MAAM;AACxC,SAAK,SAAS,KAAK,OAAO,UAAU;AAEpC,SAAK,YAAY,IAAI;AAAA,MACnB,KAAK,OAAO;AAAA,MACZ,KAAK,IAAI,KAAK,OAAO,UAAU,CAAC;AAAA;AAAA,IAClC;AAGA,QAAI,KAAK,OAAO,WAAW,GAAG;AAE5B,WAAK,iBAAiB,IAAI,UAAU,KAAK,OAAO,UAAU,CAAC;AAAA,IAC7D;AAAA,EACF;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAWA,MAAM,QAAQ,WAAyB,QAAkC;AAEvE,QAAI,KAAK,QAAQ;AACf,YAAM,KAAK,gBAAgB,oBAAoB;AAAA,IACjD;AAGA,QAAI,QAAQ,SAAS;AACnB,YAAM,OAAO,UAAU,IAAI,aAAa,WAAW,YAAY;AAAA,IACjE;AAGA,QAAI,KAAK,UAAU,WAAW,GAAG;AAC/B,aAAO,KAAK,kBAAkB,WAAW,MAAM;AAAA,IACjD;AAGA,WAAO,KAAK,QAAQ,WAAW,MAAM;AAAA,EACvC;AAAA;AAAA;AAAA;AAAA,EAKA,cAAsB;AACpB,WAAO,KAAK,OAAO,gBAAgB,KAAK,UAAU,iBAAiB;AAAA,EACrE;AAAA;AAAA;AAAA;AAAA,EAKA,cAAsB;AACpB,WAAO,KAAK,UAAU,YAAY;AAAA,EACpC;AAAA;AAAA;AAAA;AAAA,EAKA,QAAc;AACZ,QAAI,KAAK,OAAQ;AAEjB,SAAK,SAAS;AACd,UAAM,aAAa,KAAK,gBAAgB,iBAAiB;AACzD,SAAK,UAAU,UAAU,UAAU;AACnC,SAAK,gBAAgB,UAAU,UAAU;AACzC,SAAK,OAAO,KAAK,iBAAiB;AAAA,EACpC;AAAA;AAAA;AAAA;AAAA,EAKA,QAAc;AACZ,SAAK,SAAS;AACd,SAAK,OAAO,KAAK,gBAAgB;AAAA,EACnC;AAAA;AAAA;AAAA;AAAA,EAKA,MAAc,kBACZ,WACA,QACY;AACZ,QAAI;AACF,aAAO,MAAM,UAAU,UAAU,IAAI,gBAAgB,EAAE,MAAM;AAAA,IAC/D,UAAE;AACA,WAAK,UAAU,QAAQ;AAAA,IACzB;AAAA,EACF;AAAA;AAAA;AAAA;AAAA,EAKA,MAAc,QACZ,WACA,QACY;AAEZ,QAAI,KAAK,OAAO,aAAa,GAAG;AAC9B,WAAK,WAAW;AAChB,YAAM,KAAK,gBAAgB,wCAAwC;AAAA,IACrE;AAGA,QAAI,CAAC,KAAK,gBAAgB,WAAW,GAAG;AAEtC,WAAK,WAAW;AAChB,YAAM,KAAK,gBAAgB,wBAAwB;AAAA,IACrD;AAGA,QAAI;AAEJ,QAAI;AAEF,UAAI;AACJ,UAAI,iBAAiB;AAErB,UAAI,KAAK,OAAO,eAAe,GAAG;AAChC,4BAAoB,IAAI,gBAAgB;AAGxC,YAAI,QAAQ;AACV,2BAAiB,YAAY,IAAI,CAAC,QAAQ,kBAAkB,MAAM,CAAC;AAAA,QACrE,OAAO;AACL,2BAAiB,kBAAkB;AAAA,QACrC;AAGA,oBAAY,WAAW,MAAM;AAC3B,6BAAmB,MAAM,IAAI,aAAa,iBAAiB,cAAc,CAAC;AAAA,QAC5E,GAAG,KAAK,OAAO,YAAY;AAAA,MAC7B;AAGA,UAAI;AACF,cAAM,KAAK,UAAU,QAAQ,cAAc;AAAA,MAC7C,SAAS,OAAO;AAEd,YACE,iBAAiB,gBACjB,MAAM,SAAS,gBACf;AACA,eAAK,WAAW;AAAA,QAClB;AACA,cAAM;AAAA,MACR;AAGA,aAAO,MAAM,KAAK,kBAAkB,WAAW,MAAM;AAAA,IACvD,UAAE;AAEA,UAAI,WAAW;AACb,qBAAa,SAAS;AAAA,MACxB;AACA,WAAK,gBAAgB,QAAQ;AAAA,IAC/B;AAAA,EACF;AAAA;AAAA;AAAA;AAAA,EAKQ,aAAmB;AACzB,SAAK,OAAO,KAAK,sBAAsB;AAAA,MACrC,eAAe,KAAK,OAAO;AAAA,MAC3B,UAAU,KAAK,OAAO;AAAA,IACxB,CAAC;AAED,QAAI,KAAK,OAAO,YAAY;AAC1B,UAAI;AACF,aAAK,OAAO,WAAW;AAAA,MACzB,SAAS,OAAO;AACd,aAAK,OAAO,MAAM,sCAAsC;AAAA,UACtD,OAAO,iBAAiB,QAAQ,MAAM,UAAU,OAAO,KAAK;AAAA,QAC9D,CAAC;AAAA,MACH;AAAA,IACF;AAAA,EACF;AAAA;AAAA;AAAA;AAAA,EAKQ,gBAAgB,SAAqC;AAC3D,WAAO,IAAI;AAAA,MACT,WAAW;AAAA,MACX,KAAK,YAAY;AAAA,MACjB,KAAK,YAAY;AAAA,IACnB;AAAA,EACF;AACF;","names":[]}
{
"name": "@fortify-ts/bulkhead",
"version": "0.1.1",
"version": "0.1.2",
"description": "Concurrency limiter bulkhead pattern for @fortify-ts",

@@ -26,3 +26,3 @@ "type": "module",

"zod": "^4.1.13",
"@fortify-ts/core": "0.1.1"
"@fortify-ts/core": "0.1.2"
},

@@ -33,4 +33,5 @@ "devDependencies": {

"peerDependencies": {
"zod": "^3.0.0"
"zod": "^4.0.0"
},
"sideEffects": false,
"repository": {

@@ -37,0 +38,0 @@ "type": "git",