Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

piscina

Package Overview
Dependencies
Maintainers
3
Versions
33
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

piscina - npm Package Compare versions

Comparing version 1.2.1 to 1.3.0

3

dist/package.json
{
"name": "piscina",
"version": "1.2.1",
"version": "1.3.0",
"description": "A fast, efficient Node.js Worker Thread Pool implementation",

@@ -46,2 +46,3 @@ "main": "./dist/src/index.js",

"dependencies": {
"eventemitter-asyncresource": "^1.0.0",
"hdr-histogram-js": "^1.2.0",

@@ -48,0 +49,0 @@ "hdr-histogram-percentiles-obj": "^2.0.1"

/// <reference types="node" />
import { Worker, MessagePort } from 'worker_threads';
import { EventEmitter } from 'events';
import EventEmitterAsyncResource from 'eventemitter-asyncresource';
interface AbortSignalEventTarget {

@@ -36,3 +36,3 @@ addEventListener: (name: 'abort', listener: () => void) => void;

} ? T : never;
declare class Piscina extends EventEmitter {
declare class Piscina extends EventEmitterAsyncResource {
#private;

@@ -39,0 +39,0 @@ constructor(options?: Options);

@@ -21,2 +21,3 @@ "use strict";

const events_1 = require("events");
const eventemitter_asyncresource_1 = __importDefault(require("eventemitter-asyncresource"));
const async_hooks_1 = require("async_hooks");

@@ -72,4 +73,4 @@ const os_1 = require("os");

class TaskInfo extends async_hooks_1.AsyncResource {
constructor(task, transferList, filename, callback, abortSignal) {
super('Piscina.Task', { requireManualDestroy: false });
constructor(task, transferList, filename, callback, abortSignal, triggerAsyncId) {
super('Piscina.Task', { requireManualDestroy: true, triggerAsyncId });
this.workerInfo = null;

@@ -299,2 +300,3 @@ this.callback = callback;

workerInfo.postTask(taskInfo);
this._maybeDrain();
return;

@@ -335,3 +337,3 @@ }

}
}, abortSignal);
}, abortSignal, this.publicInterface.asyncResource.asyncId());
if (abortSignal !== null) {

@@ -408,4 +410,10 @@ onabort(abortSignal, () => {

workerInfo.postTask(taskInfo);
this._maybeDrain();
return ret;
}
_maybeDrain() {
if (this.taskQueue.length === 0) {
this.publicInterface.emit('drain');
}
}
async destroy() {

@@ -424,5 +432,5 @@ while (this.taskQueue.length > 0) {

}
class Piscina extends events_1.EventEmitter {
class Piscina extends eventemitter_asyncresource_1.default {
constructor(options = {}) {
super(options);
super({ ...options, name: 'Piscina' });
_pool.set(this, void 0);

@@ -429,0 +437,0 @@ if (typeof options.filename !== 'string' && options.filename != null) {

{
"name": "piscina",
"version": "1.2.1",
"version": "1.3.0",
"description": "A fast, efficient Node.js Worker Thread Pool implementation",

@@ -46,2 +46,3 @@ "main": "./dist/src/index.js",

"dependencies": {
"eventemitter-asyncresource": "^1.0.0",
"hdr-histogram-js": "^1.2.0",

@@ -48,0 +49,0 @@ "hdr-histogram-percentiles-obj": "^2.0.1"

@@ -141,2 +141,51 @@ # piscina - the node.js worker pool

### Backpressure
When the `maxQueue` option is set, once the `Piscina` queue is full, no
additional tasks may be submitted until the queue size falls below the
limit. The `'drain'` event may be used to receive notification when the
queue is empty and all tasks have been submitted to workers for processing.
Example: Using a Node.js stream to feed a Piscina worker pool:
```js
'use strict';
const { resolve } = require('path');
const Pool = require('../..');
const pool = new Pool({
filename: resolve(__dirname, 'worker.js'),
maxQueue: 8
});
const stream = getStreamSomehow();
stream.setEncoding('utf8');
pool.on('drain', () => {
if (stream.isPaused()) {
console.log('resuming...', counter, pool.queueSize);
stream.resume();
}
});
performance.mark('A');
stream
.on('data', (data) => {
pool.runTask(data);
if (pool.queueSize === maxQueue) {
console.log('pausing...', counter, pool.queueSize);
stream.pause();
}
})
.on('error', console.error)
.on('end', () => {
console.log('done');
});
```
### Additional Examples
Additional examples can be found in the GitHub repo at
https://github.com/jasnell/piscina/tree/master/examples
## Class: `Piscina`

@@ -244,2 +293,6 @@

### Event: `'drain'`
A `'drain'` event is emitted whenever the `queueSize` reaches `0`.
### Property: `completed` (readonly)

@@ -246,0 +299,0 @@

import { Worker, MessageChannel, MessagePort, receiveMessageOnPort } from 'worker_threads';
import { EventEmitter, once } from 'events';
import { once } from 'events';
import EventEmitterAsyncResource from 'eventemitter-asyncresource';
import { AsyncResource } from 'async_hooks';

@@ -63,3 +64,3 @@ import { cpus } from 'os';

env? : EnvSpecifier,
workerData? : any,
workerData? : any
}

@@ -118,4 +119,5 @@

callback : TaskCallback,
abortSignal : AbortSignalAny | null) {
super('Piscina.Task', { requireManualDestroy: false });
abortSignal : AbortSignalAny | null,
triggerAsyncId : number) {
super('Piscina.Task', { requireManualDestroy: true, triggerAsyncId });
this.callback = callback;

@@ -401,2 +403,3 @@ this.task = task;

workerInfo.postTask(taskInfo);
this._maybeDrain();
return;

@@ -446,3 +449,4 @@ }

},
abortSignal);
abortSignal,
this.publicInterface.asyncResource.asyncId());

@@ -526,5 +530,12 @@ if (abortSignal !== null) {

workerInfo.postTask(taskInfo);
this._maybeDrain();
return ret;
}
_maybeDrain () {
if (this.taskQueue.length === 0) {
this.publicInterface.emit('drain');
}
}
async destroy () {

@@ -546,7 +557,7 @@ while (this.taskQueue.length > 0) {

class Piscina extends EventEmitter {
class Piscina extends EventEmitterAsyncResource {
#pool : ThreadPool;
constructor (options : Options = {}) {
super(options as any);
super({ ...options, name: 'Piscina' });

@@ -553,0 +564,0 @@ if (typeof options.filename !== 'string' && options.filename != null) {

@@ -68,1 +68,16 @@ import { MessageChannel } from 'worker_threads';

});
test('Piscina emits drain', async ({ ok }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js')
});
let drained = false;
pool.on('drain', () => {
drained = true;
});
await Promise.all([pool.runTask('123'), pool.runTask('123')]);
ok(drained);
});

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc