thread-stream
Advanced tools
Comparing version 0.1.0 to 0.2.0
102
index.js
@@ -55,4 +55,5 @@ 'use strict' | ||
if (this.buf.length > 0) { | ||
this.write(this.buf) | ||
const toWrite = this.buf | ||
this.buf = '' | ||
this.write(toWrite) | ||
} | ||
@@ -79,49 +80,80 @@ this.emit('ready') | ||
_write (data, cb) { | ||
// data is smaller than the shared buffer length | ||
const current = Atomics.load(this._state, WRITE_INDEX) | ||
const length = Buffer.byteLength(data) | ||
this._data.write(data, current) | ||
Atomics.store(this._state, WRITE_INDEX, current + length) | ||
Atomics.notify(this._state, WRITE_INDEX) | ||
cb() | ||
return true | ||
} | ||
write (data) { | ||
if (!this.ready || this.flushing) { | ||
this.buf += data | ||
// TODO this should return false | ||
return true | ||
} | ||
if (data.length >= this._data.length) { | ||
// We are not splitting the string in two to avoid dealing | ||
// with truncated utf-8 chunks, therefore we cannot write | ||
// a string longer than the buffer. | ||
throw new Error('The SharedArrayBuffer is too small') | ||
const cb = () => { | ||
if (!this.needDrain) { | ||
// process._rawDebug('emitting drain') | ||
this.needDrain = true | ||
process.nextTick(drain, this) | ||
} | ||
} | ||
let current = Atomics.load(this._state, WRITE_INDEX) | ||
const length = Buffer.byteLength(data) | ||
if (current + length >= this._data.length) { | ||
// Handle overflow cases, we need to go back | ||
// at the beginning of the buffer to write the string. | ||
if (this._sync) { | ||
this.flushSync() | ||
Atomics.store(this._state, READ_INDEX, 0) | ||
current = 0 | ||
} else { | ||
this.flushing = true | ||
this.buf = data | ||
if (this._sync) { | ||
while (data.length !== 0) { | ||
const writeIndex = Atomics.load(this._state, WRITE_INDEX) | ||
const leftover = this._data.length - writeIndex | ||
if (leftover === 0) { | ||
this.flushSync() | ||
Atomics.store(this._state, READ_INDEX, 0) | ||
Atomics.store(this._state, WRITE_INDEX, 0) | ||
continue | ||
} else if (leftover < 0) { | ||
throw new Error('overwritten') | ||
} | ||
// TODO handle truncated utf-8 chunks | ||
const toWrite = data.slice(0, leftover) | ||
this._write(toWrite, cb) | ||
data = data.slice(leftover) | ||
} | ||
return true | ||
} | ||
const next = () => { | ||
const writeIndex = Atomics.load(this._state, WRITE_INDEX) | ||
const leftover = this._data.length - writeIndex | ||
if (leftover > 0) { | ||
if (this.buf.length === 0) { | ||
this.flushing = false | ||
cb() | ||
return | ||
} | ||
// TODO handle truncated utf-8 chunks | ||
const toWrite = this.buf.slice(0, leftover) | ||
this.buf = this.buf.slice(leftover) | ||
this._write(toWrite, next) | ||
} else if (leftover === 0) { | ||
this.flush(() => { | ||
this.flushing = false | ||
current = 0 | ||
// process._rawDebug('writing ' + Buffer.byteLength(this.buf)) | ||
this._data.write(this.buf, current) | ||
Atomics.store(this._state, READ_INDEX, 0) | ||
Atomics.store(this._state, WRITE_INDEX, current + Buffer.byteLength(this.buf)) | ||
Atomics.notify(this._state, WRITE_INDEX) | ||
this.buf = '' | ||
this.emit('drain') | ||
Atomics.store(this._state, WRITE_INDEX, 0) | ||
next() | ||
}) | ||
return | ||
} else { | ||
throw new Error('overwritten') | ||
} | ||
} | ||
this._data.write(data, current) | ||
Atomics.store(this._state, WRITE_INDEX, current + length) | ||
Atomics.notify(this._state, WRITE_INDEX) | ||
if (!this.needDrain) { | ||
this.needDrain = true | ||
process.nextTick(drain, this) | ||
} | ||
return true | ||
this.buf = data | ||
this.flushing = true | ||
next() | ||
return false | ||
} | ||
@@ -128,0 +160,0 @@ |
@@ -62,2 +62,3 @@ 'use strict' | ||
const toWrite = data.toString('utf8', current, end) | ||
// process._rawDebug('worker writing: ' + toWrite) | ||
@@ -64,0 +65,0 @@ destination.write(toWrite, function () { |
{ | ||
"name": "thread-stream", | ||
"version": "0.1.0", | ||
"version": "0.2.0", | ||
"description": "A streaming way to send data to a Node.js Worker Thread", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -25,5 +25,8 @@ # thread-stream | ||
stream.write('hello') | ||
stream.write(' ') | ||
stream.write('world') | ||
stream.end() | ||
stream.flush(function () { | ||
stream.write(' ') | ||
stream.write('world') | ||
stream.flushSync() | ||
stream.end() | ||
}) | ||
``` | ||
@@ -30,0 +33,0 @@ |
@@ -148,1 +148,72 @@ 'use strict' | ||
}) | ||
test('over the bufferSize at startup', function (t) { | ||
t.plan(7) | ||
const dest = file() | ||
const stream = new ThreadStream({ | ||
bufferSize: 10, | ||
filename: join(__dirname, 'to-file'), | ||
workerData: { dest } | ||
}) | ||
stream.on('drain', () => { | ||
t.pass('drain') | ||
}) | ||
stream.on('ready', () => { | ||
t.pass('ready emitted') | ||
}) | ||
t.ok(stream.write('hello world\n')) | ||
t.ok(stream.write('something else\n')) | ||
stream.end() | ||
stream.on('finish', () => { | ||
readFile(dest, 'utf8', (err, data) => { | ||
t.error(err) | ||
t.equal(data, 'hello world\nsomething else\n') | ||
}) | ||
}) | ||
stream.on('close', () => { | ||
t.pass('close emitted') | ||
}) | ||
}) | ||
test('over the bufferSize at startup (async)', function (t) { | ||
t.plan(7) | ||
const dest = file() | ||
const stream = new ThreadStream({ | ||
bufferSize: 10, | ||
filename: join(__dirname, 'to-file'), | ||
workerData: { dest }, | ||
sync: false | ||
}) | ||
stream.on('drain', () => { | ||
t.pass('drain') | ||
}) | ||
stream.on('ready', () => { | ||
t.pass('ready emitted') | ||
}) | ||
t.ok(stream.write('hello world\n')) | ||
t.ok(stream.write('something else\n')) | ||
stream.end() | ||
stream.on('finish', () => { | ||
readFile(dest, 'utf8', (err, data) => { | ||
t.error(err) | ||
t.equal(data, 'hello world\nsomething else\n') | ||
}) | ||
}) | ||
stream.on('close', () => { | ||
t.pass('close emitted') | ||
}) | ||
}) |
16957
533
53