Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

thread-stream

Package Overview
Dependencies
Maintainers
4
Versions
43
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

thread-stream - npm Package Compare versions

Comparing version 0.10.0 to 0.11.0

test/close-on-gc.js

127

index.js

@@ -13,2 +13,23 @@ 'use strict'

class FakeWeakRef {
constructor (value) {
this._value = value
}
deref () {
return this._value
}
}
const FinalizationRegistry = global.FinalizationRegistry || class FakeFinalizationRegistry {
register () {}
unregister () {}
}
const WeakRef = global.WeakRef || FakeWeakRef
const registry = new FinalizationRegistry((worker) => {
worker.terminate()
})
function createWorker (stream, opts) {

@@ -31,2 +52,10 @@ const { filename, workerData } = opts

// We keep a strong reference for now,
// we need to start writing first
worker.stream = new FakeWeakRef(stream)
worker.on('message', onWorkerMessage)
worker.on('exit', onWorkerExit)
registry.register(stream, worker)
return worker

@@ -95,2 +124,59 @@ }

function onWorkerMessage (msg) {
const stream = this.stream.deref()
if (stream === undefined) {
// Terminate the worker.
this.terminate()
return
}
switch (msg.code) {
case 'READY':
// Replace the FakeWeakRef with a
// proper one.
this.stream = new WeakRef(stream)
if (stream._sync) {
stream.ready = true
stream.flushSync()
stream.emit('ready')
} else {
stream.once('drain', function () {
stream.flush(() => {
stream.ready = true
stream.emit('ready')
})
})
nextFlush(stream)
}
break
case 'ERROR':
stream.closed = true
// TODO only remove our own
stream.worker.removeAllListeners('exit')
stream.worker.terminate().then(null, () => {})
process.nextTick(() => {
stream.emit('error', msg.err)
})
break
default:
throw new Error('this should not happen: ' + msg.code)
}
}
function onWorkerExit (code) {
const stream = this.stream.deref()
if (stream === undefined) {
// Nothing to do, the worker already exit
return
}
registry.unregister(stream)
stream.closed = true
setImmediate(function () {
if (code !== 0) {
stream.emit('error', new Error('The worker thread exited'))
}
stream.emit('close')
})
}
class ThreadStream extends EventEmitter {

@@ -116,43 +202,2 @@ constructor (opts = {}) {

this.buf = ''
this.worker.on('message', (msg) => {
switch (msg.code) {
case 'READY':
if (this._sync) {
this.ready = true
this.flushSync()
this.emit('ready')
} else {
this.once('drain', function () {
this.flush(() => {
this.ready = true
this.emit('ready')
})
})
nextFlush(this)
}
break
case 'ERROR':
this.closed = true
// TODO only remove our own
this.worker.removeAllListeners('exit')
this.worker.terminate().then(null, () => {})
process.nextTick(() => {
this.emit('error', msg.err)
})
break
default:
throw new Error('this should not happen: ' + msg.code)
}
})
this.worker.on('exit', (code) => {
this.closed = true
setImmediate(() => {
if (code !== 0) {
this.emit('error', new Error('The worker thread exited'))
}
this.emit('close')
})
})
}

@@ -159,0 +204,0 @@

{
"name": "thread-stream",
"version": "0.10.0",
"version": "0.11.0",
"description": "A streaming way to send data to a Node.js Worker Thread",

@@ -9,3 +9,3 @@ "main": "index.js",

"fastbench": "^1.0.1",
"husky": "^6.0.0",
"husky": "^7.0.0",
"sonic-boom": "^2.0.1",

@@ -12,0 +12,0 @@ "standard": "^16.0.3",

@@ -65,4 +65,6 @@ # thread-stream

The underlining worker is automatically closed if the stream is garbage collected.
## License
MIT

@@ -161,1 +161,14 @@ 'use strict'

})
test('close the work if out of scope on gc', { skip: !global.WeakRef }, async function (t) {
const dest = file()
const child = fork(join(__dirname, 'close-on-gc.js'), [dest], {
execArgv: ['--expose-gc']
})
const [code] = await once(child, 'exit')
t.equal(code, 0)
const data = await readFile(dest, 'utf8')
t.equal(data, 'hello world\n')
})

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc