🚀 Big News: Socket Acquires Coana to Bring Reachability Analysis to Every Appsec Team.Learn more
Socket
Book a DemoInstallSign in
Socket

async-transforms

Package Overview
Dependencies
Maintainers
1
Versions
10
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

async-transforms - npm Package Compare versions

Comparing version

to
1.0.7

tsconfig.json

4

package.json
{
"name": "async-transforms",
"version": "1.0.6",
"version": "1.0.7",
"description": "Asynchronous stream transforms",

@@ -28,3 +28,3 @@ "type": "module",

"scripts": {
"prepare": "bash build.sh",
"prepublishOnly": "bash build.sh",
"test": "mocha"

@@ -31,0 +31,0 @@ },

@@ -5,5 +5,5 @@

export interface Options {
objectMode?: boolean;
order?: boolean;
tasks?: number;
objectMode: boolean;
order: boolean;
tasks: number;
}

@@ -25,3 +25,3 @@

*/
export function map(handler: (arg: any, index: number) => any, options?: Options): stream.Transform;
export function map(handler: (arg: any, index: number) => any, options?: Partial<Options>): stream.Transform;

@@ -32,3 +32,3 @@ /**

*/
export function filter(handler: (arg: any, index: number) => boolean|Promise<boolean>, options?: Options): stream.Transform;
export function filter(handler: (arg: any, index: number) => boolean|Promise<boolean>, options?: Partial<Options>): stream.Transform;

@@ -35,0 +35,0 @@ /**

@@ -22,7 +22,8 @@ import stream from 'stream';

* @param {function(?, number): ?} handler
* @param {{objectMode: boolean, order: boolean, tasks: number}=} options
* @param {Partial<import('.').Options>=} options
* @return {!stream.Transform}
*/
export function map(handler, options={}) {
options = Object.assign({
export function map(handler, options) {
/** @type {import('.').Options} */
const o = Object.assign({
objectMode: true,

@@ -35,13 +36,19 @@ order: false,

let count = 0;
/** @type {stream.TransformCallback?} */
let flushCallback = null;
options.tasks = Math.ceil(options.tasks) || 0;
const hasTasks = options.tasks > 0;
o.tasks = Math.ceil(o.tasks) || 0;
const hasTasks = o.tasks > 0;
/** @type {{chunk: any, encoding: string}[]} */
const pending = [];
let orderPushCount = 0;
/** @type {any[]} */
const orderDone = [];
const s = new stream.Transform({
objectMode: options.objectMode,
objectMode: o.objectMode,
// nb. Passing writeableHighWaterMark here seems to do nothing, we just enforce tasks manually.

@@ -56,3 +63,3 @@

if (!hasTasks || count < options.tasks) {
if (!hasTasks || count < o.tasks) {
internalTransform(chunk, encoding);

@@ -77,2 +84,6 @@ } else {

/**
* @param {any} chunk
* @param {string} encoding
*/
function internalTransform(chunk, encoding) {

@@ -88,2 +99,7 @@ ++count;

/**
* @param {number} localIndex
* @param {any} chunk
* @param {any} result
*/
function internalResultHandler(localIndex, chunk, result) {

@@ -94,3 +110,3 @@ if (result == null) {

if (options.order) {
if (o.order) {
const doneIndex = localIndex - orderPushCount;

@@ -119,4 +135,4 @@ orderDone[doneIndex] = result;

if (pending.length && count < options.tasks) {
const {chunk, encoding} = pending.shift();
if (pending.length && count < o.tasks) {
const {chunk, encoding} = /** @type {typeof pending[0]} */ (pending.shift());
internalTransform(chunk, encoding);

@@ -136,8 +152,8 @@ } else if (count === 0 && flushCallback) {

* @param {function(?, number): ?} handler
* @param {{objectMode: boolean, order: boolean, tasks: number}=} options
* @param {Partial<import('.').Options>=} options
* @return {!stream.Transform}
*/
export function filter(handler, options={}) {
return map(async (chunk) => {
const result = await handler(chunk);
export function filter(handler, options) {
return map(async (chunk, i) => {
const result = await handler(chunk, i);
return result ? chunk : filterSymbol;

@@ -154,6 +170,7 @@ }, options);

*
* @param {function(!Array<?>): (!Array<?>|!Promise<!Array<?>>}
* @param {function(!Array<?>): (!Array<?>|!Promise<!Array<?>>)} handler
* @return {!stream.Transform}
*/
export function gate(handler) {
/** @type {any[]} */
const chunks = [];

@@ -191,4 +208,8 @@

let s;
/** @type {Promise<any[]>} */
const promise = new Promise((resolve, reject) => {
s = gate((arr) => resolve(arr));
s = gate((arr) => {
resolve(arr);
return [];
});
s.on('error', reject);

@@ -195,0 +216,0 @@ });

export interface PoolOptions {
/**
* Minimum number of tasks to keep around. Task startup can be expensive. Default of one.
*/
minTasks: number,
/**
* Maximum number of tasks to use. Default is 75% of your CPU count, rounded up, with a minimum
* of one.
*/
tasks?: number,
tasks: number,

@@ -13,4 +19,7 @@ /**

* there's immediately pending tasks), increase if your tasks have high setup costs.
*
* This can be `Infinity` to keep tasks around forever.
*/
expiry?: number,
expiry: number,
}

@@ -23,2 +32,2 @@

*/
export function pool(dep: string, options?: PoolOptions): (...any) => Promise<any>;
export function pool(dep: string, options?: Partial<PoolOptions>): (...any) => Promise<any>;

@@ -31,7 +31,9 @@ import worker from 'worker_threads';

* @param {string} dep to run script from
* @param {{tasks?: number, expiry?: number}} options
* @param {Partial<import('.').PoolOptions>} options
* @return {function(...any): Promise<any>}
*/
export function pool(dep, options) {
options = Object.assign({
/** @type {import('.').PoolOptions} */
const o = Object.assign({
minTasks: 1,
tasks: cpuCount * 0.75,

@@ -41,8 +43,9 @@ expiry: 1000,

options.expiry = Math.max(options.expiry, 0) || 0;
o.expiry = Math.max(o.expiry, 0) || 0;
if (options.tasks > 0 && options.tasks < 1) {
options.tasks = cpuCount * options.tasks;
if (o.tasks > 0 && o.tasks < 1) {
o.tasks = cpuCount * o.tasks;
}
options.tasks = Math.max(Math.ceil(options.tasks), 0) || 1;
o.tasks = Math.max(Math.ceil(o.tasks), 0) || 1;
o.minTasks = Math.max(0, Math.min(o.tasks, ~~o.minTasks));

@@ -55,38 +58,40 @@ if (!path.isAbsolute(dep)) {

/** @type {Map<worker.Worker, number>} */
/** @type {Map<worker.Worker, NodeJS.Timeout|undefined>} */
const availableWorkers = new Map();
/** @type {{args: any[], resolve: (any) => void}[]} */
const prepareWorker = () => {
++activeWorkers;
const w = createWorker(dep);
w.on('message', ({ok}) => {
if (ok !== true) {
throw new Error(`got non-ok: ${ok}`);
}
releaseWorker(w);
});
};
for (let i = 0; i < o.minTasks; ++i) {
prepareWorker();
}
/** @type {{args: any[], resolve: (arg: any) => void}[]} */
const pendingTasks = [];
return async (...args) => {
/** @type {worker.Worker} */
let w;
if (availableWorkers.size) {
for (w of availableWorkers.keys()) {
break; // get 1st worker from map
}
/** @type {worker.Worker} */
const w = availableWorkers.keys().next().value;
const timeout = availableWorkers.get(w);
availableWorkers.delete(w);
clearTimeout(timeout);
} else if (activeWorkers < options.tasks) {
if (isModule) {
w = new worker.Worker(workerTarget, {workerData: {dep}});
} else {
// In commonJS mode, we have to _again_ require the script, as the Worker ctor incorrectly
// only allows ".js" (which attempts to run as a /module/, because of `type: module`) or
// ".mjs" extensions (which is always a module).
// This will probably be fixed in a future Node. Sounds like a bug.
const code = `require(${JSON.stringify(workerTarget)});`;
w = new worker.Worker(code, {workerData: {dep}, eval: true});
}
++activeWorkers;
} else {
return new Promise((resolve) => {
pendingTasks.push({args, resolve});
});
timeout && clearTimeout(timeout);
return enact(w, args);
}
return enact(w, args);
// Start a new worker, but still push the work onto the queue for when it's ready.
if (activeWorkers < o.tasks) {
prepareWorker();
}
return new Promise((resolve) => {
pendingTasks.push({args, resolve});
});
};

@@ -105,2 +110,3 @@

return new Promise((resolve, reject) => {
/** @type {(arg: {result: any, error: Error}) => void} */
const handler = ({result, error}) => {

@@ -119,6 +125,10 @@ port1.off('message', handler); // important to allow GC

*/
function terimateWorker(w) {
function maybeTerimateWorker(w) {
if (activeWorkers > o.minTasks) {
w.terminate();
availableWorkers.delete(w);
} else {
availableWorkers.set(w, undefined);
}
--activeWorkers;
w.terminate();
availableWorkers.delete(w);
}

@@ -130,14 +140,33 @@

function releaseWorker(w) {
if (pendingTasks.length) {
const immediateTask = pendingTasks.shift();
if (immediateTask) {
// There's an immediate task, consume it and go.
const {args, resolve} = pendingTasks.shift();
const {args, resolve} = immediateTask;
resolve(enact(w, args));
} else if (options.expiry) {
} else if (isFinite(o.expiry)) {
// Otherwise, put it into our queue to be deleted soon.
const timeout = setTimeout(terimateWorker.bind(null, w), options.expiry);
const timeout = setTimeout(maybeTerimateWorker.bind(null, w), o.expiry);
availableWorkers.set(w, timeout);
} else {
terimateWorker(w);
availableWorkers.set(w, undefined);
}
}
}
/**
* @param {string} dep
*/
function createWorker(dep) {
let w;
if (isModule) {
return new worker.Worker(workerTarget, {workerData: {dep}});
}
// In commonJS mode, we have to _again_ require the script, as the Worker ctor incorrectly
// only allows ".js" (which attempts to run as a /module/, because of `type: module`) or
// ".mjs" extensions (which is always a module).
// This will probably be fixed in a future Node. Sounds like a bug.
const code = `require(${JSON.stringify(workerTarget)});`;
return new worker.Worker(code, {workerData: {dep}, eval: true});
}

@@ -7,3 +7,5 @@ /**

if (worker.isMainThread) {
const {parentPort} = worker;
if (worker.isMainThread || !parentPort) {
throw new TypeError('cannot run on main thread');

@@ -16,3 +18,3 @@ }

.then(({default: method}) => {
worker.parentPort.on('message', ({args, port}) => {
parentPort.on('message', ({args, port}) => {
Promise.resolve()

@@ -24,6 +26,7 @@ .then(() => method(...args))

});
parentPort.postMessage({ok: true});
})
.catch((error) => {
// Failure mode: the module couldn't be imported, complain loudly.
worker.parentPort.on('message', ({port}) => {
parentPort.on('message', ({port}) => {
port.postMessage({error});

@@ -30,0 +33,0 @@ port.close();

export interface PoolOptions {
/**
* Minimum number of tasks to keep around. Task startup can be expensive. Default of one.
*/
minTasks: number,
/**
* Maximum number of tasks to use. Default is 75% of your CPU count, rounded up, with a minimum
* of one.
*/
tasks?: number,
tasks: number,

@@ -13,4 +19,7 @@ /**

* there's immediately pending tasks), increase if your tasks have high setup costs.
*
* This can be `Infinity` to keep tasks around forever.
*/
expiry?: number,
expiry: number,
}

@@ -23,2 +32,2 @@

*/
export function pool(dep: string, options?: PoolOptions): (...any) => Promise<any>;
export function pool(dep: string, options?: Partial<PoolOptions>): (...any) => Promise<any>;

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet