@databases/push-to-async-iterable
Advanced tools
Comparing version 2.0.0 to 3.0.0
@@ -12,2 +12,3 @@ "use strict"; | ||
const resume = jest.fn(); | ||
const dispose = jest.fn(); | ||
let onData; | ||
@@ -17,18 +18,12 @@ let onError; | ||
const result = __1.default({ | ||
onData(fn) { | ||
onData = fn; | ||
}, | ||
onError(fn) { | ||
onError = fn; | ||
}, | ||
onEnd(fn) { | ||
onEnd = fn; | ||
}, | ||
pause, | ||
resume, | ||
highWaterMark: 2 | ||
const result = __1.default(handlers => { | ||
onData = handlers.onData; | ||
onError = handlers.onError; | ||
onEnd = handlers.onEnd; | ||
return { | ||
pause, | ||
resume, | ||
dispose, | ||
highWaterMark: 2 | ||
}; | ||
}); | ||
@@ -85,2 +80,3 @@ | ||
const resume = jest.fn(); | ||
const dispose = jest.fn(); | ||
let onData; | ||
@@ -90,18 +86,12 @@ let onError; | ||
const result = __1.default({ | ||
onData(fn) { | ||
onData = fn; | ||
}, | ||
onError(fn) { | ||
onError = fn; | ||
}, | ||
onEnd(fn) { | ||
onEnd = fn; | ||
}, | ||
pause, | ||
resume, | ||
highWaterMark: 2 | ||
const result = __1.default(handlers => { | ||
onData = handlers.onData; | ||
onError = handlers.onError; | ||
onEnd = handlers.onEnd; | ||
return { | ||
pause, | ||
resume, | ||
dispose, | ||
highWaterMark: 2 | ||
}; | ||
}); | ||
@@ -108,0 +98,0 @@ |
@@ -1,5 +0,8 @@ | ||
interface PushStream<T> { | ||
onData(fn: (value: T) => void): void; | ||
onError(fn: (err: any) => void): void; | ||
onEnd(fn: () => void): void; | ||
export interface PushStreamInput<T> { | ||
onData: (value: T) => void; | ||
onError: (err: any) => void; | ||
onEnd: () => void; | ||
} | ||
export interface PushStream { | ||
dispose(): void; | ||
pause(): void; | ||
@@ -9,3 +12,2 @@ resume(): void; | ||
} | ||
export default function pushToAsyncIterable<T>(stream: PushStream<T>): AsyncGenerator<T, void, unknown>; | ||
export {}; | ||
export default function pushToAsyncIterable<T>(getStream: (input: PushStreamInput<T>) => PushStream): AsyncGenerator<T, void, unknown>; |
126
lib/index.js
@@ -7,72 +7,90 @@ "use strict"; | ||
const Queue = require('then-queue'); | ||
const queue_1 = require("@databases/queue"); | ||
function pushToAsyncIterable(stream) { | ||
const queue = new Queue(); | ||
function pushToAsyncIterable(getStream) { | ||
const queue = new queue_1.AsyncQueue(); | ||
let bufferSize = 0; | ||
let paused = false; | ||
let ended = false; | ||
stream.onData(value => { | ||
if (!ended) { | ||
queue.push({ | ||
done: false, | ||
value | ||
}); | ||
const stream = getStream({ | ||
onData(value) { | ||
if (!ended) { | ||
queue.push({ | ||
done: false, | ||
value | ||
}); | ||
bufferSize++; | ||
if (!paused && queue.length >= stream.highWaterMark) { | ||
paused = true; | ||
stream.pause(); | ||
if (!paused && bufferSize >= stream.highWaterMark) { | ||
paused = true; | ||
stream.pause(); | ||
} | ||
} | ||
} | ||
}); | ||
stream.onError(err => { | ||
if (!ended) { | ||
ended = true; | ||
queue.push({ | ||
done: true, | ||
err | ||
}); | ||
}, | ||
if (paused) { | ||
paused = false; | ||
stream.resume(); | ||
onError(err) { | ||
if (!ended) { | ||
ended = true; | ||
queue.push({ | ||
done: true, | ||
err | ||
}); | ||
} | ||
}, | ||
onEnd() { | ||
if (!ended) { | ||
ended = true; | ||
queue.push({ | ||
done: true, | ||
err: undefined | ||
}); | ||
} | ||
} | ||
}); | ||
stream.onEnd(() => { | ||
if (!ended) { | ||
ended = true; | ||
queue.push({ | ||
done: true, | ||
err: undefined | ||
}); | ||
return { | ||
async next() { | ||
bufferSize--; | ||
if (paused) { | ||
if (paused && bufferSize < stream.highWaterMark) { | ||
paused = false; | ||
stream.resume(); | ||
} | ||
} | ||
}); | ||
return queueConsumer(queue, () => { | ||
if (paused && queue.length < stream.highWaterMark) { | ||
paused = false; | ||
stream.resume(); | ||
} | ||
}); | ||
} | ||
exports.default = pushToAsyncIterable; | ||
const next = await queue.shift(); | ||
async function* queueConsumer(queue, onPop) { | ||
let value = await queue.pop(); | ||
if (next.done && next.err) { | ||
throw next.err; | ||
} else if (next.done) { | ||
return { | ||
done: true, | ||
value: undefined | ||
}; | ||
} else { | ||
return next; | ||
} | ||
}, | ||
while (!value.done) { | ||
yield value.value; | ||
const next = queue.pop(); | ||
onPop(); | ||
value = await next; | ||
} | ||
async return() { | ||
stream.dispose(); | ||
return { | ||
done: true, | ||
value: undefined | ||
}; | ||
}, | ||
if (value.err) { | ||
throw value.err; | ||
} | ||
} | ||
async throw(e) { | ||
stream.dispose(); | ||
throw e; | ||
}, | ||
[Symbol.asyncIterator]() { | ||
// tslint:disable-next-line no-invalid-this | ||
return this; | ||
} | ||
}; | ||
} | ||
exports.default = pushToAsyncIterable; |
{ | ||
"name": "@databases/push-to-async-iterable", | ||
"version": "2.0.0", | ||
"version": "3.0.0", | ||
"description": "", | ||
@@ -8,3 +8,3 @@ "main": "./lib/index.js", | ||
"dependencies": { | ||
"then-queue": "^1.3.0" | ||
"@databases/queue": "^1.0.0" | ||
}, | ||
@@ -11,0 +11,0 @@ "scripts": {}, |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
11622
194
+ Added@databases/queue@^1.0.0
+ Added@databases/queue@1.0.1(transitive)
- Removedthen-queue@^1.3.0
- Removedasap@1.0.0(transitive)
- Removedpromise@6.1.0(transitive)
- Removedthen-queue@1.3.0(transitive)