New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

streamx

Package Overview
Dependencies
Maintainers
1
Versions
66
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

streamx - npm Package Compare versions

Comparing version 2.6.6 to 2.6.7

36

index.js

@@ -677,8 +677,8 @@ const { EventEmitter } = require('events')

let error = null
let promiseResolve
let promiseReject
let promiseResolve = null
let promiseReject = null
this.on('error', (err) => { error = err })
this.on('readable', () => call(null, stream.read()))
this.on('close', () => call(error, null))
this.on('readable', onreadable)
this.on('close', onclose)

@@ -694,4 +694,4 @@ return {

const data = stream.read()
if (data !== null) call(null, data)
else if ((stream._duplexState & DESTROYED) !== 0) call(error, null)
if (data !== null) ondata(data)
else if ((stream._duplexState & DESTROYED) !== 0) ondata(null)
})

@@ -707,2 +707,18 @@ },

function onreadable () {
if (promiseResolve !== null) ondata(stream.read())
}
function onclose () {
if (promiseResolve !== null) ondata(null)
}
function ondata (data) {
if (promiseReject === null) return
if (error) promiseReject(error)
else if (data === null && (stream._duplexState & READ_DONE) === 0) promiseReject(STREAM_DESTROYED)
else promiseResolve({ value: data, done: data === null })
promiseReject = promiseResolve = null
}
function destroy (err) {

@@ -718,10 +734,2 @@ stream.destroy(err)

}
function call (err, data) {
if (promiseReject === null) return
if (err) promiseReject(err)
else if (data === null && (stream._duplexState & READ_DONE) === 0) promiseReject(STREAM_DESTROYED)
else promiseResolve({ value: data, done: data === null })
promiseReject = promiseResolve = null
}
}

@@ -728,0 +736,0 @@ }

{
"name": "streamx",
"version": "2.6.6",
"version": "2.6.7",
"description": "An iteration of the Node.js core streams with a series of improvements",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -70,1 +70,63 @@ const tape = require('tape')

})
tape('intertesting timing', async function (t) {
const r = new Readable({
read (cb) {
setImmediate(() => {
this.push('b')
this.push('c')
this.push(null)
cb(null)
})
},
destroy (cb) {
t.pass('destroying')
cb(null)
}
})
r.push('a')
const iterated = []
for await (const chunk of r) {
iterated.push(chunk)
await new Promise(resolve => setTimeout(resolve, 10))
}
t.same(iterated, ['a', 'b', 'c'])
t.end()
})
tape('intertesting timing with close', async function (t) {
t.plan(3)
const r = new Readable({
read (cb) {
setImmediate(() => {
this.destroy(new Error('stop'))
cb(null)
})
},
destroy (cb) {
t.pass('destroying')
cb(null)
}
})
r.push('a')
const iterated = []
try {
for await (const chunk of r) {
iterated.push(chunk)
await new Promise(resolve => setTimeout(resolve, 10))
}
} catch (err) {
t.same(err, new Error('stop'))
}
t.same(iterated, ['a'])
t.end()
})
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc