Comparing version 2.7.2 to 2.8.0
21
index.js
@@ -520,2 +520,5 @@ const { EventEmitter } = require('events') | ||
if (opts.predestroy) this._predestroy = opts.predestroy | ||
if (opts.signal) { | ||
opts.signal.addEventListener('abort', abort.bind(this)) | ||
} | ||
} | ||
@@ -637,6 +640,7 @@ } | ||
static _fromAsyncIterator (ite) { | ||
static _fromAsyncIterator (ite, opts) { | ||
let destroy | ||
const rs = new Readable({ | ||
...opts, | ||
read (cb) { | ||
@@ -661,4 +665,4 @@ ite.next().then(push).then(cb.bind(null, null)).catch(cb) | ||
static from (data) { | ||
if (data[asyncIterator]) return this._fromAsyncIterator(data[asyncIterator]()) | ||
static from (data, opts) { | ||
if (data[asyncIterator]) return this._fromAsyncIterator(data[asyncIterator](), opts) | ||
if (!Array.isArray(data)) data = data === undefined ? [] : [data] | ||
@@ -668,2 +672,3 @@ | ||
return new Readable({ | ||
...opts, | ||
read (cb) { | ||
@@ -881,4 +886,8 @@ this.push(i === data.length ? null : data[i++]) | ||
function isTypedArray (data) { | ||
return typeof data === 'object' && data !== null && typeof data.byteLength === 'number' | ||
} | ||
function defaultByteLength (data) { | ||
return Buffer.isBuffer(data) ? data.length : 1024 | ||
return isTypedArray(data) ? data.byteLength : 1024 | ||
} | ||
@@ -888,2 +897,6 @@ | ||
function abort () { | ||
this.destroy(new Error('Stream aborted.')) | ||
} | ||
module.exports = { | ||
@@ -890,0 +903,0 @@ isStream, |
{ | ||
"name": "streamx", | ||
"version": "2.7.2", | ||
"version": "2.8.0", | ||
"description": "An iteration of the Node.js core streams with a series of improvements", | ||
@@ -11,2 +11,3 @@ "main": "index.js", | ||
"devDependencies": { | ||
"abort-controller": "^3.0.0", | ||
"end-of-stream": "^1.4.1", | ||
@@ -21,3 +22,3 @@ "standard": "^14.3.1", | ||
"type": "git", | ||
"url": "https://github.com/mafintosh/streamx.git" | ||
"url": "https://github.com/streamxorg/streamx.git" | ||
}, | ||
@@ -27,5 +28,5 @@ "author": "Mathias Buus (@mafintosh)", | ||
"bugs": { | ||
"url": "https://github.com/mafintosh/streamx/issues" | ||
"url": "https://github.com/streamxorg/streamx/issues" | ||
}, | ||
"homepage": "https://github.com/mafintosh/streamx" | ||
"homepage": "https://github.com/streamxorg/streamx" | ||
} |
@@ -9,3 +9,3 @@ # streamx | ||
[![Build Status](https://travis-ci.org/mafintosh/streamx.svg?branch=master)](https://travis-ci.org/mafintosh/streamx) | ||
[![Build Status](https://github.com/streamxorg/streamx/workflows/Build%20Status/badge.svg)](https://github.com/streamxorg/streamx/actions?query=workflow%3A%22Build+Status%22) | ||
@@ -76,2 +76,8 @@ ## Main improvements from Node.js core stream | ||
#### AbortSignal support | ||
To make it easier to integrate streams in a `async/await` flow, all streams support a `signal` option | ||
that accepts a [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) to as an | ||
alternative means to `.destroy` streams. | ||
## Usage | ||
@@ -108,3 +114,4 @@ | ||
map: (data) => data, // optional function to map input data | ||
byteLength: (data) => size // optional function that calculates the byte size of input data | ||
byteLength: (data) => size, // optional function that calculates the byte size of input data | ||
signal: abortController.signal // optional AbortSignal that triggers `.destroy` when on `abort` | ||
} | ||
@@ -257,3 +264,4 @@ ``` | ||
map: (data) => data, // optional function to map input data | ||
byteLength: (data) => size // optional function that calculates the byte size of input data | ||
byteLength: (data) => size, // optional function that calculates the byte size of input data | ||
signal: abortController.signal // optional AbortSignal that triggers `.destroy` when on `abort` | ||
} | ||
@@ -260,0 +268,0 @@ ``` |
const tape = require('tape') | ||
const { Readable } = require('../') | ||
const { AbortController } = require('abort-controller') | ||
@@ -149,1 +150,24 @@ tape('streams are async iterators', async function (t) { | ||
}) | ||
tape('using abort controller', async function (t) { | ||
function createInfinite (signal) { | ||
let count = 0 | ||
const r = new Readable({ signal }) | ||
r.push(count) | ||
const int = setInterval(() => r.push(count++), 5000) | ||
r.once('close', () => clearInterval(int)) | ||
return r | ||
} | ||
const controller = new AbortController() | ||
const inc = [] | ||
setTimeout(() => controller.abort(), 10) | ||
try { | ||
for await (const chunk of createInfinite(controller.signal)) { | ||
inc.push(chunk) | ||
} | ||
} catch (err) { | ||
t.same(err.message, 'Stream aborted.') | ||
} | ||
t.same(inc, [0]) | ||
t.end() | ||
}) |
@@ -120,1 +120,17 @@ const tape = require('tape') | ||
}) | ||
tape('from array with highWaterMark', function (t) { | ||
const r = Readable.from([1, 2, 3], { highWaterMark: 1 }) | ||
t.same(r._readableState.highWaterMark, 1) | ||
t.end() | ||
}) | ||
tape('from async iterator with highWaterMark', function (t) { | ||
async function * test () { | ||
yield 1 | ||
} | ||
const r = Readable.from(test(), { highWaterMark: 1 }) | ||
t.same(r._readableState.highWaterMark, 1) | ||
t.end() | ||
}) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
59459
15
1562
404
4