thread-stream
Advanced tools
Comparing version 0.4.0 to 0.6.0
86
index.js
@@ -39,3 +39,3 @@ 'use strict' | ||
const writeIndex = Atomics.load(stream._state, WRITE_INDEX) | ||
const leftover = stream._data.length - writeIndex | ||
let leftover = stream._data.length - writeIndex | ||
@@ -52,6 +52,27 @@ if (leftover > 0) { | ||
} | ||
// TODO handle truncated utf-8 chunks | ||
const toWrite = stream.buf.slice(0, leftover) | ||
stream.buf = stream.buf.slice(leftover) | ||
stream._write(toWrite, nextFlush.bind(null, stream)) | ||
let toWrite = stream.buf.slice(0, leftover) | ||
let toWriteBytes = Buffer.byteLength(toWrite) | ||
if (toWriteBytes <= leftover) { | ||
stream.buf = stream.buf.slice(leftover) | ||
// process._rawDebug('writing ' + toWrite.length) | ||
stream._write(toWrite, nextFlush.bind(null, stream)) | ||
} else { | ||
// multi-byte utf-8 | ||
stream.flush(() => { | ||
Atomics.store(stream._state, READ_INDEX, 0) | ||
Atomics.store(stream._state, WRITE_INDEX, 0) | ||
// Find a toWrite length that fits the buffer | ||
// it must exists as the buffer is at least 4 bytes length | ||
// and the max utf-8 length for a char is 4 bytes. | ||
while (toWriteBytes > stream.buf.length) { | ||
leftover = leftover / 2 | ||
toWrite = stream.buf.slice(0, leftover) | ||
toWriteBytes = Buffer.byteLength(toWrite) | ||
} | ||
stream.buf = stream.buf.slice(leftover) | ||
stream._write(toWrite, nextFlush.bind(null, stream)) | ||
}) | ||
} | ||
} else if (leftover === 0) { | ||
@@ -68,2 +89,3 @@ if (writeIndex === 0 && stream.buf.length === 0) { | ||
} else { | ||
// This should never happen | ||
throw new Error('overwritten') | ||
@@ -77,2 +99,6 @@ } | ||
if (opts.bufferSize < 4) { | ||
throw new Error('bufferSize must at least fit a 4-byte utf-8 char') | ||
} | ||
this._stateBuf = new SharedArrayBuffer(128) | ||
@@ -122,7 +148,11 @@ this._state = new Int32Array(this._stateBuf) | ||
_hasSpace () { | ||
const current = Atomics.load(this._state, WRITE_INDEX) | ||
return this._data.length - this.buf.length - current > 0 | ||
} | ||
write (data) { | ||
if (!this.ready || this.flushing) { | ||
this.buf += data | ||
// TODO this should return false | ||
return true | ||
return this._hasSpace() | ||
} | ||
@@ -141,4 +171,3 @@ | ||
// TODO implement highWaterMark | ||
return false | ||
return this._hasSpace() | ||
} | ||
@@ -202,3 +231,3 @@ | ||
const writeIndex = Atomics.load(this._state, WRITE_INDEX) | ||
const leftover = this._data.length - writeIndex | ||
let leftover = this._data.length - writeIndex | ||
if (leftover === 0) { | ||
@@ -210,10 +239,29 @@ this._flushSync() | ||
} else if (leftover < 0) { | ||
// This should never happen | ||
throw new Error('overwritten') | ||
} | ||
// TODO handle truncated utf-8 chunks | ||
const toWrite = this.buf.slice(0, leftover) | ||
this.buf = this.buf.slice(leftover) | ||
// process._rawDebug('writing ' + toWrite.length) | ||
this._write(toWrite, cb) | ||
let toWrite = this.buf.slice(0, leftover) | ||
let toWriteBytes = Buffer.byteLength(toWrite) | ||
if (toWriteBytes <= leftover) { | ||
this.buf = this.buf.slice(leftover) | ||
// process._rawDebug('writing ' + toWrite.length) | ||
this._write(toWrite, cb) | ||
} else { | ||
// multi-byte utf-8 | ||
this._flushSync() | ||
Atomics.store(this._state, READ_INDEX, 0) | ||
Atomics.store(this._state, WRITE_INDEX, 0) | ||
// Find a toWrite length that fits the buffer | ||
// it must exists as the buffer is at least 4 bytes length | ||
// and the max utf-8 length for a char is 4 bytes. | ||
while (toWriteBytes > this.buf.length) { | ||
leftover = leftover / 2 | ||
toWrite = this.buf.slice(0, leftover) | ||
toWriteBytes = Buffer.byteLength(toWrite) | ||
} | ||
this.buf = this.buf.slice(leftover) | ||
this._write(toWrite, cb) | ||
} | ||
} | ||
@@ -255,4 +303,12 @@ } | ||
} | ||
unref () { | ||
this.worker.unref() | ||
} | ||
ref () { | ||
this.worker.ref() | ||
} | ||
} | ||
module.exports = ThreadStream |
{ | ||
"name": "thread-stream", | ||
"version": "0.4.0", | ||
"version": "0.6.0", | ||
"description": "A streaming way to send data to a Node.js Worker Thread", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -5,26 +5,6 @@ 'use strict' | ||
const { join } = require('path') | ||
const { tmpdir } = require('os') | ||
const { unlinkSync } = require('fs') | ||
const { readFile } = require('fs') | ||
const { file } = require('./helper') | ||
const ThreadStream = require('..') | ||
const files = [] | ||
let count = 0 | ||
function file () { | ||
const file = join(tmpdir(), `thread-stream-${process.pid}-${process.hrtime().toString()}-${count++}`) | ||
files.push(file) | ||
return file | ||
} | ||
process.on('beforeExit', () => { | ||
for (const file of files) { | ||
try { | ||
unlinkSync(file) | ||
} catch (e) { | ||
console.log(e) | ||
} | ||
} | ||
}) | ||
test('base sync=true', function (t) { | ||
@@ -154,3 +134,3 @@ t.plan(7) | ||
test('over the bufferSize at startup', function (t) { | ||
t.plan(7) | ||
t.plan(8) | ||
@@ -173,4 +153,5 @@ const dest = file() | ||
t.ok(stream.write('hello world\n')) | ||
t.ok(stream.write('something else\n')) | ||
t.ok(stream.write('hello')) | ||
t.notOk(stream.write(' world\n')) | ||
t.notOk(stream.write('something else\n')) | ||
@@ -192,3 +173,3 @@ stream.end() | ||
test('over the bufferSize at startup (async)', function (t) { | ||
t.plan(7) | ||
t.plan(8) | ||
@@ -211,4 +192,5 @@ const dest = file() | ||
t.ok(stream.write('hello world\n')) | ||
t.ok(stream.write('something else\n')) | ||
t.ok(stream.write('hello')) | ||
t.notOk(stream.write(' world\n')) | ||
t.notOk(stream.write('something else\n')) | ||
@@ -215,0 +197,0 @@ stream.end() |
@@ -5,25 +5,5 @@ 'use strict' | ||
const { join } = require('path') | ||
const { tmpdir } = require('os') | ||
const { unlinkSync } = require('fs') | ||
const ThreadStream = require('..') | ||
const { file } = require('./helper') | ||
const files = [] | ||
let count = 0 | ||
function file () { | ||
const file = join(tmpdir(), `thread-stream-${process.pid}-${process.hrtime().toString()}-${count++}`) | ||
files.push(file) | ||
return file | ||
} | ||
process.on('beforeExit', () => { | ||
for (const file of files) { | ||
try { | ||
unlinkSync(file) | ||
} catch (e) { | ||
console.log(e) | ||
} | ||
} | ||
}) | ||
const MAX = 1000 | ||
@@ -30,0 +10,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Shell access
Supply chain riskThis module accesses the system shell. Accessing the system shell increases the risk of executing arbitrary code.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
26711
20
875
8
1