Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

promise-readable

Package Overview
Dependencies
Maintainers
1
Versions
41
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

promise-readable - npm Package Compare versions

Comparing version 2.1.1 to 3.0.0

6

CHANGELOG.md
# Changelog
## v3.0.0 2018-02-03
* No support for streams v1.
* New method `destroy`.
* Bugfix when `PromiseReadable` could ignore `error` event.
## v2.1.1 2018-01-18

@@ -4,0 +10,0 @@

2

lib/promise-readable.d.ts

@@ -13,2 +13,4 @@ /// <reference types="node" />

destroy (): void
once (event: 'close' | 'end' | 'error'): Promise<void>

@@ -15,0 +17,0 @@ once (event: 'open'): Promise<number>

217

lib/promise-readable.js

@@ -6,57 +6,20 @@ 'use strict'

this.stream = stream
this._errorHandler = (err) => {
this._errored = err
}
stream.on('error', this._errorHandler)
}
read (size) {
return typeof this.stream.read === 'function' ? this._read2(size) : this._read1()
}
_read1 () {
const stream = this.stream
return new Promise((resolve, reject) => {
if (!stream.readable || stream.closed || stream.destroyed) {
return resolve()
if (this._errored) {
const err = this._errored
delete this._errored
return reject(err)
}
const onceClose = () => {
stream.removeListener('data', onceData)
stream.removeListener('end', onceEnd)
stream.removeListener('error', onceError)
resolve(stream.bytesWritten || 0)
}
const onceData = (chunk) => {
stream.pause()
stream.removeListener('close', onceClose)
stream.removeListener('end', onceEnd)
stream.removeListener('error', onceError)
resolve(chunk)
}
const onceEnd = () => {
stream.removeListener('close', onceClose)
stream.removeListener('data', onceData)
stream.removeListener('error', onceError)
resolve()
}
const onceError = (err) => {
stream.removeListener('close', onceClose)
stream.removeListener('data', onceData)
stream.removeListener('end', onceEnd)
reject(err)
}
stream.once('close', onceClose)
stream.once('data', onceData)
stream.once('end', onceEnd)
stream.once('error', onceError)
stream.resume()
})
}
_read2 (size) {
const stream = this.stream
return new Promise((resolve, reject) => {
if (!stream.readable || stream.closed || stream.destroyed) {

@@ -66,10 +29,10 @@ return resolve()

const onReadable = () => {
const readableHandler = () => {
let chunk = stream.read(size)
if (chunk != null) {
stream.removeListener('close', onceClose)
stream.removeListener('error', onceError)
stream.removeListener('end', onceEnd)
stream.removeListener('readable', onReadable)
stream.removeListener('close', closeHandler)
stream.removeListener('error', errorHandler)
stream.removeListener('end', endHandler)
stream.removeListener('readable', readableHandler)
resolve(chunk)

@@ -79,29 +42,30 @@ }

const onceClose = () => {
stream.removeListener('end', onceEnd)
stream.removeListener('error', onceError)
stream.removeListener('readable', onReadable)
const closeHandler = () => {
stream.removeListener('end', endHandler)
stream.removeListener('error', errorHandler)
stream.removeListener('readable', readableHandler)
resolve(stream.bytesWritten || 0)
}
const onceEnd = () => {
stream.removeListener('close', onceClose)
stream.removeListener('error', onceError)
stream.removeListener('readable', onReadable)
const endHandler = () => {
stream.removeListener('close', closeHandler)
stream.removeListener('error', errorHandler)
stream.removeListener('readable', readableHandler)
resolve()
}
const onceError = (err) => {
stream.removeListener('close', onceClose)
stream.removeListener('end', onceEnd)
stream.removeListener('readable', onReadable)
const errorHandler = (err) => {
delete this._errored
stream.removeListener('close', closeHandler)
stream.removeListener('end', endHandler)
stream.removeListener('readable', readableHandler)
reject(err)
}
stream.once('close', onceClose)
stream.on('readable', onReadable)
stream.once('end', onceEnd)
stream.once('error', onceError)
stream.once('close', closeHandler)
stream.once('end', endHandler)
stream.once('error', errorHandler)
stream.on('readable', readableHandler)
onReadable()
readableHandler()
})

@@ -115,2 +79,8 @@ }

return new Promise((resolve, reject) => {
if (this._errored) {
const err = this._errored
delete this._errored
return reject(err)
}
if (!stream.readable || stream.closed || stream.destroyed) {

@@ -120,31 +90,32 @@ return resolve()

const onData = (chunk) => {
const dataHandler = (chunk) => {
bufferArray.push(chunk)
}
const onceClose = () => {
stream.removeListener('data', onData)
stream.removeListener('end', onceEnd)
stream.removeListener('error', onceError)
const closeHandler = () => {
stream.removeListener('data', dataHandler)
stream.removeListener('end', endHandler)
stream.removeListener('error', errorHandler)
resolve(stream.bytesWritten || 0)
}
const onceEnd = () => {
stream.removeListener('close', onceClose)
stream.removeListener('data', onData)
stream.removeListener('error', onceError)
const endHandler = () => {
stream.removeListener('close', closeHandler)
stream.removeListener('data', dataHandler)
stream.removeListener('error', errorHandler)
resolve(Buffer.concat(bufferArray))
}
const onceError = (err) => {
stream.removeListener('close', onceClose)
stream.removeListener('data', onData)
stream.removeListener('end', onceEnd)
const errorHandler = (err) => {
delete this._errored
stream.removeListener('close', closeHandler)
stream.removeListener('data', dataHandler)
stream.removeListener('end', endHandler)
reject(err)
}
stream.once('close', onceClose)
stream.on('data', onData)
stream.once('end', onceEnd)
stream.once('error', onceError)
stream.once('close', closeHandler)
stream.on('data', dataHandler)
stream.once('end', endHandler)
stream.once('error', errorHandler)

@@ -160,4 +131,8 @@ stream.resume()

if (this._errored) {
return reject(this._errored)
} else if (stream.closed) {
const err = this._errored
delete this._errored
return reject(err)
}
if (stream.closed) {
if (event === 'close') {

@@ -176,9 +151,9 @@ return resolve()

const onceClose = () => {
if (onceEvent) {
stream.removeListener(event, onceEvent)
const closeHandler = () => {
if (eventHandler) {
stream.removeListener(event, eventHandler)
}
stream.removeListener('error', onceError)
if (onceEnd) {
stream.removeListener('end', onceEnd)
stream.removeListener('error', errorHandler)
if (endHandler) {
stream.removeListener('end', endHandler)
}

@@ -188,7 +163,7 @@ resolve()

const onceEvent = event !== 'close' && event !== 'end' && event !== 'error' ? (argument) => {
stream.removeListener('close', onceClose)
stream.removeListener('error', onceError)
if (onceEnd) {
stream.removeListener('end', onceEnd)
const eventHandler = event !== 'close' && event !== 'end' && event !== 'error' ? (argument) => {
stream.removeListener('close', closeHandler)
stream.removeListener('error', errorHandler)
if (endHandler) {
stream.removeListener('end', endHandler)
}

@@ -198,33 +173,41 @@ resolve(argument)

const onceEnd = event !== 'close' ? () => {
if (onceEvent) {
stream.removeListener(event, onceEvent)
const endHandler = event !== 'close' ? () => {
if (eventHandler) {
stream.removeListener(event, eventHandler)
}
stream.removeListener('close', onceClose)
stream.removeListener('error', onceError)
stream.removeListener('close', closeHandler)
stream.removeListener('error', errorHandler)
resolve()
} : undefined
const onceError = (err) => {
if (onceEvent) {
stream.removeListener(event, onceEvent)
const errorHandler = (err) => {
delete this._errored
if (eventHandler) {
stream.removeListener(event, eventHandler)
}
stream.removeListener('close', onceClose)
if (onceEnd) {
stream.removeListener('end', onceEnd)
stream.removeListener('close', closeHandler)
if (endHandler) {
stream.removeListener('end', endHandler)
}
this._errored = err
reject(err)
}
if (onceEvent) {
stream.once(event, onceEvent)
if (eventHandler) {
stream.once(event, eventHandler)
}
stream.once('close', onceClose)
if (onceEnd) {
stream.once('end', onceEnd)
stream.once('close', closeHandler)
if (endHandler) {
stream.once('end', endHandler)
}
stream.once('error', onceError)
stream.once('error', errorHandler)
})
}
destroy () {
this.stream.removeListener('error', this._errorHandler)
if (typeof this.stream.destroy === 'function') {
this.stream.destroy()
}
delete this.stream
}
}

@@ -231,0 +214,0 @@

{
"name": "promise-readable",
"version": "2.1.1",
"version": "3.0.0",
"description": "Return promise for readable stream",

@@ -5,0 +5,0 @@ "main": "lib/promise-readable.js",

@@ -7,5 +7,5 @@ ## promise-readable

[`Readable`](https://nodejs.org/api/stream.html#stream_class_stream_readable)
stream into its promisified version, which returns [`Promise`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise)
object fulfilled when
[`open`](https://nodejs.org/api/fs.html#fs_event_open),
stream into its promisified version, which returns
[`Promise`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise)
object fulfilled when [`open`](https://nodejs.org/api/fs.html#fs_event_open),
[`data`](https://nodejs.org/api/stream.html#stream_event_data),

@@ -137,9 +137,17 @@ [`close`](https://nodejs.org/api/fs.html#fs_event_close),

await promiseReadable.once('error') // undefined if already ended or throws error
await promiseReadable.once('error') // throws error, undefined if ended
```
#### destroy
```js
promiseReadable.destroy()
```
This method calls `destroy` method on stream and cleans up all own handlers.
### License
Copyright (c) 2017 Piotr Roszatycki <piotr.roszatycki@gmail.com>
Copyright (c) 2017-2018 Piotr Roszatycki <piotr.roszatycki@gmail.com>
[MIT](https://opensource.org/licenses/MIT)

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc