Socket
Socket
Sign inDemoInstall

thread-stream

Package Overview
Dependencies
1
Maintainers
4
Versions
40
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 1.0.1 to 2.0.0

70

index.d.ts
import { EventEmitter } from 'events'
import workerThreads from 'worker_threads'
interface ThreadStreamOptions {
/**
* The size (in bytes) of the buffer.
* Must be greater than 4 (i.e. it must at least fit a 4-byte utf-8 char).
*
* Default: `4 * 1024 * 1024` = `4194304`
*/
bufferSize?: number,
/**
* The path to the Worker's main script or module.
* Must be either an absolute path or a relative path (i.e. relative to the current working directory) starting with ./ or ../, or a WHATWG URL object using file: or data: protocol.
* When using a data: URL, the data is interpreted based on MIME type using the ECMAScript module loader.
*
* {@link workerThreads.Worker()}
*/
filename: string | URL,
/**
* If `true`, write data synchronously; otherwise write data asynchronously.
*
* Default: `false`.
*/
sync?: boolean,
/**
* {@link workerThreads.WorkerOptions.workerData}
*
* Default: `{}`
*/
workerData?: any,
/**
* {@link workerThreads.WorkerOptions}
*
* Default: `{}`
*/
workerOpts?: workerThreads.WorkerOptions
}
declare class ThreadStream extends EventEmitter {
constructor(opts: {})
write (data: string): boolean
end (): void
/**
* @param {ThreadStreamOptions} opts
*/
constructor(opts: ThreadStreamOptions)
/**
* Write some data to the stream.
*
* **Please note that this method should not throw an {Error} if something goes wrong but emit an error event.**
* @param {string} data data to write.
* @returns {boolean} false if the stream wishes for the calling code to wait for the 'drain' event to be emitted before continuing to write additional data or if it fails to write; otherwise true.
*/
write(data: string): boolean
/**
* Signal that no more data will be written.
*
* **Please note that this method should not throw an {Error} if something goes wrong but emit an error event.**
*
* Calling the {@link write()} method after calling {@link end()} will emit an error.
*/
end(): void
/**
* Flush the stream synchronously.
* This method should be called in the shutdown phase to make sure that all data has been flushed.
*
* **Please note that this method will throw an {Error} if something goes wrong.**
*
* @throws {Error} if the stream is already flushing, if it fails to flush or if it takes more than 10 seconds to flush.
*/
flushSync(): void
}
export = ThreadStream;

28

index.js

@@ -139,3 +139,3 @@ 'use strict'

// This should never happen
throw new Error('overwritten')
destroy(stream, new Error('overwritten'))
}

@@ -168,3 +168,3 @@ }

default:
throw new Error('this should not happen: ' + msg.code)
destroy(stream, new Error('this should not happen: ' + msg.code))
}

@@ -182,3 +182,3 @@ }

stream.worker.off('exit', onWorkerExit)
destroy(stream, code !== 0 ? new Error('The worker thread exited') : null)
destroy(stream, code !== 0 ? new Error('the worker thread exited') : null)
}

@@ -217,7 +217,9 @@

if (this[kImpl].destroyed) {
throw new Error('the worker has exited')
error(this, new Error('the worker has exited'))
return false
}
if (this[kImpl].ending) {
throw new Error('the worker is ending')
error(this, new Error('the worker is ending'))
return false
}

@@ -345,2 +347,8 @@

function error (stream, err) {
setImmediate(() => {
stream.emit('error', err)
})
}
function destroy (stream, err) {

@@ -354,3 +362,3 @@ if (stream[kImpl].destroyed) {

stream[kImpl].errored = err
stream.emit('error', err)
error(stream, err)
}

@@ -408,7 +416,9 @@

if (readIndex === -2) {
throw new Error('end() failed')
destroy(stream, new Error('end() failed'))
return
}
if (++spins === 10) {
throw new Error('end() took too long (10s)')
destroy(stream, new Error('end() took too long (10s)'))
return
}

@@ -492,3 +502,3 @@ }

if (readIndex === -2) {
throw new Error('_flushSync failed')
throw Error('_flushSync failed')
}

@@ -495,0 +505,0 @@

{
"name": "thread-stream",
"version": "1.0.1",
"version": "2.0.0",
"description": "A streaming way to send data to a Node.js Worker Thread",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -266,3 +266,3 @@ 'use strict'

stream.on('error', (err) => {
t.equal(err.message, 'The worker thread exited')
t.equal(err.message, 'the worker thread exited')
stream.flush((err) => {

@@ -269,0 +269,0 @@ t.equal(err.message, 'the worker has exited')

'use strict'
const t = require('tap')
if (process.env.CI) {
t.skip('skip on CI')
process.exit(0)
}
const { join } = require('path')

@@ -5,0 +11,0 @@ const { file } = require('./helper')

'use strict'
const t = require('tap')
if (process.env.CI) {
t.skip('skip on CI')
process.exit(0)
}
const { join } = require('path')

@@ -5,0 +11,0 @@ const { file } = require('./helper')

@@ -32,48 +32,14 @@ 'use strict'

const [err] = await once(stream, 'error')
t.equal(err.message, 'The worker thread exited')
let [err] = await once(stream, 'error')
t.equal(err.message, 'the worker thread exited')
try {
stream.write('noop')
t.fail('unreacheable')
} catch (err) {
t.equal(err.message, 'the worker has exited')
}
stream.write('noop');
[err] = await once(stream, 'error')
t.equal(err.message, 'the worker has exited')
try {
stream.write('noop')
t.fail('unreacheable')
} catch (err) {
t.equal(err.message, 'the worker has exited')
}
stream.write('noop');
[err] = await once(stream, 'error')
t.equal(err.message, 'the worker has exited')
})
test('emit error if thread exits', async function (t) {
const stream = new ThreadStream({
filename: join(__dirname, 'exit.js'),
sync: true
})
stream.on('ready', () => {
stream.write('hello world\n')
})
const [err] = await once(stream, 'error')
t.equal(err.message, 'The worker thread exited')
try {
stream.write('noop')
t.fail('unreacheable')
} catch (err) {
t.equal(err.message, 'the worker has exited')
}
try {
stream.write('noop')
t.fail('unreacheable')
} catch (err) {
t.equal(err.message, 'the worker has exited')
}
})
test('emit error if thread have unhandledRejection', async function (t) {

@@ -89,18 +55,12 @@ const stream = new ThreadStream({

const [err] = await once(stream, 'error')
let [err] = await once(stream, 'error')
t.equal(err.message, 'kaboom')
try {
stream.write('noop')
t.fail('unreacheable')
} catch (err) {
t.equal(err.message, 'the worker has exited')
}
stream.write('noop');
[err] = await once(stream, 'error')
t.equal(err.message, 'the worker has exited')
try {
stream.write('noop')
t.fail('unreacheable')
} catch (err) {
t.equal(err.message, 'the worker has exited')
}
stream.write('noop');
[err] = await once(stream, 'error')
t.equal(err.message, 'the worker has exited')
})

@@ -118,18 +78,12 @@

const [err] = await once(stream, 'error')
let [err] = await once(stream, 'error')
t.equal(err.message, 'kaboom')
try {
stream.write('noop')
t.fail('unreacheable')
} catch (err) {
t.equal(err.message, 'the worker has exited')
}
stream.write('noop');
[err] = await once(stream, 'error')
t.equal(err.message, 'the worker has exited')
try {
stream.write('noop')
t.fail('unreacheable')
} catch (err) {
t.equal(err.message, 'the worker has exited')
}
stream.write('noop');
[err] = await once(stream, 'error')
t.equal(err.message, 'the worker has exited')
})

@@ -147,18 +101,12 @@

const [err] = await once(stream, 'error')
let [err] = await once(stream, 'error')
t.equal(err.message, 'kaboom')
try {
stream.write('noop')
t.fail('unreacheable')
} catch (err) {
t.equal(err.message, 'the worker has exited')
}
stream.write('noop');
[err] = await once(stream, 'error')
t.equal(err.message, 'the worker has exited')
try {
stream.write('noop')
t.fail('unreacheable')
} catch (err) {
t.equal(err.message, 'the worker has exited')
}
stream.write('noop');
[err] = await once(stream, 'error')
t.equal(err.message, 'the worker has exited')
})

@@ -165,0 +113,0 @@

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc