thread-stream
Advanced tools
| { | ||
| "permissions": { | ||
| "allow": [ | ||
| "Bash(npx tap:*)", | ||
| "Bash(node:*)", | ||
| "Bash(for i in 1 2 3 4 5)", | ||
| "Bash(do)", | ||
| "Bash(echo:*)", | ||
| "Bash(done)", | ||
| "Bash(npm test:*)" | ||
| ], | ||
| "deny": [], | ||
| "ask": [] | ||
| } | ||
| } |
+64
| # CLAUDE.md | ||
| This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. | ||
| ## Project Overview | ||
| thread-stream is a library for streaming data to a Node.js Worker Thread. It uses SharedArrayBuffer and Atomics for efficient inter-thread communication, enabling high-performance data streaming to worker threads. | ||
| ## Build & Test Commands | ||
| ```bash | ||
| npm test # Run linting (standard), type checking, and all tests | ||
| npm run build # Type check only (tsc --noEmit) | ||
| npm run test:ci # CI-specific test run | ||
| # Run a single test file | ||
| node --test test/<filename>.test.js | ||
| node --test test/<filename>.test.ts # For TypeScript tests | ||
| # Lint | ||
| npx standard | ||
| ``` | ||
| ## Architecture | ||
| ### Core Components | ||
| - **index.js**: Main `ThreadStream` class extending EventEmitter. Manages shared memory buffers, worker lifecycle, and provides stream-like write/flush/end API. | ||
| - **lib/worker.js**: Runs inside the Worker Thread. Loads the user-provided destination module, reads from shared buffer, and writes to the destination stream. | ||
| - **lib/indexes.js**: Defines shared buffer index constants (`WRITE_INDEX`, `READ_INDEX`) used for Atomics-based synchronization. | ||
| - **lib/wait.js**: Provides `wait()` and `waitDiff()` utilities for async waiting on Atomics state changes with exponential backoff. | ||
| ### Shared Memory Communication | ||
| The main thread and worker communicate via two SharedArrayBuffers: | ||
| 1. **stateBuf**: Int32Array for READ_INDEX and WRITE_INDEX positions | ||
| 2. **dataBuf**: Buffer for actual string data (default 4MB) | ||
| Write flow: Main thread writes to dataBuf, updates WRITE_INDEX, worker reads data between READ_INDEX and WRITE_INDEX, updates READ_INDEX when consumed. | ||
| ### Worker Module Interface | ||
| User-provided worker modules must export an async function that receives `workerData` and returns a writable stream: | ||
| ```js | ||
| async function run(opts) { | ||
| const stream = fs.createWriteStream(opts.dest) | ||
| await once(stream, 'open') | ||
| return stream | ||
| } | ||
| module.exports = run | ||
| ``` | ||
| ### Sync vs Async Modes | ||
| - `sync: true`: Blocking writes using flushSync, waits for worker to consume | ||
| - `sync: false` (default): Non-blocking writes with drain events when buffer fills | ||
| ## Code Style | ||
| Uses [Standard](https://standardjs.com/) for linting. Test files in `test/ts/**/*` and `test/syntax-error.mjs` are excluded from linting. |
| 'use strict' | ||
| const { Writable } = require('stream') | ||
| const { parentPort } = require('worker_threads') | ||
| async function run () { | ||
| parentPort.postMessage({ | ||
| internal: 'watch-mode' | ||
| }) | ||
| return new Writable({ | ||
| autoDestroy: true, | ||
| write (chunk, enc, cb) { | ||
| cb() | ||
| } | ||
| }) | ||
| } | ||
| module.exports = run |
| 'use strict' | ||
| const { test } = require('tap') | ||
| const { join } = require('path') | ||
| const ThreadStream = require('..') | ||
| test('ignores worker messages without a protocol code', function (t) { | ||
| t.plan(2) | ||
| const stream = new ThreadStream({ | ||
| filename: join(__dirname, 'message-without-code.js'), | ||
| sync: false | ||
| }) | ||
| const errors = [] | ||
| stream.on('error', err => { | ||
| errors.push(err) | ||
| }) | ||
| stream.on('ready', () => { | ||
| t.ok(stream.write('hello world\n')) | ||
| stream.end() | ||
| }) | ||
| stream.on('finish', () => { | ||
| t.same(errors, []) | ||
| }) | ||
| }) |
+41
-13
@@ -11,3 +11,4 @@ 'use strict' | ||
| WRITE_INDEX, | ||
| READ_INDEX | ||
| READ_INDEX, | ||
| SEQ_INDEX | ||
| } = require('./lib/indexes') | ||
@@ -22,2 +23,9 @@ const buffer = require('buffer') | ||
| function updateState (stream, fn) { | ||
| Atomics.add(stream[kImpl].state, SEQ_INDEX, 1) | ||
| fn() | ||
| Atomics.add(stream[kImpl].state, SEQ_INDEX, 1) | ||
| Atomics.notify(stream[kImpl].state, SEQ_INDEX) | ||
| } | ||
| class FakeWeakRef { | ||
@@ -125,4 +133,7 @@ constructor (value) { | ||
| Atomics.store(stream[kImpl].state, READ_INDEX, 0) | ||
| Atomics.store(stream[kImpl].state, WRITE_INDEX, 0) | ||
| updateState(stream, () => { | ||
| Atomics.store(stream[kImpl].state, READ_INDEX, 0) | ||
| Atomics.store(stream[kImpl].state, WRITE_INDEX, 0) | ||
| }) | ||
| Atomics.notify(stream[kImpl].state, READ_INDEX) | ||
@@ -147,4 +158,7 @@ // Find a toWrite length that fits the buffer | ||
| stream.flush(() => { | ||
| Atomics.store(stream[kImpl].state, READ_INDEX, 0) | ||
| Atomics.store(stream[kImpl].state, WRITE_INDEX, 0) | ||
| updateState(stream, () => { | ||
| Atomics.store(stream[kImpl].state, READ_INDEX, 0) | ||
| Atomics.store(stream[kImpl].state, WRITE_INDEX, 0) | ||
| }) | ||
| Atomics.notify(stream[kImpl].state, READ_INDEX) | ||
| nextFlush(stream) | ||
@@ -167,2 +181,8 @@ }) | ||
| // Node.js watch mode may send internal worker messages that do not | ||
| // participate in thread-stream's worker protocol. | ||
| if (msg?.code == null) { | ||
| return | ||
| } | ||
| switch (msg.code) { | ||
@@ -409,4 +429,5 @@ case 'READY': | ||
| stream[kImpl].data.write(data, current) | ||
| Atomics.store(stream[kImpl].state, WRITE_INDEX, current + length) | ||
| Atomics.notify(stream[kImpl].state, WRITE_INDEX) | ||
| updateState(stream, () => { | ||
| Atomics.store(stream[kImpl].state, WRITE_INDEX, current + length) | ||
| }) | ||
| cb() | ||
@@ -428,5 +449,6 @@ return true | ||
| // process._rawDebug('writing index') | ||
| Atomics.store(stream[kImpl].state, WRITE_INDEX, -1) | ||
| updateState(stream, () => { | ||
| Atomics.store(stream[kImpl].state, WRITE_INDEX, -1) | ||
| }) | ||
| // process._rawDebug(`(end) readIndex (${Atomics.load(stream.state, READ_INDEX)}) writeIndex (${Atomics.load(stream.state, WRITE_INDEX)})`) | ||
| Atomics.notify(stream[kImpl].state, WRITE_INDEX) | ||
@@ -476,4 +498,7 @@ // Wait for the process to complete | ||
| flushSync(stream) | ||
| Atomics.store(stream[kImpl].state, READ_INDEX, 0) | ||
| Atomics.store(stream[kImpl].state, WRITE_INDEX, 0) | ||
| updateState(stream, () => { | ||
| Atomics.store(stream[kImpl].state, READ_INDEX, 0) | ||
| Atomics.store(stream[kImpl].state, WRITE_INDEX, 0) | ||
| }) | ||
| Atomics.notify(stream[kImpl].state, READ_INDEX) | ||
| continue | ||
@@ -494,4 +519,7 @@ } else if (leftover < 0) { | ||
| flushSync(stream) | ||
| Atomics.store(stream[kImpl].state, READ_INDEX, 0) | ||
| Atomics.store(stream[kImpl].state, WRITE_INDEX, 0) | ||
| updateState(stream, () => { | ||
| Atomics.store(stream[kImpl].state, READ_INDEX, 0) | ||
| Atomics.store(stream[kImpl].state, WRITE_INDEX, 0) | ||
| }) | ||
| Atomics.notify(stream[kImpl].state, READ_INDEX) | ||
@@ -498,0 +526,0 @@ // Find a toWrite length that fits the buffer |
+3
-1
| 'use strict' | ||
| const SEQ_INDEX = 2 | ||
| const WRITE_INDEX = 4 | ||
@@ -8,3 +9,4 @@ const READ_INDEX = 8 | ||
| WRITE_INDEX, | ||
| READ_INDEX | ||
| READ_INDEX, | ||
| SEQ_INDEX | ||
| } |
+20
-8
@@ -5,3 +5,3 @@ 'use strict' | ||
| const { workerData, parentPort } = require('worker_threads') | ||
| const { WRITE_INDEX, READ_INDEX } = require('./indexes') | ||
| const { WRITE_INDEX, READ_INDEX, SEQ_INDEX } = require('./indexes') | ||
| const { waitDiff } = require('./wait') | ||
@@ -105,5 +105,21 @@ | ||
| function readState () { | ||
| while (true) { | ||
| const seq = Atomics.load(state, SEQ_INDEX) | ||
| if ((seq & 1) !== 0) { | ||
| continue | ||
| } | ||
| const current = Atomics.load(state, READ_INDEX) | ||
| const end = Atomics.load(state, WRITE_INDEX) | ||
| if (seq === Atomics.load(state, SEQ_INDEX)) { | ||
| return { current, end, seq } | ||
| } | ||
| } | ||
| } | ||
| function run () { | ||
| const current = Atomics.load(state, READ_INDEX) | ||
| const end = Atomics.load(state, WRITE_INDEX) | ||
| const { current, end, seq } = readState() | ||
@@ -113,7 +129,3 @@ // process._rawDebug(`pre state ${current} ${end}`) | ||
| if (end === current) { | ||
| if (end === data.length) { | ||
| waitDiff(state, READ_INDEX, end, Infinity, run) | ||
| } else { | ||
| waitDiff(state, WRITE_INDEX, end, Infinity, run) | ||
| } | ||
| waitDiff(state, SEQ_INDEX, seq, Infinity, run) | ||
| return | ||
@@ -120,0 +132,0 @@ } |
+1
-1
| { | ||
| "name": "thread-stream", | ||
| "version": "3.1.0", | ||
| "version": "3.2.0", | ||
| "description": "A streaming way to send data to a Node.js Worker Thread", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
+2
-1
@@ -90,3 +90,4 @@ # thread-stream | ||
| You can emit events on the ThreadStream from your worker using [`worker.parentPort.postMessage()`](https://nodejs.org/api/worker_threads.html#workerparentport). | ||
| The message (JSON object) must have the following data structure: | ||
| Messages that do not carry a thread-stream protocol `code` are ignored. | ||
| For custom events, the message (JSON object) must have the following data structure: | ||
@@ -93,0 +94,0 @@ ```js |
+61
-88
@@ -11,5 +11,15 @@ 'use strict' | ||
| test('base sync=true', function (t) { | ||
| t.plan(15) | ||
| function readFileAsync (path) { | ||
| return new Promise((resolve, reject) => { | ||
| readFile(path, 'utf8', (err, data) => { | ||
| if (err) { | ||
| reject(err) | ||
| return | ||
| } | ||
| resolve(data) | ||
| }) | ||
| }) | ||
| } | ||
| test('base sync=true', async function (t) { | ||
| const dest = file() | ||
@@ -22,20 +32,8 @@ const stream = new ThreadStream({ | ||
| const finish = once(stream, 'finish') | ||
| const close = once(stream, 'close') | ||
| t.same(stream.writableObjectMode, false) | ||
| t.same(stream.writableFinished, false) | ||
| stream.on('finish', () => { | ||
| t.same(stream.writableFinished, true) | ||
| readFile(dest, 'utf8', (err, data) => { | ||
| t.error(err) | ||
| t.equal(data, 'hello world\nsomething else\n') | ||
| }) | ||
| }) | ||
| t.same(stream.closed, false) | ||
| stream.on('close', () => { | ||
| t.same(stream.closed, true) | ||
| t.notOk(stream.writable) | ||
| t.pass('close emitted') | ||
| }) | ||
| t.same(stream.writableNeedDrain, false) | ||
@@ -49,7 +47,15 @@ t.ok(stream.write('hello world\n')) | ||
| t.same(stream.writableEnded, true) | ||
| await finish | ||
| t.same(stream.writableFinished, true) | ||
| await close | ||
| t.same(stream.closed, true) | ||
| t.notOk(stream.writable) | ||
| const data = await readFileAsync(dest) | ||
| t.equal(data, 'hello world\nsomething else\n') | ||
| }) | ||
| test('overflow sync=true', function (t) { | ||
| t.plan(3) | ||
| test('overflow sync=true', async function (t) { | ||
| const dest = file() | ||
@@ -63,5 +69,5 @@ const stream = new ThreadStream({ | ||
| const close = once(stream, 'close') | ||
| let count = 0 | ||
| // Write 10 chars, 20 times | ||
| function write () { | ||
@@ -74,3 +80,2 @@ if (count++ === 20) { | ||
| stream.write('aaaaaaaaaa') | ||
| // do not wait for drain event | ||
| setImmediate(write) | ||
@@ -81,15 +86,8 @@ } | ||
| stream.on('finish', () => { | ||
| t.pass('finish emitted') | ||
| }) | ||
| stream.on('close', () => { | ||
| readFile(dest, 'utf8', (err, data) => { | ||
| t.error(err) | ||
| t.equal(data.length, 200) | ||
| }) | ||
| }) | ||
| await close | ||
| const data = await readFileAsync(dest) | ||
| t.equal(data.length, 200) | ||
| }) | ||
| test('overflow sync=false', function (t) { | ||
| test('overflow sync=false', async function (t) { | ||
| const dest = file() | ||
@@ -103,2 +101,3 @@ const stream = new ThreadStream({ | ||
| const close = once(stream, 'close') | ||
| let count = 0 | ||
@@ -108,6 +107,4 @@ | ||
| // Write 10 chars, 20 times | ||
| function write () { | ||
| if (count++ === 20) { | ||
| t.pass('end sent') | ||
| stream.end() | ||
@@ -120,3 +117,2 @@ return | ||
| } | ||
| // do not wait for drain event | ||
| setImmediate(write) | ||
@@ -129,20 +125,11 @@ } | ||
| t.same(stream.writableNeedDrain, false) | ||
| t.pass('drain') | ||
| }) | ||
| stream.on('finish', () => { | ||
| t.pass('finish emitted') | ||
| }) | ||
| stream.on('close', () => { | ||
| readFile(dest, 'utf8', (err, data) => { | ||
| t.error(err) | ||
| t.equal(data.length, 200) | ||
| t.end() | ||
| }) | ||
| }) | ||
| await close | ||
| const data = await readFileAsync(dest) | ||
| t.equal(data.length, 200) | ||
| }) | ||
| test('over the bufferSize at startup', function (t) { | ||
| t.plan(6) | ||
| t.plan(5) | ||
@@ -164,6 +151,2 @@ const dest = file() | ||
| stream.on('close', () => { | ||
| t.pass('close emitted') | ||
| }) | ||
| t.ok(stream.write('hello')) | ||
@@ -177,3 +160,3 @@ t.ok(stream.write(' world\n')) | ||
| test('over the bufferSize at startup (async)', function (t) { | ||
| t.plan(6) | ||
| t.plan(5) | ||
@@ -200,9 +183,7 @@ const dest = file() | ||
| }) | ||
| stream.on('close', () => { | ||
| t.pass('close emitted') | ||
| }) | ||
| }) | ||
| test('flushSync sync=false', function (t) { | ||
| t.plan(2) | ||
| const dest = file() | ||
@@ -216,28 +197,22 @@ const stream = new ThreadStream({ | ||
| stream.on('drain', () => { | ||
| t.pass('drain') | ||
| stream.end() | ||
| stream.on('ready', () => { | ||
| for (let count = 0; count < 20; count++) { | ||
| stream.write('aaaaaaaaaa') | ||
| } | ||
| stream.flushSync() | ||
| setImmediate(() => { | ||
| stream.end() | ||
| }) | ||
| }) | ||
| stream.on('finish', () => { | ||
| t.pass('finish emitted') | ||
| }) | ||
| stream.on('close', () => { | ||
| readFile(dest, 'utf8', (err, data) => { | ||
| t.error(err) | ||
| t.equal(data.length, 200) | ||
| t.end() | ||
| }) | ||
| }) | ||
| for (let count = 0; count < 20; count++) { | ||
| stream.write('aaaaaaaaaa') | ||
| } | ||
| stream.flushSync() | ||
| }) | ||
| test('pass down MessagePorts', async function (t) { | ||
| t.plan(3) | ||
| const { port1, port2 } = new MessageChannel() | ||
@@ -252,2 +227,3 @@ const stream = new ThreadStream({ | ||
| }) | ||
| t.teardown(() => { | ||
@@ -261,9 +237,6 @@ stream.end() | ||
| const [strings] = await once(port2, 'message') | ||
| t.equal(strings, 'hello world\nsomething else\n') | ||
| }) | ||
| test('destroy does not error', function (t) { | ||
| t.plan(5) | ||
| test('destroy does not error', async function (t) { | ||
| const dest = file() | ||
@@ -277,19 +250,20 @@ const stream = new ThreadStream({ | ||
| stream.on('ready', () => { | ||
| t.pass('ready emitted') | ||
| stream.worker.terminate() | ||
| }) | ||
| stream.on('error', (err) => { | ||
| t.equal(err.message, 'the worker thread exited') | ||
| const [err] = await once(stream, 'error') | ||
| t.equal(err.message, 'the worker thread exited') | ||
| await new Promise((resolve) => { | ||
| stream.flush((err) => { | ||
| t.equal(err.message, 'the worker has exited') | ||
| resolve() | ||
| }) | ||
| t.doesNotThrow(() => stream.flushSync()) | ||
| t.doesNotThrow(() => stream.end()) | ||
| }) | ||
| t.doesNotThrow(() => stream.flushSync()) | ||
| t.doesNotThrow(() => stream.end()) | ||
| }) | ||
| test('syntax error', function (t) { | ||
| t.plan(1) | ||
| test('syntax error', async function (t) { | ||
| const stream = new ThreadStream({ | ||
@@ -299,5 +273,4 @@ filename: join(__dirname, 'syntax-error.mjs') | ||
| stream.on('error', (err) => { | ||
| t.equal(err.message, 'Unexpected end of input') | ||
| }) | ||
| const [err] = await once(stream, 'error') | ||
| t.equal(err.message, 'Unexpected end of input') | ||
| }) |
74238
6.02%62
6.9%2100
3.24%137
0.74%