thread-stream
Advanced tools
Comparing version 0.10.0 to 0.11.0
127
index.js
@@ -13,2 +13,23 @@ 'use strict' | ||
class FakeWeakRef { | ||
constructor (value) { | ||
this._value = value | ||
} | ||
deref () { | ||
return this._value | ||
} | ||
} | ||
const FinalizationRegistry = global.FinalizationRegistry || class FakeFinalizationRegistry { | ||
register () {} | ||
unregister () {} | ||
} | ||
const WeakRef = global.WeakRef || FakeWeakRef | ||
const registry = new FinalizationRegistry((worker) => { | ||
worker.terminate() | ||
}) | ||
function createWorker (stream, opts) { | ||
@@ -31,2 +52,10 @@ const { filename, workerData } = opts | ||
// We keep a strong reference for now, | ||
// we need to start writing first | ||
worker.stream = new FakeWeakRef(stream) | ||
worker.on('message', onWorkerMessage) | ||
worker.on('exit', onWorkerExit) | ||
registry.register(stream, worker) | ||
return worker | ||
@@ -95,2 +124,59 @@ } | ||
function onWorkerMessage (msg) { | ||
const stream = this.stream.deref() | ||
if (stream === undefined) { | ||
// Terminate the worker. | ||
this.terminate() | ||
return | ||
} | ||
switch (msg.code) { | ||
case 'READY': | ||
// Replace the FakeWeakRef with a | ||
// proper one. | ||
this.stream = new WeakRef(stream) | ||
if (stream._sync) { | ||
stream.ready = true | ||
stream.flushSync() | ||
stream.emit('ready') | ||
} else { | ||
stream.once('drain', function () { | ||
stream.flush(() => { | ||
stream.ready = true | ||
stream.emit('ready') | ||
}) | ||
}) | ||
nextFlush(stream) | ||
} | ||
break | ||
case 'ERROR': | ||
stream.closed = true | ||
// TODO only remove our own | ||
stream.worker.removeAllListeners('exit') | ||
stream.worker.terminate().then(null, () => {}) | ||
process.nextTick(() => { | ||
stream.emit('error', msg.err) | ||
}) | ||
break | ||
default: | ||
throw new Error('this should not happen: ' + msg.code) | ||
} | ||
} | ||
function onWorkerExit (code) { | ||
const stream = this.stream.deref() | ||
if (stream === undefined) { | ||
// Nothing to do, the worker already exit | ||
return | ||
} | ||
registry.unregister(stream) | ||
stream.closed = true | ||
setImmediate(function () { | ||
if (code !== 0) { | ||
stream.emit('error', new Error('The worker thread exited')) | ||
} | ||
stream.emit('close') | ||
}) | ||
} | ||
class ThreadStream extends EventEmitter { | ||
@@ -116,43 +202,2 @@ constructor (opts = {}) { | ||
this.buf = '' | ||
this.worker.on('message', (msg) => { | ||
switch (msg.code) { | ||
case 'READY': | ||
if (this._sync) { | ||
this.ready = true | ||
this.flushSync() | ||
this.emit('ready') | ||
} else { | ||
this.once('drain', function () { | ||
this.flush(() => { | ||
this.ready = true | ||
this.emit('ready') | ||
}) | ||
}) | ||
nextFlush(this) | ||
} | ||
break | ||
case 'ERROR': | ||
this.closed = true | ||
// TODO only remove our own | ||
this.worker.removeAllListeners('exit') | ||
this.worker.terminate().then(null, () => {}) | ||
process.nextTick(() => { | ||
this.emit('error', msg.err) | ||
}) | ||
break | ||
default: | ||
throw new Error('this should not happen: ' + msg.code) | ||
} | ||
}) | ||
this.worker.on('exit', (code) => { | ||
this.closed = true | ||
setImmediate(() => { | ||
if (code !== 0) { | ||
this.emit('error', new Error('The worker thread exited')) | ||
} | ||
this.emit('close') | ||
}) | ||
}) | ||
} | ||
@@ -159,0 +204,0 @@ |
{ | ||
"name": "thread-stream", | ||
"version": "0.10.0", | ||
"version": "0.11.0", | ||
"description": "A streaming way to send data to a Node.js Worker Thread", | ||
@@ -9,3 +9,3 @@ "main": "index.js", | ||
"fastbench": "^1.0.1", | ||
"husky": "^6.0.0", | ||
"husky": "^7.0.0", | ||
"sonic-boom": "^2.0.1", | ||
@@ -12,0 +12,0 @@ "standard": "^16.0.3", |
@@ -65,4 +65,6 @@ # thread-stream | ||
The underlining worker is automatically closed if the stream is garbage collected. | ||
## License | ||
MIT |
@@ -161,1 +161,14 @@ 'use strict' | ||
}) | ||
test('close the work if out of scope on gc', { skip: !global.WeakRef }, async function (t) { | ||
const dest = file() | ||
const child = fork(join(__dirname, 'close-on-gc.js'), [dest], { | ||
execArgv: ['--expose-gc'] | ||
}) | ||
const [code] = await once(child, 'exit') | ||
t.equal(code, 0) | ||
const data = await readFile(dest, 'utf8') | ||
t.equal(data, 'hello world\n') | ||
}) |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
41202
30
1363
70