@enterprise_search/kleislis
Advanced tools
Comparing version 0.5.0 to 0.5.1
@@ -0,1 +1,2 @@ | ||
export * from "./src/async"; | ||
export * from "./src/concurrency.limiter"; | ||
@@ -7,2 +8,3 @@ export * from "./src/debug"; | ||
export * from "./src/metrics"; | ||
export * from "./src/nonfunctionals"; | ||
export * from "./src/retry"; | ||
@@ -12,2 +14,3 @@ export * from "./src/replay"; | ||
export * from "./src/sideeffect"; | ||
export * from "./src/simple.log"; | ||
export * from "./src/throttling"; |
@@ -17,2 +17,3 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
__exportStar(require("./src/async"), exports); | ||
__exportStar(require("./src/concurrency.limiter"), exports); | ||
@@ -24,2 +25,3 @@ __exportStar(require("./src/debug"), exports); | ||
__exportStar(require("./src/metrics"), exports); | ||
__exportStar(require("./src/nonfunctionals"), exports); | ||
__exportStar(require("./src/retry"), exports); | ||
@@ -29,2 +31,3 @@ __exportStar(require("./src/replay"), exports); | ||
__exportStar(require("./src/sideeffect"), exports); | ||
__exportStar(require("./src/simple.log"), exports); | ||
__exportStar(require("./src/throttling"), exports); |
import { K0, K1, K2, K3, K4, K5 } from "./kleisli"; | ||
export type Task<TOutput> = { | ||
fn: K0<TOutput>; | ||
name: string; | ||
resolve: (output: TOutput) => void; | ||
reject: (err: any) => void; | ||
}; | ||
export declare function withConcurrencyLimit<T>(limit: number, queue: Task<any>[], fn: K0<T>): K0<T>; | ||
export declare function withConcurrencyLimit<P1, T>(limit: number, queue: Task<any>[], fn: K1<P1, T>): K1<P1, T>; | ||
export declare function withConcurrencyLimit<P1, P2, T>(limit: number, queue: Task<any>[], fn: K2<P1, P2, T>): K2<P1, P2, T>; | ||
export declare function withConcurrencyLimit<P1, P2, P3, T>(limit: number, queue: Task<any>[], fn: K3<P1, P2, P3, T>): K3<P1, P2, P3, T>; | ||
export declare function withConcurrencyLimit<P1, P2, P3, P4, T>(limit: number, queue: Task<any>[], fn: K4<P1, P2, P3, P4, T>): K4<P1, P2, P3, P4, T>; | ||
export declare function withConcurrencyLimit<P1, P2, P3, P4, P5, T>(limit: number, queue: Task<any>[], fn: K5<P1, P2, P3, P4, P5, T>): K5<P1, P2, P3, P4, P5, T>; | ||
export type ConcurrencyLimits = { | ||
limit: number; | ||
queue: Task<any>[]; | ||
active: number; | ||
onTasks: (() => void)[]; | ||
}; | ||
export declare function withOnTasks<T>(limits: ConcurrencyLimits, onTask: () => void, block: () => Promise<T>): Promise<T>; | ||
export declare function defaultConcurrencyLimits(limit?: number): ConcurrencyLimits; | ||
export declare function withConcurrencyLimits<T>(limits: ConcurrencyLimits, name: string, fn: K0<T>): K0<T>; | ||
export declare function withConcurrencyLimits<P1, T>(limits: ConcurrencyLimits, name: string, fn: K1<P1, T>): K1<P1, T>; | ||
export declare function withConcurrencyLimits<P1, P2, T>(limits: ConcurrencyLimits, name: string, fn: K2<P1, P2, T>): K2<P1, P2, T>; | ||
export declare function withConcurrencyLimits<P1, P2, P3, T>(limits: ConcurrencyLimits, name: string, fn: K3<P1, P2, P3, T>): K3<P1, P2, P3, T>; | ||
export declare function withConcurrencyLimits<P1, P2, P3, P4, T>(limits: ConcurrencyLimits, name: string, fn: K4<P1, P2, P3, P4, T>): K4<P1, P2, P3, P4, T>; | ||
export declare function withConcurrencyLimits<P1, P2, P3, P4, P5, T>(limits: ConcurrencyLimits, name: string, fn: K5<P1, P2, P3, P4, P5, T>): K5<P1, P2, P3, P4, P5, T>; | ||
export declare function withConcurrencyLimit<T>(limit: number, queue: Task<any>[], name: string, fn: K0<T>): K0<T>; | ||
export declare function withConcurrencyLimit<P1, T>(limit: number, queue: Task<any>[], name: string, fn: K1<P1, T>): K1<P1, T>; | ||
export declare function withConcurrencyLimit<P1, P2, T>(limit: number, queue: Task<any>[], name: string, fn: K2<P1, P2, T>): K2<P1, P2, T>; | ||
export declare function withConcurrencyLimit<P1, P2, P3, T>(limit: number, queue: Task<any>[], name: string, fn: K3<P1, P2, P3, T>): K3<P1, P2, P3, T>; | ||
export declare function withConcurrencyLimit<P1, P2, P3, P4, T>(limit: number, queue: Task<any>[], name: string, fn: K4<P1, P2, P3, P4, T>): K4<P1, P2, P3, P4, T>; | ||
export declare function withConcurrencyLimit<P1, P2, P3, P4, P5, T>(limit: number, queue: Task<any>[], name: string, fn: K5<P1, P2, P3, P4, P5, T>): K5<P1, P2, P3, P4, P5, T>; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.withConcurrencyLimit = void 0; | ||
function withConcurrencyLimit(limit, queue, fn) { | ||
let active = 0; | ||
exports.withConcurrencyLimit = exports.withConcurrencyLimits = exports.defaultConcurrencyLimits = exports.withOnTasks = void 0; | ||
async function withOnTasks(limits, onTask, block) { | ||
if (limits.onTasks.includes(onTask)) | ||
throw new Error("onTask callback is already registered."); | ||
// Add the onTask callback to the onTasks list | ||
limits.onTasks.push(onTask); | ||
try { | ||
// Execute the block and wait for it to finish | ||
const result = await block(); | ||
return result; | ||
} | ||
finally { | ||
// Remove the onTask callback from the onTasks list after execution | ||
const index = limits.onTasks.indexOf(onTask); | ||
if (index > -1) { | ||
limits.onTasks.splice(index, 1); | ||
} | ||
} | ||
} | ||
exports.withOnTasks = withOnTasks; | ||
function defaultConcurrencyLimits(limit = 10) { | ||
return { | ||
active: 0, | ||
limit, | ||
queue: [], | ||
onTasks: [] | ||
}; | ||
} | ||
exports.defaultConcurrencyLimits = defaultConcurrencyLimits; | ||
function withConcurrencyLimits(limits, name, fn) { | ||
const { queue, limit } = limits; | ||
const next = () => { | ||
if (queue.length > 0 && active < limit) { | ||
active++; | ||
if (queue.length > 0 && limits.active < limit) { | ||
limits.active++; | ||
const { fn, resolve, reject } = queue.shift(); | ||
fn().then(resolve).catch(reject).finally(() => { | ||
active--; | ||
next(); | ||
limits.active--; | ||
for (const onTask of limits.onTasks) | ||
onTask(); | ||
setImmediate(next); // Schedule the next iteration after the current task completes | ||
//note that this helps a lot with stack size... | ||
}); | ||
@@ -17,3 +48,6 @@ } | ||
return async (...args) => new Promise((resolve, reject) => { | ||
const task = { fn: () => fn(...args), resolve, reject }; | ||
const task = { | ||
name, | ||
fn: () => fn(...args), resolve, reject | ||
}; | ||
queue.push(task); | ||
@@ -23,2 +57,6 @@ next(); | ||
} | ||
exports.withConcurrencyLimits = withConcurrencyLimits; | ||
function withConcurrencyLimit(limit, queue, name, fn) { | ||
return withConcurrencyLimits({ limit, queue, active: 0, onTasks: [] }, name, fn); | ||
} | ||
exports.withConcurrencyLimit = withConcurrencyLimit; |
@@ -12,4 +12,4 @@ "use strict"; | ||
}; | ||
const limiter1 = (0, concurrency_limiter_1.withConcurrencyLimit)(limit, sharedQueue, asyncOp); | ||
const limiter2 = (0, concurrency_limiter_1.withConcurrencyLimit)(limit, sharedQueue, asyncOp); | ||
const limiter1 = (0, concurrency_limiter_1.withConcurrencyLimit)(limit, sharedQueue, 'l1', asyncOp); | ||
const limiter2 = (0, concurrency_limiter_1.withConcurrencyLimit)(limit, sharedQueue, 'l2', asyncOp); | ||
const tasks1 = Array.from({ length: 10 }, (_, i) => limiter1(i)); | ||
@@ -16,0 +16,0 @@ const tasks2 = Array.from({ length: 10 }, (_, i) => limiter2(i + 10)); |
import { NameAnd } from "@laoban/utils"; | ||
import { Timeservice } from "@itsmworkbench/utils"; | ||
export type SimpleLogFn = (msg: string) => Promise<void>; | ||
export declare function timeAsHHMMSS(timeService: Timeservice): string; | ||
export type LogLevel = 'TRACE' | 'DEBUG' | 'INFO' | 'WARN' | 'ERROR' | 'NONE'; | ||
@@ -3,0 +6,0 @@ export declare const LogLevelValue: { |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.makeObjectFromParamsAndOutput = exports.makeObjectFromParams = exports.consoleLog = exports.LogLevelValue = void 0; | ||
exports.makeObjectFromParamsAndOutput = exports.makeObjectFromParams = exports.consoleLog = exports.LogLevelValue = exports.timeAsHHMMSS = void 0; | ||
const utils_1 = require("@itsmworkbench/utils"); | ||
let startTime = undefined; | ||
function timeAsHHMMSS(timeService) { | ||
if (startTime === undefined) | ||
startTime = timeService(); | ||
const time = timeService(); | ||
const elapsed = time - startTime; | ||
const hours = Math.floor(elapsed / 3600000).toString().padStart(2, '0'); | ||
const minutes = Math.floor((elapsed % 3600000) / 60000).toString().padStart(2, '0'); | ||
const seconds = Math.floor((elapsed % 60000) / 1000).toString().padStart(2, '0'); | ||
return `${hours}:${minutes}:${seconds}`; | ||
} | ||
exports.timeAsHHMMSS = timeAsHHMMSS; | ||
exports.LogLevelValue = { | ||
@@ -6,0 +18,0 @@ TRACE: 0, |
@@ -9,2 +9,3 @@ import { K0, K1, K2, K3, K4, K5 } from "./kleisli"; | ||
nonRecoverableErrors?: string[]; | ||
logName?: string; | ||
}; | ||
@@ -11,0 +12,0 @@ export declare const defaultRetryPolicy: RetryPolicyConfig; |
@@ -5,2 +5,3 @@ "use strict"; | ||
const metrics_1 = require("./metrics"); | ||
const simple_log_1 = require("./simple.log"); | ||
exports.defaultRetryPolicy = { | ||
@@ -25,2 +26,3 @@ initialInterval: 1000, | ||
retryPolicy = exports.defaultRetryPolicy; | ||
const log = retryPolicy.logName ? (0, simple_log_1.nameLogFn)(retryPolicy.logName) : async (s) => console.log(s); | ||
const multiplier = retryPolicy.multiplier || 2; | ||
@@ -41,7 +43,9 @@ const nonRecoverableErrors = retryPolicy.nonRecoverableErrors || []; | ||
catch (error) { | ||
console.log('error', error.message, 'attempts', attempts, retryPolicy.maximumAttempts, 'retryPolicy.maximumAttempts', retryPolicy.maximumAttempts, 'delay', delay, 'multiplier', multiplier, 'retryPolicy.maximumInterval', retryPolicy.maximumInterval, 'nonRecoverableErrors', nonRecoverableErrors); | ||
if (nonRecoverableErrors.includes(error.message)) { | ||
const nonRecoverable = nonRecoverableErrors.includes(error.message); | ||
if (nonRecoverable) { | ||
log(`${error.message}, attempts: ${attempts}/nonrecoverable`); | ||
incMetric('activity.non_recoverable_error'); | ||
return Promise.reject(error); | ||
} // Rethrow if non-recoverable | ||
log(`${error.message}, attempts: ${attempts}/${retryPolicy.maximumAttempts} delay: ${delay}/${retryPolicy.maximumInterval}, multiplier, ${multiplier}`); | ||
if (++attempts < retryPolicy.maximumAttempts) { | ||
@@ -48,0 +52,0 @@ // Calculate next delay |
/// <reference types="node" /> | ||
import { K0, K1, K2, K3, K4, K5 } from "./kleisli"; | ||
import { Timeservice } from "@itsmworkbench/utils/dist/src/timeservice"; | ||
export type Throttling = { | ||
@@ -18,3 +19,3 @@ current?: number; | ||
export declare function withThrottle<P1, P2, P3, P4, P5, T>(throttle: Throttling, fn: K5<P1, P2, P3, P4, P5, T>): K5<P1, P2, P3, P4, P5, T>; | ||
export declare function startThrottling(throttle: Throttling): void; | ||
export declare function startThrottling(throttle: Throttling, timeService?: Timeservice): void; | ||
export declare function stopThrottling(throttle: Throttling): void; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.stopThrottling = exports.startThrottling = exports.withThrottle = void 0; | ||
const timeservice_1 = require("@itsmworkbench/utils/dist/src/timeservice"); | ||
function withThrottle(throttle, fn) { | ||
// Start the throttling interval only once when `withThrottle` is called | ||
startThrottling(throttle); | ||
return async (...args) => { | ||
const attemptInvoke = async () => { | ||
const { current = 0, max, throttlingDelay = 50 } = throttle; | ||
if (current > 0) { | ||
throttle.current--; | ||
try { | ||
// console.log('executing in throttle', throttle.current, throttle.tokensPer100ms) | ||
return await fn(...args); | ||
// Create a single promise to handle the resolution/rejection | ||
return new Promise((resolve, reject) => { | ||
const attemptInvoke = async () => { | ||
const { current = 0, throttlingDelay = 50 } = throttle; | ||
if (current > 0) { | ||
// If tokens are available, execute the function and resolve/reject the promise | ||
executeFunction(throttle, fn, args) | ||
.then(resolve) | ||
.catch(reject); | ||
} | ||
catch (e) { | ||
if (e.message === 'Too many requests') { | ||
if (throttle.current > -10) { | ||
throttle.tokensPer100ms = throttle.tokensPer100ms * .9; | ||
console.log('Too many requests - throttling back', throttle.tokensPer100ms); | ||
} | ||
throttle.current = (throttle.countOnTooManyErrors || -10); | ||
} | ||
throw e; // hopefully the retry logic will kick in | ||
else { | ||
if (throttle.intervalId === undefined) | ||
return; // Exit if interval is not running | ||
setTimeout(attemptInvoke, throttlingDelay); | ||
} | ||
} | ||
else { | ||
if (throttle.intervalId === undefined) | ||
return; | ||
const delay = Math.random() * throttlingDelay; | ||
await new Promise(resolve => setTimeout(resolve, delay)); | ||
return attemptInvoke(); // Retry the invocation | ||
} | ||
}; | ||
startThrottling(throttle); | ||
return attemptInvoke(); | ||
}; | ||
attemptInvoke(); // Start the invocation attempts | ||
}); | ||
}; | ||
} | ||
exports.withThrottle = withThrottle; | ||
function startThrottling(throttle) { | ||
async function executeFunction(throttle, fn, args) { | ||
try { | ||
throttle.current--; // Decrement the token count | ||
return await fn(...args); // Execute the original function | ||
} | ||
catch (e) { | ||
if (e.message === 'Too many requests') { | ||
if (throttle.current > -10) { | ||
throttle.tokensPer100ms = throttle.tokensPer100ms * 0.95; // Adjust the token replenishment rate | ||
console.log('Too many requests - throttling back', throttle.tokensPer100ms); | ||
} | ||
throttle.current = throttle.countOnTooManyErrors || -10; // Reset token count on too many errors | ||
} | ||
throw e; // Propagate the error to allow retry logic | ||
} | ||
} | ||
function startThrottling(throttle, timeService = timeservice_1.DateTimeService) { | ||
if (throttle.intervalId || throttle.kill) | ||
return; // Idempotently handle already running intervals | ||
// Record the start time when throttling begins using the provided timeService | ||
let lastTime = timeService(); | ||
throttle.intervalId = setInterval(() => { | ||
const { current = 0, max, tokensPer100ms = 0.1 } = throttle; | ||
throttle.current = Math.min(max, current + tokensPer100ms); | ||
const { max, tokensPer100ms = 0.1 } = throttle; | ||
// Get the current time using the timeService | ||
const currentTime = timeService(); | ||
const elapsedTime = currentTime - lastTime; // Calculate elapsed time since last execution | ||
lastTime = currentTime; // Update the last execution time | ||
// Calculate how many tokens to add based on elapsed time | ||
const tokensToAdd = (elapsedTime / 100) * tokensPer100ms; // Convert milliseconds to the token rate | ||
// Update the current token count, ensuring it does not exceed the maximum | ||
throttle.current = Math.min(max, throttle.current + tokensToAdd); | ||
}, 100); | ||
@@ -45,0 +62,0 @@ } |
{ | ||
"name": "@enterprise_search/kleislis", | ||
"description": "functions that have inputs, returns promises, and non functionals around them", | ||
"version": "0.5.0", | ||
"version": "0.5.1", | ||
"main": "dist/index", | ||
@@ -6,0 +6,0 @@ "types": "dist/index", |
75989
47
1584