Socket
Socket
Sign inDemoInstall

thread-stream

Package Overview
Dependencies
0
Maintainers
4
Versions
42
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.4.0 to 0.6.0

test/create-and-exit.js

86

index.js

@@ -39,3 +39,3 @@ 'use strict'

const writeIndex = Atomics.load(stream._state, WRITE_INDEX)
const leftover = stream._data.length - writeIndex
let leftover = stream._data.length - writeIndex

@@ -52,6 +52,27 @@ if (leftover > 0) {

}
// TODO handle truncated utf-8 chunks
const toWrite = stream.buf.slice(0, leftover)
stream.buf = stream.buf.slice(leftover)
stream._write(toWrite, nextFlush.bind(null, stream))
let toWrite = stream.buf.slice(0, leftover)
let toWriteBytes = Buffer.byteLength(toWrite)
if (toWriteBytes <= leftover) {
stream.buf = stream.buf.slice(leftover)
// process._rawDebug('writing ' + toWrite.length)
stream._write(toWrite, nextFlush.bind(null, stream))
} else {
// multi-byte utf-8
stream.flush(() => {
Atomics.store(stream._state, READ_INDEX, 0)
Atomics.store(stream._state, WRITE_INDEX, 0)
// Find a toWrite length that fits the buffer
// it must exists as the buffer is at least 4 bytes length
// and the max utf-8 length for a char is 4 bytes.
while (toWriteBytes > stream.buf.length) {
leftover = leftover / 2
toWrite = stream.buf.slice(0, leftover)
toWriteBytes = Buffer.byteLength(toWrite)
}
stream.buf = stream.buf.slice(leftover)
stream._write(toWrite, nextFlush.bind(null, stream))
})
}
} else if (leftover === 0) {

@@ -68,2 +89,3 @@ if (writeIndex === 0 && stream.buf.length === 0) {

} else {
// This should never happen
throw new Error('overwritten')

@@ -77,2 +99,6 @@ }

if (opts.bufferSize < 4) {
throw new Error('bufferSize must at least fit a 4-byte utf-8 char')
}
this._stateBuf = new SharedArrayBuffer(128)

@@ -122,7 +148,11 @@ this._state = new Int32Array(this._stateBuf)

_hasSpace () {
const current = Atomics.load(this._state, WRITE_INDEX)
return this._data.length - this.buf.length - current > 0
}
write (data) {
if (!this.ready || this.flushing) {
this.buf += data
// TODO this should return false
return true
return this._hasSpace()
}

@@ -141,4 +171,3 @@

// TODO implement highWaterMark
return false
return this._hasSpace()
}

@@ -202,3 +231,3 @@

const writeIndex = Atomics.load(this._state, WRITE_INDEX)
const leftover = this._data.length - writeIndex
let leftover = this._data.length - writeIndex
if (leftover === 0) {

@@ -210,10 +239,29 @@ this._flushSync()

} else if (leftover < 0) {
// This should never happen
throw new Error('overwritten')
}
// TODO handle truncated utf-8 chunks
const toWrite = this.buf.slice(0, leftover)
this.buf = this.buf.slice(leftover)
// process._rawDebug('writing ' + toWrite.length)
this._write(toWrite, cb)
let toWrite = this.buf.slice(0, leftover)
let toWriteBytes = Buffer.byteLength(toWrite)
if (toWriteBytes <= leftover) {
this.buf = this.buf.slice(leftover)
// process._rawDebug('writing ' + toWrite.length)
this._write(toWrite, cb)
} else {
// multi-byte utf-8
this._flushSync()
Atomics.store(this._state, READ_INDEX, 0)
Atomics.store(this._state, WRITE_INDEX, 0)
// Find a toWrite length that fits the buffer
// it must exists as the buffer is at least 4 bytes length
// and the max utf-8 length for a char is 4 bytes.
while (toWriteBytes > this.buf.length) {
leftover = leftover / 2
toWrite = this.buf.slice(0, leftover)
toWriteBytes = Buffer.byteLength(toWrite)
}
this.buf = this.buf.slice(leftover)
this._write(toWrite, cb)
}
}

@@ -255,4 +303,12 @@ }

}
unref () {
this.worker.unref()
}
ref () {
this.worker.ref()
}
}
module.exports = ThreadStream

2

package.json
{
"name": "thread-stream",
"version": "0.4.0",
"version": "0.6.0",
"description": "A streaming way to send data to a Node.js Worker Thread",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -5,26 +5,6 @@ 'use strict'

const { join } = require('path')
const { tmpdir } = require('os')
const { unlinkSync } = require('fs')
const { readFile } = require('fs')
const { file } = require('./helper')
const ThreadStream = require('..')
const files = []
let count = 0
function file () {
const file = join(tmpdir(), `thread-stream-${process.pid}-${process.hrtime().toString()}-${count++}`)
files.push(file)
return file
}
process.on('beforeExit', () => {
for (const file of files) {
try {
unlinkSync(file)
} catch (e) {
console.log(e)
}
}
})
test('base sync=true', function (t) {

@@ -154,3 +134,3 @@ t.plan(7)

test('over the bufferSize at startup', function (t) {
t.plan(7)
t.plan(8)

@@ -173,4 +153,5 @@ const dest = file()

t.ok(stream.write('hello world\n'))
t.ok(stream.write('something else\n'))
t.ok(stream.write('hello'))
t.notOk(stream.write(' world\n'))
t.notOk(stream.write('something else\n'))

@@ -192,3 +173,3 @@ stream.end()

test('over the bufferSize at startup (async)', function (t) {
t.plan(7)
t.plan(8)

@@ -211,4 +192,5 @@ const dest = file()

t.ok(stream.write('hello world\n'))
t.ok(stream.write('something else\n'))
t.ok(stream.write('hello'))
t.notOk(stream.write(' world\n'))
t.notOk(stream.write('something else\n'))

@@ -215,0 +197,0 @@ stream.end()

@@ -5,25 +5,5 @@ 'use strict'

const { join } = require('path')
const { tmpdir } = require('os')
const { unlinkSync } = require('fs')
const ThreadStream = require('..')
const { file } = require('./helper')
const files = []
let count = 0
function file () {
const file = join(tmpdir(), `thread-stream-${process.pid}-${process.hrtime().toString()}-${count++}`)
files.push(file)
return file
}
process.on('beforeExit', () => {
for (const file of files) {
try {
unlinkSync(file)
} catch (e) {
console.log(e)
}
}
})
const MAX = 1000

@@ -30,0 +10,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc