Socket
Socket
Sign inDemoInstall

@hisorange/resistor

Package Overview
Dependencies
1
Maintainers
1
Versions
9
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 1.1.4 to 1.1.5

build/interfaces/worker.interface.d.ts

7

build/events.d.ts

@@ -5,7 +5,8 @@ export declare enum EVENTS {

FLUSH_EXECUTED = "flush.executed",
FLUSH_REJECTED = "flush.rejected",
FLUSH_RETRYING = "flush.retrying",
WORKER_REJECTED = "worker.rejected",
WORKER_RETRYING = "worker.retrying",
THREAD_OPENED = "thread.opened",
THREAD_CLOSED = "thread.closed",
QUEUE_EMPTY = "queue.empty"
QUEUE_EMPTY = "queue.empty",
EMPTY = "empty"
}

@@ -9,8 +9,9 @@ "use strict";

EVENTS["FLUSH_EXECUTED"] = "flush.executed";
EVENTS["FLUSH_REJECTED"] = "flush.rejected";
EVENTS["FLUSH_RETRYING"] = "flush.retrying";
EVENTS["WORKER_REJECTED"] = "worker.rejected";
EVENTS["WORKER_RETRYING"] = "worker.retrying";
EVENTS["THREAD_OPENED"] = "thread.opened";
EVENTS["THREAD_CLOSED"] = "thread.closed";
EVENTS["QUEUE_EMPTY"] = "queue.empty";
EVENTS["EMPTY"] = "empty";
})(EVENTS = exports.EVENTS || (exports.EVENTS = {}));
//# sourceMappingURL=events.js.map
export * from './events';
export * from './interfaces/analytics.interface';
export * from './interfaces/config.interface';
export * from './interfaces/handler.interface';
export * from './interfaces/strategy.interface';
export * from './interfaces/worker.interface';
export * from './resistor';
export * from './strategies';

@@ -16,6 +16,6 @@ "use strict";

__exportStar(require("./interfaces/config.interface"), exports);
__exportStar(require("./interfaces/handler.interface"), exports);
__exportStar(require("./interfaces/strategy.interface"), exports);
__exportStar(require("./interfaces/worker.interface"), exports);
__exportStar(require("./resistor"), exports);
__exportStar(require("./strategies"), exports);
//# sourceMappingURL=index.js.map
export interface IAnalytics {
flush: {
worker: {
invoked: number;

@@ -4,0 +4,0 @@ scheduled: number;

@@ -29,3 +29,3 @@ import { IStrategy } from './strategy.interface';

/**
* Configured limiting strategy handler.
* Configured limiting strategy worker.
*/

@@ -35,3 +35,3 @@ strategy: IStrategy;

/**
* Retries the handler if the handler throws an unhandled promise rejection.
* Retries the worker if the worker throws an unhandled promise rejection.
*/

@@ -38,0 +38,0 @@ retrier: {

export interface IFlushConfig {
waitForHandler: boolean;
waitForWorker: boolean;
}

@@ -9,8 +9,8 @@ /// <reference types="node" />

import { IFlushConfig } from './interfaces/flush-config.interface';
import { IHandler } from './interfaces/handler.interface';
import { WaitPass } from './interfaces/wait-pass.interface';
import { IWorker } from './interfaces/worker.interface';
export declare class Resistor<I> implements Pick<EventEmitter, 'on' | 'once' | 'off'> {
protected handler: IHandler<I>;
protected worker: IWorker<I>;
/**
* Temporary buffer to store the records until the handler flushes them.
* Temporary buffer to store the records until the worker flushes them.
*/

@@ -24,7 +24,7 @@ protected buffer: I[];

/**
* Stores the active flush handlers, this is how the script tracks the active "threads".
* Stores the active flush workers, this is how the script tracks the active "threads".
*/
protected vThreads: Promise<void>[];
/**
* When the maximum thread reached, the script will enqueue the flush handlers in a FIFO logic,
* When the maximum thread reached, the script will enqueue the flush workers in a FIFO logic,
* after a thread finished, it will shift the first waiting execution and allows its execution.

@@ -48,3 +48,3 @@ */

*/
constructor(handler: IHandler<I>, config?: DeepPartial<IResistorConfig>);
constructor(worker: IWorker<I>, config?: DeepPartial<IResistorConfig>);
/**

@@ -80,3 +80,3 @@ * Register an event listener for every occasion when the event emitted.

*
* Important! By default the flush will not wait for the handler to execute so the caller
* Important! By default the flush will not wait for the worker to execute so the caller
* can push the records until the active threads are populated without waiting.

@@ -91,3 +91,3 @@ *

*/
protected schedule(job: () => Promise<void>, waitForHandler: boolean): Promise<void>;
protected schedule(work: () => Promise<void>, waitForWorker: boolean): Promise<void>;
/**

@@ -94,0 +94,0 @@ * Push a record to the buffer, returns a promise which should be awaited so

@@ -15,14 +15,14 @@ "use strict";

*/
constructor(handler, config) {
this.handler = handler;
constructor(worker, config) {
this.worker = worker;
/**
* Temporary buffer to store the records until the handler flushes them.
* Temporary buffer to store the records until the worker flushes them.
*/
this.buffer = [];
/**
* Stores the active flush handlers, this is how the script tracks the active "threads".
* Stores the active flush workers, this is how the script tracks the active "threads".
*/
this.vThreads = [];
/**
* When the maximum thread reached, the script will enqueue the flush handlers in a FIFO logic,
* When the maximum thread reached, the script will enqueue the flush workers in a FIFO logic,
* after a thread finished, it will shift the first waiting execution and allows its execution.

@@ -52,3 +52,3 @@ */

this._analytics = {
flush: {
worker: {
invoked: 0,

@@ -126,3 +126,3 @@ scheduled: 0,

await this.flush({
waitForHandler: true,
waitForWorker: true,
});

@@ -148,3 +148,3 @@ }

*
* Important! By default the flush will not wait for the handler to execute so the caller
* Important! By default the flush will not wait for the worker to execute so the caller
* can push the records until the active threads are populated without waiting.

@@ -154,4 +154,4 @@ *

*/
async flush(config = { waitForHandler: false }) {
this.emitter.emit(events_2.EVENTS.FLUSH_INVOKED, ++this._analytics.flush.invoked);
async flush(config = { waitForWorker: false }) {
this.emitter.emit(events_2.EVENTS.FLUSH_INVOKED, ++this._analytics.worker.invoked);
// Release the auto flush until the buffer is freed.

@@ -162,3 +162,3 @@ if (this.flushTimer) {

if (this.buffer.length > 0) {
this.emitter.emit(events_2.EVENTS.FLUSH_SCHEDULED, ++this._analytics.flush.scheduled);
this.emitter.emit(events_2.EVENTS.FLUSH_SCHEDULED, ++this._analytics.worker.scheduled);
// We cut the maximum record and leave an empty array behind,

@@ -170,7 +170,7 @@ // this is needed in case an async .push has been called while an other call started the flush.

let retries = 0;
const job = () => this.handler(records).catch(async (rejection) => {
this.emitter.emit(events_2.EVENTS.FLUSH_REJECTED, {
const job = () => this.worker(records).catch(async (rejection) => {
this.emitter.emit(events_2.EVENTS.WORKER_REJECTED, {
rejection,
records,
errors: ++this._analytics.flush.errors,
errors: ++this._analytics.worker.errors,
});

@@ -181,3 +181,3 @@ // Retrying is enabled

if (++retries <= this.config.retrier.times) {
this.emitter.emit(events_2.EVENTS.FLUSH_RETRYING, {
this.emitter.emit(events_2.EVENTS.WORKER_RETRYING, {
rejection,

@@ -191,4 +191,4 @@ records,

});
// Schedule the handler for execution, the strategy will handle the timings.
await this.schedule(job, config.waitForHandler);
// Schedule the worker for execution, the strategy will handle the timings.
await this.schedule(job, config.waitForWorker);
}

@@ -202,3 +202,3 @@ // Always push the auto flush even if the next flush will do the same.

*/
async schedule(job, waitForHandler) {
async schedule(work, waitForWorker) {
// Limit the maximum "virtual threads" to the configured threshold.

@@ -209,3 +209,2 @@ if (this.vThreads.length >= this.config.threads) {

await new Promise(waitPass => this.waitQueue.push(waitPass));
this._analytics.queue.waiting--;
}

@@ -217,9 +216,9 @@ // Track maximum thread count.

// Push the execution to a free thread.
const threadId = this.vThreads.push(job()) - 1;
const threadId = this.vThreads.push(work()) - 1;
// Hook to handle thread removal.
const handler = this.vThreads[threadId].then(() => {
const worker = this.vThreads[threadId].then(() => {
const finishedAt = Date.now();
// Track the execution time.
this._analytics.flush.processTime = finishedAt - startedAt;
this.emitter.emit(events_2.EVENTS.FLUSH_EXECUTED, ++this._analytics.flush.executed);
this._analytics.worker.processTime = finishedAt - startedAt;
this.emitter.emit(events_2.EVENTS.FLUSH_EXECUTED, ++this._analytics.worker.executed);
// Remove the process after finish.

@@ -238,2 +237,3 @@ this.vThreads.splice(threadId, 1);

const waitPass = this.waitQueue.shift();
this._analytics.queue.waiting--;
if (typeof waitPass === 'function') {

@@ -247,6 +247,12 @@ this.config.limiter.strategy.handleWaitPass(vThreadId, waitPass);

}
// Resistor is totaly empty.
if (!this.buffer.length &&
!this._analytics.thread.active &&
!this._analytics.queue.waiting) {
this.emitter.emit(events_2.EVENTS.EMPTY);
}
});
// The flush wants to wait for the handler as well.
if (waitForHandler) {
await handler;
// The flush wants to wait for the worker as well.
if (waitForWorker) {
await worker;
}

@@ -253,0 +259,0 @@ }

{
"name": "@hisorange/resistor",
"version": "1.1.4",
"description": "Virtual threaded resource processing handler.",
"version": "1.1.5",
"description": "Versatily resource load throttler with extensible strategies, configuration and virtual thread management.",
"keywords": [
"virtual-thread",
"throttle",
"resource",
"worker",
"parallel",
"rate-limiter"
],
"main": "build/index.js",

@@ -6,0 +14,0 @@ "repository": "https://github.com/hisorange/resistor",

@@ -5,42 +5,123 @@ ![Resistor](https://user-images.githubusercontent.com/3441017/119745067-ab632600-be8d-11eb-93e1-24d34ffe2a92.png)

Easy to use resource load throttler with extensible strategies and configuration. This packages provides a solution to limit and contol the given handler's invocation with this you can easily implement any resource usage limiter.
### Quick Start
I wrote this package because I know how boring is to rewrite the bulk save, and API limiters for every high throughput flow, and this is why the Resistor accepts a generic async handler so You can implement any kind of work which requires control over the resource loading.
```sh
npm i @hisorange/resistor
# or
yarn add @hisorange/resistor
```
### Quick example
---
Versatily resource load throttler with extensible strategies, configuration and virtual thread management. This packages provides a solution to limit and contol the your worker's invocation, and by this feature you can easily implement any resource usage limiter into your flow.
I wrote this package because I know how boring is to rewrite the bulk save, and API limiters for every high throughput flow, and this is why the Resistor accepts a generic async worker so You can implement any kind of work which requires control over the resource loading.
### Strategy - Interval
---
Enforces a minimum invocation time for cases when the worker should wait the given amount of miliseconds between calls.
In this example we start a new HTTP call every 5 second, in 2 parallel virtual threads.
```ts
import { Resistor, RateLimiterStrategy } from '@hisorange/resistor';
import { Resistor, IntervalStrategy } from '@hisorange/resistor';
const buffer = new Resistor<string>(
async (urls: string[]) => urls.forEach(url => fetch(url)),
{
threads: 10,
buffer: {
size: 1_000,
},
limiter: {
level: 'thread',
strategy: new RateLimiterStrategy({
interval: 10_000,
occurrence: 50,
}),
},
const worker = async (urls: string[]) => fetch(urls[0]);
const buffer = new Resistor<string>(worker, {
threads: 2,
buffer: {
size: 1,
},
);
limiter: {
level: 'thread', // Applied to each thread individually
strategy: new IntervalStrategy({
interval: 5000,
}),
},
});
// Not blocking just starts the work.
await buffer.push('https://hisorange.me');
await buffer.push('https://google.com');
// Will wait 5 second until the promise resolves.
await buffer.push('https://github.com');
```
### Strategies
### Strategy - Rate Limiter
---
Limiting can be handled in many different ways, this is why the resister comes with some built in strategies.
Monitors the invocations frequency and requlates them in a given interval, most commonly used by external APIs with rules like 100 call / minute.
In this example we send 5 user on 10 thread, but still respecting a global 10 call / second limit.
- **IntervalStrategy** enforces a minimum invocation time for cases when the handler should wait the given amount of miliseconds between calls.
- **RateLimiterStrategy** this strategy monitors the invocations frequency and requlates them in a given interval, most commonly used by external APIs with rules like 100 call / minute.
- **UnboundStrategy** is a basic active thread limiter which simply enforces the thread to execute one job at a time, mostly useful for database related actions where the handler should not use too many connection.
```ts
import { Resistor, RateLimiterStrategy } from '@hisorange/resistor';
const worker = async (users: string[]) =>
axios.post('https://mockapi.io/test', users);
const buffer = new Resistor<string>(worker, {
threads: 10,
buffer: {
size: 5,
},
limiter: {
level: 'global', // Applied to every thread in aggregate
strategy: new RateLimiterStrategy({
interval: 1000,
occurrence: 10,
}),
},
});
// Not blocking just starts the work.
await buffer.push('admin');
await buffer.push('user1');
await buffer.push('user2');
/// ... Will return a blocking promise when the rate limiter strategy reached it's limit.
```
### Strategy - Unbound
---
Basic active thread limiter which simply enforces the thread to execute one job at a time, mostly useful for database related actions where the worker should not use too many connection.
In this example we send 5 user in a batch on 5 thread, but we do not wait between calls, the scheduler will call the next buffer when any thread is free to do so.
```ts
import { Resistor, UnboundStrategy } from '@hisorange/resistor';
const worker = async (users: string[]) =>
sequelize.insertMany('myUsers', users);
const buffer = new Resistor<string>(worker, {
threads: 5,
buffer: {
size: 10,
},
limiter: {
strategy: new UnboundStrategy(),
},
});
// Not blocking just starts the work.
await buffer.push('admin');
await buffer.push('user1');
await buffer.push('user2');
/// ... Will return a blocking promise when the rate limiter strategy reached it's limit.
```
### Auto Flush
---
Many times your buffer will not gona be filled to the exact maximum you set, and some records would hang around without ever being flushed out. This is where the auto flush comes in, you can provide an interval in the config which will be continouosly delayed from the last flush, and if when executed it will trigger a flush even if the buffer is not at its maximum.
```ts
const buffer = new Resistor<string>(() => Promise.resolve(), {
autoFlush: {
interval: 5_000, // Wait maximum 5 second before flushing the buffer.
},
});
```
### Events

@@ -52,12 +133,13 @@

| Event | Description |
| ------------------------ | ------------------------------------------------------------------------------------ |
| `.on('flush.invoked')` | Emitted when the flush handler is invoked either by auto flush, or the buffer. |
| `.on('flush.scheduled')` | Emitted when the flush hander has active buffer to be passed to the scheduler. |
| `.on('flush.executed')` | Emitted when the scheduler executed the handler. |
| `.on('flush.rejected')` | Emitted when the provided handler thrown an unhandled promise rejection. |
| `.on('flush.retrying')` | Emitted when the provided handler is rescheduled due to unhandled promise rejection. |
| `.on('thread.opened')` | Emitted after a new thread has been opened by the scheduler. |
| `.on('thread.closed')` | Emitted after a thread has been closed by the scheduler. |
| `.on('queue.empty')` | Emitted when the scheduler's waiting queue is empty. |
| Event | Description |
| ------------------------ | ----------------------------------------------------------------------------------- |
| `.on('flush.invoked')` | Emitted when the flush handler is invoked either by auto flush, or the buffer. |
| `.on('flush.scheduled')` | Emitted when the flush handler has active buffer to be passed to the scheduler. |
| `.on('flush.executed')` | Emitted when the scheduler executed the worker. |
| `.on('worker.rejected')` | Emitted when the provided worker thrown an unhandled promise rejection. |
| `.on('worker.retrying')` | Emitted when the provided worker is rescheduled due to unhandled promise rejection. |
| `.on('thread.opened')` | Emitted after a new thread has been opened by the scheduler. |
| `.on('thread.closed')` | Emitted after a thread has been closed by the scheduler. |
| `.on('queue.empty')` | Emitted when the scheduler's waiting queue is empty. |
| `.on('empty')` | Emitted when the resistor is absolutely empty. |

@@ -68,4 +150,55 @@ ### Retry Handler

When the handler throws an unhandled rejection, the resistor can schedule it for one more execution. By default this is turned off, because You can subscribe to the flush.rejected event and could handle the error by yourself, but if your workload allows a simple requeue without outside effect, then simply set the retry times to your desired value. Important to note, each retry will block the same thread until it's either solved or runs out of retry chances, but everytime the resistor will emit the flush.rejected and flush.retrying event with the respective information to handle.
When the worker throws an unhandled rejection, the resistor can schedule it for one more execution. By default this is turned off, because You can subscribe to the worker.rejected event and could handle the error by yourself, but if your workload allows a simple requeue without outside effect, then simply set the retry times to your desired value. Important to note, each retry will block the same thread until it's either solved or runs out of retry chances, but everytime the resistor will emit the worker.rejected and worker.retrying event with the respective information to handle.
```ts
const buffer = new Resistor<string>(() => Promise.resolve(), {
retries: {
timer: 50, // Reschedule the job 50 times maximum before giving up on it.
},
});
```
### Error Handling
---
It's painful to lose records in an asynchronus workflow, this is why the resistor emits an worker.rejected event with the failing records, so You can apply your custom logic. The best case would be to handle the error in the given worker fn, but this is a failsafe in case if anything slips through the worker.
### Analytics / Health Check
---
Measure what matters! But seriously, to implement a healthcheck or any monitoring, the package provides a full fledged analytics system to help you understand and optimize your workload :)
```ts
const usage = resistor.analytics;
{
flush: {
invoked: 0,
scheduled: 0,
executed: 0,
errors: 0,
processTime: 0,
};
thread: {
active: 0,
opened: 0,
closed: 0,
maximum: 0,
};
queue: {
waiting: 0,
maximum: 0,
};
record: {
received: 0,
buffered: 0,
};
};
```
### Whats with the name?

@@ -81,2 +214,7 @@

##### 1.1.5
- Usage examples
- Change in event keys to match the worker naming
##### 1.1.4

@@ -83,0 +221,0 @@

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc