Comparing version 1.2.1 to 1.3.0
{ | ||
"name": "piscina", | ||
"version": "1.2.1", | ||
"version": "1.3.0", | ||
"description": "A fast, efficient Node.js Worker Thread Pool implementation", | ||
@@ -46,2 +46,3 @@ "main": "./dist/src/index.js", | ||
"dependencies": { | ||
"eventemitter-asyncresource": "^1.0.0", | ||
"hdr-histogram-js": "^1.2.0", | ||
@@ -48,0 +49,0 @@ "hdr-histogram-percentiles-obj": "^2.0.1" |
/// <reference types="node" /> | ||
import { Worker, MessagePort } from 'worker_threads'; | ||
import { EventEmitter } from 'events'; | ||
import EventEmitterAsyncResource from 'eventemitter-asyncresource'; | ||
interface AbortSignalEventTarget { | ||
@@ -36,3 +36,3 @@ addEventListener: (name: 'abort', listener: () => void) => void; | ||
} ? T : never; | ||
declare class Piscina extends EventEmitter { | ||
declare class Piscina extends EventEmitterAsyncResource { | ||
#private; | ||
@@ -39,0 +39,0 @@ constructor(options?: Options); |
@@ -21,2 +21,3 @@ "use strict"; | ||
const events_1 = require("events"); | ||
const eventemitter_asyncresource_1 = __importDefault(require("eventemitter-asyncresource")); | ||
const async_hooks_1 = require("async_hooks"); | ||
@@ -72,4 +73,4 @@ const os_1 = require("os"); | ||
class TaskInfo extends async_hooks_1.AsyncResource { | ||
constructor(task, transferList, filename, callback, abortSignal) { | ||
super('Piscina.Task', { requireManualDestroy: false }); | ||
constructor(task, transferList, filename, callback, abortSignal, triggerAsyncId) { | ||
super('Piscina.Task', { requireManualDestroy: true, triggerAsyncId }); | ||
this.workerInfo = null; | ||
@@ -299,2 +300,3 @@ this.callback = callback; | ||
workerInfo.postTask(taskInfo); | ||
this._maybeDrain(); | ||
return; | ||
@@ -335,3 +337,3 @@ } | ||
} | ||
}, abortSignal); | ||
}, abortSignal, this.publicInterface.asyncResource.asyncId()); | ||
if (abortSignal !== null) { | ||
@@ -408,4 +410,10 @@ onabort(abortSignal, () => { | ||
workerInfo.postTask(taskInfo); | ||
this._maybeDrain(); | ||
return ret; | ||
} | ||
_maybeDrain() { | ||
if (this.taskQueue.length === 0) { | ||
this.publicInterface.emit('drain'); | ||
} | ||
} | ||
async destroy() { | ||
@@ -424,5 +432,5 @@ while (this.taskQueue.length > 0) { | ||
} | ||
class Piscina extends events_1.EventEmitter { | ||
class Piscina extends eventemitter_asyncresource_1.default { | ||
constructor(options = {}) { | ||
super(options); | ||
super({ ...options, name: 'Piscina' }); | ||
_pool.set(this, void 0); | ||
@@ -429,0 +437,0 @@ if (typeof options.filename !== 'string' && options.filename != null) { |
{ | ||
"name": "piscina", | ||
"version": "1.2.1", | ||
"version": "1.3.0", | ||
"description": "A fast, efficient Node.js Worker Thread Pool implementation", | ||
@@ -46,2 +46,3 @@ "main": "./dist/src/index.js", | ||
"dependencies": { | ||
"eventemitter-asyncresource": "^1.0.0", | ||
"hdr-histogram-js": "^1.2.0", | ||
@@ -48,0 +49,0 @@ "hdr-histogram-percentiles-obj": "^2.0.1" |
@@ -141,2 +141,51 @@ # piscina - the node.js worker pool | ||
### Backpressure | ||
When the `maxQueue` option is set, once the `Piscina` queue is full, no | ||
additional tasks may be submitted until the queue size falls below the | ||
limit. The `'drain'` event may be used to receive notification when the | ||
queue is empty and all tasks have been submitted to workers for processing. | ||
Example: Using a Node.js stream to feed a Piscina worker pool: | ||
```js | ||
'use strict'; | ||
const { resolve } = require('path'); | ||
const Pool = require('../..'); | ||
const pool = new Pool({ | ||
filename: resolve(__dirname, 'worker.js'), | ||
maxQueue: 8 | ||
}); | ||
const stream = getStreamSomehow(); | ||
stream.setEncoding('utf8'); | ||
pool.on('drain', () => { | ||
if (stream.isPaused()) { | ||
console.log('resuming...', counter, pool.queueSize); | ||
stream.resume(); | ||
} | ||
}); | ||
performance.mark('A'); | ||
stream | ||
.on('data', (data) => { | ||
pool.runTask(data); | ||
if (pool.queueSize === maxQueue) { | ||
console.log('pausing...', counter, pool.queueSize); | ||
stream.pause(); | ||
} | ||
}) | ||
.on('error', console.error) | ||
.on('end', () => { | ||
console.log('done'); | ||
}); | ||
``` | ||
### Additional Examples | ||
Additional examples can be found in the GitHub repo at | ||
https://github.com/jasnell/piscina/tree/master/examples | ||
## Class: `Piscina` | ||
@@ -244,2 +293,6 @@ | ||
### Event: `'drain'` | ||
A `'drain'` event is emitted whenever the `queueSize` reaches `0`. | ||
### Property: `completed` (readonly) | ||
@@ -246,0 +299,0 @@ |
import { Worker, MessageChannel, MessagePort, receiveMessageOnPort } from 'worker_threads'; | ||
import { EventEmitter, once } from 'events'; | ||
import { once } from 'events'; | ||
import EventEmitterAsyncResource from 'eventemitter-asyncresource'; | ||
import { AsyncResource } from 'async_hooks'; | ||
@@ -63,3 +64,3 @@ import { cpus } from 'os'; | ||
env? : EnvSpecifier, | ||
workerData? : any, | ||
workerData? : any | ||
} | ||
@@ -118,4 +119,5 @@ | ||
callback : TaskCallback, | ||
abortSignal : AbortSignalAny | null) { | ||
super('Piscina.Task', { requireManualDestroy: false }); | ||
abortSignal : AbortSignalAny | null, | ||
triggerAsyncId : number) { | ||
super('Piscina.Task', { requireManualDestroy: true, triggerAsyncId }); | ||
this.callback = callback; | ||
@@ -401,2 +403,3 @@ this.task = task; | ||
workerInfo.postTask(taskInfo); | ||
this._maybeDrain(); | ||
return; | ||
@@ -446,3 +449,4 @@ } | ||
}, | ||
abortSignal); | ||
abortSignal, | ||
this.publicInterface.asyncResource.asyncId()); | ||
@@ -526,5 +530,12 @@ if (abortSignal !== null) { | ||
workerInfo.postTask(taskInfo); | ||
this._maybeDrain(); | ||
return ret; | ||
} | ||
_maybeDrain () { | ||
if (this.taskQueue.length === 0) { | ||
this.publicInterface.emit('drain'); | ||
} | ||
} | ||
async destroy () { | ||
@@ -546,7 +557,7 @@ while (this.taskQueue.length > 0) { | ||
class Piscina extends EventEmitter { | ||
class Piscina extends EventEmitterAsyncResource { | ||
#pool : ThreadPool; | ||
constructor (options : Options = {}) { | ||
super(options as any); | ||
super({ ...options, name: 'Piscina' }); | ||
@@ -553,0 +564,0 @@ if (typeof options.filename !== 'string' && options.filename != null) { |
@@ -68,1 +68,16 @@ import { MessageChannel } from 'worker_threads'; | ||
}); | ||
test('Piscina emits drain', async ({ ok }) => { | ||
const pool = new Piscina({ | ||
filename: resolve(__dirname, 'fixtures/eval.js') | ||
}); | ||
let drained = false; | ||
pool.on('drain', () => { | ||
drained = true; | ||
}); | ||
await Promise.all([pool.runTask('123'), pool.runTask('123')]); | ||
ok(drained); | ||
}); |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
Debug access
Supply chain riskUses debug, reflection and dynamic code execution features.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 1 instance in 1 package
Mixed license
License(Experimental) Package contains multiple licenses.
Found 1 instance in 1 package
0
458
3
1
134789
3
47
2420
+ Addedeventemitter-asyncresource@1.0.0(transitive)