@hisorange/resistor
Advanced tools
Comparing version 1.1.6 to 2.0.0
@@ -1,1 +0,3 @@ | ||
export declare type IWorker<I> = (records: I[]) => Promise<void>; | ||
export declare type ISingleRecordWorker<I> = (record: I, threadId: number) => Promise<void>; | ||
export declare type IBufferedWorker<I> = (records: I[], threadId: number) => Promise<void>; | ||
export declare type IWorker<I> = IBufferedWorker<I> | ISingleRecordWorker<I>; |
@@ -12,3 +12,2 @@ /// <reference types="node" /> | ||
export declare class Resistor<I> implements Pick<EventEmitter, 'on' | 'once' | 'off'> { | ||
protected worker: IWorker<I>; | ||
/** | ||
@@ -45,2 +44,6 @@ * Temporary buffer to store the records until the worker flushes them. | ||
/** | ||
* Store the reference to the worker function. | ||
*/ | ||
protected worker: IWorker<I>; | ||
/** | ||
* Initialize a configured resistor. | ||
@@ -89,3 +92,3 @@ */ | ||
*/ | ||
protected schedule(work: () => Promise<void>, waitForWorker: boolean): Promise<void>; | ||
protected schedule(job: (threadId: number) => Promise<void>, waitForWorker: boolean): Promise<void>; | ||
/** | ||
@@ -92,0 +95,0 @@ * Push a record to the buffer, returns a promise which should be awaited so |
@@ -16,3 +16,2 @@ "use strict"; | ||
constructor(worker, config) { | ||
this.worker = worker; | ||
/** | ||
@@ -78,2 +77,9 @@ * Temporary buffer to store the records until the worker flushes them. | ||
} | ||
// Help the type system. | ||
if (this.config.buffer.size === 1) { | ||
this.worker = worker; | ||
} | ||
else { | ||
this.worker = worker; | ||
} | ||
// Start the auto flush timer if it's configured. | ||
@@ -131,3 +137,3 @@ this.register(); | ||
while (this.waitQueue.length) { | ||
await Promise.all(this.vThreads); | ||
await Promise.all(this.waitQueue); | ||
} | ||
@@ -161,9 +167,16 @@ // Queue maybe empty but the vThreads could be loaded. | ||
this.emitter.emit(events_2.EVENTS.FLUSH_SCHEDULED, ++this._analytics.worker.scheduled); | ||
let records; | ||
// We cut the maximum record and leave an empty array behind, | ||
// this is needed in case an async .push has been called while an other call started the flush. | ||
const records = this.buffer.splice(0, this.config.buffer.size); | ||
this._analytics.record.buffered -= records.length; | ||
if (this.config.buffer.size === 1) { | ||
records = this.buffer.pop(); | ||
this._analytics.record.buffered--; | ||
} | ||
else { | ||
records = this.buffer.splice(0, this.config.buffer.size); | ||
this._analytics.record.buffered -= records.length; | ||
} | ||
// Retry counter. | ||
let retries = 0; | ||
const job = () => this.worker(records).catch(async (rejection) => { | ||
const job = (threadId) => this.worker(records, threadId).catch(async (rejection) => { | ||
this.emitter.emit(events_2.EVENTS.WORKER_REJECTED, { | ||
@@ -183,3 +196,3 @@ rejection, | ||
}); | ||
await job(); | ||
await job(threadId); | ||
} | ||
@@ -198,3 +211,3 @@ } | ||
*/ | ||
async schedule(work, waitForWorker) { | ||
async schedule(job, waitForWorker) { | ||
// Limit the maximum "virtual threads" to the configured threshold. | ||
@@ -211,3 +224,3 @@ if (this._analytics.thread.active >= this.config.threads) { | ||
// Push the execution to a free thread. | ||
const threadId = this.vThreads.push(work()) - 1; | ||
const threadId = this.vThreads.push(job(this.vThreads.length)) - 1; | ||
// Hook to handle thread removal. | ||
@@ -214,0 +227,0 @@ const worker = this.vThreads[threadId].then(() => { |
{ | ||
"name": "@hisorange/resistor", | ||
"version": "1.1.6", | ||
"version": "2.0.0", | ||
"description": "Versatily resource load throttler with extensible strategies, configuration and virtual thread management.", | ||
@@ -5,0 +5,0 @@ "keywords": [ |
@@ -29,7 +29,7 @@ ![Resistor](https://user-images.githubusercontent.com/3441017/119745067-ab632600-be8d-11eb-93e1-24d34ffe2a92.png) | ||
const worker = async (urls: string[]) => fetch(urls[0]); | ||
const buffer = new Resistor<string>(worker, { | ||
const worker = async (urls: string[]) => urls.forEach(fetch)); | ||
const buffer = new Resistor(worker, { | ||
threads: 2, | ||
buffer: { | ||
size: 1, | ||
size: 10, | ||
}, | ||
@@ -46,5 +46,19 @@ limiter: { | ||
await buffer.push('https://hisorange.me'); | ||
await buffer.push('https://google.com'); | ||
await buffer.push('https://artgen.io'); | ||
// Will wait 5 second until the promise resolves. | ||
await buffer.push('https://github.com'); | ||
// New feature (2.x.x): set the buffer size to 1, and the worker will only receive | ||
// a single record and not an array of records. | ||
// This syntax makes it easier to wrap functions | ||
const resistor = new Resistor(fetch, { | ||
thread: 16, | ||
buffer: { | ||
size: 1 | ||
} | ||
}); | ||
// Will call the fetch directly on a maximum of 16 concurrent thread. | ||
resistor.push('https://enalin.co'); | ||
resistor.push('https://prerender.io'); | ||
``` | ||
@@ -213,2 +227,7 @@ | ||
##### 2.0.0 | ||
- Fixed an edge case on shutdown where the buffer may keep 1 record in progress but says it's stopped | ||
- Added the new single record option behavior when the buffer size is 1 | ||
##### 1.1.6 | ||
@@ -215,0 +234,0 @@ |
Sorry, the diff of this file is not supported yet
47741
665
256