Nanolith
(More intuitive and feature-rich than Piscina!)
❔ About
✨Nanolith✨ is a scalable, reliable, easy-to-use, and well-documented multithreading library that allows you to easily vertically scale your Node.js applications. It serves to not only build upon, but entirely replace the (deprecated) Threadz library.
There have always been a few main goals for Nanolith:
- Performance & scalability 🏃
- Ease-of-use 😇
- Seamless TypeScript support 😎
- Modern ESModules-only support 📈
- Steady updates with new features & fixes 🚀
So what can you do with it?
Here's a quick rundown of everything you can do in Nanolith:
- Offload expensive tasks to separate threads.
- Spawn up separate-threaded "nanoservices" that can run any tasks you want.
- Communicate back and forth between threads by sending messages.
- Stream data between threads with the already familiar
node:stream
API. - Share memory between threads using the familiar-feeling
SharedMap
class.
📖 Table of contents
💾 Installation
The latest version can be installed via any package manager of your choice.
npm install nanolith@latest
# or
yarn add nanolith@latest
Beta versions are released under the next tag and can be installed like this:
npm install nanolith@next
# or
yarn add nanolith@next
📝 Defining your tasks
A task is any function that you define which is accessible by Nanolith's APIs. Tasks can be defined using the define()
function in a separate file dedicated to definitions.
import { define } from 'nanolith';
export const worker = await define({
add(x: number, y: number) {
return x + y;
},
async waitThenAdd(x: number, y: number) {
await new Promise((resolve) => setTimeout(resolve, 5e3))
return x + y;
},
subtract,
});
function subtract(x: number, y: number) {
return x - y;
};
By passing functions into define()
, you immediately turn them into multithreadable tasks. No further configuration is required.
define()
options
As seen above, the first argument to define()
is an object containing your functions. The second parameter is an object accepting the following (optional) configurations:
Name | Type | About |
---|
file | string | If define() 's file location detection is not working correctly, the true file location for the set of definitions can be provided here. |
identifier | string | A unique identifier for the set of definitions. Overrides the auto-identifier generated by Nanolith. |
safeMode | boolean | Whether or not to prevent the usage of the returned Nanolith API from within the same file where their definitions were created. Defaults to true . |
👷 Running a task
After defining a set of tasks, you can import them and call them anywhere by directly using the Nanolith API resolved by the define()
function. The only difference is that instead of being called on the main/parent thread, a new thread will be created for the task and it will be run there.
import { worker } from './worker.js';
const result = await worker({
name: 'add',
params: [2, 3],
});
console.log(result);
The new thread's process is shut down after the task finishes.
📝 Note: Notice that even with the synchronous add()
function, it is now asynchronous when being multithreaded.
Task function options
name
and params
are amongst many of the possible options that can be passed in when running a task:
Name | Type | About |
---|
name | string | The name of the task to call. Must be present on the set of definitions. |
params | any[] | The arguments for the task in array form. |
priority | boolean | Whether or not to treat the task's worker as priority over others when being queued into the pool . |
reffed | boolean | When true , the underlying Worker instance is reffed. Defaults to false . |
messengers | Messenger[] | The Messenger s that should be accessible to the task. |
options | object | An object containing most of the options available on the Worker constructor. |
🎩 Understanding services
Services are Nanolith's flagship feature. Running a task on a service works similarly to running a task normally; however, the key difference is that the thread only shuts down when you tell it to. This means that you can run multiple tasks on the same thread rather than spawning up a new one for each call.
Considering the definitions we created here, here is how a service would be launched and a task would be called on it.
import { worker } from './worker.js';
const service = await worker.launchService();
const result = await service.call({
name: 'waitThenAdd',
params: [2, 3],
});
console.log(result);
await service.close();
launchService()
options
The configurations for Nanolith.launchService()
are nearly identical to the task function options with the addition of exceptionHandler
:
Name | Type | About |
---|
exceptionHandler | function | An optional but highly recommended option that allows you to catch uncaught exceptions within the service. |
priority | boolean | Whether or not to treat the service's worker as priority over others when being queued into the pool . |
reffed | boolean | When true , the underlying Worker instance is reffed. Defaults to false . |
messengers | Messenger[] | The Messenger s that should be accessible to the service. |
options | object | An object containing most of the options available on the Worker constructor. |
Service
properties & methods
Along with .call()
, Service
offers many other properties and methods:
Name | Type | About |
---|
activeCalls | Property | The current number of active calls running on the Service instance. |
closed | Property | Whether or not the underlying Worker instance has exited its process. |
threadID | Property | The thread ID of the underlying Worker . |
worker | Property | The raw Worker instance being used by the service. |
call() | Method | Call a task to be run within the service worker. Usage is similar to running a task normally |
close() | Method | Terminates the worker, ending its process and marking the Service instance as closed . |
sendMessage() | Method | Send messages to the service. |
onMessage() | Method | Listen for and receive messages from the service. |
waitForMessage() | Method | Wait for a specific message coming from the service. |
createStream() | Method | Create a Writable instance that can be piped into in order to stream data to the service worker. |
onStream() | Method | Listen for and receive data streams from the service. |
sendMessenger() | Method | Dynamically send a Messenger to the service. |
🎬 Coordinating services
In a scalable application utilizing multiple identical services, it is possible to optimize them by treating the main/parent thread as an orchestrator and managing the workloads on each service. Nanolith's ServiceCluster
automatically does this for you.
import { worker } from './worker.js';
const cluster = await worker.clusterize(6, {
exceptionHandler({ error, terminate }) {
console.error(error);
},
priority: true;
});
const service = cluster.use();
const result = await service.call({
name: 'subtract',
params: [10, 5],
});
console.log(result);
await cluster.closeAll();
For simplicity of the above example, we are only running a single task. However, ServiceCluster
can be used to run a large amount of heavy operations in true parallel on multiple services.
Tip: To automatically re-launch services on a cluster when they exit with a non-zero code, look into the autoRenew
option.
ServiceCluster
properties & methods
Along with .use()
, ServiceCluster
offers many other properties and methods:
Name | Type | About |
---|
activeServices | Property | The number of currently running services on the cluster. |
currentServices | Property | An array of objects representing each active service on the cluster. Each object contains the service and its identifier . |
activeServiceCalls | Property | The number of currently active task calls on all services on the cluster. |
launch() | Method | Launch a new service and start automatically managing it with the cluster. |
addService() | Method | Add an already running service to the cluster. |
use() | Method | Find and return the currently least busy Service on the cluster. |
notifyAll() | Method | Send a message to all running services on the cluster using |
.sendMessage() . | | |
closeAll() | Method | Close all active services on the cluster. |
closeAllIdle() | Method | Close all service instances on the cluster that are currently doing nothing (not running any tasks). |
🪝 Hooks
For a bit of finer control over your services and tasks, three hooks are available and can be provided directly to define()
.
import { define } from 'nanolith';
export const worker = await define({
__initializeService(threadId) {
console.log(`Initializing service on thread: ${threadId}`);
},
__beforeTask({ name, inService }) {
console.log(`Running task ${name}.`);
console.log(`${inService ? 'Is' : 'Is not'} in a service.`);
},
__afterTask({ name, inService }) {
console.log(`Finished task ${name}`);
},
});
These hooks run on the same thread as their corresponding service/task.
🚨 Managing concurrency
Nanolith automatically manages the concurrency your services and task calls with the internal pool
class. By default, the maximum concurrency is two threads per core on the machine. This is a safe value to go with; however, the maxConcurrency
can be modified up using one of the available ConcurrencyOption
s.
import { pool, ConcurrencyOption } from 'nanolith';
pool.setConcurrency(ConcurrencyOption.Quarter);
pool.setConcurrency(ConcurrencyOption.Half);
pool.setConcurrency(ConcurrencyOption.Default);
pool.setConcurrency(ConcurrencyOption.x1);
pool.setConcurrency(ConcurrencyOption.x2);
pool.setConcurrency(ConcurrencyOption.x4);
pool.setConcurrency(ConcurrencyOption.x6);
pool.setConcurrency(ConcurrencyOption.x8);
pool.setConcurrency(ConcurrencyOption.x10);
Access to the pool's default concurrency for the current machine's resources can be accessed like so:
import { getDefaultPoolConcurrency } from 'nanolith';
console.log(getDefaultPoolConcurrency)
pool
properties & methods
Along with .setConcurrency()
, pool
offers many other properties and methods:
Name | Type | About |
---|
option | Property | Easy access to the ConcurrencyOption enum. |
maxConcurrency | Property | The maximum concurrency of the pool . |
maxed | Property | Whether or not the pool has currently reached its max concurrency. |
queueLength | Property | The current number of item in the pool's queue. |
activeCount | Property | The current number of workers that are running under the pool. |
idle | Property | A boolean indicating whether or not the pool is currently doing nothing. |
next | Property | Returns the internal PoolItemOptions for the next worker in the queue to be run. |
setConcurrency() | Method | Modify the maxConcurrency of the pool. Use this wisely. |
📨 Communicating between threads
There are two ways of communicating between threads in Nanolith.
Between a service and the main (or a parent) thread
When using services, you are automatically able to communicate between the service and main (or a parent) thread with no extra work.
The __initializeService()
hook can be used to register listeners on the ParentThread
:
import { define, ParentThread } from 'nanolith';
export const worker = await define({
__initializeService() {
ParentThread.onMessage<string>((message) => {
console.log(message);
ParentThread.sendMessage('hello from the service!');
});
},
});
Then, the .onMessage()
and .sendMessage()
methods on the created service can be used to send messages to and receive messages from the service:
import { worker } from './worker.js';
const service = await worker.launchService();
service.sendMessage('hello from the parent thread');
service.onMessage(async (message) => {
console.log(message);
await service.close();
});
Between all threads
A bit of extra work is required when there is a need to communicate between all threads (including the parent thread). First, an instance of Messenger
must be created. That instance can then be exposed to as many services and tasks as you want.
Within task functions, the .use()
method on MessengerList
can be used to grab hold of Messenger
s exposed to the thread:
import { define, MessengerList } from 'nanolith';
export const worker = await define({
async __initializeService() {
const fooMessenger = await MessengerList.use('foo');
fooMessenger.onMessage<string>((message) => {
console.log(message);
process.exit();
});
},
async sendSomeMessage() {
const fooMessenger = await MessengerList.use('foo');
fooMessenger.sendMessage('hello from other service!');
},
});
The messenger instance can be exposed to a task call or service by using the messengers
option.
import { Messenger } from 'nanolith';
import { worker } from './worker.js';
const fooMessenger = new Messenger('foo');
const service1 = await worker.launchService({
messengers: [fooMessenger],
});
const service2 = await worker.launchService({
messengers: [fooMessenger],
});
await service1.call({ name: 'sendSomeMessage' });
await service1.close();
Note: The Messenger
class can be used to communicate between all threads. That means between task calls, between services, between the parent thread and multiple services/task calls, etc.
📡 Streaming data between threads
It's possible to stream data from one thread to another either using Service
, Messenger
, and ParentThread
. All have the .createStream()
and .onStream()
methods.
import { define, ParentThread } from 'nanolith';
import { createWriteStream } from 'fs';
export const worker = await define({
__initializeService() {
ParentThread.onStream((stream) => {
const writeStream = createWriteStream('./movie.mp4');
writeStream.on('finish', () => {
ParentThread.sendMessage('close please');
});
stream.pipe(writeStream);
});
},
});
import axios from 'axios';
import { worker } from './worker.js';
import type { Readable } from 'stream';
const service = await worker.launchService();
service.onMessage(async () => {
await service.close();
});
const { data: readStream } = await axios.get<Readable>(
'https://stream-1-1-ip4.loadshare.org/slice/3/VideoID-qbfnKjG4/CXNa4S/uSDJeP/BXvsDm/Jmrsew/360?name=edward-scissorhands_360&token=ip=85.160.33.237~st=1672263375~exp=1672277775~acl=/*~hmac=de82d742e7cda87859d519fdbf179416d67366497f2e65c103de830b379b1e8b',
{
responseType: 'stream',
}
);
readStream.pipe(await service.createStream());
When using Messenger
, things work a bit differently. The .onStream()
method takes a different type of callback that must first accept the stream before handling it. This is because with messengers, there are multiple possible recipients, and not all of them might want to accept the stream.
Again, we use the __initializeService()
hook:
import { define, MessengerList, ParentThread } from 'nanolith';
import { createWriteStream } from 'fs';
export const worker = await define({
async __initializeService() {
const fooMessenger = await MessengerList.use('foo');
fooMessenger.onStream(({ metaData, accept }) => {
if (metaData.scissorhands !== true) return;
const stream = accept();
const writeStream = createWriteStream('./movie.mp4');
stream.on('end', () => {
ParentThread.sendMessage('close please');
});
stream.pipe(writeStream);
});
},
});
When it comes to actually sending the stream with Messenger
, the workflow is nearly the same as messaging between a service and the main (or a parent) thread:
import { Messenger } from 'nanolith';
import axios from 'axios';
import { worker } from './worker.js';
import type { Readable } from 'stream';
const fooMessenger = new Messenger('foo');
const service = await worker.launchService({
messengers: [fooMessenger],
});
service.onMessage(async () => {
await service.close();
});
const { data: readStream } = await axios.get<Readable>(
'https://stream-1-1-ip4.loadshare.org/slice/3/VideoID-qbfnKjG4/CXNa4S/uSDJeP/BXvsDm/Jmrsew/360?name=edward-scissorhands_360&token=ip=85.160.33.237~st=1672263375~exp=1672277775~acl=/*~hmac=de82d742e7cda87859d519fdbf179416d67366497f2e65c103de830b379b1e8b',
{
responseType: 'stream',
}
);
readStream.pipe(await fooMessenger.createStream({ scissorhands: true }));
💾 Sharing memory between threads
In vanilla Node.js, memory can only be shared between threads using raw bytes with SharedArrayBuffer
. That totally sucks, but luckily sharing memory is easy in Nanolith. If you're already familiar with the JavaScript Map
object, you'll feel comfortable with SharedMap
.
In a single-threaded sense, SharedMap
works in quite a standard way:
import { SharedMap } from 'nanolith';
const myMap = new SharedMap({ foo: 'bar' });
await myMap.set('foo', 'hello world');
console.log(await myMap.get('foo'));
But the main point of SharedMap
is that it can be used to share values between threads without making copies of the data. A mutex is also implemented under the hood, which means that a very large concurrency of truly parallel operations to modify the same memory location at the same time.
import { define, SharedMap } from 'nanolith';
import type { SharedMapRawData } from 'nanolith';
export const worker = await define({
async handleMap(raw: SharedMapRawData<{ count: number }>) {
const countMap = new SharedMap(raw);
for (let i = 1; i <= 1000; i++) {
await countMap.set('count', (prev) => {
return +prev + 1;
});
}
},
});
import { SharedMap } from 'nanolith';
import { worker } from './worker.js';
const countMap = new SharedMap({ count: 0 });
await Promise.all([
worker({ name: 'handleMap', params: [countMap.raw] }),
worker({ name: 'handleMap', params: [countMap.raw] }),
worker({ name: 'handleMap', params: [countMap.raw] }),
worker({ name: 'handleMap', params: [countMap.raw] }),
worker({ name: 'handleMap', params: [countMap.raw] }),
]);
console.log(await countMap.get('count'));
Notice that the .get()
method will always return a stringified version of the value.
🧑🏫 Examples
Examples coming soon!
📜 License
The MIT License (MIT)
Copyright (c) 2023 Matthias Stephens
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.