Big News: Socket raises $60M Series C at a $1B valuation to secure software supply chains for AI-driven development.Announcement
Sign In

thread-stream

Package Overview
Dependencies
Maintainers
3
Versions
47
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

thread-stream - npm Package Compare versions

Comparing version
4.1.0
to
4.2.0
+15
.claude/settings.local.json
{
"permissions": {
"allow": [
"Bash(npx tap:*)",
"Bash(node:*)",
"Bash(for i in 1 2 3 4 5)",
"Bash(do)",
"Bash(echo:*)",
"Bash(done)",
"Bash(npm test:*)"
],
"deny": [],
"ask": []
}
}
import { test } from 'node:test'
import assert from 'node:assert/strict'
import { readFile } from 'node:fs/promises'
import ThreadStream from '../index.js'
import { join } from 'desm'
import { file } from './helper.js'
test('preserves multibyte records that cross the buffer boundary', async () => {
const dest = file()
const stream = new ThreadStream({
bufferSize: 128,
filename: join(import.meta.url, 'to-file.js'),
workerData: { dest },
sync: false
})
let expected = ''
for (let i = 0; i < 1000; i++) {
const line = `{"idx":${i},"alert":"🚨"}\n`
expected += line
stream.write(line)
}
await new Promise((resolve, reject) => {
stream.once('error', reject)
stream.once('close', resolve)
stream.end()
})
const data = await readFile(dest, 'utf8')
assert.strictEqual(data, expected)
})
+92
-87

@@ -11,3 +11,4 @@ 'use strict'

WRITE_INDEX,
READ_INDEX
READ_INDEX,
SEQ_INDEX
} = require('./lib/indexes')

@@ -19,3 +20,3 @@ const buffer = require('buffer')

// V8 limit for string size
// Maximum pending buffered data before forcing a synchronous drain
const MAX_STRING = buffer.constants.MAX_STRING_LENGTH

@@ -25,2 +26,16 @@

function updateState (stream, fn) {
Atomics.add(stream[kImpl].state, SEQ_INDEX, 1)
fn()
Atomics.add(stream[kImpl].state, SEQ_INDEX, 1)
Atomics.notify(stream[kImpl].state, SEQ_INDEX)
}
function resetIndexes (stream) {
updateState(stream, () => {
Atomics.store(stream[kImpl].state, READ_INDEX, 0)
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
})
}
class FakeWeakRef {

@@ -99,28 +114,29 @@ constructor (value) {

function nextFlush (stream) {
const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX)
let leftover = stream[kImpl].data.length - writeIndex
while (true) {
const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX)
const leftover = stream[kImpl].data.length - writeIndex
if (leftover > 0) {
if (stream[kImpl].buf.length === 0) {
stream[kImpl].flushing = false
if (leftover > 0) {
if (stream[kImpl].bufLen === 0) {
stream[kImpl].flushing = false
if (stream[kImpl].ending) {
end(stream)
} else if (stream[kImpl].needDrain) {
process.nextTick(drain, stream)
if (stream[kImpl].ending) {
end(stream)
} else if (stream[kImpl].needDrain) {
process.nextTick(drain, stream)
}
return
}
return
write(stream, leftover, noop)
continue
}
let toWrite = stream[kImpl].buf.slice(0, leftover)
let toWriteBytes = Buffer.byteLength(toWrite)
if (toWriteBytes <= leftover) {
stream[kImpl].buf = stream[kImpl].buf.slice(leftover)
// process._rawDebug('writing ' + toWrite.length)
write(stream, toWrite, nextFlush.bind(null, stream))
} else {
// multi-byte utf-8
if (leftover === 0) {
if (writeIndex === 0 && stream[kImpl].bufLen === 0) {
// we had a flushSync in the meanwhile
return
}
waitForRead(stream, () => {
// err is already handled in waitForRead()
if (stream.destroyed) {

@@ -130,32 +146,11 @@ return

Atomics.store(stream[kImpl].state, READ_INDEX, 0)
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
Atomics.notify(stream[kImpl].state, READ_INDEX)
// Find a toWrite length that fits the buffer
// it must exists as the buffer is at least 4 bytes length
// and the max utf-8 length for a char is 4 bytes.
while (toWriteBytes > stream[kImpl].data.length) {
leftover = leftover / 2
toWrite = stream[kImpl].buf.slice(0, leftover)
toWriteBytes = Buffer.byteLength(toWrite)
}
stream[kImpl].buf = stream[kImpl].buf.slice(leftover)
write(stream, toWrite, nextFlush.bind(null, stream))
resetIndexes(stream)
nextFlush(stream)
})
}
} else if (leftover === 0) {
if (writeIndex === 0 && stream[kImpl].buf.length === 0) {
// we had a flushSync in the meanwhile
return
}
waitForRead(stream, () => {
Atomics.store(stream[kImpl].state, READ_INDEX, 0)
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
Atomics.notify(stream[kImpl].state, READ_INDEX)
nextFlush(stream)
})
} else {
// This should never happen
destroy(stream, new Error('overwritten'))
return
}

@@ -256,3 +251,5 @@ }

this[kImpl].closed = false
this[kImpl].buf = ''
this[kImpl].buf = []
this[kImpl].bufHead = 0
this[kImpl].bufLen = 0
this[kImpl].flushCallbacks = new Map()

@@ -269,2 +266,3 @@ this[kImpl].nextFlushId = 0

write (data) {
const dataBuf = Buffer.isBuffer(data) ? data : Buffer.from(data)
if (this[kImpl].destroyed) {

@@ -280,3 +278,3 @@ error(this, new Error('the worker has exited'))

if (this[kImpl].flushing && this[kImpl].buf.length + data.length >= MAX_STRING) {
if (this[kImpl].flushing && this[kImpl].bufLen + dataBuf.length >= MAX_STRING) {
try {

@@ -291,3 +289,4 @@ writeSync(this)

this[kImpl].buf += data
this[kImpl].buf.push(dataBuf)
this[kImpl].bufLen += dataBuf.length

@@ -309,3 +308,3 @@ if (this[kImpl].sync) {

this[kImpl].needDrain = this[kImpl].data.length - this[kImpl].buf.length - Atomics.load(this[kImpl].state, WRITE_INDEX) <= 0
this[kImpl].needDrain = this[kImpl].data.length - this[kImpl].bufLen - Atomics.load(this[kImpl].state, WRITE_INDEX) <= 0
return !this[kImpl].needDrain

@@ -396,3 +395,3 @@ }

if (!stream[kImpl].sync && (stream[kImpl].flushing || stream[kImpl].buf.length > 0)) {
if (!stream[kImpl].sync && (stream[kImpl].flushing || stream[kImpl].bufLen > 0)) {
setImmediate(flushBuffer, stream, cb)

@@ -511,9 +510,39 @@ return

function write (stream, data, cb) {
function write (stream, maxBytes, cb) {
// data is smaller than the shared buffer length
const current = Atomics.load(stream[kImpl].state, WRITE_INDEX)
const length = Buffer.byteLength(data)
stream[kImpl].data.write(data, current)
Atomics.store(stream[kImpl].state, WRITE_INDEX, current + length)
Atomics.notify(stream[kImpl].state, WRITE_INDEX)
let offset = current
let remaining = maxBytes
while (remaining > 0 && stream[kImpl].bufLen !== 0) {
const head = stream[kImpl].bufHead
const buf = stream[kImpl].buf[head]
if (buf.length <= remaining) {
buf.copy(stream[kImpl].data, offset)
offset += buf.length
remaining -= buf.length
stream[kImpl].bufLen -= buf.length
stream[kImpl].bufHead = head + 1
if (stream[kImpl].bufHead === stream[kImpl].buf.length) {
stream[kImpl].buf.length = 0
stream[kImpl].bufHead = 0
} else if (stream[kImpl].bufHead >= 1024 && stream[kImpl].bufHead * 2 >= stream[kImpl].buf.length) {
stream[kImpl].buf.splice(0, stream[kImpl].bufHead)
stream[kImpl].bufHead = 0
}
continue
}
buf.copy(stream[kImpl].data, offset, 0, remaining)
stream[kImpl].buf[head] = buf.subarray(remaining)
stream[kImpl].bufLen -= remaining
offset += remaining
remaining = 0
}
updateState(stream, () => {
Atomics.store(stream[kImpl].state, WRITE_INDEX, offset)
})
cb()

@@ -535,5 +564,6 @@ return true

// process._rawDebug('writing index')
Atomics.store(stream[kImpl].state, WRITE_INDEX, -1)
updateState(stream, () => {
Atomics.store(stream[kImpl].state, WRITE_INDEX, -1)
})
// process._rawDebug(`(end) readIndex (${Atomics.load(stream.state, READ_INDEX)}) writeIndex (${Atomics.load(stream.state, WRITE_INDEX)})`)
Atomics.notify(stream[kImpl].state, WRITE_INDEX)

@@ -578,10 +608,8 @@ // Wait for the process to complete

while (stream[kImpl].buf.length !== 0) {
while (stream[kImpl].bufLen !== 0) {
const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX)
let leftover = stream[kImpl].data.length - writeIndex
const leftover = stream[kImpl].data.length - writeIndex
if (leftover === 0) {
flushSync(stream)
Atomics.store(stream[kImpl].state, READ_INDEX, 0)
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
Atomics.notify(stream[kImpl].state, READ_INDEX)
resetIndexes(stream)
continue

@@ -593,26 +621,3 @@ } else if (leftover < 0) {

let toWrite = stream[kImpl].buf.slice(0, leftover)
let toWriteBytes = Buffer.byteLength(toWrite)
if (toWriteBytes <= leftover) {
stream[kImpl].buf = stream[kImpl].buf.slice(leftover)
// process._rawDebug('writing ' + toWrite.length)
write(stream, toWrite, cb)
} else {
// multi-byte utf-8
flushSync(stream)
Atomics.store(stream[kImpl].state, READ_INDEX, 0)
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
Atomics.notify(stream[kImpl].state, READ_INDEX)
// Find a toWrite length that fits the buffer
// it must exists as the buffer is at least 4 bytes length
// and the max utf-8 length for a char is 4 bytes.
while (toWriteBytes > stream[kImpl].buf.length) {
leftover = leftover / 2
toWrite = stream[kImpl].buf.slice(0, leftover)
toWriteBytes = Buffer.byteLength(toWrite)
}
stream[kImpl].buf = stream[kImpl].buf.slice(leftover)
write(stream, toWrite, cb)
}
write(stream, leftover, cb)
}

@@ -619,0 +624,0 @@ }

'use strict'
const SEQ_INDEX = 2
const WRITE_INDEX = 4

@@ -8,3 +9,4 @@ const READ_INDEX = 8

WRITE_INDEX,
READ_INDEX
READ_INDEX,
SEQ_INDEX
}

@@ -53,3 +53,5 @@ 'use strict'

// Wait for value to change from expected
// Wait for value to change from expected.
// If we are notified, resume immediately even if the value cycled back
// to the same number before we could re-read it.
const remaining = max === Infinity ? WAIT_MS : Math.min(WAIT_MS, Math.max(1, max - Date.now()))

@@ -59,3 +61,10 @@ const result = Atomics.waitAsync(state, index, expected, remaining)

if (result.async) {
result.value.then(check)
result.value.then((res) => {
if (res === 'ok') {
done(null, 'ok')
return
}
check()
})
} else {

@@ -62,0 +71,0 @@ // Value already changed (not-equal) - recheck on next tick

@@ -5,3 +5,4 @@ 'use strict'

const { workerData, parentPort } = require('worker_threads')
const { WRITE_INDEX, READ_INDEX } = require('./indexes')
const { StringDecoder } = require('string_decoder')
const { WRITE_INDEX, READ_INDEX, SEQ_INDEX } = require('./indexes')
const { waitDiff } = require('./wait')

@@ -21,2 +22,3 @@

const data = Buffer.from(dataBuf)
const decoder = new StringDecoder('utf8')

@@ -212,5 +214,21 @@ // Keep the event loop alive - Atomics.waitAsync promises don't prevent worker exit

function readState () {
while (true) {
const seq = Atomics.load(state, SEQ_INDEX)
if ((seq & 1) !== 0) {
continue
}
const current = Atomics.load(state, READ_INDEX)
const end = Atomics.load(state, WRITE_INDEX)
if (seq === Atomics.load(state, SEQ_INDEX)) {
return { current, end, seq }
}
}
}
function run () {
const current = Atomics.load(state, READ_INDEX)
const end = Atomics.load(state, WRITE_INDEX)
const { current, end, seq } = readState()

@@ -220,7 +238,3 @@ // process._rawDebug(`pre state ${current} ${end}`)

if (end === current) {
if (end === data.length) {
waitDiff(state, READ_INDEX, end, Infinity, run)
} else {
waitDiff(state, WRITE_INDEX, end, Infinity, run)
}
waitDiff(state, SEQ_INDEX, seq, Infinity, run)
return

@@ -233,2 +247,6 @@ }

// process._rawDebug('end')
const remaining = decoder.end()
if (remaining.length > 0) {
destination.write(remaining)
}
destination.end()

@@ -238,3 +256,3 @@ return

const toWrite = data.toString('utf8', current, end)
const toWrite = decoder.write(data.subarray(current, end))
// process._rawDebug('worker writing: ' + toWrite)

@@ -241,0 +259,0 @@

{
"name": "thread-stream",
"version": "4.1.0",
"version": "4.2.0",
"description": "A streaming way to send data to a Node.js Worker Thread",

@@ -21,3 +21,3 @@ "main": "index.js",

"neostandard": "^0.13.0",
"pino-elasticsearch": "^8.0.0",
"pino-elasticsearch": "^9.0.0",
"sonic-boom": "^5.0.0",

@@ -24,0 +24,0 @@ "ts-node": "^10.8.0",