Socket
Socket
Sign inDemoInstall

@hisorange/resistor

Package Overview
Dependencies
1
Maintainers
1
Versions
9
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 1.1.6 to 2.0.0

4

build/interfaces/worker.interface.d.ts

@@ -1,1 +0,3 @@

export declare type IWorker<I> = (records: I[]) => Promise<void>;
export declare type ISingleRecordWorker<I> = (record: I, threadId: number) => Promise<void>;
export declare type IBufferedWorker<I> = (records: I[], threadId: number) => Promise<void>;
export declare type IWorker<I> = IBufferedWorker<I> | ISingleRecordWorker<I>;

@@ -12,3 +12,2 @@ /// <reference types="node" />

export declare class Resistor<I> implements Pick<EventEmitter, 'on' | 'once' | 'off'> {
protected worker: IWorker<I>;
/**

@@ -45,2 +44,6 @@ * Temporary buffer to store the records until the worker flushes them.

/**
* Store the reference to the worker function.
*/
protected worker: IWorker<I>;
/**
* Initialize a configured resistor.

@@ -89,3 +92,3 @@ */

*/
protected schedule(work: () => Promise<void>, waitForWorker: boolean): Promise<void>;
protected schedule(job: (threadId: number) => Promise<void>, waitForWorker: boolean): Promise<void>;
/**

@@ -92,0 +95,0 @@ * Push a record to the buffer, returns a promise which should be awaited so

@@ -16,3 +16,2 @@ "use strict";

constructor(worker, config) {
this.worker = worker;
/**

@@ -78,2 +77,9 @@ * Temporary buffer to store the records until the worker flushes them.

}
// Help the type system.
if (this.config.buffer.size === 1) {
this.worker = worker;
}
else {
this.worker = worker;
}
// Start the auto flush timer if it's configured.

@@ -131,3 +137,3 @@ this.register();

while (this.waitQueue.length) {
await Promise.all(this.vThreads);
await Promise.all(this.waitQueue);
}

@@ -161,9 +167,16 @@ // Queue maybe empty but the vThreads could be loaded.

this.emitter.emit(events_2.EVENTS.FLUSH_SCHEDULED, ++this._analytics.worker.scheduled);
let records;
// We cut the maximum record and leave an empty array behind,
// this is needed in case an async .push has been called while an other call started the flush.
const records = this.buffer.splice(0, this.config.buffer.size);
this._analytics.record.buffered -= records.length;
if (this.config.buffer.size === 1) {
records = this.buffer.pop();
this._analytics.record.buffered--;
}
else {
records = this.buffer.splice(0, this.config.buffer.size);
this._analytics.record.buffered -= records.length;
}
// Retry counter.
let retries = 0;
const job = () => this.worker(records).catch(async (rejection) => {
const job = (threadId) => this.worker(records, threadId).catch(async (rejection) => {
this.emitter.emit(events_2.EVENTS.WORKER_REJECTED, {

@@ -183,3 +196,3 @@ rejection,

});
await job();
await job(threadId);
}

@@ -198,3 +211,3 @@ }

*/
async schedule(work, waitForWorker) {
async schedule(job, waitForWorker) {
// Limit the maximum "virtual threads" to the configured threshold.

@@ -211,3 +224,3 @@ if (this._analytics.thread.active >= this.config.threads) {

// Push the execution to a free thread.
const threadId = this.vThreads.push(work()) - 1;
const threadId = this.vThreads.push(job(this.vThreads.length)) - 1;
// Hook to handle thread removal.

@@ -214,0 +227,0 @@ const worker = this.vThreads[threadId].then(() => {

{
"name": "@hisorange/resistor",
"version": "1.1.6",
"version": "2.0.0",
"description": "Versatily resource load throttler with extensible strategies, configuration and virtual thread management.",

@@ -5,0 +5,0 @@ "keywords": [

@@ -29,7 +29,7 @@ ![Resistor](https://user-images.githubusercontent.com/3441017/119745067-ab632600-be8d-11eb-93e1-24d34ffe2a92.png)

const worker = async (urls: string[]) => fetch(urls[0]);
const buffer = new Resistor<string>(worker, {
const worker = async (urls: string[]) => urls.forEach(fetch));
const buffer = new Resistor(worker, {
threads: 2,
buffer: {
size: 1,
size: 10,
},

@@ -46,5 +46,19 @@ limiter: {

await buffer.push('https://hisorange.me');
await buffer.push('https://google.com');
await buffer.push('https://artgen.io');
// Will wait 5 second until the promise resolves.
await buffer.push('https://github.com');
// New feature (2.x.x): set the buffer size to 1, and the worker will only receive
// a single record and not an array of records.
// This syntax makes it easier to wrap functions
const resistor = new Resistor(fetch, {
thread: 16,
buffer: {
size: 1
}
});
// Will call the fetch directly on a maximum of 16 concurrent thread.
resistor.push('https://enalin.co');
resistor.push('https://prerender.io');
```

@@ -213,2 +227,7 @@

##### 2.0.0
- Fixed an edge case on shutdown where the buffer may keep 1 record in progress but says it's stopped
- Added the new single record option behavior when the buffer size is 1
##### 1.1.6

@@ -215,0 +234,0 @@

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc