Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

async-transforms

Package Overview
Dependencies
Maintainers
1
Versions
10
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

async-transforms - npm Package Compare versions

Comparing version 1.0.6 to 1.0.7

tsconfig.json

4

package.json
{
"name": "async-transforms",
"version": "1.0.6",
"version": "1.0.7",
"description": "Asynchronous stream transforms",

@@ -28,3 +28,3 @@ "type": "module",

"scripts": {
"prepare": "bash build.sh",
"prepublishOnly": "bash build.sh",
"test": "mocha"

@@ -31,0 +31,0 @@ },

@@ -5,5 +5,5 @@

export interface Options {
objectMode?: boolean;
order?: boolean;
tasks?: number;
objectMode: boolean;
order: boolean;
tasks: number;
}

@@ -25,3 +25,3 @@

*/
export function map(handler: (arg: any, index: number) => any, options?: Options): stream.Transform;
export function map(handler: (arg: any, index: number) => any, options?: Partial<Options>): stream.Transform;

@@ -32,3 +32,3 @@ /**

*/
export function filter(handler: (arg: any, index: number) => boolean|Promise<boolean>, options?: Options): stream.Transform;
export function filter(handler: (arg: any, index: number) => boolean|Promise<boolean>, options?: Partial<Options>): stream.Transform;

@@ -35,0 +35,0 @@ /**

@@ -22,7 +22,8 @@ import stream from 'stream';

* @param {function(?, number): ?} handler
* @param {{objectMode: boolean, order: boolean, tasks: number}=} options
* @param {Partial<import('.').Options>=} options
* @return {!stream.Transform}
*/
export function map(handler, options={}) {
options = Object.assign({
export function map(handler, options) {
/** @type {import('.').Options} */
const o = Object.assign({
objectMode: true,

@@ -35,13 +36,19 @@ order: false,

let count = 0;
/** @type {stream.TransformCallback?} */
let flushCallback = null;
options.tasks = Math.ceil(options.tasks) || 0;
const hasTasks = options.tasks > 0;
o.tasks = Math.ceil(o.tasks) || 0;
const hasTasks = o.tasks > 0;
/** @type {{chunk: any, encoding: string}[]} */
const pending = [];
let orderPushCount = 0;
/** @type {any[]} */
const orderDone = [];
const s = new stream.Transform({
objectMode: options.objectMode,
objectMode: o.objectMode,
// nb. Passing writeableHighWaterMark here seems to do nothing, we just enforce tasks manually.

@@ -56,3 +63,3 @@

if (!hasTasks || count < options.tasks) {
if (!hasTasks || count < o.tasks) {
internalTransform(chunk, encoding);

@@ -77,2 +84,6 @@ } else {

/**
* @param {any} chunk
* @param {string} encoding
*/
function internalTransform(chunk, encoding) {

@@ -88,2 +99,7 @@ ++count;

/**
* @param {number} localIndex
* @param {any} chunk
* @param {any} result
*/
function internalResultHandler(localIndex, chunk, result) {

@@ -94,3 +110,3 @@ if (result == null) {

if (options.order) {
if (o.order) {
const doneIndex = localIndex - orderPushCount;

@@ -119,4 +135,4 @@ orderDone[doneIndex] = result;

if (pending.length && count < options.tasks) {
const {chunk, encoding} = pending.shift();
if (pending.length && count < o.tasks) {
const {chunk, encoding} = /** @type {typeof pending[0]} */ (pending.shift());
internalTransform(chunk, encoding);

@@ -136,8 +152,8 @@ } else if (count === 0 && flushCallback) {

* @param {function(?, number): ?} handler
* @param {{objectMode: boolean, order: boolean, tasks: number}=} options
* @param {Partial<import('.').Options>=} options
* @return {!stream.Transform}
*/
export function filter(handler, options={}) {
return map(async (chunk) => {
const result = await handler(chunk);
export function filter(handler, options) {
return map(async (chunk, i) => {
const result = await handler(chunk, i);
return result ? chunk : filterSymbol;

@@ -154,6 +170,7 @@ }, options);

*
* @param {function(!Array<?>): (!Array<?>|!Promise<!Array<?>>}
* @param {function(!Array<?>): (!Array<?>|!Promise<!Array<?>>)} handler
* @return {!stream.Transform}
*/
export function gate(handler) {
/** @type {any[]} */
const chunks = [];

@@ -191,4 +208,8 @@

let s;
/** @type {Promise<any[]>} */
const promise = new Promise((resolve, reject) => {
s = gate((arr) => resolve(arr));
s = gate((arr) => {
resolve(arr);
return [];
});
s.on('error', reject);

@@ -195,0 +216,0 @@ });

export interface PoolOptions {
/**
* Minimum number of tasks to keep around. Task startup can be expensive. Default of one.
*/
minTasks: number,
/**
* Maximum number of tasks to use. Default is 75% of your CPU count, rounded up, with a minimum
* of one.
*/
tasks?: number,
tasks: number,

@@ -13,4 +19,7 @@ /**

* there's immediately pending tasks), increase if your tasks have high setup costs.
*
* This can be `Infinity` to keep tasks around forever.
*/
expiry?: number,
expiry: number,
}

@@ -23,2 +32,2 @@

*/
export function pool(dep: string, options?: PoolOptions): (...any) => Promise<any>;
export function pool(dep: string, options?: Partial<PoolOptions>): (...any) => Promise<any>;

@@ -31,7 +31,9 @@ import worker from 'worker_threads';

* @param {string} dep to run script from
* @param {{tasks?: number, expiry?: number}} options
* @param {Partial<import('.').PoolOptions>} options
* @return {function(...any): Promise<any>}
*/
export function pool(dep, options) {
options = Object.assign({
/** @type {import('.').PoolOptions} */
const o = Object.assign({
minTasks: 1,
tasks: cpuCount * 0.75,

@@ -41,8 +43,9 @@ expiry: 1000,

options.expiry = Math.max(options.expiry, 0) || 0;
o.expiry = Math.max(o.expiry, 0) || 0;
if (options.tasks > 0 && options.tasks < 1) {
options.tasks = cpuCount * options.tasks;
if (o.tasks > 0 && o.tasks < 1) {
o.tasks = cpuCount * o.tasks;
}
options.tasks = Math.max(Math.ceil(options.tasks), 0) || 1;
o.tasks = Math.max(Math.ceil(o.tasks), 0) || 1;
o.minTasks = Math.max(0, Math.min(o.tasks, ~~o.minTasks));

@@ -55,38 +58,40 @@ if (!path.isAbsolute(dep)) {

/** @type {Map<worker.Worker, number>} */
/** @type {Map<worker.Worker, NodeJS.Timeout|undefined>} */
const availableWorkers = new Map();
/** @type {{args: any[], resolve: (any) => void}[]} */
const prepareWorker = () => {
++activeWorkers;
const w = createWorker(dep);
w.on('message', ({ok}) => {
if (ok !== true) {
throw new Error(`got non-ok: ${ok}`);
}
releaseWorker(w);
});
};
for (let i = 0; i < o.minTasks; ++i) {
prepareWorker();
}
/** @type {{args: any[], resolve: (arg: any) => void}[]} */
const pendingTasks = [];
return async (...args) => {
/** @type {worker.Worker} */
let w;
if (availableWorkers.size) {
for (w of availableWorkers.keys()) {
break; // get 1st worker from map
}
/** @type {worker.Worker} */
const w = availableWorkers.keys().next().value;
const timeout = availableWorkers.get(w);
availableWorkers.delete(w);
clearTimeout(timeout);
} else if (activeWorkers < options.tasks) {
if (isModule) {
w = new worker.Worker(workerTarget, {workerData: {dep}});
} else {
// In commonJS mode, we have to _again_ require the script, as the Worker ctor incorrectly
// only allows ".js" (which attempts to run as a /module/, because of `type: module`) or
// ".mjs" extensions (which is always a module).
// This will probably be fixed in a future Node. Sounds like a bug.
const code = `require(${JSON.stringify(workerTarget)});`;
w = new worker.Worker(code, {workerData: {dep}, eval: true});
}
++activeWorkers;
} else {
return new Promise((resolve) => {
pendingTasks.push({args, resolve});
});
timeout && clearTimeout(timeout);
return enact(w, args);
}
return enact(w, args);
// Start a new worker, but still push the work onto the queue for when it's ready.
if (activeWorkers < o.tasks) {
prepareWorker();
}
return new Promise((resolve) => {
pendingTasks.push({args, resolve});
});
};

@@ -105,2 +110,3 @@

return new Promise((resolve, reject) => {
/** @type {(arg: {result: any, error: Error}) => void} */
const handler = ({result, error}) => {

@@ -119,6 +125,10 @@ port1.off('message', handler); // important to allow GC

*/
function terimateWorker(w) {
function maybeTerimateWorker(w) {
if (activeWorkers > o.minTasks) {
w.terminate();
availableWorkers.delete(w);
} else {
availableWorkers.set(w, undefined);
}
--activeWorkers;
w.terminate();
availableWorkers.delete(w);
}

@@ -130,14 +140,33 @@

function releaseWorker(w) {
if (pendingTasks.length) {
const immediateTask = pendingTasks.shift();
if (immediateTask) {
// There's an immediate task, consume it and go.
const {args, resolve} = pendingTasks.shift();
const {args, resolve} = immediateTask;
resolve(enact(w, args));
} else if (options.expiry) {
} else if (isFinite(o.expiry)) {
// Otherwise, put it into our queue to be deleted soon.
const timeout = setTimeout(terimateWorker.bind(null, w), options.expiry);
const timeout = setTimeout(maybeTerimateWorker.bind(null, w), o.expiry);
availableWorkers.set(w, timeout);
} else {
terimateWorker(w);
availableWorkers.set(w, undefined);
}
}
}
/**
* @param {string} dep
*/
function createWorker(dep) {
let w;
if (isModule) {
return new worker.Worker(workerTarget, {workerData: {dep}});
}
// In commonJS mode, we have to _again_ require the script, as the Worker ctor incorrectly
// only allows ".js" (which attempts to run as a /module/, because of `type: module`) or
// ".mjs" extensions (which is always a module).
// This will probably be fixed in a future Node. Sounds like a bug.
const code = `require(${JSON.stringify(workerTarget)});`;
return new worker.Worker(code, {workerData: {dep}, eval: true});
}

@@ -7,3 +7,5 @@ /**

if (worker.isMainThread) {
const {parentPort} = worker;
if (worker.isMainThread || !parentPort) {
throw new TypeError('cannot run on main thread');

@@ -16,3 +18,3 @@ }

.then(({default: method}) => {
worker.parentPort.on('message', ({args, port}) => {
parentPort.on('message', ({args, port}) => {
Promise.resolve()

@@ -24,6 +26,7 @@ .then(() => method(...args))

});
parentPort.postMessage({ok: true});
})
.catch((error) => {
// Failure mode: the module couldn't be imported, complain loudly.
worker.parentPort.on('message', ({port}) => {
parentPort.on('message', ({port}) => {
port.postMessage({error});

@@ -30,0 +33,0 @@ port.close();

export interface PoolOptions {
/**
* Minimum number of tasks to keep around. Task startup can be expensive. Default of one.
*/
minTasks: number,
/**
* Maximum number of tasks to use. Default is 75% of your CPU count, rounded up, with a minimum
* of one.
*/
tasks?: number,
tasks: number,

@@ -13,4 +19,7 @@ /**

* there's immediately pending tasks), increase if your tasks have high setup costs.
*
* This can be `Infinity` to keep tasks around forever.
*/
expiry?: number,
expiry: number,
}

@@ -23,2 +32,2 @@

*/
export function pool(dep: string, options?: PoolOptions): (...any) => Promise<any>;
export function pool(dep: string, options?: Partial<PoolOptions>): (...any) => Promise<any>;

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc