Socket
Socket
Sign inDemoInstall

thread-stream

Package Overview
Dependencies
0
Maintainers
1
Versions
42
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.1.0 to 0.2.0

102

index.js

@@ -55,4 +55,5 @@ 'use strict'

if (this.buf.length > 0) {
this.write(this.buf)
const toWrite = this.buf
this.buf = ''
this.write(toWrite)
}

@@ -79,49 +80,80 @@ this.emit('ready')

_write (data, cb) {
// data is smaller than the shared buffer length
const current = Atomics.load(this._state, WRITE_INDEX)
const length = Buffer.byteLength(data)
this._data.write(data, current)
Atomics.store(this._state, WRITE_INDEX, current + length)
Atomics.notify(this._state, WRITE_INDEX)
cb()
return true
}
write (data) {
if (!this.ready || this.flushing) {
this.buf += data
// TODO this should return false
return true
}
if (data.length >= this._data.length) {
// We are not splitting the string in two to avoid dealing
// with truncated utf-8 chunks, therefore we cannot write
// a string longer than the buffer.
throw new Error('The SharedArrayBuffer is too small')
const cb = () => {
if (!this.needDrain) {
// process._rawDebug('emitting drain')
this.needDrain = true
process.nextTick(drain, this)
}
}
let current = Atomics.load(this._state, WRITE_INDEX)
const length = Buffer.byteLength(data)
if (current + length >= this._data.length) {
// Handle overflow cases, we need to go back
// at the beginning of the buffer to write the string.
if (this._sync) {
this.flushSync()
Atomics.store(this._state, READ_INDEX, 0)
current = 0
} else {
this.flushing = true
this.buf = data
if (this._sync) {
while (data.length !== 0) {
const writeIndex = Atomics.load(this._state, WRITE_INDEX)
const leftover = this._data.length - writeIndex
if (leftover === 0) {
this.flushSync()
Atomics.store(this._state, READ_INDEX, 0)
Atomics.store(this._state, WRITE_INDEX, 0)
continue
} else if (leftover < 0) {
throw new Error('overwritten')
}
// TODO handle truncated utf-8 chunks
const toWrite = data.slice(0, leftover)
this._write(toWrite, cb)
data = data.slice(leftover)
}
return true
}
const next = () => {
const writeIndex = Atomics.load(this._state, WRITE_INDEX)
const leftover = this._data.length - writeIndex
if (leftover > 0) {
if (this.buf.length === 0) {
this.flushing = false
cb()
return
}
// TODO handle truncated utf-8 chunks
const toWrite = this.buf.slice(0, leftover)
this.buf = this.buf.slice(leftover)
this._write(toWrite, next)
} else if (leftover === 0) {
this.flush(() => {
this.flushing = false
current = 0
// process._rawDebug('writing ' + Buffer.byteLength(this.buf))
this._data.write(this.buf, current)
Atomics.store(this._state, READ_INDEX, 0)
Atomics.store(this._state, WRITE_INDEX, current + Buffer.byteLength(this.buf))
Atomics.notify(this._state, WRITE_INDEX)
this.buf = ''
this.emit('drain')
Atomics.store(this._state, WRITE_INDEX, 0)
next()
})
return
} else {
throw new Error('overwritten')
}
}
this._data.write(data, current)
Atomics.store(this._state, WRITE_INDEX, current + length)
Atomics.notify(this._state, WRITE_INDEX)
if (!this.needDrain) {
this.needDrain = true
process.nextTick(drain, this)
}
return true
this.buf = data
this.flushing = true
next()
return false
}

@@ -128,0 +160,0 @@

@@ -62,2 +62,3 @@ 'use strict'

const toWrite = data.toString('utf8', current, end)
// process._rawDebug('worker writing: ' + toWrite)

@@ -64,0 +65,0 @@ destination.write(toWrite, function () {

{
"name": "thread-stream",
"version": "0.1.0",
"version": "0.2.0",
"description": "A streaming way to send data to a Node.js Worker Thread",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -25,5 +25,8 @@ # thread-stream

stream.write('hello')
stream.write(' ')
stream.write('world')
stream.end()
stream.flush(function () {
stream.write(' ')
stream.write('world')
stream.flushSync()
stream.end()
})
```

@@ -30,0 +33,0 @@

@@ -148,1 +148,72 @@ 'use strict'

})
test('over the bufferSize at startup', function (t) {
t.plan(7)
const dest = file()
const stream = new ThreadStream({
bufferSize: 10,
filename: join(__dirname, 'to-file'),
workerData: { dest }
})
stream.on('drain', () => {
t.pass('drain')
})
stream.on('ready', () => {
t.pass('ready emitted')
})
t.ok(stream.write('hello world\n'))
t.ok(stream.write('something else\n'))
stream.end()
stream.on('finish', () => {
readFile(dest, 'utf8', (err, data) => {
t.error(err)
t.equal(data, 'hello world\nsomething else\n')
})
})
stream.on('close', () => {
t.pass('close emitted')
})
})
test('over the bufferSize at startup (async)', function (t) {
t.plan(7)
const dest = file()
const stream = new ThreadStream({
bufferSize: 10,
filename: join(__dirname, 'to-file'),
workerData: { dest },
sync: false
})
stream.on('drain', () => {
t.pass('drain')
})
stream.on('ready', () => {
t.pass('ready emitted')
})
t.ok(stream.write('hello world\n'))
t.ok(stream.write('something else\n'))
stream.end()
stream.on('finish', () => {
readFile(dest, 'utf8', (err, data) => {
t.error(err)
t.equal(data, 'hello world\nsomething else\n')
})
})
stream.on('close', () => {
t.pass('close emitted')
})
})
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc