catchment
A Node.js package to collect stream data into a catchment and return it either as a buffer or a string.
yarn add -E catchment
Table of Contents
API
The package exports the default Catchment class, and the collect
method.
import Catchment, { collect } from 'catchment'
Catchment
Class
Catchment extends Writable
, and pushes incoming data into an internal array. When the stream finishes, a promise referenced in the promise
property is fulfilled with concatenated data. If an error occurred, the promise is rejected.
A new Catchment can be created with a constructor, which accepts optional options.
import { Readable } from 'stream'
import Catchment from 'catchment'
const DATA = 'test-data'
const rs = new Readable({
read() {
for (let i = 0; i < DATA.length; i++) {
const c = DATA.charAt(i)
this.push(c)
}
this.push(null)
},
})
;(async () => {
try {
const catchment = new Catchment()
rs.pipe(catchment)
const res = await catchment.promise
console.log(res)
} catch (err) {
console.log(err)
}
})()
test-data
constructor(
options?: Options,
): Catchment
Options
An optional options object can be passed to the constructor.
import('stream').Readable
Readable
Options
: Options to pass to the Writable
super constructor, and others shown below.
Name | Type | Description | Default |
---|
rs | Readable | A readable stream to automatically pipe into the catchment. If an error occurs during reading of this stream, the catchment promise will be rejected with it. | - |
binary | boolean | Whether to return a raw buffer instead of a string. The string is created by joining all incoming chunks together with .join('') method. | false |
Collect Buffer
To receive a buffer, the binary
option should be set to true
:
import Catchment from 'catchment'
import { createReadable } from './lib'
(async () => {
try {
const rs = createReadable('test-data')
const catchment = new Catchment({ binary: true })
rs.pipe(catchment)
const res = await catchment.promise
console.log(res)
} catch (err) {
console.log(err)
}
})()
<Buffer 74 65 73 74 2d 64 61 74 61>
Pipe Readable
To automatically pipe a Readable, and reject the promise if an error occurs there, the rs
option can be passed:
import Catchment from 'catchment'
import { createReadStream } from 'fs'
(async () => {
try {
const rs = createReadStream('missing-file.txt')
const { promise } = new Catchment({ rs })
const res = await promise
console.log(res)
} catch ({ message }) {
console.log(message)
}
})()
ENOENT: no such file or directory, open 'missing-file.txt'
async collect(
readable: Readable,
options?: CollectOptions,
): string|Buffer
The collect method is a shorthand for creating a new catchment, and piping a readable stream into it. It will accumulate all data from the read stream, and asynchronously return when the stream finishes. If an error occurs in the stream, the promise will be rejected.
Some options can be passed to the collect
method. The proxyError
option is described in the Proxy Error section.
CollectOptions
: Options when collecting data into a catchment. They can extend Writable
options which will be passed to the Catchment
constructor.
Name | Type | Description | Default |
---|
binary | boolean | Whether to return a raw buffer instead of a string. The string is created by joining all incoming chunks together with .join('') method. | false |
proxyError | boolean | Sets whether an error emitted by the stream with have its stack start at the line where the collect was called rather than inside of the stream. In other words, hides the implementation of the stream. | false |
Errors Handling
Whenever an error is encountered during reading a readable stream, either piped into a Catchment via the rs
option, or passed as an argument to the collect
method, it will result in a rejected promise.
If the error has a stack, it will be modified to clean it from internal Node.js lines, such as _module
.
import { Readable } from 'stream'
import Catchment from 'catchment'
const rs = new Readable({
read() {
const er = new Error('example-error')
this.emit('error', er)
this.push(null)
},
})
;(async () => {
try {
const catchment = new Catchment({
rs,
})
rs.pipe(catchment)
await catchment.promise
} catch ({ stack }) {
console.log(stack)
}
})()
Error: example-error
at Readable.read [as _read] (/Users/zavr/adc/catchment/example/error-catchment.js:6:16)
If the error does not have the stack (which can happen when using createReadStream
from the fs
module), it will appear as thrown at the point of either creating an instance of Catchment, or calling the collect
method.
import { createReadStream } from 'fs'
import { collect } from 'catchment'
(async () => {
try {
const rs = createReadStream('missing-file.txt')
await collect(rs)
} catch ({ stack }) {
console.log(stack)
}
})()
Error: ENOENT: no such file or directory, open 'missing-file.txt'
at /Users/zavr/adc/catchment/example/error-collect.js:7:11
at Object.<anonymous> (/Users/zavr/adc/catchment/example/error-collect.js:11:3)
Proxy Error
The collect
method can throw an error with its stack updated to when it was called. This can be useful when using 3-rd party streams without the need to look into details of their internal stack. By setting the proxyError
option, all internal lines of the stream will be hidden, and the error will appear to be thrown by the call to the collect
method.
import { collect } from 'catchment'
import { createReadable } from './lib'
import frame from 'frame-of-mind'
function read() {
const err = new Error('Whatever error happens')
setTimeout(() => {
this.emit('error', err)
this.push(null)
}, 10)
}
const Collect = async ({ proxyError } = {}) => {
try {
const rs = createReadable(read)
await collect(rs, { proxyError })
} catch ({ stack }) {
console.log('COLLECT %s \n%s', proxyError ? 'WITH PROXY' : '', frame(stack))
}
}
const Listeners = async () => {
try {
const rs = createReadable(read)
const p = collect(rs).catch(() => {})
await new Promise((r, j) => {
rs.on('finish', r)
rs.on('error', j)
})
await p
} catch ({ stack }) {
console.log('LISTENERS:\n%s', frame(stack))
}
}
(async () => {
await Collect()
await Listeners()
await Collect({ proxyError: true })
})()
COLLECT
┌────────────────────────────────────────────────────────────────────────────────────────┐
│ Error: Whatever error happens │
│ at Readable.read (/Users/zavr/adc/catchment/example/error-collect2.js:8:15) │
│ at Readable.read [as _read] (/Users/zavr/adc/catchment/example/lib/index.js:11:16) │
└────────────────────────────────────────────────────────────────────────────────────────┘
LISTENERS:
┌────────────────────────────────────────────────────────────────────────────────────────┐
│ Error: Whatever error happens │
│ at Readable.read (/Users/zavr/adc/catchment/example/error-collect2.js:8:15) │
│ at Readable.read [as _read] (/Users/zavr/adc/catchment/example/lib/index.js:11:16) │
└────────────────────────────────────────────────────────────────────────────────────────┘
COLLECT WITH PROXY
┌────────────────────────────────────────────────────────────────────────────┐
│ Error: Whatever error happens │
│ at Collect (/Users/zavr/adc/catchment/example/error-collect2.js:18:11) │
│ at /Users/zavr/adc/catchment/example/error-collect2.js:41:9 │
│ at <anonymous> │
└────────────────────────────────────────────────────────────────────────────┘
Copyright