Socket
Socket
Sign inDemoInstall

thread-stream

Package Overview
Dependencies
Maintainers
1
Versions
43
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

thread-stream - npm Package Compare versions

Comparing version 0.2.1 to 0.3.0

test/bench.test.js

13

bench.js

@@ -16,6 +16,7 @@ 'use strict'

const dummyConsole = new Console(out)
const threadStream = new ThreadStream({
const threadStreamSync = new ThreadStream({
filename: join(__dirname, 'test', 'to-file'),
workerData: { dest: '/dev/null' },
bufferSize: 4 * 1024 * 1024
bufferSize: 4 * 1024 * 1024,
sync: true
})

@@ -40,5 +41,5 @@ const threadStreamAsync = new ThreadStream({

const run = bench([
function benchThreadStream (cb) {
function benchThreadStreamSync (cb) {
for (let i = 0; i < MAX; i++) {
threadStream.write(str)
threadStreamSync.write(str)
}

@@ -87,5 +88,1 @@ setImmediate(cb)

}
process.on('beforeExit', function () {
console.log(threadStream.flushes)
})

@@ -53,2 +53,6 @@ 'use strict'

} else if (leftover === 0) {
if (writeIndex === 0 && stream.buf.length === 0) {
// we had a flushSync in the meanwhile
return
}
stream.flush(() => {

@@ -72,3 +76,3 @@ Atomics.store(stream._state, READ_INDEX, 0)

this._data = Buffer.from(this._dataBuf)
this._sync = opts.sync === undefined ? true : opts.sync
this._sync = opts.sync || false
this.worker = createWorker(this, opts)

@@ -85,11 +89,3 @@ this.ready = false

this.ready = true
if (this.buf.length > 0) {
const toWrite = this.buf
this.buf = ''
this.write(toWrite)
}
this.emit('ready')
if (this.ending) {
this.end()
}
break

@@ -129,29 +125,6 @@ case 'FINISH':

const cb = () => {
if (!this.needDrain) {
// process._rawDebug('emitting drain')
this.needDrain = true
process.nextTick(drain, this)
}
}
if (this._sync) {
while (data.length !== 0) {
const writeIndex = Atomics.load(this._state, WRITE_INDEX)
const leftover = this._data.length - writeIndex
if (leftover === 0) {
this.flushSync()
Atomics.store(this._state, READ_INDEX, 0)
Atomics.store(this._state, WRITE_INDEX, 0)
continue
} else if (leftover < 0) {
throw new Error('overwritten')
}
this.buf += data
this._writeSync()
// TODO handle truncated utf-8 chunks
const toWrite = data.slice(0, leftover)
this._write(toWrite, cb)
data = data.slice(leftover)
}
return true

@@ -164,2 +137,3 @@ }

// TODO implement highWaterMark
return false

@@ -169,7 +143,12 @@ }

end () {
this.ending = true
if (!this.ready) {
this.once('ready', this.end.bind(this))
return
}
if (this.ending) {
return
}
this.ending = true
if (this.flushing) {

@@ -182,4 +161,2 @@ this.once('drain', this.end.bind(this))

// process._rawDebug('end...!')
// process._rawDebug('writing index')

@@ -189,2 +166,3 @@ Atomics.store(this._state, WRITE_INDEX, -1)

Atomics.notify(this._state, WRITE_INDEX)
// process._rawDebug('end finished...')
}

@@ -209,3 +187,38 @@

_writeSync () {
const cb = () => {
if (!this.needDrain) {
// process._rawDebug('emitting drain')
this.needDrain = true
process.nextTick(drain, this)
}
}
this.flushing = false
while (this.buf.length !== 0) {
const writeIndex = Atomics.load(this._state, WRITE_INDEX)
const leftover = this._data.length - writeIndex
if (leftover === 0) {
this._flushSync()
Atomics.store(this._state, READ_INDEX, 0)
Atomics.store(this._state, WRITE_INDEX, 0)
continue
} else if (leftover < 0) {
throw new Error('overwritten')
}
// TODO handle truncated utf-8 chunks
const toWrite = this.buf.slice(0, leftover)
this.buf = this.buf.slice(leftover)
// process._rawDebug('writing ' + toWrite.length)
this._write(toWrite, cb)
}
}
flushSync () {
this._writeSync()
this._flushSync()
}
_flushSync () {
if (this.flushing) {

@@ -215,2 +228,4 @@ throw new Error('unable to flush while flushing')

// process._rawDebug('flushSync started')
const writeIndex = Atomics.load(this._state, WRITE_INDEX)

@@ -229,2 +244,3 @@

}
// process._rawDebug('flushSync finished')
}

@@ -231,0 +247,0 @@ }

{
"name": "thread-stream",
"version": "0.2.1",
"version": "0.3.0",
"description": "A streaming way to send data to a Node.js Worker Thread",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -21,9 +21,14 @@ # thread-stream

filename: join(__dirname, 'worker.js'),
workerData: { dest }
workerData: { dest },
sync: false // default
})
stream.write('hello')
// Asynchronous flushing
stream.flush(function () {
stream.write(' ')
stream.write('world')
// Synchronous flushing
stream.flushSync()

@@ -30,0 +35,0 @@ stream.end()

@@ -29,3 +29,3 @@ 'use strict'

test('base', function (t) {
test('base sync=true', function (t) {
t.plan(7)

@@ -36,3 +36,4 @@

filename: join(__dirname, 'to-file'),
workerData: { dest }
workerData: { dest },
sync: true
})

@@ -72,3 +73,4 @@

filename: join(__dirname, 'to-file'),
workerData: { dest }
workerData: { dest },
sync: true
})

@@ -159,3 +161,4 @@

filename: join(__dirname, 'to-file'),
workerData: { dest }
workerData: { dest },
sync: true
})

@@ -223,1 +226,37 @@

})
test('flushSync sync=false', function (t) {
const dest = file()
const stream = new ThreadStream({
bufferSize: 128,
filename: join(__dirname, 'to-file'),
workerData: { dest },
sync: false
})
stream.on('ready', () => {
t.pass('ready emitted')
for (let count = 0; count < 20; count++) {
stream.write('aaaaaaaaaa')
}
stream.flushSync()
})
stream.on('drain', () => {
t.pass('drain')
stream.end()
})
stream.on('finish', () => {
t.pass('finish emitted')
})
stream.on('close', () => {
readFile(dest, 'utf8', (err, data) => {
t.error(err)
t.equal(data.length, 200)
t.end()
})
})
})
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc