Comparing version 2.6.6 to 2.6.7
36
index.js
@@ -677,8 +677,8 @@ const { EventEmitter } = require('events') | ||
let error = null | ||
let promiseResolve | ||
let promiseReject | ||
let promiseResolve = null | ||
let promiseReject = null | ||
this.on('error', (err) => { error = err }) | ||
this.on('readable', () => call(null, stream.read())) | ||
this.on('close', () => call(error, null)) | ||
this.on('readable', onreadable) | ||
this.on('close', onclose) | ||
@@ -694,4 +694,4 @@ return { | ||
const data = stream.read() | ||
if (data !== null) call(null, data) | ||
else if ((stream._duplexState & DESTROYED) !== 0) call(error, null) | ||
if (data !== null) ondata(data) | ||
else if ((stream._duplexState & DESTROYED) !== 0) ondata(null) | ||
}) | ||
@@ -707,2 +707,18 @@ }, | ||
function onreadable () { | ||
if (promiseResolve !== null) ondata(stream.read()) | ||
} | ||
function onclose () { | ||
if (promiseResolve !== null) ondata(null) | ||
} | ||
function ondata (data) { | ||
if (promiseReject === null) return | ||
if (error) promiseReject(error) | ||
else if (data === null && (stream._duplexState & READ_DONE) === 0) promiseReject(STREAM_DESTROYED) | ||
else promiseResolve({ value: data, done: data === null }) | ||
promiseReject = promiseResolve = null | ||
} | ||
function destroy (err) { | ||
@@ -718,10 +734,2 @@ stream.destroy(err) | ||
} | ||
function call (err, data) { | ||
if (promiseReject === null) return | ||
if (err) promiseReject(err) | ||
else if (data === null && (stream._duplexState & READ_DONE) === 0) promiseReject(STREAM_DESTROYED) | ||
else promiseResolve({ value: data, done: data === null }) | ||
promiseReject = promiseResolve = null | ||
} | ||
} | ||
@@ -728,0 +736,0 @@ } |
{ | ||
"name": "streamx", | ||
"version": "2.6.6", | ||
"version": "2.6.7", | ||
"description": "An iteration of the Node.js core streams with a series of improvements", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -70,1 +70,63 @@ const tape = require('tape') | ||
}) | ||
tape('intertesting timing', async function (t) { | ||
const r = new Readable({ | ||
read (cb) { | ||
setImmediate(() => { | ||
this.push('b') | ||
this.push('c') | ||
this.push(null) | ||
cb(null) | ||
}) | ||
}, | ||
destroy (cb) { | ||
t.pass('destroying') | ||
cb(null) | ||
} | ||
}) | ||
r.push('a') | ||
const iterated = [] | ||
for await (const chunk of r) { | ||
iterated.push(chunk) | ||
await new Promise(resolve => setTimeout(resolve, 10)) | ||
} | ||
t.same(iterated, ['a', 'b', 'c']) | ||
t.end() | ||
}) | ||
tape('intertesting timing with close', async function (t) { | ||
t.plan(3) | ||
const r = new Readable({ | ||
read (cb) { | ||
setImmediate(() => { | ||
this.destroy(new Error('stop')) | ||
cb(null) | ||
}) | ||
}, | ||
destroy (cb) { | ||
t.pass('destroying') | ||
cb(null) | ||
} | ||
}) | ||
r.push('a') | ||
const iterated = [] | ||
try { | ||
for await (const chunk of r) { | ||
iterated.push(chunk) | ||
await new Promise(resolve => setTimeout(resolve, 10)) | ||
} | ||
} catch (err) { | ||
t.same(err, new Error('stop')) | ||
} | ||
t.same(iterated, ['a']) | ||
t.end() | ||
}) |
53565
1410