Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

node-parallelizer

Package Overview
Dependencies
Maintainers
1
Versions
13
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

node-parallelizer - npm Package Compare versions

Comparing version 2.2.0 to 3.0.0

18

package.json
{
"name": "node-parallelizer",
"version": "2.2.0",
"version": "3.0.0",
"description": "A NodeJS package for running code in parallel. Initially created to provide multiprocessing in an AWS Lambda function, but it can be used in any NodeJS environment.",

@@ -13,4 +13,11 @@ "main": "src/index.js",

},
"keywords": ["parallelizer", "Lambda parallelizer", "child process", "parallelism"],
"author": "",
"keywords": [
"parallelizer",
"Lambda parallelizer",
"child process",
"parallelism",
"worker thread",
"nodejs parallel"
],
"author": "Eduardo Marcos <edujugon@gmail.com>",
"license": "MIT",

@@ -20,3 +27,6 @@ "bugs": {

},
"homepage": "https://github.com/Edujugon/node-parallelizer#readme"
"homepage": "https://github.com/Edujugon/node-parallelizer#readme",
"devDependencies": {
"benchmark": "^2.1.4"
}
}

@@ -9,5 +9,5 @@ # Node Parallelizer

### Child Process Parallelizer
This parallelizer is specifically designed for processing hundreds or thousands of records in a single invocation when your code performs both CPU-intensive and **I/O-intensive operations**.
This parallelizer is designed specifically for creating new Node.js processes. These processes include an extra communication channel that enables message exchange between the parent and child processes.
When you call the `runBatch(records)` method in this parallelizer, the package will split the list of records you provide into smaller subsets, and your code will be used to execute each subset in parallel.
Bear in mind that the created Node.js child processes are independent from the parent process, except for the IPC communication channel connecting them. Each child process has its own memory and V8 instance. Due to the extra resources required for these allocations, it is not advisable to create a large number of child Node.js processes.

@@ -17,27 +17,79 @@ It uses the NodeJS [child process module](https://nodejs.org/api/child_process.html) behind the scenes.

### Worker Threads Parallelizer
This parallelizer is specifically designed for processing hundreds or thousands of records in a single invocation when your code performs **CPU-intensive operations**.
This parallelizer enables the use of threads that execute JavaScript in parallel.
When you call the `runBatch(records)` method in this parallelizer, the package will split the list of records you provide into smaller subsets, and your code will be used to execute each subset in parallel.
These threads are beneficial for carrying out JavaScript tasks that demand significant CPU resources. However, they offer limited advantages for I/O-intensive tasks. Node.js's native asynchronous I/O operations or the the child process parallelizer are more effective than this parallelizer type in such cases.
It uses the NodeJS [worker threads module](https://nodejs.org/api/worker_threads.html) behind the scenes.
### Generic details
When you call the `run(records)` method in a parallelizer instance, this package will split the list of records you provide into smaller subsets, and your code will be used to execute each subset in parallel.
This package can detect the number of vCPU cores allocated to your execution environment and maximize their utilization. By default, it generates one child process/thread per vCPU core, but this setting can be customized to meet your specific requirements. Alternatively, you can manually specify the number of child processes/threads the library creates, regardless of the number of vCPU cores available.
## AWS Lambda & Node Parallelizer
This package can detect the number of vCPU cores allocated to your Lambda function and maximize their utilization. By default, it generates one child process/thread per vCPU core, but this setting can be customized to meet your specific requirements. Alternatively, you can manually specify the number of child processes/threads the library creates, regardless of the number of vCPU cores available.
By default, It uses the Lambda function environment `/tmp` folder to create the required module that runs in the child process/thread.
It uses the Lambda function environment `/tmp` folder to create the required module that runs in the child.
On the Child Process Parallelizer, when you create an instanciate if the Parallelizer class outside of the Lambda handler function, it will reuse the child processes across the different invocations within a Lambda instance, minimazing the impact of creating child process on every invocation. Furthermore, if the package detects a disconnection of any of the child processes, it will recreate it automatically without affecting the execution.
On the Child Process Parallelizer, when you call the `parallelizerFunction` method outside of the Lambda handler function, it will reuse the child processes across the different invocations within a Lambda instance, minimazing the impact of creating child process on every invocation. Furthermore, if the package detects a disconnection of any of the child processes, it will recreate it automatically without affecting the execution.
## Demostration
![Alt text describing the image](./images/node-parallelizer.png)
## Installation
To add this package to your dependency list, run:
## Benchmark
#### CPU & I/O operations (Parallelization per CPU = 1)
```bash
$ node test/benchmark.js
Child Parallelizer x 15.08 ops/sec
Thread Parallelizer x 31.90 ops/sec
Without Parallelizer x 2.79 ops/sec
Result:
Fastest is Thread Parallelizer
Slowest is Without Parallelizer
```
#### CPU & I/O operations (Parallelization per CPU = 3)
```bash
$ node test/benchmark.js
Child Parallelizer x 17.01 ops/sec
Thread Parallelizer x 7.72 ops/sec
Without Parallelizer x 2.93 ops/sec
Result:
Fastest is Child Parallelizer
Slowest is Without Parallelizer
```
#### Child + Thread Parallelizers VS JavaScript Promise.All (Parallelization of = 1)
```bash
$ node test/benchmark-2.js
Child + Thread Parallelizers x 8.15 ops/sec
JavaSCript Promise.All x 7.21 ops/sec
Result:
Fastest is Child and Thread Parallelizers
Slowest is JavaSCript Promise.All
```
#### Child + Thread Parallelizers VS JavaScript Promise.All (Parallelization of = 3)
```bash
$ node test/benchmark-2.js
Child + Thread Parallelizers x 16.42 ops/sec
JavaSCript Promise.All x 7.49 ops/sec
Result:
Fastest is Child + Thread Parallelizers
Slowest is JavaSCript Promise.All
```
## Installation
```bash
npm i node-parallelizer --save
```
## Usage
<details>
<summary>Parallelizer (<b>CPU-intensive operations && I/O-intensive operations</b>)</summary>
<summary>Parallelizer (Basic)</summary>
#### Class instantiation
`Parallelizer({ type = 'child-process', tmpPath = '/tmp', maxParallelization = false, parallelizationPerCPU = 1, debug = false })`
`Parallelizer({ type = 'child-process', tmpPath = '/tmp', filePath, processBatchFunctionName, parallelization = false, parallelizationPerCPU = 1, debug = false })`

@@ -47,19 +99,15 @@ **Parameters**

- `tmpPath` (String) (Default value: '/tmp'): The path where the module that runs in the thread will be created.
- `maxParallelization` (Number|false) (Default value: false): The maximum number of processes/threads that will be created. If false, it is based on the CPU cores available.
- `parallelizationPerCPU` (Number) (Default value: 1): If the `maxParallelization` is set to `false`, this parameter defines the amount of processes/threads per CPU.
- `filePath` (String): The absolute path to the file that contains the function that will be executed in parallel.
- `processBatchFunctionName` (String): The name of the function that will be executed in parallel.
- `parallelization` (Number|false) (Default value: false): The exact number of processes/threads that will be created. If false, it is based on the CPU cores available.
- `parallelizationPerCPU` (Number) (Default value: 1): If the `parallelization` is set to `false`, this parameter defines the amount of processes/threads per CPU.
- `debug` (Boolean) (Default value: false): Enables the internal logs for debuggin purposes.
#### Main methods
`parallelizerFunction({ filePath, processBatchFunctionName })`
`run(batch)`
**Parameters**
- `filePath` (String): The absolute path to the file that contains the function that will be executed in parallel.
- `processBatchFunctionName` (String): The name of the function that will be executed in parallel.
`runBatch(batch)`
**Parameters**
- `batch` (Array): The records you want to process in parallel.
**Returns** (Array): The thread's responses.
#### Using worker threads parallizer in AWS Lambda.
**Returns** (Array): The processes/threads' responses.
#### Using the Node Parallizer in AWS Lambda.
In this example, the repository structure looks like this

@@ -81,9 +129,7 @@ ```

// Creates a new parallelizer instance.
const parallelizer = new Parallelizer();
// Creates child processes based on your code.
parallelizer.parallelizerFunction({ type: PARALLELIZER_CHILD, filePath: "/var/task/src/parallel.js", processBatchFunctionName: 'batchProcessor' });
const parallelizer = new Parallelizer({ type: PARALLELIZER_CHILD, filePath: "/var/task/src/parallel.js", processBatchFunctionName: 'batchProcessor' });
module.exports.handler = async(event) => {
// Run batch in parallel
const responses = await parallelizer.runBatch(event.Records);
const responses = await parallelizer.run(event.Records);

@@ -116,27 +162,28 @@ console.log(responses);

</details>
<details>
<summary>Child Process Parallelizer (<b>I/O-intensive operations or CPU-intensive operations && I/O-intensive operations</b>)</summary>
<summary>Parallelizer (Advance)</summary>
#### Class instantiation
`ChildProcess({ tmpPath = '/tmp', maxParallelization = false, parallelizationPerCPU = 1, debug = false })`
`Parallelizer([{ id: "only-cpu", type = 'worker-threads', tmpPath = '/tmp', filePath, processBatchFunctionName, parallelization = false, parallelizationPerCPU = 1, debug = false }, { id: "only-io", type = 'child-process', tmpPath = '/tmp', filePath, processBatchFunctionName, parallelization = false, parallelizationPerCPU = 1, debug = false }])`
**Parameters**
- `tmpPath` (String) (Default value: '/tmp'): The path where the module that runs in the child will be created.
- `maxParallelization` (Number|false) (Default value: false): The maximum number of child processes that will be created. If false, it is based on the CPU cores available.
- `parallelizationPerCPU` (Number) (Default value: 1): If the `maxParallelization` is set to `false`, this parameter defines the amount of processes per CPU.
- `debug` (Boolean) (Default value: false): Enables the internal logs for debuggin purposes.
- List of:
- `id` (String): The unique identifier for your Child/Thread internal instance.
- `type` (String) (Default value: 'child-process') (Options: 'child-process' | 'worker-threads'): The parallelizer type to be used.
- `tmpPath` (String) (Default value: '/tmp'): The path where the module that runs in the thread will be created.
- `filePath` (String): The absolute path to the file that contains the function that will be executed in parallel.
- `processBatchFunctionName` (String): The name of the function that will be executed in parallel.
- `parallelization` (Number|false) (Default value: false): The exact number of processes/threads that will be created. If false, it is based on the CPU cores available.
- `parallelizationPerCPU` (Number) (Default value: 1): If the `parallelization` is set to `false`, this parameter defines the amount of processes/threads per CPU.
- `debug` (Boolean) (Default value: false): Enables the internal logs for debuggin purposes.
#### Main methods
`parallelizerFunction({ filePath, processBatchFunctionName })`
`run([{ id: "only-cpu", batch: batchOne },{ id: "only-io", batch: batchTwo }])`
**Parameters**
- `filePath` (String): The absolute path to the file that contains the function that will be executed in parallel.
- `processBatchFunctionName` (String): The name of the function that will be executed in parallel.
`runBatch(batch)`
**Parameters**
- `id` (String): The unique identifier for your Child/Thread internal instance.
- `batch` (Array): The records you want to process in parallel.
**Returns** (Array): The child processes' responses.
#### Using child process parallizer in AWS Lambda.
**Returns** (Array): A list with the processes/threads' responses.
#### Using the Node Parallizer in AWS Lambda.
In this example, the repository structure looks like this

@@ -155,12 +202,16 @@ ```

const { ChildProcess } = require("node-parallelizer");
const { Parallelizer, PARALLELIZER_CHILD, PARALLELIZER_THREADS } = require("node-parallelizer");
// Creates a new child process instance.
const childProcess = new ChildProcess();
// Creates child processes based on your code.
childProcess.parallelizerFunction({ filePath: "/var/task/src/parallel.js", processBatchFunctionName: 'batchProcessor' });
// Creates a new parallelizer instance with multiple different parallelizers.
const parallelizer = new Parallelizer([
{ id: "with-threads", type: PARALLELIZER_THREADS, parallelization: 2, filePath: "/var/task/src/parallel.js", processBatchFunctionName: 'batchProcessorOne' },
{ id: "with-processes", type: PARALLELIZER_CHILD, parallelization: 4, filePath: "/var/task/src/parallel.js", processBatchFunctionName: 'batchProcessorTwo' },
]);
module.exports.handler = async(event) => {
// Run batch in parallel
const responses = await childProcess.runBatch(event.Records);
const responses = await parallelizer.run([
{ id: "with-threads", batch: event.batchOne },
{ id: "with-processes", batch: event.batchTwo },
])

@@ -173,2 +224,4 @@ console.log(responses);

> Notice that we have added a new parameter called 'id'. This is used to distinguish between the various types of parallelizers and to pass the appropriate batch.
The below snippet represents the code you want to run in parallel

@@ -178,3 +231,3 @@ ```javascript

const batchProcessor = ({ batch }) => {
const batchProcessorOne = ({ batch }) => {

@@ -188,70 +241,4 @@ //

module.exports = { batchProcessor }
```
> Verify that the input signature of your function (in this case, batchProcessor) includes batch as a parameter, as it contains the subset of records that a child process will handle.
const batchProcessorTwo = ({ batch }) => {
</details>
<details>
<summary>Worker Threads Parallelizer (<b>CPU-intensive operations</b>)</summary>
#### Class instantiation
`WorkerThreads({ tmpPath = '/tmp', maxParallelization = false, parallelizationPerCPU = 1, debug = false })`
**Parameters**
- `tmpPath` (String) (Default value: '/tmp'): The path where the module that runs in the thread will be created.
- `maxParallelization` (Number|false) (Default value: false): The maximum number of threads that will be created. If false, it is based on the CPU cores available.
- `parallelizationPerCPU` (Number) (Default value: 1): If the `maxParallelization` is set to `false`, this parameter defines the amount of threads per CPU.
- `debug` (Boolean) (Default value: false): Enables the internal logs for debuggin purposes.
#### Main methods
`parallelizerFunction({ filePath, processBatchFunctionName })`
**Parameters**
- `filePath` (String): The absolute path to the file that contains the function that will be executed in parallel.
- `processBatchFunctionName` (String): The name of the function that will be executed in parallel.
`runBatch(batch)`
**Parameters**
- `batch` (Array): The records you want to process in parallel.
**Returns** (Array): The thread's responses.
#### Using worker threads parallizer in AWS Lambda.
In this example, the repository structure looks like this
```
src/
handler.js
parallel.js
serverless.yml
package.json
```
The below snippet represents your Lambda handler
```javascript
// handler.js
const { WorkerThreads } = require("node-parallelizer");
// Creates a new child process instance.
const threads = new WorkerThreads();
// Creates child processes based on your code.
threads.parallelizerFunction({ filePath: "/var/task/src/parallel.js", processBatchFunctionName: 'batchProcessor' });
module.exports.handler = async(event) => {
// Run batch in parallel
const responses = await threads.runBatch(event.Records);
console.log(responses);
};
```
> Make sure to provide the filePath parameter as an absolute path. In this example, we've included '/var/task/' prefix in the path because Lambda deploys your code within that folder.
The below snippet represents the code you want to run in parallel
```javascript
// parallel.js
const batchProcessor = ({ batch }) => {
//

@@ -265,10 +252,16 @@ // HERE YOUR CODE

module.exports = { batchProcessor }
module.exports = { batchProcessorOne, batchProcessorTwo }
```
> Verify that the input signature of your function (in this case, batchProcessor) includes batch as a parameter, as it contains the subset of records that a child process will handle.
> Verify that the input signature of your function (in this case, batchProcessorOne and batchProcessorTwo) includes batch as a parameter, as it contains the subset of records that a child process will handle.
</details>
## Examples
1. [Basic](https://github.com/Edujugon/node-parallelizer/tree/main/examples/basic)
2. [With Bundler](https://github.com/Edujugon/node-parallelizer/tree/main/examples/with-bundler)
## Contribution
We welcome contributions to this project. If you are interested in contributing, please feel free to submit a pull request.

@@ -11,3 +11,3 @@ "use strict";

class ChildProcess {
constructor({ tmpPath = '/tmp', maxParallelization = false, parallelizationPerCPU = 1, debug = false, generateStats = false, generateChildStats = false } = {}) {
constructor({ tmpPath = '/tmp', parallelization = false, parallelizationPerCPU = 1, debug = false } = {}) {
const uniqueId = crypto.randomBytes(16).toString('hex');

@@ -18,3 +18,3 @@

this.childProcesses = [];
this.maxParallelization = maxParallelization;
this.parallelization = parallelization;
this.parallelizationPerCPU = parallelizationPerCPU;

@@ -24,4 +24,2 @@

this.debug = debug;
this.generateStats = generateStats; // TODO
this.generateChildStats = generateChildStats; // TODO
}

@@ -44,3 +42,3 @@

_createChildProcesses() {
this.processesCount = (typeof this.maxParallelization === 'number') ? this.maxParallelization : this._getProcessesCount();
this.processesCount = (typeof this.parallelization === 'number') ? this.parallelization : this._getProcessesCount();

@@ -72,2 +70,6 @@ for (let id = 0; id < this.processesCount; id++) {

}
removeChildThreads() {
this.removeChildProcesses();
}

@@ -74,0 +76,0 @@ _removeForkEvents() {

@@ -7,13 +7,68 @@ const ChildProcess = require("./child-process");

const SINGLE_CHILD_THREAD_ID = 'single-process';
class Parallelizer {
constructor(params) {
const parallelizer = params.type || PARALLELIZER_CHILD;
if(parallelizer === PARALLELIZER_CHILD) {
return new ChildProcess(params);
}else if(parallelizer === PARALLELIZER_THREADS) {
return new WorkerThreads(params);
this.childThreads = {};
if (!isArray(params)) {
params.id = SINGLE_CHILD_THREAD_ID;
params = [params];
}
this._init(params);
}
_init(list) {
list.forEach(({ id, type, tmpPath = '/tmp', parallelization = false, parallelizationPerCPU = 1, debug = false, filePath, processBatchFunctionName }) => {
if (!filePath || !processBatchFunctionName) {
throw new Error('filePath and processBatchFunctionName are required');
}
const parallelizer = [PARALLELIZER_CHILD, PARALLELIZER_THREADS].includes(type) ? type : PARALLELIZER_CHILD;
const childThreadParams = { tmpPath, parallelization, parallelizationPerCPU, debug };
this.childThreads[id] = (parallelizer === PARALLELIZER_CHILD) ?
new ChildProcess(childThreadParams) :
new WorkerThreads(childThreadParams);
this.childThreads[id].parallelizerFunction({ filePath, processBatchFunctionName });
});
}
async run(paramsList) {
if (Object.keys(this.childThreads).length == 1) {
return this.childThreads[SINGLE_CHILD_THREAD_ID].runBatch(paramsList);
}
if (!isArray(paramsList)) {
paramsList.id = SINGLE_CHILD_THREAD_ID;
paramsList = [paramsList];
}
return await Promise.all(paramsList.map(({ id, batch }) => {
return this.childThreads[id].runBatch(batch)
}));
}
removeChildThreads(ids = null) {
ids = (ids !== null && !isArray(ids)) ? [ids] : ids;
Object.keys(this.childThreads)
.filter(id => ids === null ? true : ids.includes(id))
.forEach((id) => {
this.childThreads[id].removeChildThreads();
});
}
}
const isArray = (value) => {
return Array.isArray(value);
}
module.exports = { ChildProcess, WorkerThreads, Parallelizer, PARALLELIZER_CHILD, PARALLELIZER_THREADS };

@@ -6,2 +6,3 @@ "use strict";

const fs = require('fs');
const crypto = require('crypto');

@@ -11,11 +12,11 @@ const workerFileName = "worker-thread-file.js";

class WorkerThreads {
constructor({ tmpPath = '/tmp', maxParallelization = false, parallelizationPerCPU = 1, debug = false, generateStats = false, generateThreadStats = false } = {}) {
this.tmpPath = `${tmpPath}/${workerFileName}`;
constructor({ tmpPath = '/tmp', parallelization = false, parallelizationPerCPU = 1, debug = false } = {}) {
const uniqueId = crypto.randomBytes(16).toString('hex');
this.tmpPath = `${tmpPath}/${workerFileName}-${uniqueId}.js`;
this.workerFile = null;
this.maxParallelization = maxParallelization;
this.parallelization = parallelization;
this.parallelizationPerCPU = parallelizationPerCPU;
this.threadsCount = 1;
this.debug = debug;
this.generateStats = generateStats; // TODO
this.generateThreadStats = generateThreadStats; // TODO
}

@@ -27,3 +28,3 @@

this.threadsCount = (typeof this.maxParallelization === 'number') ? this.maxParallelization : this._getThreadsCount();
this.threadsCount = (typeof this.parallelization === 'number') ? this.parallelization : this._getThreadsCount();
}

@@ -42,2 +43,9 @@

removeWorkerThreads() {
this._removeThreadFile();
}
removeChildThreads() {
this._removeThreadFile();
}
_processBatchesInThreads = async (batches) => {

@@ -106,2 +114,13 @@ const batchesCount = batches.length;

_removeThreadFile() {
if (!fs.existsSync(this.tmpPath))
return;
try {
fs.unlinkSync(this.tmpPath);
} catch (error) {
console.error(`Failed to remove temporary child process file: ${error.message}`);
}
}
_getThreadsCount = () => {

@@ -108,0 +127,0 @@ const cpuData = os.cpus();

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc