Big News: Socket raises $60M Series C at a $1B valuation to secure software supply chains for AI-driven development.Announcement
Sign In

thread-stream

Package Overview
Dependencies
Maintainers
3
Versions
47
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

thread-stream - npm Package Compare versions

Comparing version
4.0.0
to
4.1.0
+68
test/flush-worker.js
'use strict'
const { EventEmitter } = require('events')
const { parentPort } = require('worker_threads')
function createDestination (mode) {
const destination = new EventEmitter()
destination.writableEnded = false
destination.writableNeedDrain = false
destination.write = function () {
if (mode === 'drain') {
destination.writableNeedDrain = true
setTimeout(() => {
destination.writableNeedDrain = false
parentPort.postMessage({
code: 'EVENT',
name: 'destination-drain'
})
destination.emit('drain')
}, 50)
}
return true
}
destination.end = function () {
destination.writableEnded = true
destination.emit('close')
}
if (mode === 'flush') {
destination.flush = function (cb) {
setTimeout(() => {
parentPort.postMessage({
code: 'EVENT',
name: 'destination-flushed'
})
cb()
}, 50)
}
}
if (mode === 'flush-sync') {
destination.flushSync = function () {
parentPort.postMessage({
code: 'EVENT',
name: 'destination-flush-sync'
})
}
}
if (mode === 'exit-on-flush') {
destination.flush = function (_cb) {
setTimeout(() => {
process.exit(0)
}, 20)
}
}
return destination
}
async function run (opts) {
return createDestination(opts.mode)
}
module.exports = run
'use strict'
const { test } = require('node:test')
const assert = require('node:assert')
const { once } = require('node:events')
const { join } = require('node:path')
const ThreadStream = require('..')
function createStream (mode) {
return new ThreadStream({
filename: join(__dirname, 'flush-worker.js'),
workerData: { mode },
sync: false
})
}
test('flush waits for worker destination.flush(cb)', async function () {
const stream = createStream('flush')
let flushed = false
stream.on('destination-flushed', () => {
flushed = true
})
assert.ok(stream.write('hello'))
await new Promise((resolve, reject) => {
stream.flush((err) => {
if (err) {
reject(err)
return
}
resolve()
})
})
assert.strictEqual(flushed, true)
const close = once(stream, 'close')
stream.end()
await close
})
test('flush falls back to destination.flushSync()', async function () {
const stream = createStream('flush-sync')
let called = false
stream.on('destination-flush-sync', () => {
called = true
})
assert.ok(stream.write('hello'))
await new Promise((resolve, reject) => {
stream.flush((err) => {
if (err) {
reject(err)
return
}
resolve()
})
})
assert.strictEqual(called, true)
const close = once(stream, 'close')
stream.end()
await close
})
test('flush waits for drain when destination has no flush API', async function () {
const stream = createStream('drain')
let drained = false
stream.on('destination-drain', () => {
drained = true
})
assert.ok(stream.write('hello'))
await new Promise((resolve, reject) => {
stream.flush((err) => {
if (err) {
reject(err)
return
}
resolve()
})
})
assert.strictEqual(drained, true)
const close = once(stream, 'close')
stream.end()
await close
})
test('pending flush callbacks fail when worker exits', async function () {
const stream = createStream('exit-on-flush')
const close = once(stream, 'close')
assert.ok(stream.write('hello'))
const err = await new Promise((resolve) => {
stream.flush(resolve)
})
assert.ok(err)
assert.strictEqual(err.message, 'the worker has exited')
await close
})
'use strict'
const { Writable } = require('stream')
const { parentPort } = require('worker_threads')
async function run () {
parentPort.postMessage({
internal: 'watch-mode'
})
return new Writable({
autoDestroy: true,
write (chunk, enc, cb) {
cb()
}
})
}
module.exports = run
'use strict'
const { Writable } = require('stream')
const { threadName, parentPort } = require('worker_threads')
module.exports = function () {
parentPort.once('message', function ({ port }) {
port.postMessage({ threadName })
})
return new Writable({
write (chunk, encoding, callback) {
callback()
}
})
}
'use strict'
const { test } = require('node:test')
const assert = require('node:assert')
const { once } = require('events')
const { join } = require('path')
const ThreadStream = require('..')
test('ignores worker messages without a protocol code', async function () {
const stream = new ThreadStream({
filename: join(__dirname, 'message-without-code.js'),
sync: false
})
const errors = []
stream.on('error', err => {
errors.push(err)
})
const ready = once(stream, 'ready')
const close = once(stream, 'close')
assert.ok(stream.write('hello world\n'))
stream.end()
await ready
await close
assert.deepStrictEqual(errors, [])
})
'use strict'
const { test } = require('node:test')
const assert = require('node:assert')
const { join } = require('path')
const { once } = require('events')
const { MessageChannel } = require('worker_threads')
const ThreadStream = require('..')
// threadName was added in Node.js v22.20.0 and v24.6.0
const [major, minor] = process.versions.node.split('.').map(Number)
const supportsThreadName = (major === 22 && minor >= 20) || major >= 24
test('worker has default name "thread-stream"', { skip: !supportsThreadName }, async function (t) {
const { port1, port2 } = new MessageChannel()
const stream = new ThreadStream({
filename: join(__dirname, 'report-thread-name.js'),
sync: true
})
t.after(() => stream.end())
stream.emit('message', { port: port1 }, [port1])
const [{ threadName }] = await once(port2, 'message')
assert.strictEqual(threadName, 'thread-stream')
})
test('worker name can be overridden via workerOpts', { skip: !supportsThreadName }, async function (t) {
const { port1, port2 } = new MessageChannel()
const stream = new ThreadStream({
filename: join(__dirname, 'report-thread-name.js'),
workerOpts: {
name: 'my-custom-worker'
},
sync: true
})
t.after(() => stream.end())
stream.emit('message', { port: port1 }, [port1])
const [{ threadName }] = await once(port2, 'message')
assert.strictEqual(threadName, 'my-custom-worker')
})
+4
-4

@@ -27,3 +27,3 @@ name: CI

- name: Check out repo
uses: actions/checkout@v4
uses: actions/checkout@v6
with:

@@ -42,3 +42,3 @@ persist-credentials: false

matrix:
node-version: [20, 22, 24]
node-version: [20, 22, 24, 26]
os: [macos-latest, ubuntu-latest, windows-latest]

@@ -48,3 +48,3 @@

- name: Check out repo
uses: actions/checkout@v4
uses: actions/checkout@v6
with:

@@ -54,3 +54,3 @@ persist-credentials: false

- name: Setup Node ${{ matrix.node-version }}
uses: actions/setup-node@v4
uses: actions/setup-node@v6
with:

@@ -57,0 +57,0 @@ node-version: ${{ matrix.node-version }}

@@ -63,2 +63,9 @@ import { EventEmitter } from 'events'

/**
* Flush the stream asynchronously.
*
* The callback is invoked once data has been consumed by the worker and the
* worker destination has acknowledged the flush.
*/
flush(cb?: (err?: Error) => void): void
/**
* Flush the stream synchronously.

@@ -65,0 +72,0 @@ * This method should be called in the shutdown phase to make sure that all data has been flushed.

+120
-21

@@ -21,2 +21,4 @@ 'use strict'

function noop () {}
class FakeWeakRef {

@@ -58,2 +60,3 @@ constructor (value) {

...opts.workerOpts,
name: opts.workerOpts?.name || 'thread-stream',
trackUnmanagedFds: false,

@@ -119,4 +122,4 @@ workerData: {

// multi-byte utf-8
stream.flush(() => {
// err is already handled in flush()
waitForRead(stream, () => {
// err is already handled in waitForRead()
if (stream.destroyed) {

@@ -147,3 +150,3 @@ return

}
stream.flush(() => {
waitForRead(stream, () => {
Atomics.store(stream[kImpl].state, READ_INDEX, 0)

@@ -169,2 +172,8 @@ Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)

// Node.js watch mode may send internal worker messages that do not
// participate in thread-stream's worker protocol.
if (msg?.code == null) {
return
}
switch (msg.code) {

@@ -176,3 +185,3 @@ case 'READY':

stream.flush(() => {
waitForRead(stream, () => {
stream[kImpl].ready = true

@@ -192,2 +201,15 @@ stream.emit('ready')

break
case 'FLUSHED': {
if (msg.context !== 'thread-stream') {
destroy(stream, new Error('this should not happen: ' + msg.code))
break
}
const cb = stream[kImpl].flushCallbacks.get(msg.id)
if (cb) {
stream[kImpl].flushCallbacks.delete(msg.id)
process.nextTick(cb)
}
break
}
case 'WARNING':

@@ -237,2 +259,4 @@ process.emitWarning(msg.err)

this[kImpl].buf = ''
this[kImpl].flushCallbacks = new Map()
this[kImpl].nextFlushId = 0

@@ -298,24 +322,11 @@ // TODO (fix): Make private?

flush (cb) {
if (this[kImpl].destroyed) {
if (typeof cb === 'function') {
process.nextTick(cb, new Error('the worker has exited'))
}
return
}
cb = typeof cb === 'function' ? cb : noop
// TODO write all .buf
const writeIndex = Atomics.load(this[kImpl].state, WRITE_INDEX)
// process._rawDebug(`(flush) readIndex (${Atomics.load(this.state, READ_INDEX)}) writeIndex (${Atomics.load(this.state, WRITE_INDEX)})`)
wait(this[kImpl].state, READ_INDEX, writeIndex, Infinity, (err, res) => {
flushBuffer(this, (err) => {
if (err) {
destroy(this, err)
process.nextTick(cb, err)
return
}
if (res === 'not-equal') {
// TODO handle deadlock
this.flush(cb)
return
}
process.nextTick(cb)
requestWorkerFlush(this, cb)
})

@@ -378,2 +389,89 @@ }

function flushBuffer (stream, cb) {
if (stream[kImpl].destroyed) {
process.nextTick(cb, new Error('the worker has exited'))
return
}
if (!stream[kImpl].sync && (stream[kImpl].flushing || stream[kImpl].buf.length > 0)) {
setImmediate(flushBuffer, stream, cb)
return
}
waitForRead(stream, cb)
}
function waitForRead (stream, cb) {
const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX)
wait(stream[kImpl].state, READ_INDEX, writeIndex, Infinity, (err, res) => {
if (err) {
destroy(stream, err)
cb(err)
return
}
if (res !== 'ok') {
waitForRead(stream, cb)
return
}
cb()
})
}
function requestWorkerFlush (stream, cb) {
if (stream[kImpl].destroyed) {
process.nextTick(cb, new Error('the worker has exited'))
return
}
if (!stream[kImpl].ready) {
const onReady = () => {
cleanup()
requestWorkerFlush(stream, cb)
}
const onClose = () => {
cleanup()
process.nextTick(cb, new Error('the worker has exited'))
}
const cleanup = () => {
stream.off('ready', onReady)
stream.off('close', onClose)
}
stream.once('ready', onReady)
stream.once('close', onClose)
return
}
const id = ++stream[kImpl].nextFlushId
stream[kImpl].flushCallbacks.set(id, cb)
try {
stream.worker.postMessage({
code: 'FLUSH',
context: 'thread-stream',
id
})
} catch (err) {
stream[kImpl].flushCallbacks.delete(id)
destroy(stream, err)
process.nextTick(cb, err)
}
}
function failPendingFlushCallbacks (stream, err) {
const callbacks = stream[kImpl].flushCallbacks
if (callbacks.size === 0) {
return
}
const flushErr = err || new Error('the worker has exited')
for (const cb of callbacks.values()) {
process.nextTick(cb, flushErr)
}
callbacks.clear()
}
function error (stream, err) {

@@ -390,2 +488,3 @@ setImmediate(() => {

stream[kImpl].destroyed = true
failPendingFlushCallbacks(stream, err)

@@ -392,0 +491,0 @@ if (err) {

@@ -15,2 +15,4 @@ 'use strict'

let destination
const flushQueue = []
let flushing = false

@@ -23,2 +25,97 @@ const state = new Int32Array(stateBuf)

function onParentPortMessage (msg) {
if (!msg || msg.code !== 'FLUSH' || msg.context !== 'thread-stream') {
return
}
flushQueue.push(msg.id)
processFlushQueue()
}
function processFlushQueue () {
if (flushing || !destination) {
return
}
const id = flushQueue.shift()
if (id === undefined) {
return
}
flushing = true
flushDestination((err) => {
flushing = false
if (err) {
parentPort.postMessage({
code: 'ERROR',
err
})
return
}
parentPort.postMessage({
code: 'FLUSHED',
context: 'thread-stream',
id
})
processFlushQueue()
})
}
function flushDestination (cb) {
if (typeof destination?.flush === 'function') {
if (destination.flush.length === 0) {
try {
const result = destination.flush()
if (result && typeof result.then === 'function') {
result.then(() => cb(), cb)
} else {
cb()
}
} catch (err) {
cb(err)
}
return
}
let done = false
const onDone = (err) => {
if (done) {
return
}
done = true
cb(err)
}
try {
const result = destination.flush(onDone)
if (result && typeof result.then === 'function') {
result.then(() => onDone(), onDone)
}
} catch (err) {
onDone(err)
}
return
}
if (typeof destination?.flushSync === 'function') {
try {
destination.flushSync()
cb()
} catch (err) {
cb(err)
}
return
}
if (destination?.writableNeedDrain && !destination?.writableEnded) {
destination.once('drain', cb)
return
}
cb()
}
async function start () {

@@ -98,2 +195,4 @@ let worker

})
processFlushQueue()
}

@@ -105,2 +204,4 @@

start().then(function () {
parentPort.on('message', onParentPortMessage)
parentPort.postMessage({

@@ -107,0 +208,0 @@ code: 'READY'

{
"name": "thread-stream",
"version": "4.0.0",
"version": "4.1.0",
"description": "A streaming way to send data to a Node.js Worker Thread",

@@ -11,15 +11,14 @@ "main": "index.js",

"dependencies": {
"real-require": "^0.2.0"
"real-require": "^1.0.0"
},
"devDependencies": {
"@types/node": "^22.0.0",
"@types/node": "^25.0.2",
"@yao-pkg/pkg": "^6.0.0",
"borp": "^0.21.0",
"borp": "^1.0.0",
"desm": "^1.3.0",
"eslint": "^9.39.1",
"fastbench": "^1.0.1",
"husky": "^9.0.6",
"neostandard": "^0.12.2",
"neostandard": "^0.13.0",
"pino-elasticsearch": "^8.0.0",
"sonic-boom": "^4.0.1",
"sonic-boom": "^5.0.0",
"ts-node": "^10.8.0",

@@ -31,7 +30,6 @@ "typescript": "~5.7.3"

"lint": "eslint",
"test": "npm run lint && npm run build && npm run transpile && borp --pattern 'test/*.test.{js,mjs}'",
"test:ci": "npm run lint && npm run transpile && borp --pattern 'test/*.test.{js,mjs}'",
"test:yarn": "npm run transpile && borp --pattern 'test/*.test.js'",
"transpile": "sh ./test/ts/transpile.sh",
"prepare": "husky install"
"test": "npm run lint && npm run build && npm run transpile && borp --pattern \"test/*.test.{js,mjs}\"",
"test:ci": "npm run lint && npm run transpile && borp --pattern \"test/*.test.{js,mjs}\"",
"test:yarn": "npm run transpile && borp --pattern \"test/*.test.js\"",
"transpile": "sh ./test/ts/transpile.sh"
},

@@ -38,0 +36,0 @@ "repository": {

@@ -42,2 +42,4 @@ # thread-stream

`flush(cb)` waits for the worker destination flush when supported (`flush`, `flushSync`, or pending `drain`).
In `worker.js`:

@@ -91,3 +93,4 @@

You can emit events on the ThreadStream from your worker using [`worker.parentPort.postMessage()`](https://nodejs.org/api/worker_threads.html#workerparentport).
The message (JSON object) must have the following data structure:
Messages that do not carry a thread-stream protocol `code` are ignored.
For custom events, the message (JSON object) must have the following data structure:

@@ -94,0 +97,0 @@ ```js

@@ -30,2 +30,5 @@ 'use strict'

const closed = once(stream, 'close').catch(() => {})
// Keep a persistent error listener to avoid unhandled late error events
// reported as asynchronous activity by stricter test runners.
stream.on('error', () => {})

@@ -57,2 +60,5 @@ stream.on('ready', () => {

const closed = once(stream, 'close').catch(() => {})
// Keep a persistent error listener to avoid unhandled late error events
// reported as asynchronous activity by stricter test runners.
stream.on('error', () => {})

@@ -84,2 +90,5 @@ stream.on('ready', () => {

const closed = once(stream, 'close').catch(() => {})
// Keep a persistent error listener to avoid unhandled late error events
// reported as asynchronous activity by stricter test runners.
stream.on('error', () => {})

@@ -111,2 +120,5 @@ stream.on('ready', () => {

const closed = once(stream, 'close').catch(() => {})
// Keep a persistent error listener to avoid unhandled late error events
// reported as asynchronous activity by stricter test runners.
stream.on('error', () => {})

@@ -113,0 +125,0 @@ stream.on('ready', () => {

Sorry, the diff of this file is not supported yet

# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Build & Test Commands
```sh
npm test # Run linter (standard), type check, transpile, and all tests
npm run build # Type check only (tsc --noEmit)
npm run transpile # Generate transpiled test files for various ES targets
# Run a single test file
npx tap test/base.test.js
# Run only JavaScript tests (faster)
npx tap "test/**/*.test.*js"
# Run only TypeScript tests
npx tap --ts test/*.test.*ts
```
## Code Style
This project uses [Standard](https://standardjs.com/) for linting. No semicolons, 2-space indentation.
## Architecture
thread-stream is a library for streaming data to a Node.js Worker Thread using SharedArrayBuffer for high-performance inter-thread communication.
### Core Components
- **index.js** - Main `ThreadStream` class extending EventEmitter. Manages the worker lifecycle, handles writes via SharedArrayBuffer, and coordinates synchronization using Atomics.
- **lib/worker.js** - Runs in the Worker Thread. Loads the user-provided destination module, reads data from SharedArrayBuffer, and writes to the destination stream. Handles both ESM and CommonJS modules, including yarn PnP compatibility.
- **lib/indexes.js** - Defines SharedArrayBuffer index constants (`WRITE_INDEX`, `READ_INDEX`) used for Atomics coordination.
- **lib/wait.js** - Async polling utilities (`wait`, `waitDiff`) for cross-thread synchronization without blocking the main thread.
### Data Flow
1. Main thread writes strings to an internal buffer, then copies to SharedArrayBuffer
2. Atomics.notify signals the worker that data is available
3. Worker reads from SharedArrayBuffer via Atomics.load and writes to destination stream
4. Worker updates READ_INDEX and notifies main thread when done
5. Special index values (-1 for end, -2 for error) signal stream termination
### Key Features
- **Sync/async modes** - `sync: true` blocks until data is written; async mode uses `setImmediate` batching
- **Backpressure** - Handles `drain` events from destination streams
- **GC cleanup** - Uses FinalizationRegistry to terminate orphaned workers
- **TypeScript support** - Workers can be `.ts` files (requires ts-node)