thread-stream
Advanced tools
| { | ||
| "permissions": { | ||
| "allow": [ | ||
| "Bash(npx tap:*)", | ||
| "Bash(node:*)", | ||
| "Bash(for i in 1 2 3 4 5)", | ||
| "Bash(do)", | ||
| "Bash(echo:*)", | ||
| "Bash(done)", | ||
| "Bash(npm test:*)" | ||
| ], | ||
| "deny": [], | ||
| "ask": [] | ||
| } | ||
| } |
| import { test } from 'node:test' | ||
| import assert from 'node:assert/strict' | ||
| import { readFile } from 'node:fs/promises' | ||
| import ThreadStream from '../index.js' | ||
| import { join } from 'desm' | ||
| import { file } from './helper.js' | ||
| test('preserves multibyte records that cross the buffer boundary', async () => { | ||
| const dest = file() | ||
| const stream = new ThreadStream({ | ||
| bufferSize: 128, | ||
| filename: join(import.meta.url, 'to-file.js'), | ||
| workerData: { dest }, | ||
| sync: false | ||
| }) | ||
| let expected = '' | ||
| for (let i = 0; i < 1000; i++) { | ||
| const line = `{"idx":${i},"alert":"🚨"}\n` | ||
| expected += line | ||
| stream.write(line) | ||
| } | ||
| await new Promise((resolve, reject) => { | ||
| stream.once('error', reject) | ||
| stream.once('close', resolve) | ||
| stream.end() | ||
| }) | ||
| const data = await readFile(dest, 'utf8') | ||
| assert.strictEqual(data, expected) | ||
| }) |
+92
-87
@@ -11,3 +11,4 @@ 'use strict' | ||
| WRITE_INDEX, | ||
| READ_INDEX | ||
| READ_INDEX, | ||
| SEQ_INDEX | ||
| } = require('./lib/indexes') | ||
@@ -19,3 +20,3 @@ const buffer = require('buffer') | ||
| // V8 limit for string size | ||
| // Maximum pending buffered data before forcing a synchronous drain | ||
| const MAX_STRING = buffer.constants.MAX_STRING_LENGTH | ||
@@ -25,2 +26,16 @@ | ||
| function updateState (stream, fn) { | ||
| Atomics.add(stream[kImpl].state, SEQ_INDEX, 1) | ||
| fn() | ||
| Atomics.add(stream[kImpl].state, SEQ_INDEX, 1) | ||
| Atomics.notify(stream[kImpl].state, SEQ_INDEX) | ||
| } | ||
| function resetIndexes (stream) { | ||
| updateState(stream, () => { | ||
| Atomics.store(stream[kImpl].state, READ_INDEX, 0) | ||
| Atomics.store(stream[kImpl].state, WRITE_INDEX, 0) | ||
| }) | ||
| } | ||
| class FakeWeakRef { | ||
@@ -99,28 +114,29 @@ constructor (value) { | ||
| function nextFlush (stream) { | ||
| const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX) | ||
| let leftover = stream[kImpl].data.length - writeIndex | ||
| while (true) { | ||
| const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX) | ||
| const leftover = stream[kImpl].data.length - writeIndex | ||
| if (leftover > 0) { | ||
| if (stream[kImpl].buf.length === 0) { | ||
| stream[kImpl].flushing = false | ||
| if (leftover > 0) { | ||
| if (stream[kImpl].bufLen === 0) { | ||
| stream[kImpl].flushing = false | ||
| if (stream[kImpl].ending) { | ||
| end(stream) | ||
| } else if (stream[kImpl].needDrain) { | ||
| process.nextTick(drain, stream) | ||
| if (stream[kImpl].ending) { | ||
| end(stream) | ||
| } else if (stream[kImpl].needDrain) { | ||
| process.nextTick(drain, stream) | ||
| } | ||
| return | ||
| } | ||
| return | ||
| write(stream, leftover, noop) | ||
| continue | ||
| } | ||
| let toWrite = stream[kImpl].buf.slice(0, leftover) | ||
| let toWriteBytes = Buffer.byteLength(toWrite) | ||
| if (toWriteBytes <= leftover) { | ||
| stream[kImpl].buf = stream[kImpl].buf.slice(leftover) | ||
| // process._rawDebug('writing ' + toWrite.length) | ||
| write(stream, toWrite, nextFlush.bind(null, stream)) | ||
| } else { | ||
| // multi-byte utf-8 | ||
| if (leftover === 0) { | ||
| if (writeIndex === 0 && stream[kImpl].bufLen === 0) { | ||
| // we had a flushSync in the meanwhile | ||
| return | ||
| } | ||
| waitForRead(stream, () => { | ||
| // err is already handled in waitForRead() | ||
| if (stream.destroyed) { | ||
@@ -130,32 +146,11 @@ return | ||
| Atomics.store(stream[kImpl].state, READ_INDEX, 0) | ||
| Atomics.store(stream[kImpl].state, WRITE_INDEX, 0) | ||
| Atomics.notify(stream[kImpl].state, READ_INDEX) | ||
| // Find a toWrite length that fits the buffer | ||
| // it must exists as the buffer is at least 4 bytes length | ||
| // and the max utf-8 length for a char is 4 bytes. | ||
| while (toWriteBytes > stream[kImpl].data.length) { | ||
| leftover = leftover / 2 | ||
| toWrite = stream[kImpl].buf.slice(0, leftover) | ||
| toWriteBytes = Buffer.byteLength(toWrite) | ||
| } | ||
| stream[kImpl].buf = stream[kImpl].buf.slice(leftover) | ||
| write(stream, toWrite, nextFlush.bind(null, stream)) | ||
| resetIndexes(stream) | ||
| nextFlush(stream) | ||
| }) | ||
| } | ||
| } else if (leftover === 0) { | ||
| if (writeIndex === 0 && stream[kImpl].buf.length === 0) { | ||
| // we had a flushSync in the meanwhile | ||
| return | ||
| } | ||
| waitForRead(stream, () => { | ||
| Atomics.store(stream[kImpl].state, READ_INDEX, 0) | ||
| Atomics.store(stream[kImpl].state, WRITE_INDEX, 0) | ||
| Atomics.notify(stream[kImpl].state, READ_INDEX) | ||
| nextFlush(stream) | ||
| }) | ||
| } else { | ||
| // This should never happen | ||
| destroy(stream, new Error('overwritten')) | ||
| return | ||
| } | ||
@@ -256,3 +251,5 @@ } | ||
| this[kImpl].closed = false | ||
| this[kImpl].buf = '' | ||
| this[kImpl].buf = [] | ||
| this[kImpl].bufHead = 0 | ||
| this[kImpl].bufLen = 0 | ||
| this[kImpl].flushCallbacks = new Map() | ||
@@ -269,2 +266,3 @@ this[kImpl].nextFlushId = 0 | ||
| write (data) { | ||
| const dataBuf = Buffer.isBuffer(data) ? data : Buffer.from(data) | ||
| if (this[kImpl].destroyed) { | ||
@@ -280,3 +278,3 @@ error(this, new Error('the worker has exited')) | ||
| if (this[kImpl].flushing && this[kImpl].buf.length + data.length >= MAX_STRING) { | ||
| if (this[kImpl].flushing && this[kImpl].bufLen + dataBuf.length >= MAX_STRING) { | ||
| try { | ||
@@ -291,3 +289,4 @@ writeSync(this) | ||
| this[kImpl].buf += data | ||
| this[kImpl].buf.push(dataBuf) | ||
| this[kImpl].bufLen += dataBuf.length | ||
@@ -309,3 +308,3 @@ if (this[kImpl].sync) { | ||
| this[kImpl].needDrain = this[kImpl].data.length - this[kImpl].buf.length - Atomics.load(this[kImpl].state, WRITE_INDEX) <= 0 | ||
| this[kImpl].needDrain = this[kImpl].data.length - this[kImpl].bufLen - Atomics.load(this[kImpl].state, WRITE_INDEX) <= 0 | ||
| return !this[kImpl].needDrain | ||
@@ -396,3 +395,3 @@ } | ||
| if (!stream[kImpl].sync && (stream[kImpl].flushing || stream[kImpl].buf.length > 0)) { | ||
| if (!stream[kImpl].sync && (stream[kImpl].flushing || stream[kImpl].bufLen > 0)) { | ||
| setImmediate(flushBuffer, stream, cb) | ||
@@ -511,9 +510,39 @@ return | ||
| function write (stream, data, cb) { | ||
| function write (stream, maxBytes, cb) { | ||
| // data is smaller than the shared buffer length | ||
| const current = Atomics.load(stream[kImpl].state, WRITE_INDEX) | ||
| const length = Buffer.byteLength(data) | ||
| stream[kImpl].data.write(data, current) | ||
| Atomics.store(stream[kImpl].state, WRITE_INDEX, current + length) | ||
| Atomics.notify(stream[kImpl].state, WRITE_INDEX) | ||
| let offset = current | ||
| let remaining = maxBytes | ||
| while (remaining > 0 && stream[kImpl].bufLen !== 0) { | ||
| const head = stream[kImpl].bufHead | ||
| const buf = stream[kImpl].buf[head] | ||
| if (buf.length <= remaining) { | ||
| buf.copy(stream[kImpl].data, offset) | ||
| offset += buf.length | ||
| remaining -= buf.length | ||
| stream[kImpl].bufLen -= buf.length | ||
| stream[kImpl].bufHead = head + 1 | ||
| if (stream[kImpl].bufHead === stream[kImpl].buf.length) { | ||
| stream[kImpl].buf.length = 0 | ||
| stream[kImpl].bufHead = 0 | ||
| } else if (stream[kImpl].bufHead >= 1024 && stream[kImpl].bufHead * 2 >= stream[kImpl].buf.length) { | ||
| stream[kImpl].buf.splice(0, stream[kImpl].bufHead) | ||
| stream[kImpl].bufHead = 0 | ||
| } | ||
| continue | ||
| } | ||
| buf.copy(stream[kImpl].data, offset, 0, remaining) | ||
| stream[kImpl].buf[head] = buf.subarray(remaining) | ||
| stream[kImpl].bufLen -= remaining | ||
| offset += remaining | ||
| remaining = 0 | ||
| } | ||
| updateState(stream, () => { | ||
| Atomics.store(stream[kImpl].state, WRITE_INDEX, offset) | ||
| }) | ||
| cb() | ||
@@ -535,5 +564,6 @@ return true | ||
| // process._rawDebug('writing index') | ||
| Atomics.store(stream[kImpl].state, WRITE_INDEX, -1) | ||
| updateState(stream, () => { | ||
| Atomics.store(stream[kImpl].state, WRITE_INDEX, -1) | ||
| }) | ||
| // process._rawDebug(`(end) readIndex (${Atomics.load(stream.state, READ_INDEX)}) writeIndex (${Atomics.load(stream.state, WRITE_INDEX)})`) | ||
| Atomics.notify(stream[kImpl].state, WRITE_INDEX) | ||
@@ -578,10 +608,8 @@ // Wait for the process to complete | ||
| while (stream[kImpl].buf.length !== 0) { | ||
| while (stream[kImpl].bufLen !== 0) { | ||
| const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX) | ||
| let leftover = stream[kImpl].data.length - writeIndex | ||
| const leftover = stream[kImpl].data.length - writeIndex | ||
| if (leftover === 0) { | ||
| flushSync(stream) | ||
| Atomics.store(stream[kImpl].state, READ_INDEX, 0) | ||
| Atomics.store(stream[kImpl].state, WRITE_INDEX, 0) | ||
| Atomics.notify(stream[kImpl].state, READ_INDEX) | ||
| resetIndexes(stream) | ||
| continue | ||
@@ -593,26 +621,3 @@ } else if (leftover < 0) { | ||
| let toWrite = stream[kImpl].buf.slice(0, leftover) | ||
| let toWriteBytes = Buffer.byteLength(toWrite) | ||
| if (toWriteBytes <= leftover) { | ||
| stream[kImpl].buf = stream[kImpl].buf.slice(leftover) | ||
| // process._rawDebug('writing ' + toWrite.length) | ||
| write(stream, toWrite, cb) | ||
| } else { | ||
| // multi-byte utf-8 | ||
| flushSync(stream) | ||
| Atomics.store(stream[kImpl].state, READ_INDEX, 0) | ||
| Atomics.store(stream[kImpl].state, WRITE_INDEX, 0) | ||
| Atomics.notify(stream[kImpl].state, READ_INDEX) | ||
| // Find a toWrite length that fits the buffer | ||
| // it must exists as the buffer is at least 4 bytes length | ||
| // and the max utf-8 length for a char is 4 bytes. | ||
| while (toWriteBytes > stream[kImpl].buf.length) { | ||
| leftover = leftover / 2 | ||
| toWrite = stream[kImpl].buf.slice(0, leftover) | ||
| toWriteBytes = Buffer.byteLength(toWrite) | ||
| } | ||
| stream[kImpl].buf = stream[kImpl].buf.slice(leftover) | ||
| write(stream, toWrite, cb) | ||
| } | ||
| write(stream, leftover, cb) | ||
| } | ||
@@ -619,0 +624,0 @@ } |
+3
-1
| 'use strict' | ||
| const SEQ_INDEX = 2 | ||
| const WRITE_INDEX = 4 | ||
@@ -8,3 +9,4 @@ const READ_INDEX = 8 | ||
| WRITE_INDEX, | ||
| READ_INDEX | ||
| READ_INDEX, | ||
| SEQ_INDEX | ||
| } |
+11
-2
@@ -53,3 +53,5 @@ 'use strict' | ||
| // Wait for value to change from expected | ||
| // Wait for value to change from expected. | ||
| // If we are notified, resume immediately even if the value cycled back | ||
| // to the same number before we could re-read it. | ||
| const remaining = max === Infinity ? WAIT_MS : Math.min(WAIT_MS, Math.max(1, max - Date.now())) | ||
@@ -59,3 +61,10 @@ const result = Atomics.waitAsync(state, index, expected, remaining) | ||
| if (result.async) { | ||
| result.value.then(check) | ||
| result.value.then((res) => { | ||
| if (res === 'ok') { | ||
| done(null, 'ok') | ||
| return | ||
| } | ||
| check() | ||
| }) | ||
| } else { | ||
@@ -62,0 +71,0 @@ // Value already changed (not-equal) - recheck on next tick |
+27
-9
@@ -5,3 +5,4 @@ 'use strict' | ||
| const { workerData, parentPort } = require('worker_threads') | ||
| const { WRITE_INDEX, READ_INDEX } = require('./indexes') | ||
| const { StringDecoder } = require('string_decoder') | ||
| const { WRITE_INDEX, READ_INDEX, SEQ_INDEX } = require('./indexes') | ||
| const { waitDiff } = require('./wait') | ||
@@ -21,2 +22,3 @@ | ||
| const data = Buffer.from(dataBuf) | ||
| const decoder = new StringDecoder('utf8') | ||
@@ -212,5 +214,21 @@ // Keep the event loop alive - Atomics.waitAsync promises don't prevent worker exit | ||
| function readState () { | ||
| while (true) { | ||
| const seq = Atomics.load(state, SEQ_INDEX) | ||
| if ((seq & 1) !== 0) { | ||
| continue | ||
| } | ||
| const current = Atomics.load(state, READ_INDEX) | ||
| const end = Atomics.load(state, WRITE_INDEX) | ||
| if (seq === Atomics.load(state, SEQ_INDEX)) { | ||
| return { current, end, seq } | ||
| } | ||
| } | ||
| } | ||
| function run () { | ||
| const current = Atomics.load(state, READ_INDEX) | ||
| const end = Atomics.load(state, WRITE_INDEX) | ||
| const { current, end, seq } = readState() | ||
@@ -220,7 +238,3 @@ // process._rawDebug(`pre state ${current} ${end}`) | ||
| if (end === current) { | ||
| if (end === data.length) { | ||
| waitDiff(state, READ_INDEX, end, Infinity, run) | ||
| } else { | ||
| waitDiff(state, WRITE_INDEX, end, Infinity, run) | ||
| } | ||
| waitDiff(state, SEQ_INDEX, seq, Infinity, run) | ||
| return | ||
@@ -233,2 +247,6 @@ } | ||
| // process._rawDebug('end') | ||
| const remaining = decoder.end() | ||
| if (remaining.length > 0) { | ||
| destination.write(remaining) | ||
| } | ||
| destination.end() | ||
@@ -238,3 +256,3 @@ return | ||
| const toWrite = data.toString('utf8', current, end) | ||
| const toWrite = decoder.write(data.subarray(current, end)) | ||
| // process._rawDebug('worker writing: ' + toWrite) | ||
@@ -241,0 +259,0 @@ |
+2
-2
| { | ||
| "name": "thread-stream", | ||
| "version": "4.1.0", | ||
| "version": "4.2.0", | ||
| "description": "A streaming way to send data to a Node.js Worker Thread", | ||
@@ -21,3 +21,3 @@ "main": "index.js", | ||
| "neostandard": "^0.13.0", | ||
| "pino-elasticsearch": "^8.0.0", | ||
| "pino-elasticsearch": "^9.0.0", | ||
| "sonic-boom": "^5.0.0", | ||
@@ -24,0 +24,0 @@ "ts-node": "^10.8.0", |
83313
1.14%65
3.17%2489
2.68%