thread-stream
Advanced tools
Comparing version 0.2.1 to 0.3.0
13
bench.js
@@ -16,6 +16,7 @@ 'use strict' | ||
const dummyConsole = new Console(out) | ||
const threadStream = new ThreadStream({ | ||
const threadStreamSync = new ThreadStream({ | ||
filename: join(__dirname, 'test', 'to-file'), | ||
workerData: { dest: '/dev/null' }, | ||
bufferSize: 4 * 1024 * 1024 | ||
bufferSize: 4 * 1024 * 1024, | ||
sync: true | ||
}) | ||
@@ -40,5 +41,5 @@ const threadStreamAsync = new ThreadStream({ | ||
const run = bench([ | ||
function benchThreadStream (cb) { | ||
function benchThreadStreamSync (cb) { | ||
for (let i = 0; i < MAX; i++) { | ||
threadStream.write(str) | ||
threadStreamSync.write(str) | ||
} | ||
@@ -87,5 +88,1 @@ setImmediate(cb) | ||
} | ||
process.on('beforeExit', function () { | ||
console.log(threadStream.flushes) | ||
}) |
90
index.js
@@ -53,2 +53,6 @@ 'use strict' | ||
} else if (leftover === 0) { | ||
if (writeIndex === 0 && stream.buf.length === 0) { | ||
// we had a flushSync in the meanwhile | ||
return | ||
} | ||
stream.flush(() => { | ||
@@ -72,3 +76,3 @@ Atomics.store(stream._state, READ_INDEX, 0) | ||
this._data = Buffer.from(this._dataBuf) | ||
this._sync = opts.sync === undefined ? true : opts.sync | ||
this._sync = opts.sync || false | ||
this.worker = createWorker(this, opts) | ||
@@ -85,11 +89,3 @@ this.ready = false | ||
this.ready = true | ||
if (this.buf.length > 0) { | ||
const toWrite = this.buf | ||
this.buf = '' | ||
this.write(toWrite) | ||
} | ||
this.emit('ready') | ||
if (this.ending) { | ||
this.end() | ||
} | ||
break | ||
@@ -129,29 +125,6 @@ case 'FINISH': | ||
const cb = () => { | ||
if (!this.needDrain) { | ||
// process._rawDebug('emitting drain') | ||
this.needDrain = true | ||
process.nextTick(drain, this) | ||
} | ||
} | ||
if (this._sync) { | ||
while (data.length !== 0) { | ||
const writeIndex = Atomics.load(this._state, WRITE_INDEX) | ||
const leftover = this._data.length - writeIndex | ||
if (leftover === 0) { | ||
this.flushSync() | ||
Atomics.store(this._state, READ_INDEX, 0) | ||
Atomics.store(this._state, WRITE_INDEX, 0) | ||
continue | ||
} else if (leftover < 0) { | ||
throw new Error('overwritten') | ||
} | ||
this.buf += data | ||
this._writeSync() | ||
// TODO handle truncated utf-8 chunks | ||
const toWrite = data.slice(0, leftover) | ||
this._write(toWrite, cb) | ||
data = data.slice(leftover) | ||
} | ||
return true | ||
@@ -164,2 +137,3 @@ } | ||
// TODO implement highWaterMark | ||
return false | ||
@@ -169,7 +143,12 @@ } | ||
end () { | ||
this.ending = true | ||
if (!this.ready) { | ||
this.once('ready', this.end.bind(this)) | ||
return | ||
} | ||
if (this.ending) { | ||
return | ||
} | ||
this.ending = true | ||
if (this.flushing) { | ||
@@ -182,4 +161,2 @@ this.once('drain', this.end.bind(this)) | ||
// process._rawDebug('end...!') | ||
// process._rawDebug('writing index') | ||
@@ -189,2 +166,3 @@ Atomics.store(this._state, WRITE_INDEX, -1) | ||
Atomics.notify(this._state, WRITE_INDEX) | ||
// process._rawDebug('end finished...') | ||
} | ||
@@ -209,3 +187,38 @@ | ||
_writeSync () { | ||
const cb = () => { | ||
if (!this.needDrain) { | ||
// process._rawDebug('emitting drain') | ||
this.needDrain = true | ||
process.nextTick(drain, this) | ||
} | ||
} | ||
this.flushing = false | ||
while (this.buf.length !== 0) { | ||
const writeIndex = Atomics.load(this._state, WRITE_INDEX) | ||
const leftover = this._data.length - writeIndex | ||
if (leftover === 0) { | ||
this._flushSync() | ||
Atomics.store(this._state, READ_INDEX, 0) | ||
Atomics.store(this._state, WRITE_INDEX, 0) | ||
continue | ||
} else if (leftover < 0) { | ||
throw new Error('overwritten') | ||
} | ||
// TODO handle truncated utf-8 chunks | ||
const toWrite = this.buf.slice(0, leftover) | ||
this.buf = this.buf.slice(leftover) | ||
// process._rawDebug('writing ' + toWrite.length) | ||
this._write(toWrite, cb) | ||
} | ||
} | ||
flushSync () { | ||
this._writeSync() | ||
this._flushSync() | ||
} | ||
_flushSync () { | ||
if (this.flushing) { | ||
@@ -215,2 +228,4 @@ throw new Error('unable to flush while flushing') | ||
// process._rawDebug('flushSync started') | ||
const writeIndex = Atomics.load(this._state, WRITE_INDEX) | ||
@@ -229,2 +244,3 @@ | ||
} | ||
// process._rawDebug('flushSync finished') | ||
} | ||
@@ -231,0 +247,0 @@ } |
{ | ||
"name": "thread-stream", | ||
"version": "0.2.1", | ||
"version": "0.3.0", | ||
"description": "A streaming way to send data to a Node.js Worker Thread", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -21,9 +21,14 @@ # thread-stream | ||
filename: join(__dirname, 'worker.js'), | ||
workerData: { dest } | ||
workerData: { dest }, | ||
sync: false // default | ||
}) | ||
stream.write('hello') | ||
// Asynchronous flushing | ||
stream.flush(function () { | ||
stream.write(' ') | ||
stream.write('world') | ||
// Synchronous flushing | ||
stream.flushSync() | ||
@@ -30,0 +35,0 @@ stream.end() |
@@ -29,3 +29,3 @@ 'use strict' | ||
test('base', function (t) { | ||
test('base sync=true', function (t) { | ||
t.plan(7) | ||
@@ -36,3 +36,4 @@ | ||
filename: join(__dirname, 'to-file'), | ||
workerData: { dest } | ||
workerData: { dest }, | ||
sync: true | ||
}) | ||
@@ -72,3 +73,4 @@ | ||
filename: join(__dirname, 'to-file'), | ||
workerData: { dest } | ||
workerData: { dest }, | ||
sync: true | ||
}) | ||
@@ -159,3 +161,4 @@ | ||
filename: join(__dirname, 'to-file'), | ||
workerData: { dest } | ||
workerData: { dest }, | ||
sync: true | ||
}) | ||
@@ -223,1 +226,37 @@ | ||
}) | ||
test('flushSync sync=false', function (t) { | ||
const dest = file() | ||
const stream = new ThreadStream({ | ||
bufferSize: 128, | ||
filename: join(__dirname, 'to-file'), | ||
workerData: { dest }, | ||
sync: false | ||
}) | ||
stream.on('ready', () => { | ||
t.pass('ready emitted') | ||
for (let count = 0; count < 20; count++) { | ||
stream.write('aaaaaaaaaa') | ||
} | ||
stream.flushSync() | ||
}) | ||
stream.on('drain', () => { | ||
t.pass('drain') | ||
stream.end() | ||
}) | ||
stream.on('finish', () => { | ||
t.pass('finish emitted') | ||
}) | ||
stream.on('close', () => { | ||
readFile(dest, 'utf8', (err, data) => { | ||
t.error(err) | ||
t.equal(data.length, 200) | ||
t.end() | ||
}) | ||
}) | ||
}) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
19396
13
635
58
6