Big News: Socket raises $60M Series C at a $1B valuation to secure software supply chains for AI-driven development.Announcement
Sign In

thread-stream

Package Overview
Dependencies
Maintainers
3
Versions
47
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

thread-stream - npm Package Compare versions

Comparing version
3.1.0
to
3.2.0
+15
.claude/settings.local.json
{
"permissions": {
"allow": [
"Bash(npx tap:*)",
"Bash(node:*)",
"Bash(for i in 1 2 3 4 5)",
"Bash(do)",
"Bash(echo:*)",
"Bash(done)",
"Bash(npm test:*)"
],
"deny": [],
"ask": []
}
}
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
thread-stream is a library for streaming data to a Node.js Worker Thread. It uses SharedArrayBuffer and Atomics for efficient inter-thread communication, enabling high-performance data streaming to worker threads.
## Build & Test Commands
```bash
npm test # Run linting (standard), type checking, and all tests
npm run build # Type check only (tsc --noEmit)
npm run test:ci # CI-specific test run
# Run a single test file
node --test test/<filename>.test.js
node --test test/<filename>.test.ts # For TypeScript tests
# Lint
npx standard
```
## Architecture
### Core Components
- **index.js**: Main `ThreadStream` class extending EventEmitter. Manages shared memory buffers, worker lifecycle, and provides stream-like write/flush/end API.
- **lib/worker.js**: Runs inside the Worker Thread. Loads the user-provided destination module, reads from shared buffer, and writes to the destination stream.
- **lib/indexes.js**: Defines shared buffer index constants (`WRITE_INDEX`, `READ_INDEX`) used for Atomics-based synchronization.
- **lib/wait.js**: Provides `wait()` and `waitDiff()` utilities for async waiting on Atomics state changes with exponential backoff.
### Shared Memory Communication
The main thread and worker communicate via two SharedArrayBuffers:
1. **stateBuf**: Int32Array for READ_INDEX and WRITE_INDEX positions
2. **dataBuf**: Buffer for actual string data (default 4MB)
Write flow: Main thread writes to dataBuf, updates WRITE_INDEX, worker reads data between READ_INDEX and WRITE_INDEX, updates READ_INDEX when consumed.
### Worker Module Interface
User-provided worker modules must export an async function that receives `workerData` and returns a writable stream:
```js
async function run(opts) {
const stream = fs.createWriteStream(opts.dest)
await once(stream, 'open')
return stream
}
module.exports = run
```
### Sync vs Async Modes
- `sync: true`: Blocking writes using flushSync, waits for worker to consume
- `sync: false` (default): Non-blocking writes with drain events when buffer fills
## Code Style
Uses [Standard](https://standardjs.com/) for linting. Test files in `test/ts/**/*` and `test/syntax-error.mjs` are excluded from linting.
'use strict'
const { Writable } = require('stream')
const { parentPort } = require('worker_threads')
async function run () {
parentPort.postMessage({
internal: 'watch-mode'
})
return new Writable({
autoDestroy: true,
write (chunk, enc, cb) {
cb()
}
})
}
module.exports = run
'use strict'
const { test } = require('tap')
const { join } = require('path')
const ThreadStream = require('..')
test('ignores worker messages without a protocol code', function (t) {
t.plan(2)
const stream = new ThreadStream({
filename: join(__dirname, 'message-without-code.js'),
sync: false
})
const errors = []
stream.on('error', err => {
errors.push(err)
})
stream.on('ready', () => {
t.ok(stream.write('hello world\n'))
stream.end()
})
stream.on('finish', () => {
t.same(errors, [])
})
})
+41
-13

@@ -11,3 +11,4 @@ 'use strict'

WRITE_INDEX,
READ_INDEX
READ_INDEX,
SEQ_INDEX
} = require('./lib/indexes')

@@ -22,2 +23,9 @@ const buffer = require('buffer')

function updateState (stream, fn) {
Atomics.add(stream[kImpl].state, SEQ_INDEX, 1)
fn()
Atomics.add(stream[kImpl].state, SEQ_INDEX, 1)
Atomics.notify(stream[kImpl].state, SEQ_INDEX)
}
class FakeWeakRef {

@@ -125,4 +133,7 @@ constructor (value) {

Atomics.store(stream[kImpl].state, READ_INDEX, 0)
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
updateState(stream, () => {
Atomics.store(stream[kImpl].state, READ_INDEX, 0)
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
})
Atomics.notify(stream[kImpl].state, READ_INDEX)

@@ -147,4 +158,7 @@ // Find a toWrite length that fits the buffer

stream.flush(() => {
Atomics.store(stream[kImpl].state, READ_INDEX, 0)
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
updateState(stream, () => {
Atomics.store(stream[kImpl].state, READ_INDEX, 0)
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
})
Atomics.notify(stream[kImpl].state, READ_INDEX)
nextFlush(stream)

@@ -167,2 +181,8 @@ })

// Node.js watch mode may send internal worker messages that do not
// participate in thread-stream's worker protocol.
if (msg?.code == null) {
return
}
switch (msg.code) {

@@ -409,4 +429,5 @@ case 'READY':

stream[kImpl].data.write(data, current)
Atomics.store(stream[kImpl].state, WRITE_INDEX, current + length)
Atomics.notify(stream[kImpl].state, WRITE_INDEX)
updateState(stream, () => {
Atomics.store(stream[kImpl].state, WRITE_INDEX, current + length)
})
cb()

@@ -428,5 +449,6 @@ return true

// process._rawDebug('writing index')
Atomics.store(stream[kImpl].state, WRITE_INDEX, -1)
updateState(stream, () => {
Atomics.store(stream[kImpl].state, WRITE_INDEX, -1)
})
// process._rawDebug(`(end) readIndex (${Atomics.load(stream.state, READ_INDEX)}) writeIndex (${Atomics.load(stream.state, WRITE_INDEX)})`)
Atomics.notify(stream[kImpl].state, WRITE_INDEX)

@@ -476,4 +498,7 @@ // Wait for the process to complete

flushSync(stream)
Atomics.store(stream[kImpl].state, READ_INDEX, 0)
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
updateState(stream, () => {
Atomics.store(stream[kImpl].state, READ_INDEX, 0)
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
})
Atomics.notify(stream[kImpl].state, READ_INDEX)
continue

@@ -494,4 +519,7 @@ } else if (leftover < 0) {

flushSync(stream)
Atomics.store(stream[kImpl].state, READ_INDEX, 0)
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
updateState(stream, () => {
Atomics.store(stream[kImpl].state, READ_INDEX, 0)
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
})
Atomics.notify(stream[kImpl].state, READ_INDEX)

@@ -498,0 +526,0 @@ // Find a toWrite length that fits the buffer

'use strict'
const SEQ_INDEX = 2
const WRITE_INDEX = 4

@@ -8,3 +9,4 @@ const READ_INDEX = 8

WRITE_INDEX,
READ_INDEX
READ_INDEX,
SEQ_INDEX
}

@@ -5,3 +5,3 @@ 'use strict'

const { workerData, parentPort } = require('worker_threads')
const { WRITE_INDEX, READ_INDEX } = require('./indexes')
const { WRITE_INDEX, READ_INDEX, SEQ_INDEX } = require('./indexes')
const { waitDiff } = require('./wait')

@@ -105,5 +105,21 @@

function readState () {
while (true) {
const seq = Atomics.load(state, SEQ_INDEX)
if ((seq & 1) !== 0) {
continue
}
const current = Atomics.load(state, READ_INDEX)
const end = Atomics.load(state, WRITE_INDEX)
if (seq === Atomics.load(state, SEQ_INDEX)) {
return { current, end, seq }
}
}
}
function run () {
const current = Atomics.load(state, READ_INDEX)
const end = Atomics.load(state, WRITE_INDEX)
const { current, end, seq } = readState()

@@ -113,7 +129,3 @@ // process._rawDebug(`pre state ${current} ${end}`)

if (end === current) {
if (end === data.length) {
waitDiff(state, READ_INDEX, end, Infinity, run)
} else {
waitDiff(state, WRITE_INDEX, end, Infinity, run)
}
waitDiff(state, SEQ_INDEX, seq, Infinity, run)
return

@@ -120,0 +132,0 @@ }

{
"name": "thread-stream",
"version": "3.1.0",
"version": "3.2.0",
"description": "A streaming way to send data to a Node.js Worker Thread",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -90,3 +90,4 @@ # thread-stream

You can emit events on the ThreadStream from your worker using [`worker.parentPort.postMessage()`](https://nodejs.org/api/worker_threads.html#workerparentport).
The message (JSON object) must have the following data structure:
Messages that do not carry a thread-stream protocol `code` are ignored.
For custom events, the message (JSON object) must have the following data structure:

@@ -93,0 +94,0 @@ ```js

@@ -11,5 +11,15 @@ 'use strict'

test('base sync=true', function (t) {
t.plan(15)
function readFileAsync (path) {
return new Promise((resolve, reject) => {
readFile(path, 'utf8', (err, data) => {
if (err) {
reject(err)
return
}
resolve(data)
})
})
}
test('base sync=true', async function (t) {
const dest = file()

@@ -22,20 +32,8 @@ const stream = new ThreadStream({

const finish = once(stream, 'finish')
const close = once(stream, 'close')
t.same(stream.writableObjectMode, false)
t.same(stream.writableFinished, false)
stream.on('finish', () => {
t.same(stream.writableFinished, true)
readFile(dest, 'utf8', (err, data) => {
t.error(err)
t.equal(data, 'hello world\nsomething else\n')
})
})
t.same(stream.closed, false)
stream.on('close', () => {
t.same(stream.closed, true)
t.notOk(stream.writable)
t.pass('close emitted')
})
t.same(stream.writableNeedDrain, false)

@@ -49,7 +47,15 @@ t.ok(stream.write('hello world\n'))

t.same(stream.writableEnded, true)
await finish
t.same(stream.writableFinished, true)
await close
t.same(stream.closed, true)
t.notOk(stream.writable)
const data = await readFileAsync(dest)
t.equal(data, 'hello world\nsomething else\n')
})
test('overflow sync=true', function (t) {
t.plan(3)
test('overflow sync=true', async function (t) {
const dest = file()

@@ -63,5 +69,5 @@ const stream = new ThreadStream({

const close = once(stream, 'close')
let count = 0
// Write 10 chars, 20 times
function write () {

@@ -74,3 +80,2 @@ if (count++ === 20) {

stream.write('aaaaaaaaaa')
// do not wait for drain event
setImmediate(write)

@@ -81,15 +86,8 @@ }

stream.on('finish', () => {
t.pass('finish emitted')
})
stream.on('close', () => {
readFile(dest, 'utf8', (err, data) => {
t.error(err)
t.equal(data.length, 200)
})
})
await close
const data = await readFileAsync(dest)
t.equal(data.length, 200)
})
test('overflow sync=false', function (t) {
test('overflow sync=false', async function (t) {
const dest = file()

@@ -103,2 +101,3 @@ const stream = new ThreadStream({

const close = once(stream, 'close')
let count = 0

@@ -108,6 +107,4 @@

// Write 10 chars, 20 times
function write () {
if (count++ === 20) {
t.pass('end sent')
stream.end()

@@ -120,3 +117,2 @@ return

}
// do not wait for drain event
setImmediate(write)

@@ -129,20 +125,11 @@ }

t.same(stream.writableNeedDrain, false)
t.pass('drain')
})
stream.on('finish', () => {
t.pass('finish emitted')
})
stream.on('close', () => {
readFile(dest, 'utf8', (err, data) => {
t.error(err)
t.equal(data.length, 200)
t.end()
})
})
await close
const data = await readFileAsync(dest)
t.equal(data.length, 200)
})
test('over the bufferSize at startup', function (t) {
t.plan(6)
t.plan(5)

@@ -164,6 +151,2 @@ const dest = file()

stream.on('close', () => {
t.pass('close emitted')
})
t.ok(stream.write('hello'))

@@ -177,3 +160,3 @@ t.ok(stream.write(' world\n'))

test('over the bufferSize at startup (async)', function (t) {
t.plan(6)
t.plan(5)

@@ -200,9 +183,7 @@ const dest = file()

})
stream.on('close', () => {
t.pass('close emitted')
})
})
test('flushSync sync=false', function (t) {
t.plan(2)
const dest = file()

@@ -216,28 +197,22 @@ const stream = new ThreadStream({

stream.on('drain', () => {
t.pass('drain')
stream.end()
stream.on('ready', () => {
for (let count = 0; count < 20; count++) {
stream.write('aaaaaaaaaa')
}
stream.flushSync()
setImmediate(() => {
stream.end()
})
})
stream.on('finish', () => {
t.pass('finish emitted')
})
stream.on('close', () => {
readFile(dest, 'utf8', (err, data) => {
t.error(err)
t.equal(data.length, 200)
t.end()
})
})
for (let count = 0; count < 20; count++) {
stream.write('aaaaaaaaaa')
}
stream.flushSync()
})
test('pass down MessagePorts', async function (t) {
t.plan(3)
const { port1, port2 } = new MessageChannel()

@@ -252,2 +227,3 @@ const stream = new ThreadStream({

})
t.teardown(() => {

@@ -261,9 +237,6 @@ stream.end()

const [strings] = await once(port2, 'message')
t.equal(strings, 'hello world\nsomething else\n')
})
test('destroy does not error', function (t) {
t.plan(5)
test('destroy does not error', async function (t) {
const dest = file()

@@ -277,19 +250,20 @@ const stream = new ThreadStream({

stream.on('ready', () => {
t.pass('ready emitted')
stream.worker.terminate()
})
stream.on('error', (err) => {
t.equal(err.message, 'the worker thread exited')
const [err] = await once(stream, 'error')
t.equal(err.message, 'the worker thread exited')
await new Promise((resolve) => {
stream.flush((err) => {
t.equal(err.message, 'the worker has exited')
resolve()
})
t.doesNotThrow(() => stream.flushSync())
t.doesNotThrow(() => stream.end())
})
t.doesNotThrow(() => stream.flushSync())
t.doesNotThrow(() => stream.end())
})
test('syntax error', function (t) {
t.plan(1)
test('syntax error', async function (t) {
const stream = new ThreadStream({

@@ -299,5 +273,4 @@ filename: join(__dirname, 'syntax-error.mjs')

stream.on('error', (err) => {
t.equal(err.message, 'Unexpected end of input')
})
const [err] = await once(stream, 'error')
t.equal(err.message, 'Unexpected end of input')
})