thread-stream
Advanced tools
Comparing version 0.11.2 to 0.12.0
210
index.js
@@ -13,2 +13,3 @@ 'use strict' | ||
const buffer = require('buffer') | ||
const assert = require('assert') | ||
@@ -71,4 +72,7 @@ // V8 limit for string size | ||
function drain (stream) { | ||
stream.needDrain = false | ||
stream.emit('drain') | ||
assert(!stream._sync) | ||
if (stream.needDrain) { | ||
stream.needDrain = false | ||
stream.emit('drain') | ||
} | ||
} | ||
@@ -83,7 +87,9 @@ | ||
stream.flushing = false | ||
if (!stream.needDrain) { | ||
// process._rawDebug('emitting drain') | ||
stream.needDrain = true | ||
if (stream.ending) { | ||
stream._end() | ||
} else if (stream.needDrain) { | ||
process.nextTick(drain, stream) | ||
} | ||
return | ||
@@ -146,25 +152,10 @@ } | ||
this.stream = new WeakRef(stream) | ||
if (stream._sync) { | ||
stream.ready = true | ||
stream.flushSync() | ||
stream.flush(() => { | ||
this.ready = true | ||
stream.emit('ready') | ||
} else { | ||
stream.once('drain', function () { | ||
stream.flush(() => { | ||
stream.ready = true | ||
stream.emit('ready') | ||
}) | ||
}) | ||
nextFlush(stream) | ||
} | ||
}) | ||
break | ||
case 'ERROR': | ||
stream.closed = true | ||
stream.worker.exited = true | ||
// TODO only remove our own | ||
stream.worker.removeAllListeners('exit') | ||
stream.worker.terminate().then(null, () => {}) | ||
process.nextTick(() => { | ||
stream.emit('error', msg.err) | ||
}) | ||
stream._destroy(msg.err) | ||
break | ||
@@ -183,10 +174,5 @@ default: | ||
registry.unregister(stream) | ||
stream.closed = true | ||
stream.worker.exited = true | ||
setImmediate(function () { | ||
if (code !== 0) { | ||
stream.emit('error', new Error('The worker thread exited')) | ||
} | ||
stream.emit('close') | ||
}) | ||
stream.worker.off('exit', onWorkerExit) | ||
stream._destroy(code !== 0 ? new Error('The worker thread exited') : null) | ||
} | ||
@@ -208,6 +194,8 @@ | ||
this.worker = createWorker(this, opts) | ||
this.ready = false | ||
this.ending = false | ||
this.ended = false | ||
this.needDrain = false | ||
this.closed = false | ||
this.destroyed = false | ||
this.flushing = false | ||
this.ready = false | ||
@@ -217,2 +205,25 @@ this.buf = '' | ||
_destroy (err) { | ||
if (this.destroyed) { | ||
return | ||
} | ||
this.destroyed = true | ||
if (err) { | ||
this.emit('error', err) | ||
} | ||
if (!this.worker.exited) { | ||
this.worker.terminate() | ||
.catch(() => {}) | ||
.then(() => { | ||
this.emit('close') | ||
}) | ||
} else { | ||
setImmediate(() => { | ||
this.emit('close') | ||
}) | ||
} | ||
} | ||
_write (data, cb) { | ||
@@ -229,81 +240,89 @@ // data is smaller than the shared buffer length | ||
_hasSpace () { | ||
const current = Atomics.load(this._state, WRITE_INDEX) | ||
return this._data.length - this.buf.length - current > 0 | ||
} | ||
write (data) { | ||
if (this.closed) { | ||
if (this.destroyed) { | ||
throw new Error('the worker has exited') | ||
} | ||
if (this.flushing && this.buf.length + data.length >= MAX_STRING) { | ||
// process._rawDebug('write: flushing') | ||
this._writeSync() | ||
this.flushing = true // we are still flushing | ||
if (this.ending) { | ||
throw new Error('the worker is ending') | ||
} | ||
if (!this.ready || this.flushing) { | ||
this.buf += data | ||
return this._hasSpace() | ||
if (this.flushing && this.buf.length + data.length >= MAX_STRING) { | ||
try { | ||
this._writeSync() | ||
this.flushing = true | ||
} catch (err) { | ||
this._destroy(err) | ||
return false | ||
} | ||
} | ||
this.buf += data | ||
if (this._sync) { | ||
this.buf += data | ||
this._writeSync() | ||
try { | ||
this._writeSync() | ||
return true | ||
} catch (err) { | ||
this._destroy(err) | ||
return false | ||
} | ||
} | ||
return true | ||
if (!this.flushing) { | ||
this.flushing = true | ||
setImmediate(nextFlush, this) | ||
} | ||
this.buf = data | ||
this.flushing = true | ||
setImmediate(nextFlush, this) | ||
return this._hasSpace() | ||
this.needDrain = this._data.length - this.buf.length - Atomics.load(this._state, WRITE_INDEX) <= 0 | ||
return !this.needDrain | ||
} | ||
end () { | ||
if (this.closed) { | ||
if (this.destroyed) { | ||
throw new Error('the worker has exited') | ||
} | ||
if (!this.ready) { | ||
this.once('ready', this.end.bind(this)) | ||
return | ||
} | ||
this.ending = true | ||
this._end() | ||
} | ||
if (this.flushing) { | ||
this.once('drain', this.end.bind(this)) | ||
_end () { | ||
if (this.ended || !this.ending || this.flushing) { | ||
return | ||
} | ||
this.ended = true | ||
if (this.ending) { | ||
return | ||
} | ||
this.ending = true | ||
try { | ||
this.flushSync() | ||
this.flushSync() | ||
let readIndex = Atomics.load(this._state, READ_INDEX) | ||
let read = Atomics.load(this._state, READ_INDEX) | ||
// process._rawDebug('writing index') | ||
Atomics.store(this._state, WRITE_INDEX, -1) | ||
// process._rawDebug(`(end) readIndex (${Atomics.load(this._state, READ_INDEX)}) writeIndex (${Atomics.load(this._state, WRITE_INDEX)})`) | ||
Atomics.notify(this._state, WRITE_INDEX) | ||
// process._rawDebug('writing index') | ||
Atomics.store(this._state, WRITE_INDEX, -1) | ||
// process._rawDebug(`(end) readIndex (${Atomics.load(this._state, READ_INDEX)}) writeIndex (${Atomics.load(this._state, WRITE_INDEX)})`) | ||
Atomics.notify(this._state, WRITE_INDEX) | ||
// Wait for the process to complete | ||
let spins = 0 | ||
while (readIndex !== -1) { | ||
// process._rawDebug(`read = ${read}`) | ||
Atomics.wait(this._state, READ_INDEX, readIndex, 1000) | ||
readIndex = Atomics.load(this._state, READ_INDEX) | ||
// Wait for the process to complete | ||
let spins = 0 | ||
while (read !== -1) { | ||
// process._rawDebug(`read = ${read}`) | ||
Atomics.wait(this._state, READ_INDEX, read, 1000) | ||
read = Atomics.load(this._state, READ_INDEX) | ||
if (readIndex === -2) { | ||
throw new Error('end() failed') | ||
} | ||
if (++spins === 10) { | ||
throw new Error('end() took too long (10s)') | ||
if (++spins === 10) { | ||
throw new Error('end() took too long (10s)') | ||
} | ||
} | ||
process.nextTick(() => { | ||
this.emit('finish') | ||
}) | ||
} catch (err) { | ||
this._destroy(err) | ||
} | ||
process.nextTick(() => { | ||
this.emit('finish') | ||
}) | ||
// process._rawDebug('end finished...') | ||
@@ -313,3 +332,3 @@ } | ||
flush (cb) { | ||
if (this.closed) { | ||
if (this.destroyed) { | ||
throw new Error('the worker has exited') | ||
@@ -323,4 +342,4 @@ } | ||
if (err) { | ||
this.emit('error', err) | ||
cb(err) | ||
this._destroy(err) | ||
process.nextTick(cb, err) | ||
return | ||
@@ -333,3 +352,3 @@ } | ||
} | ||
cb() | ||
process.nextTick(cb) | ||
}) | ||
@@ -340,5 +359,5 @@ } | ||
const cb = () => { | ||
if (!this.needDrain) { | ||
// process._rawDebug('emitting drain') | ||
this.needDrain = true | ||
if (this.ending) { | ||
this._end() | ||
} else if (this.needDrain) { | ||
process.nextTick(drain, this) | ||
@@ -389,3 +408,3 @@ } | ||
flushSync () { | ||
if (this.closed) { | ||
if (this.destroyed) { | ||
throw new Error('the worker has exited') | ||
@@ -412,2 +431,7 @@ } | ||
const readIndex = Atomics.load(this._state, READ_INDEX) | ||
if (readIndex === -2) { | ||
throw new Error('_flushSync failed') | ||
} | ||
// process._rawDebug(`(flushSync) readIndex (${readIndex}) writeIndex (${writeIndex})`) | ||
@@ -437,3 +461,3 @@ if (readIndex !== writeIndex) { | ||
get writable () { | ||
return !this.closed | ||
return !this.destroyed && !this.ending | ||
} | ||
@@ -440,0 +464,0 @@ } |
@@ -40,2 +40,8 @@ 'use strict' | ||
destination.on('error', function (err) { | ||
Atomics.store(state, WRITE_INDEX, -2) | ||
Atomics.notify(state, WRITE_INDEX) | ||
Atomics.store(state, READ_INDEX, -2) | ||
Atomics.notify(state, READ_INDEX) | ||
parentPort.postMessage({ | ||
@@ -42,0 +48,0 @@ code: 'ERROR', |
{ | ||
"name": "thread-stream", | ||
"version": "0.11.2", | ||
"version": "0.12.0", | ||
"description": "A streaming way to send data to a Node.js Worker Thread", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -12,3 +12,3 @@ 'use strict' | ||
test('base sync=true', function (t) { | ||
t.plan(9) | ||
t.plan(7) | ||
@@ -22,16 +22,2 @@ const dest = file() | ||
stream.on('drain', () => { | ||
t.pass('drain') | ||
}) | ||
stream.on('ready', () => { | ||
t.pass('ready emitted') | ||
}) | ||
t.ok(stream.write('hello world\n')) | ||
t.ok(stream.write('something else\n')) | ||
t.ok(stream.writable) | ||
stream.end() | ||
stream.on('finish', () => { | ||
@@ -48,6 +34,12 @@ readFile(dest, 'utf8', (err, data) => { | ||
}) | ||
t.ok(stream.write('hello world\n')) | ||
t.ok(stream.write('something else\n')) | ||
t.ok(stream.writable) | ||
stream.end() | ||
}) | ||
test('overflow sync=true', function (t) { | ||
t.plan(4) | ||
t.plan(3) | ||
@@ -62,7 +54,2 @@ const dest = file() | ||
stream.on('ready', () => { | ||
t.pass('ready emitted') | ||
write() | ||
}) | ||
let count = 0 | ||
@@ -82,2 +69,4 @@ | ||
write() | ||
stream.on('finish', () => { | ||
@@ -104,7 +93,2 @@ t.pass('finish emitted') | ||
stream.on('ready', () => { | ||
t.pass('ready emitted') | ||
write() | ||
}) | ||
let count = 0 | ||
@@ -125,2 +109,4 @@ | ||
write() | ||
stream.on('drain', () => { | ||
@@ -144,3 +130,3 @@ t.pass('drain') | ||
test('over the bufferSize at startup', function (t) { | ||
t.plan(8) | ||
t.plan(6) | ||
@@ -155,16 +141,2 @@ const dest = file() | ||
stream.on('drain', () => { | ||
t.pass('drain') | ||
}) | ||
stream.on('ready', () => { | ||
t.pass('ready emitted') | ||
}) | ||
t.ok(stream.write('hello')) | ||
t.notOk(stream.write(' world\n')) | ||
t.notOk(stream.write('something else\n')) | ||
stream.end() | ||
stream.on('finish', () => { | ||
@@ -180,6 +152,12 @@ readFile(dest, 'utf8', (err, data) => { | ||
}) | ||
t.ok(stream.write('hello')) | ||
t.ok(stream.write(' world\n')) | ||
t.ok(stream.write('something else\n')) | ||
stream.end() | ||
}) | ||
test('over the bufferSize at startup (async)', function (t) { | ||
t.plan(8) | ||
t.plan(6) | ||
@@ -194,10 +172,2 @@ const dest = file() | ||
stream.on('drain', () => { | ||
t.pass('drain') | ||
}) | ||
stream.on('ready', () => { | ||
t.pass('ready emitted') | ||
}) | ||
t.ok(stream.write('hello')) | ||
@@ -230,11 +200,2 @@ t.notOk(stream.write(' world\n')) | ||
stream.on('ready', () => { | ||
t.pass('ready emitted') | ||
for (let count = 0; count < 20; count++) { | ||
stream.write('aaaaaaaaaa') | ||
} | ||
stream.flushSync() | ||
}) | ||
stream.on('drain', () => { | ||
@@ -256,48 +217,9 @@ t.pass('drain') | ||
}) | ||
}) | ||
test('flushSync on ready if sync=true', function (t) { | ||
t.plan(4) | ||
const dest = file() | ||
const stream = new ThreadStream({ | ||
filename: join(__dirname, 'to-file.js'), | ||
workerData: { dest }, | ||
sync: true | ||
}) | ||
stream.on('ready', () => { | ||
readFile(dest, 'utf8', (err, data) => { | ||
t.error(err) | ||
t.equal(data, 'hello world\nsomething else\n') | ||
stream.end() | ||
}) | ||
}) | ||
t.ok(stream.write('hello world\n')) | ||
t.ok(stream.write('something else\n')) | ||
for (let count = 0; count < 20; count++) { | ||
stream.write('aaaaaaaaaa') | ||
} | ||
stream.flushSync() | ||
}) | ||
test('flush on ready if sync=false', function (t) { | ||
t.plan(4) | ||
const dest = file() | ||
const stream = new ThreadStream({ | ||
filename: join(__dirname, 'to-file.js'), | ||
workerData: { dest }, | ||
sync: false | ||
}) | ||
stream.on('ready', () => { | ||
readFile(dest, 'utf8', (err, data) => { | ||
t.error(err) | ||
t.equal(data, 'hello world\nsomething else\n') | ||
stream.end() | ||
}) | ||
}) | ||
t.ok(stream.write('hello world\n')) | ||
t.ok(stream.write('something else\n')) | ||
}) | ||
test('pass down MessagePorts', async function (t) { | ||
@@ -304,0 +226,0 @@ t.plan(3) |
@@ -18,12 +18,10 @@ 'use strict' | ||
stream.on('ready', function () { | ||
stream.write('hello') | ||
stream.write(' ') | ||
stream.write('world\n') | ||
stream.flushSync() | ||
stream.unref() | ||
stream.write('hello') | ||
stream.write(' ') | ||
stream.write('world\n') | ||
stream.flushSync() | ||
stream.unref() | ||
// the stream object goes out of scope here | ||
setImmediate(gc) // eslint-disable-line | ||
}) | ||
// the stream object goes out of scope here | ||
setImmediate(gc) // eslint-disable-line | ||
} | ||
@@ -30,0 +28,0 @@ |
@@ -9,3 +9,3 @@ 'use strict' | ||
test('yarn module resolution', { skip: !isYarnPnp }, t => { | ||
t.plan(5) | ||
t.plan(4) | ||
@@ -21,5 +21,4 @@ const modulePath = require.resolve('pino-elasticsearch') | ||
stream.on('error', (err) => { | ||
stream.on('error', () => { | ||
t.pass('error emitted') | ||
t.equal(err.message, 'Missing node(s) option', 'module custom error') | ||
}) | ||
@@ -29,4 +28,3 @@ | ||
t.ok(stream.writable) | ||
stream.end() | ||
}) |
@@ -12,8 +12,6 @@ 'use strict' | ||
stream.on('ready', function () { | ||
stream.write('hello') | ||
stream.write(' ') | ||
stream.write('world\n') | ||
stream.flushSync() | ||
stream.unref() | ||
}) | ||
stream.write('hello') | ||
stream.write(' ') | ||
stream.write('world\n') | ||
stream.flushSync() | ||
stream.unref() |
@@ -10,3 +10,3 @@ 'use strict' | ||
test('destroy support', function (t) { | ||
t.plan(9) | ||
t.plan(7) | ||
@@ -20,29 +20,21 @@ const dest = file() | ||
stream.on('drain', () => { | ||
t.pass('drain') | ||
stream.on('close', () => { | ||
t.notOk(stream.writable) | ||
t.pass('close emitted') | ||
}) | ||
stream.on('ready', () => { | ||
t.pass('ready emitted') | ||
t.ok(stream.write('hello world\n')) | ||
t.ok(stream.write('something else\n')) | ||
t.ok(stream.writable) | ||
t.ok(stream.write('hello world\n')) | ||
t.ok(stream.write('something else\n')) | ||
t.ok(stream.writable) | ||
stream.end() | ||
stream.end() | ||
readFile(dest, 'utf8', (err, data) => { | ||
t.error(err) | ||
t.equal(data, 'hello world\nsomething else\n') | ||
}) | ||
readFile(dest, 'utf8', (err, data) => { | ||
t.error(err) | ||
t.equal(data, 'hello world\nsomething else\n') | ||
}) | ||
stream.on('close', () => { | ||
t.notOk(stream.writable) | ||
t.pass('close emitted') | ||
}) | ||
}) | ||
test('synchronous _final support', function (t) { | ||
t.plan(9) | ||
t.plan(7) | ||
@@ -56,25 +48,17 @@ const dest = file() | ||
stream.on('drain', () => { | ||
t.pass('drain') | ||
stream.on('close', () => { | ||
t.notOk(stream.writable) | ||
t.pass('close emitted') | ||
}) | ||
stream.on('ready', () => { | ||
t.pass('ready emitted') | ||
t.ok(stream.write('hello world\n')) | ||
t.ok(stream.write('something else\n')) | ||
t.ok(stream.writable) | ||
t.ok(stream.write('hello world\n')) | ||
t.ok(stream.write('something else\n')) | ||
t.ok(stream.writable) | ||
stream.end() | ||
stream.end() | ||
readFile(dest, 'utf8', (err, data) => { | ||
t.error(err) | ||
t.equal(data, 'hello world\nsomething else\n') | ||
}) | ||
readFile(dest, 'utf8', (err, data) => { | ||
t.error(err) | ||
t.equal(data, 'hello world\nsomething else\n') | ||
}) | ||
stream.on('close', () => { | ||
t.notOk(stream.writable) | ||
t.pass('close emitted') | ||
}) | ||
}) |
@@ -20,16 +20,2 @@ 'use strict' | ||
stream.on('ready', () => { | ||
t.pass('ready emitted') | ||
const buf = Buffer.alloc(1024).fill('x').toString() // 1 KB | ||
// This writes 1 GB of data | ||
for (let i = 0; i < 1024 * 1024; i++) { | ||
length += buf.length | ||
stream.write(buf) | ||
} | ||
stream.end() | ||
}) | ||
stream.on('close', () => { | ||
@@ -42,1 +28,11 @@ stat(dest, (err, f) => { | ||
}) | ||
const buf = Buffer.alloc(1024).fill('x').toString() // 1 KB | ||
// This writes 1 GB of data | ||
for (let i = 0; i < 1024 * 1024; i++) { | ||
length += buf.length | ||
stream.write(buf) | ||
} | ||
stream.end() |
@@ -28,3 +28,3 @@ 'use strict' | ||
stream.on('ready', function () { | ||
stream.on('ready', () => { | ||
stream.write('hello world\n') | ||
@@ -57,3 +57,3 @@ }) | ||
stream.on('ready', function () { | ||
stream.on('ready', () => { | ||
stream.write('hello world\n') | ||
@@ -86,3 +86,3 @@ }) | ||
stream.on('ready', function () { | ||
stream.on('ready', () => { | ||
stream.write('hello world\n') | ||
@@ -115,3 +115,3 @@ }) | ||
stream.on('ready', function () { | ||
stream.on('ready', () => { | ||
stream.write('hello world\n') | ||
@@ -144,3 +144,3 @@ }) | ||
stream.on('ready', function () { | ||
stream.on('ready', () => { | ||
stream.write('hello world\n') | ||
@@ -147,0 +147,0 @@ }) |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
44142
1364