
Security News
MCP Community Begins Work on Official MCP Metaregistry
The MCP community is launching an official registry to standardize AI tool discovery and let agents dynamically find and install MCP servers.
Each is a single elegant function to iterate over values both in sequential
, parallel
, and concurrent
mode. It is a powerful and mature library.
Main functionalities include:
Use your favorite package manager to install the each
package:
npm install each
With ESM:
import each from "each";
With CommonJS:
const each = require("each");
In its simplest form, Each is used as a single function, a bit like Promise.all
or Promise.allSettled
but, arguably, with more flexibility and easier to read.
This example defines list 3 items to process along the concurrency level and a function hander as it last argument:
const stack = [];
const result = await each(
[
{ message: "Is", timeout: 30 },
{ message: "Gollum", timeout: 20 },
{ message: "Around", timeout: 10 },
],
{ concurrency: true },
({ message, timeout }) =>
new Promise((resolve) =>
setTimeout(() => stack.push(message) && resolve(message), timeout),
),
);
assert.equal(result.join(" "), "Is Gollum Around");
assert.equal(stack.join(" "), "Around Gollum Is");
It is equivalent to passing items as functions without a function handler and with the concurrency level defined as true
.
const stack = [];
const result = await each(
[
() =>
new Promise((resolve) => {
setTimeout(() => stack.push("Is") && resolve("Is"), 30);
}),
() =>
new Promise((resolve) => {
setTimeout(() => stack.push("Gollum") && resolve("Gollum"), 20);
}),
() =>
new Promise((resolve) => {
setTimeout(() => stack.push("Around") && resolve("Around"), 10);
}),
],
true,
);
assert.equal(result.join(" "), "Is Gollum Around");
assert.equal(stack.join(" "), "Around Gollum Is");
In its advanced form, Each is a scheduler with advanced functionalities to control the execution process.
const scheduler = each({ concurrency: true });
const result = await Promise.all([
scheduler.call([
() => new Promise((resolve) => resolve(1)),
() => new Promise((resolve) => resolve(2)),
]),
scheduler.call([
() => new Promise((resolve) => resolve(3)),
() => new Promise((resolve) => resolve(4)),
]),
]);
assert.deepStrictEqual(result, [
[1, 2],
[3, 4],
]);
Signature is each(...[items|options|concurrency|handler])
.
All arguments are optional and can be defined in any order.
Multiple items (arrays) are merged. Muliple options (objects) are merged as well.
items: array
handler
function.option: object
concurrency: boolean | integer
concurrency
option property. Jump to the concurrency section below.handler: function
concurrency
(default 1
)false
is converted to 1
where functions are executed sequentially. The value true
is converted to -1
where all functions run simultaneously.fluent
(default true
)
Expose a fluent API where the functions may be chained.pause
(default false
)resume
is called.relax
(default false
)call
is further executed.call(handler)
fluent
option is false
, it is also possible to chain additional functions.concurrency([level])
end([error|options])
error(error|null)
null
to set the scheduler to a normal state.options
pause
resume
Output order is consistent with input order. The value returned by a function or resolved by a promise is always returned in the same position as it was originally defined.
Another type is returned as is unless a handler function is defined.
Each iterates over any type of item. If no handler is defined, functions and and promises get a special treatment. Functions are executed and may return a promise and promises are resolved.
Here is a quick example:
const result = await each([
// Item is a value
"a",
// Item is a function
() => new Promise((resolve) => resolve("b")),
// Item is a promise
new Promise((resolve) => resolve("c")),
]);
assert.deepStrictEqual(result, ["a", "b", "c"]);
Note, in the majority of cases, items (arrays) which do not contain functions and promises are handled with a handler function.
Functions are executed. Each handles both synchronous and asynchronous functions. In the latter case, functions return a Promise and Each wait for their resolution.
Here are various ways to declare functions:
const result = await each([
// Synchronous function
function () {
return "a";
},
// Synchronous function with the fat arrow syntax
() => "b",
// Asynchronous function
() => new Promise((resolve) => resolve("c")),
// Asynchronous function which resolves after some delay
() => new Promise((resolve) => setTimeout(() => resolve("d")), 100),
]);
assert.deepStrictEqual(result, ["a", "b", "c", "d"]);
Each wait for all promises to be resolved before returning their result. Just like with Promise.all
, result orders respect registration orders.
const result = await each([
// Instant resolution
new Promise((resolve) => resolve("a")),
// Delayed resolution
new Promise((resolve) => setTimeout(() => resolve("b")), 100),
// Instant resolution
new Promise((resolve) => resolve("c")),
]);
assert.deepStrictEqual(result, ["a", "b", "c"]);
A function can be an item to iterate or defined with the handler
option. In both cases, the behavior is the same.
A function defined as an item:
console.info(await each([() => 1]));
A function handling an item:
console.info(await each([1], (item) => item));
Handlers are called with the item as the first argument and the index number as the second argument.
Synchronous functions return a value. Asynchronous functions return a Promise.
Here is a synchronous handler function:
const result = await each(
[{ id: "a" }, { id: "b" }, { id: "c" }, { id: "d" }],
(item, index) => `${item.id}@${index}`,
);
assert.deepStrictEqual(result, ["a@0", "b@1", "c@2", "d@3"]);
Here is an asynchronous handler function:
const result = await each(
[{ id: "a" }, { id: "b" }, { id: "c" }, { id: "d" }],
(item, index) =>
new Promise((resolve) => setTimeout(resolve(`${item.id}@${index}`), 100)),
);
assert.deepStrictEqual(result, ["a@0", "b@1", "c@2", "d@3"]);
sequential
false
or 1
. It is the default concurrency mode.parallel
true
or -1
. In asynchronous mode, all the items are executed in parallel.concurrent
When the concurrent
option is undefined
, false
, or 1
, items are executed in order one after the other.
let running = 0;
const result = await each(
[{ id: "a" }, { id: "b" }, { id: "c" }, { id: "d" }],
function (item, index) {
running++;
if (running !== 1) {
throw Error("Invalid execution");
}
return new Promise((resolve) =>
setTimeout(() => {
if (running !== 1) {
throw Error("Invalid execution");
}
running--;
resolve(`${item.id}@${index}`);
}, 100),
);
},
);
assert.deepStrictEqual(result, ["a@0", "b@1", "c@2", "d@3"]);
When the concurrent
option is true
or -1
, items are all scheduled at the same time and run in parallel.
let running = 0;
const result = await each(
[{ id: "a" }, { id: "b" }, { id: "c" }, { id: "d" }],
true,
function (item, index) {
if (running !== index) {
throw Error("Invalid execution");
}
running++;
return new Promise((resolve) =>
setTimeout(() => {
if (running !== 4 - index) {
throw Error("Invalid execution");
}
running--;
resolve(`${item.id}@${index}`);
}, 100),
);
},
);
assert.deepStrictEqual(result, ["a@0", "b@1", "c@2", "d@3"]);
When the concurrent
mode is a value above 1
, the number of items running simultaneously is bounded to the concurrent
value.
let running = 0;
const result = await each(
[{ id: "a" }, { id: "b" }, { id: "c" }, { id: "d" }],
2,
function (item, index) {
running++;
if (running > 2) {
throw Error("At most 2 running tasks");
}
return new Promise((resolve, reject) =>
setTimeout(() => {
running--;
if (running > 2) {
reject(Error("At most 2 running tasks"));
} else {
resolve(`${item.id}@${index}`);
}
}, 100),
);
},
);
assert.deepStrictEqual(result, ["a@0", "b@1", "c@2", "d@3"]);
Use pause
and resume
functions to throttle the iteration.
The pause
option defines the initial status. Its value defaults to false
.
On pause, executed functions pursue their execution and no further function is scheduled for execution.
When the iteration's state is paused, new scheduled items will not resolve the returned promise until the iteration is resumed.
let state = "paused";
const scheduler = each({ pause: true });
scheduler.then(() => assert.deepStrictEqual(state, "resumed"));
setTimeout(() => {
state = "resumed";
scheduler.resume();
}, 100);
The resume
and end
methods return a promise that resolves once all the element's executions are complete. This is an example using the resume
function.
const stack = [];
const scheduler = each({ pause: true });
scheduler.call(
() =>
new Promise((resolve) => {
stack.push(1);
resolve();
}),
);
scheduler.call(
() =>
new Promise((resolve) => {
stack.push(2);
resolve();
}),
);
setTimeout(async () => {
// Before resume, not processing occurs
assert.deepStrictEqual(stack, []);
// Resume and wait for execution
await scheduler.resume();
// After resume, every element was processed
assert.deepStrictEqual(stack, [1, 2]);
}, 100);
Iterations are stopped on error.
With synchronous functions or when the concurrency mode is sequential, it behaves like Promise.all
. On error, no additionnal function is scheduled for execution and the returned promise is rejected.
With asynchronous functions executed concurrently, no additional functions are scheduled. Already executed functions resolves or rejects their promise but the result is discarded.
Whether the items array is provided at initialization or with the call
function, the behavior is the same:
try {
await each(2).call([
() => new Promise((resolve) => setImmediate(() => resolve("ok"))),
() =>
new Promise((resolve, reject) =>
setImmediate(() => reject(Error("Catchme"))),
),
() => new Promise((resolve) => setImmediate(() => resolve("ok"))),
]);
} catch (error) {
assert.equal(error.message, "Catchme");
}
concurrency
concurrency([level])
level
<integer|boolean>It defines the number of items to be executed in parallel. The new level takes effect for all new scheduled items. Previously scheduled items are unaffected.
Calling the concurrency
function change the number of items executed in parrallel. Previously scheduled items are not affected. Only the items scheduled after calling the concurrency
function will honor the new value.
This example change the concurrency
level. The first 3 items are executed in parallel and the next 3 items are executed sequentially.
import assert from "assert";
import each from "each";
const history = [];
const handler = (id) => {
history.push(`${id}:start`);
return new Promise((resolve) =>
setTimeout(() => {
history.push(`${id}:end`);
resolve();
}, 20),
);
};
const scheduler = each(-1);
// Schedule parallel execution
scheduler.call(() => handler(1));
scheduler.call(() => handler(2));
// Change the concurrency level
scheduler.concurrency(1);
// Schedule sequential execution
scheduler.call(() => handler(4));
scheduler.call(() => handler(5));
// Wait for completion
await scheduler.end();
assert.deepStrictEqual(history, [
// Parallel execution
"1:start",
"2:start",
"1:end",
"2:end",
// Sequential execution
"4:start",
"4:end",
"5:start",
"5:end",
]);
end
end([error|options])
error
relax
mode, only the promise returned by end
is rejected with an error.force
Close the scheduler. The returned promise waits for all previously scheduled items to resolve.
No further items are allowed to register with call
. In such case, the returned promise is rejected. When end
is called and the scheduler is in paused state, all paused items are resolved with undefined
or an error if any.
This example wait for the completion of two scheduled items before completion.
import assert from "assert";
import each from "each";
const history = [];
const handler = (id) => {
return new Promise((resolve) =>
setTimeout(() => {
history.push(`${id}:end`);
resolve();
}, 20),
);
};
const scheduler = each(-1);
// Schedule parallel execution
scheduler.call(() => handler(1));
scheduler.call(() => handler(2));
// Wait for completion
await scheduler.end();
assert.deepStrictEqual(history, ["1:end", "2:end"]);
fluent
The fluent
option applies when using the each().call
function. By default, it is enabled. The API is designed to allow multiple calls to be chained where the value of the last call is returned:
const result = await each()
.call(() => new Promise((resolve) => resolve(1)))
.call(() => new Promise((resolve) => resolve(2)))
.call(() => new Promise((resolve) => resolve(3)));
assert.strictEqual(result, 3);
The returned promise is enriched with the same functions as the promise returned by each()
, thus exposing the each
API.
Set the fluent
option to false
to not overload the returned promise with the each API:
const promise = each({ fluent: false }).call(
() => new Promise((resolve) => resolve(1)),
);
assert.strictEqual(promise.call, undefined);
assert.strictEqual(promise.options, undefined);
pause
The pause
set the initial mode of the scheduler. It is false
by default. Setting the scheduler in pause mode implies calling resume
to start the execution.
relax
When the relax
option is active, the internal scheduler permits the registration of new items with the call
function even after an error.
It doesn't affect the processing of an items
list. An error while handling one of the items prevents additionnal execution and rejects the items' promise. However, it provides the ability to register and execute new items with call
.
This is an example with the default behavior:
const scheduler = each();
const prom1 = scheduler.call(() => new Promise((resolve) => resolve(1)));
const prom2 = scheduler.call(() => new Promise((resolve, reject) => reject(2)));
const prom3 = scheduler.call(() => new Promise((resolve) => resolve(3)));
const result = await Promise.allSettled([prom1, prom2, prom3]);
assert.deepStrictEqual(result, [
{ status: "fulfilled", value: 1 },
{ status: "rejected", reason: 2 },
{ status: "rejected", reason: 2 },
]);
This is an example with the relax
option in action:
const scheduler = each({ relax: true });
const prom1 = scheduler.call(() => new Promise((resolve) => resolve(1)));
const prom2 = scheduler.call(() => new Promise((resolve, reject) => reject(2)));
const prom3 = scheduler.call(() => new Promise((resolve) => resolve(3)));
const result = await Promise.allSettled([prom1, prom2, prom3]);
assert.deepStrictEqual(result, [
{ status: "fulfilled", value: 1 },
{ status: "rejected", reason: 2 },
{ status: "fulfilled", value: 3 },
]);
Tests are executed with Mocha. To install the mocha
package and its dependencies, run npm install
.
npm run test
# Or
yarn run test
To automatically generate a new version and publish it:
yarn run release
Package publication is handled by the CI/CD with GitHub action.
FAQs
Chained and parallel async iterator in one elegant function.
The npm package each receives a total of 7,704 weekly downloads. As such, each popularity was classified as popular.
We found that each demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
The MCP community is launching an official registry to standardize AI tool discovery and let agents dynamically find and install MCP servers.
Research
Security News
Socket uncovers an npm Trojan stealing crypto wallets and BullX credentials via obfuscated code and Telegram exfiltration.
Research
Security News
Malicious npm packages posing as developer tools target macOS Cursor IDE users, stealing credentials and modifying files to gain persistent backdoor access.