Socket
Socket
Sign inDemoInstall

thread-stream

Package Overview
Dependencies
Maintainers
4
Versions
43
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

thread-stream - npm Package Compare versions

Comparing version 0.11.2 to 0.12.0

210

index.js

@@ -13,2 +13,3 @@ 'use strict'

const buffer = require('buffer')
const assert = require('assert')

@@ -71,4 +72,7 @@ // V8 limit for string size

function drain (stream) {
stream.needDrain = false
stream.emit('drain')
assert(!stream._sync)
if (stream.needDrain) {
stream.needDrain = false
stream.emit('drain')
}
}

@@ -83,7 +87,9 @@

stream.flushing = false
if (!stream.needDrain) {
// process._rawDebug('emitting drain')
stream.needDrain = true
if (stream.ending) {
stream._end()
} else if (stream.needDrain) {
process.nextTick(drain, stream)
}
return

@@ -146,25 +152,10 @@ }

this.stream = new WeakRef(stream)
if (stream._sync) {
stream.ready = true
stream.flushSync()
stream.flush(() => {
this.ready = true
stream.emit('ready')
} else {
stream.once('drain', function () {
stream.flush(() => {
stream.ready = true
stream.emit('ready')
})
})
nextFlush(stream)
}
})
break
case 'ERROR':
stream.closed = true
stream.worker.exited = true
// TODO only remove our own
stream.worker.removeAllListeners('exit')
stream.worker.terminate().then(null, () => {})
process.nextTick(() => {
stream.emit('error', msg.err)
})
stream._destroy(msg.err)
break

@@ -183,10 +174,5 @@ default:

registry.unregister(stream)
stream.closed = true
stream.worker.exited = true
setImmediate(function () {
if (code !== 0) {
stream.emit('error', new Error('The worker thread exited'))
}
stream.emit('close')
})
stream.worker.off('exit', onWorkerExit)
stream._destroy(code !== 0 ? new Error('The worker thread exited') : null)
}

@@ -208,6 +194,8 @@

this.worker = createWorker(this, opts)
this.ready = false
this.ending = false
this.ended = false
this.needDrain = false
this.closed = false
this.destroyed = false
this.flushing = false
this.ready = false

@@ -217,2 +205,25 @@ this.buf = ''

_destroy (err) {
if (this.destroyed) {
return
}
this.destroyed = true
if (err) {
this.emit('error', err)
}
if (!this.worker.exited) {
this.worker.terminate()
.catch(() => {})
.then(() => {
this.emit('close')
})
} else {
setImmediate(() => {
this.emit('close')
})
}
}
_write (data, cb) {

@@ -229,81 +240,89 @@ // data is smaller than the shared buffer length

_hasSpace () {
const current = Atomics.load(this._state, WRITE_INDEX)
return this._data.length - this.buf.length - current > 0
}
write (data) {
if (this.closed) {
if (this.destroyed) {
throw new Error('the worker has exited')
}
if (this.flushing && this.buf.length + data.length >= MAX_STRING) {
// process._rawDebug('write: flushing')
this._writeSync()
this.flushing = true // we are still flushing
if (this.ending) {
throw new Error('the worker is ending')
}
if (!this.ready || this.flushing) {
this.buf += data
return this._hasSpace()
if (this.flushing && this.buf.length + data.length >= MAX_STRING) {
try {
this._writeSync()
this.flushing = true
} catch (err) {
this._destroy(err)
return false
}
}
this.buf += data
if (this._sync) {
this.buf += data
this._writeSync()
try {
this._writeSync()
return true
} catch (err) {
this._destroy(err)
return false
}
}
return true
if (!this.flushing) {
this.flushing = true
setImmediate(nextFlush, this)
}
this.buf = data
this.flushing = true
setImmediate(nextFlush, this)
return this._hasSpace()
this.needDrain = this._data.length - this.buf.length - Atomics.load(this._state, WRITE_INDEX) <= 0
return !this.needDrain
}
end () {
if (this.closed) {
if (this.destroyed) {
throw new Error('the worker has exited')
}
if (!this.ready) {
this.once('ready', this.end.bind(this))
return
}
this.ending = true
this._end()
}
if (this.flushing) {
this.once('drain', this.end.bind(this))
_end () {
if (this.ended || !this.ending || this.flushing) {
return
}
this.ended = true
if (this.ending) {
return
}
this.ending = true
try {
this.flushSync()
this.flushSync()
let readIndex = Atomics.load(this._state, READ_INDEX)
let read = Atomics.load(this._state, READ_INDEX)
// process._rawDebug('writing index')
Atomics.store(this._state, WRITE_INDEX, -1)
// process._rawDebug(`(end) readIndex (${Atomics.load(this._state, READ_INDEX)}) writeIndex (${Atomics.load(this._state, WRITE_INDEX)})`)
Atomics.notify(this._state, WRITE_INDEX)
// process._rawDebug('writing index')
Atomics.store(this._state, WRITE_INDEX, -1)
// process._rawDebug(`(end) readIndex (${Atomics.load(this._state, READ_INDEX)}) writeIndex (${Atomics.load(this._state, WRITE_INDEX)})`)
Atomics.notify(this._state, WRITE_INDEX)
// Wait for the process to complete
let spins = 0
while (readIndex !== -1) {
// process._rawDebug(`read = ${read}`)
Atomics.wait(this._state, READ_INDEX, readIndex, 1000)
readIndex = Atomics.load(this._state, READ_INDEX)
// Wait for the process to complete
let spins = 0
while (read !== -1) {
// process._rawDebug(`read = ${read}`)
Atomics.wait(this._state, READ_INDEX, read, 1000)
read = Atomics.load(this._state, READ_INDEX)
if (readIndex === -2) {
throw new Error('end() failed')
}
if (++spins === 10) {
throw new Error('end() took too long (10s)')
if (++spins === 10) {
throw new Error('end() took too long (10s)')
}
}
process.nextTick(() => {
this.emit('finish')
})
} catch (err) {
this._destroy(err)
}
process.nextTick(() => {
this.emit('finish')
})
// process._rawDebug('end finished...')

@@ -313,3 +332,3 @@ }

flush (cb) {
if (this.closed) {
if (this.destroyed) {
throw new Error('the worker has exited')

@@ -323,4 +342,4 @@ }

if (err) {
this.emit('error', err)
cb(err)
this._destroy(err)
process.nextTick(cb, err)
return

@@ -333,3 +352,3 @@ }

}
cb()
process.nextTick(cb)
})

@@ -340,5 +359,5 @@ }

const cb = () => {
if (!this.needDrain) {
// process._rawDebug('emitting drain')
this.needDrain = true
if (this.ending) {
this._end()
} else if (this.needDrain) {
process.nextTick(drain, this)

@@ -389,3 +408,3 @@ }

flushSync () {
if (this.closed) {
if (this.destroyed) {
throw new Error('the worker has exited')

@@ -412,2 +431,7 @@ }

const readIndex = Atomics.load(this._state, READ_INDEX)
if (readIndex === -2) {
throw new Error('_flushSync failed')
}
// process._rawDebug(`(flushSync) readIndex (${readIndex}) writeIndex (${writeIndex})`)

@@ -437,3 +461,3 @@ if (readIndex !== writeIndex) {

get writable () {
return !this.closed
return !this.destroyed && !this.ending
}

@@ -440,0 +464,0 @@ }

@@ -40,2 +40,8 @@ 'use strict'

destination.on('error', function (err) {
Atomics.store(state, WRITE_INDEX, -2)
Atomics.notify(state, WRITE_INDEX)
Atomics.store(state, READ_INDEX, -2)
Atomics.notify(state, READ_INDEX)
parentPort.postMessage({

@@ -42,0 +48,0 @@ code: 'ERROR',

{
"name": "thread-stream",
"version": "0.11.2",
"version": "0.12.0",
"description": "A streaming way to send data to a Node.js Worker Thread",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -12,3 +12,3 @@ 'use strict'

test('base sync=true', function (t) {
t.plan(9)
t.plan(7)

@@ -22,16 +22,2 @@ const dest = file()

stream.on('drain', () => {
t.pass('drain')
})
stream.on('ready', () => {
t.pass('ready emitted')
})
t.ok(stream.write('hello world\n'))
t.ok(stream.write('something else\n'))
t.ok(stream.writable)
stream.end()
stream.on('finish', () => {

@@ -48,6 +34,12 @@ readFile(dest, 'utf8', (err, data) => {

})
t.ok(stream.write('hello world\n'))
t.ok(stream.write('something else\n'))
t.ok(stream.writable)
stream.end()
})
test('overflow sync=true', function (t) {
t.plan(4)
t.plan(3)

@@ -62,7 +54,2 @@ const dest = file()

stream.on('ready', () => {
t.pass('ready emitted')
write()
})
let count = 0

@@ -82,2 +69,4 @@

write()
stream.on('finish', () => {

@@ -104,7 +93,2 @@ t.pass('finish emitted')

stream.on('ready', () => {
t.pass('ready emitted')
write()
})
let count = 0

@@ -125,2 +109,4 @@

write()
stream.on('drain', () => {

@@ -144,3 +130,3 @@ t.pass('drain')

test('over the bufferSize at startup', function (t) {
t.plan(8)
t.plan(6)

@@ -155,16 +141,2 @@ const dest = file()

stream.on('drain', () => {
t.pass('drain')
})
stream.on('ready', () => {
t.pass('ready emitted')
})
t.ok(stream.write('hello'))
t.notOk(stream.write(' world\n'))
t.notOk(stream.write('something else\n'))
stream.end()
stream.on('finish', () => {

@@ -180,6 +152,12 @@ readFile(dest, 'utf8', (err, data) => {

})
t.ok(stream.write('hello'))
t.ok(stream.write(' world\n'))
t.ok(stream.write('something else\n'))
stream.end()
})
test('over the bufferSize at startup (async)', function (t) {
t.plan(8)
t.plan(6)

@@ -194,10 +172,2 @@ const dest = file()

stream.on('drain', () => {
t.pass('drain')
})
stream.on('ready', () => {
t.pass('ready emitted')
})
t.ok(stream.write('hello'))

@@ -230,11 +200,2 @@ t.notOk(stream.write(' world\n'))

stream.on('ready', () => {
t.pass('ready emitted')
for (let count = 0; count < 20; count++) {
stream.write('aaaaaaaaaa')
}
stream.flushSync()
})
stream.on('drain', () => {

@@ -256,48 +217,9 @@ t.pass('drain')

})
})
test('flushSync on ready if sync=true', function (t) {
t.plan(4)
const dest = file()
const stream = new ThreadStream({
filename: join(__dirname, 'to-file.js'),
workerData: { dest },
sync: true
})
stream.on('ready', () => {
readFile(dest, 'utf8', (err, data) => {
t.error(err)
t.equal(data, 'hello world\nsomething else\n')
stream.end()
})
})
t.ok(stream.write('hello world\n'))
t.ok(stream.write('something else\n'))
for (let count = 0; count < 20; count++) {
stream.write('aaaaaaaaaa')
}
stream.flushSync()
})
test('flush on ready if sync=false', function (t) {
t.plan(4)
const dest = file()
const stream = new ThreadStream({
filename: join(__dirname, 'to-file.js'),
workerData: { dest },
sync: false
})
stream.on('ready', () => {
readFile(dest, 'utf8', (err, data) => {
t.error(err)
t.equal(data, 'hello world\nsomething else\n')
stream.end()
})
})
t.ok(stream.write('hello world\n'))
t.ok(stream.write('something else\n'))
})
test('pass down MessagePorts', async function (t) {

@@ -304,0 +226,0 @@ t.plan(3)

@@ -18,12 +18,10 @@ 'use strict'

stream.on('ready', function () {
stream.write('hello')
stream.write(' ')
stream.write('world\n')
stream.flushSync()
stream.unref()
stream.write('hello')
stream.write(' ')
stream.write('world\n')
stream.flushSync()
stream.unref()
// the stream object goes out of scope here
setImmediate(gc) // eslint-disable-line
})
// the stream object goes out of scope here
setImmediate(gc) // eslint-disable-line
}

@@ -30,0 +28,0 @@

@@ -9,3 +9,3 @@ 'use strict'

test('yarn module resolution', { skip: !isYarnPnp }, t => {
t.plan(5)
t.plan(4)

@@ -21,5 +21,4 @@ const modulePath = require.resolve('pino-elasticsearch')

stream.on('error', (err) => {
stream.on('error', () => {
t.pass('error emitted')
t.equal(err.message, 'Missing node(s) option', 'module custom error')
})

@@ -29,4 +28,3 @@

t.ok(stream.writable)
stream.end()
})

@@ -12,8 +12,6 @@ 'use strict'

stream.on('ready', function () {
stream.write('hello')
stream.write(' ')
stream.write('world\n')
stream.flushSync()
stream.unref()
})
stream.write('hello')
stream.write(' ')
stream.write('world\n')
stream.flushSync()
stream.unref()

@@ -10,3 +10,3 @@ 'use strict'

test('destroy support', function (t) {
t.plan(9)
t.plan(7)

@@ -20,29 +20,21 @@ const dest = file()

stream.on('drain', () => {
t.pass('drain')
stream.on('close', () => {
t.notOk(stream.writable)
t.pass('close emitted')
})
stream.on('ready', () => {
t.pass('ready emitted')
t.ok(stream.write('hello world\n'))
t.ok(stream.write('something else\n'))
t.ok(stream.writable)
t.ok(stream.write('hello world\n'))
t.ok(stream.write('something else\n'))
t.ok(stream.writable)
stream.end()
stream.end()
readFile(dest, 'utf8', (err, data) => {
t.error(err)
t.equal(data, 'hello world\nsomething else\n')
})
readFile(dest, 'utf8', (err, data) => {
t.error(err)
t.equal(data, 'hello world\nsomething else\n')
})
stream.on('close', () => {
t.notOk(stream.writable)
t.pass('close emitted')
})
})
test('synchronous _final support', function (t) {
t.plan(9)
t.plan(7)

@@ -56,25 +48,17 @@ const dest = file()

stream.on('drain', () => {
t.pass('drain')
stream.on('close', () => {
t.notOk(stream.writable)
t.pass('close emitted')
})
stream.on('ready', () => {
t.pass('ready emitted')
t.ok(stream.write('hello world\n'))
t.ok(stream.write('something else\n'))
t.ok(stream.writable)
t.ok(stream.write('hello world\n'))
t.ok(stream.write('something else\n'))
t.ok(stream.writable)
stream.end()
stream.end()
readFile(dest, 'utf8', (err, data) => {
t.error(err)
t.equal(data, 'hello world\nsomething else\n')
})
readFile(dest, 'utf8', (err, data) => {
t.error(err)
t.equal(data, 'hello world\nsomething else\n')
})
stream.on('close', () => {
t.notOk(stream.writable)
t.pass('close emitted')
})
})

@@ -20,16 +20,2 @@ 'use strict'

stream.on('ready', () => {
t.pass('ready emitted')
const buf = Buffer.alloc(1024).fill('x').toString() // 1 KB
// This writes 1 GB of data
for (let i = 0; i < 1024 * 1024; i++) {
length += buf.length
stream.write(buf)
}
stream.end()
})
stream.on('close', () => {

@@ -42,1 +28,11 @@ stat(dest, (err, f) => {

})
const buf = Buffer.alloc(1024).fill('x').toString() // 1 KB
// This writes 1 GB of data
for (let i = 0; i < 1024 * 1024; i++) {
length += buf.length
stream.write(buf)
}
stream.end()

@@ -28,3 +28,3 @@ 'use strict'

stream.on('ready', function () {
stream.on('ready', () => {
stream.write('hello world\n')

@@ -57,3 +57,3 @@ })

stream.on('ready', function () {
stream.on('ready', () => {
stream.write('hello world\n')

@@ -86,3 +86,3 @@ })

stream.on('ready', function () {
stream.on('ready', () => {
stream.write('hello world\n')

@@ -115,3 +115,3 @@ })

stream.on('ready', function () {
stream.on('ready', () => {
stream.write('hello world\n')

@@ -144,3 +144,3 @@ })

stream.on('ready', function () {
stream.on('ready', () => {
stream.write('hello world\n')

@@ -147,0 +147,0 @@ })

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc