stream-to-it
Advanced tools
+12
-11
| { | ||
| "name": "stream-to-it", | ||
| "version": "0.2.0", | ||
| "version": "0.2.1", | ||
| "description": "Convert Node.js streams to streaming iterables", | ||
| "main": "index.js", | ||
| "files": [ | ||
| "duplex.js", | ||
| "sink.js", | ||
| "source.js", | ||
| "transform.js" | ||
| ], | ||
| "scripts": { | ||
@@ -21,18 +27,13 @@ "test": "ava test/*.test.js --verbose", | ||
| "dependencies": { | ||
| "get-iterator": "^1.0.2", | ||
| "p-defer": "^3.0.0" | ||
| "get-iterator": "^1.0.2" | ||
| }, | ||
| "devDependencies": { | ||
| "ava": "^2.2.0", | ||
| "ava": "^3.10.1", | ||
| "bl": "^4.0.0", | ||
| "it-pipe": "^1.0.1", | ||
| "nyc": "^14.1.1", | ||
| "it-pipe": "^1.1.0", | ||
| "nyc": "^15.1.0", | ||
| "p-fifo": "^1.0.0", | ||
| "standard": "^14.3.1", | ||
| "streaming-iterables": "^4.1.0" | ||
| "streaming-iterables": "^5.0.2" | ||
| }, | ||
| "ava": { | ||
| "babel": false, | ||
| "compileEnhancements": false | ||
| }, | ||
| "directories": { | ||
@@ -39,0 +40,0 @@ "test": "test" |
+4
-4
@@ -33,3 +33,3 @@ # stream-to-it | ||
| Also works with browser [ReadableStream](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream): | ||
| Also works with browser [`ReadableStream`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream): | ||
@@ -84,7 +84,7 @@ ```js | ||
| ### `toIterable.source(stream): Function` | ||
| ### `toIterable.source(readable): Function` | ||
| Convert a [`Readable`](https://nodejs.org/dist/latest/docs/api/stream.html#stream_readable_streams) stream to a [source](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#source-it) iterable. | ||
| Convert a [`Readable`](https://nodejs.org/dist/latest/docs/api/stream.html#stream_readable_streams) stream or a browser [`ReadableStream`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream) to a [source](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#source-it) iterable. | ||
| ### `toIterable.sink(sink): Function` | ||
| ### `toIterable.sink(writable): Function` | ||
@@ -91,0 +91,0 @@ Convert a [`Writable`](https://nodejs.org/dist/latest/docs/api/stream.html#stream_writable_streams) stream to a [sink](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#sink-it) iterable. |
+71
-88
| const getIterator = require('get-iterator') | ||
| const defer = require('p-defer') | ||
@@ -7,107 +6,91 @@ module.exports = writable => async source => { | ||
| const errPromise = defer() | ||
| const closePromise = defer() | ||
| const endingPromise = defer() | ||
| const finishPromise = defer() | ||
| let drainPromise | ||
| const endSource = (source) => { | ||
| if (typeof source.return === 'function') source.return() | ||
| } | ||
| const errorHandler = err => errPromise.reject(err) | ||
| const closeHandler = () => closePromise.resolve({ closed: true }) | ||
| const finishHandler = () => finishPromise.resolve({ finished: true }) | ||
| const drainHandler = () => { | ||
| if (drainPromise) drainPromise.resolve({ drained: true }) | ||
| let error = null | ||
| let errCb = null | ||
| const errorHandler = (err) => { | ||
| error = err | ||
| if (errCb) errCb(err) | ||
| // When the writable errors, end the source to exit iteration early | ||
| endSource(source) | ||
| } | ||
| // There's no event to determine the start of a call to .end() | ||
| const _end = writable.end.bind(writable) | ||
| writable.end = (...args) => { | ||
| endingPromise.resolve({ ending: true }) | ||
| return _end(...args) | ||
| let closeCb = null | ||
| let closed = false | ||
| const closeHandler = () => { | ||
| closed = true | ||
| if (closeCb) closeCb() | ||
| } | ||
| writable | ||
| .on('error', errorHandler) | ||
| .on('close', closeHandler) | ||
| .on('finish', finishHandler) | ||
| .on('drain', drainHandler) | ||
| let finishCb = null | ||
| let finished = false | ||
| const finishHandler = () => { | ||
| finished = true | ||
| if (finishCb) finishCb() | ||
| } | ||
| const getNext = async () => { | ||
| try { | ||
| return source.next() | ||
| } catch (err) { | ||
| writable.destroy(err) | ||
| return errPromise.promise | ||
| } | ||
| let drainCb = null | ||
| const drainHandler = () => { | ||
| if (drainCb) drainCb() | ||
| } | ||
| try { | ||
| while (true) { | ||
| // Race the iterator and the error, close and finish listener | ||
| const result = await Promise.race([ | ||
| errPromise.promise, | ||
| closePromise.promise, | ||
| endingPromise.promise, | ||
| finishPromise.promise, | ||
| getNext() | ||
| ]) | ||
| const waitForDrainOrClose = () => { | ||
| return new Promise((resolve, reject) => { | ||
| closeCb = drainCb = resolve | ||
| errCb = reject | ||
| writable.once('drain', drainHandler) | ||
| }) | ||
| } | ||
| if (result.closed || result.finished) { | ||
| break | ||
| } | ||
| const waitForDone = () => { | ||
| // Immediately end the source | ||
| endSource(source) | ||
| return new Promise((resolve, reject) => { | ||
| if (closed || finished) return resolve() | ||
| finishCb = closeCb = resolve | ||
| errCb = reject | ||
| }) | ||
| } | ||
| // .end() was called, waiting on flush (finish event) | ||
| if (result.ending) { | ||
| await Promise.race([ | ||
| errPromise.promise, | ||
| // TODO: do we need to wait on close? If slow end and destroy is | ||
| // called then what is emitted? close or finish? | ||
| closePromise.promise, | ||
| finishPromise.promise | ||
| ]) | ||
| break | ||
| } | ||
| const cleanup = () => { | ||
| writable.removeListener('error', errorHandler) | ||
| writable.removeListener('close', closeHandler) | ||
| writable.removeListener('finish', finishHandler) | ||
| writable.removeListener('drain', drainHandler) | ||
| } | ||
| // If destroyed, race err & close to determine reason & then throw/break | ||
| if (writable.destroyed) { | ||
| await Promise.race([ | ||
| errPromise.promise, | ||
| closePromise.promise | ||
| ]) | ||
| break | ||
| } | ||
| writable.once('error', errorHandler) | ||
| writable.once('close', closeHandler) | ||
| writable.once('finish', finishHandler) | ||
| if (result.done) { | ||
| writable.end() | ||
| await Promise.race([ | ||
| errPromise.promise, | ||
| // TODO: do we need to wait on close? If slow end and destroy is | ||
| // called then what is emitted? close or finish? | ||
| closePromise.promise, | ||
| finishPromise.promise | ||
| ]) | ||
| break | ||
| } | ||
| try { | ||
| for await (const value of source) { | ||
| if (!writable.writable || writable.destroyed) break | ||
| if (!writable.write(result.value)) { | ||
| drainPromise = defer() | ||
| await Promise.race([ | ||
| errPromise.promise, | ||
| closePromise.promise, | ||
| finishPromise.promise, | ||
| drainPromise.promise | ||
| ]) | ||
| if (writable.write(value) === false) { | ||
| await waitForDrainOrClose() | ||
| } | ||
| } | ||
| } finally { | ||
| writable | ||
| .removeListener('error', errorHandler) | ||
| .removeListener('close', closeHandler) | ||
| .removeListener('finish', finishHandler) | ||
| .removeListener('drain', drainHandler) | ||
| } catch (err) { | ||
| // The writable did not error, give it the error | ||
| writable.destroy(err) | ||
| } | ||
| // End the iterator if it is a generator | ||
| if (typeof source.return === 'function') { | ||
| await source.return() | ||
| try { | ||
| // Everything is good and we're done writing, end everything | ||
| if (!error && writable.writable) { | ||
| writable.end() | ||
| } | ||
| // Wait until we close or finish. This supports halfClosed streams | ||
| await waitForDone() | ||
| } finally { | ||
| // Clean up listeners | ||
| cleanup() | ||
| } | ||
| // Notify the user an error occurred | ||
| if (error) throw error | ||
| } |
| language: node_js | ||
| node_js: | ||
| - "stable" |
| const test = require('ava') | ||
| const { Duplex } = require('stream') | ||
| const pipe = require('it-pipe') | ||
| const { collect } = require('streaming-iterables') | ||
| const Fifo = require('p-fifo') | ||
| const toIterable = require('../') | ||
| const { randomInt, randomBytes } = require('./helpers/random') | ||
| test('should convert to duplex iterable', async t => { | ||
| const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512)) | ||
| const fifo = new Fifo() | ||
| const output = await pipe( | ||
| input, | ||
| toIterable.duplex(new Duplex({ | ||
| objectMode: true, | ||
| write (chunk, enc, cb) { | ||
| fifo.push(chunk).then(cb) | ||
| }, | ||
| final (cb) { | ||
| fifo.push(null).then(cb) | ||
| }, | ||
| async read (size) { | ||
| while (true) { | ||
| const chunk = await fifo.shift() | ||
| if (!this.push(chunk)) break | ||
| } | ||
| } | ||
| })), | ||
| collect | ||
| ) | ||
| t.deepEqual(output, input) | ||
| }) |
| const Crypto = require('crypto') | ||
| // Maximum is exclusive and the minimum is inclusive | ||
| const randomInt = (min, max) => { | ||
| min = Math.ceil(min) | ||
| max = Math.floor(max) | ||
| return Math.floor(Math.random() * (max - min)) + min | ||
| } | ||
| exports.randomInt = randomInt | ||
| const randomBytes = (min, max) => Crypto.randomBytes(randomInt(min, max)) | ||
| exports.randomBytes = randomBytes |
| const test = require('ava') | ||
| const { Writable } = require('stream') | ||
| const toIterable = require('../') | ||
| const pipe = require('it-pipe') | ||
| const { randomInt, randomBytes } = require('./helpers/random') | ||
| test('should convert to sink iterable', async t => { | ||
| const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512)) | ||
| const output = [] | ||
| await pipe( | ||
| input, | ||
| toIterable.sink(new Writable({ | ||
| write (chunk, enc, cb) { | ||
| output.push(chunk) | ||
| cb() | ||
| } | ||
| })) | ||
| ) | ||
| t.deepEqual(output, input) | ||
| }) | ||
| test('should convert to sink iterable and call return on end', async t => { | ||
| const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512)) | ||
| let i = 0 | ||
| let returnCalled = false | ||
| const output = [] | ||
| await pipe( | ||
| { | ||
| [Symbol.iterator] () { | ||
| return this | ||
| }, | ||
| next () { | ||
| const value = input[i++] | ||
| return { done: !value, value } | ||
| }, | ||
| return () { | ||
| returnCalled = true | ||
| } | ||
| }, | ||
| toIterable.sink(new Writable({ | ||
| write (chunk, enc, cb) { | ||
| output.push(chunk) | ||
| cb() | ||
| } | ||
| })) | ||
| ) | ||
| t.is(returnCalled, true) | ||
| t.deepEqual(output, input) | ||
| }) | ||
| test('should end mid stream', async t => { | ||
| const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512)) | ||
| const output = [] | ||
| await pipe( | ||
| input, | ||
| toIterable.sink(new Writable({ | ||
| write (chunk, enc, cb) { | ||
| output.push(chunk) | ||
| cb() | ||
| this.end() | ||
| } | ||
| })) | ||
| ) | ||
| t.deepEqual(output, input.slice(0, 1)) | ||
| }) | ||
| test('should destroy mid stream', async t => { | ||
| const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512)) | ||
| const output = [] | ||
| await pipe( | ||
| input, | ||
| toIterable.sink(new Writable({ | ||
| write (chunk, enc, cb) { | ||
| output.push(chunk) | ||
| cb() | ||
| this.destroy() | ||
| } | ||
| })) | ||
| ) | ||
| t.deepEqual(output, input.slice(0, 1)) | ||
| }) | ||
| test('should destroy mid stream with error', async t => { | ||
| const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512)) | ||
| const output = [] | ||
| const err = await t.throwsAsync( | ||
| pipe( | ||
| input, | ||
| toIterable.sink(new Writable({ | ||
| write (chunk, enc, cb) { | ||
| output.push(chunk) | ||
| cb() | ||
| this.destroy(new Error('boom')) | ||
| } | ||
| })) | ||
| ) | ||
| ) | ||
| t.is(err.message, 'boom') | ||
| }) | ||
| test('should throw mid stream', async t => { | ||
| const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512)) | ||
| const output = [] | ||
| const err = await t.throwsAsync( | ||
| pipe( | ||
| input, | ||
| toIterable.sink(new Writable({ | ||
| write (chunk, enc, cb) { | ||
| output.push(chunk) | ||
| cb() | ||
| throw new Error('boom') | ||
| } | ||
| })) | ||
| ) | ||
| ) | ||
| t.is(err.message, 'boom') | ||
| }) | ||
| test('should destroy writable stream if source throws', async t => { | ||
| const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512)) | ||
| const output = [] | ||
| const source = { | ||
| [Symbol.iterator]: function * () { | ||
| yield * input[Symbol.iterator]() | ||
| throw new Error('boom') | ||
| } | ||
| } | ||
| const stream = new Writable({ | ||
| write (chunk, enc, cb) { | ||
| output.push(chunk) | ||
| cb() | ||
| } | ||
| }) | ||
| const err = await t.throwsAsync(pipe(source, toIterable.sink(stream))) | ||
| t.is(err.message, 'boom') | ||
| t.true(stream.destroyed) | ||
| }) |
| const test = require('ava') | ||
| const { Readable } = require('stream') | ||
| const toIterable = require('../') | ||
| const { collect } = require('streaming-iterables') | ||
| const { randomInt, randomBytes } = require('./helpers/random') | ||
| test('should convert to source iterable', async t => { | ||
| const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512)) | ||
| let i = 0 | ||
| const readable = new Readable({ | ||
| objectMode: true, | ||
| read () { | ||
| while (true) { | ||
| const data = input[i++] || null | ||
| if (!this.push(data)) break | ||
| } | ||
| } | ||
| }) | ||
| const output = await collect(toIterable.source(readable)) | ||
| t.deepEqual(input, output) | ||
| }) | ||
| test('should convert browser ReadableStream to source iterable', async t => { | ||
| const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512)) | ||
| let i = 0 | ||
| // Like a Response object you get from a call to fetch() in the browser | ||
| const response = { | ||
| body: { | ||
| getReader () { | ||
| return { | ||
| read () { | ||
| const value = input[i++] | ||
| return value == null ? { done: true } : { value } | ||
| }, | ||
| releaseLock () {} | ||
| } | ||
| } | ||
| } | ||
| } | ||
| const output = await collect(toIterable.source(response.body)) | ||
| t.deepEqual(input, output) | ||
| }) |
| const test = require('ava') | ||
| const { Transform } = require('stream') | ||
| const { collect } = require('streaming-iterables') | ||
| const pipe = require('it-pipe') | ||
| const bl = require('bl') | ||
| const toIterable = require('../') | ||
| const { randomInt, randomBytes } = require('./helpers/random') | ||
| test('should convert to transform iterable', async t => { | ||
| const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512)) | ||
| const suffix = Buffer.from(`${Date.now()}`) | ||
| const output = await pipe( | ||
| input, | ||
| // Transform every chunk to have a "suffix" | ||
| toIterable.transform(new Transform({ | ||
| transform (chunk, enc, cb) { | ||
| cb(null, Buffer.concat([chunk, suffix])) | ||
| } | ||
| })), | ||
| collect | ||
| ) | ||
| t.deepEqual( | ||
| bl(input.map(d => Buffer.concat([d, suffix]))).slice(), | ||
| bl(output).slice() | ||
| ) | ||
| }) | ||
| test('should transform single chunk into multiple chunks', async t => { | ||
| const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512)) | ||
| const separator = Buffer.from(`${Date.now()}`) | ||
| const output = await pipe( | ||
| input, | ||
| // Transform every chunk to have a "suffix" | ||
| toIterable.transform(new Transform({ | ||
| transform (chunk, enc, cb) { | ||
| this.push(chunk) | ||
| setTimeout(() => { | ||
| this.push(separator) | ||
| cb() | ||
| }) | ||
| } | ||
| })), | ||
| collect | ||
| ) | ||
| t.deepEqual( | ||
| bl(input.map(d => Buffer.concat([d, separator]))).slice(), | ||
| bl(output).slice() | ||
| ) | ||
| }) | ||
| test('should transform single chunk into no chunks', async t => { | ||
| const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512)) | ||
| const output = await pipe( | ||
| input, | ||
| toIterable.transform(new Transform({ | ||
| transform (chunk, enc, cb) { | ||
| cb() | ||
| } | ||
| })), | ||
| collect | ||
| ) | ||
| t.is(output.length, 0) | ||
| }) | ||
| test('should error the iterator when transform stream errors', async t => { | ||
| const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512)) | ||
| let i = 0 | ||
| const err = await t.throwsAsync( | ||
| pipe( | ||
| input, | ||
| toIterable.transform(new Transform({ | ||
| transform (chunk, enc, cb) { | ||
| i++ | ||
| if (i > 2) return cb(new Error('boom')) | ||
| cb(null, chunk) | ||
| } | ||
| })), | ||
| collect | ||
| ) | ||
| ) | ||
| t.is(err.message, 'boom') | ||
| }) |
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
Long strings
Supply chain riskContains long string literals, which may be a sign of obfuscated or packed code.
Found 1 instance in 1 package
1
-50%0
-100%9076
-49.03%8
-42.86%117
-72.47%- Removed
- Removed