Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@databases/push-to-async-iterable

Package Overview
Dependencies
Maintainers
1
Versions
3
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@databases/push-to-async-iterable - npm Package Compare versions

Comparing version 2.0.0 to 3.0.0

54

lib/__tests__/index.test.js

@@ -12,2 +12,3 @@ "use strict";

const resume = jest.fn();
const dispose = jest.fn();
let onData;

@@ -17,18 +18,12 @@ let onError;

const result = __1.default({
onData(fn) {
onData = fn;
},
onError(fn) {
onError = fn;
},
onEnd(fn) {
onEnd = fn;
},
pause,
resume,
highWaterMark: 2
const result = __1.default(handlers => {
onData = handlers.onData;
onError = handlers.onError;
onEnd = handlers.onEnd;
return {
pause,
resume,
dispose,
highWaterMark: 2
};
});

@@ -85,2 +80,3 @@

const resume = jest.fn();
const dispose = jest.fn();
let onData;

@@ -90,18 +86,12 @@ let onError;

const result = __1.default({
onData(fn) {
onData = fn;
},
onError(fn) {
onError = fn;
},
onEnd(fn) {
onEnd = fn;
},
pause,
resume,
highWaterMark: 2
const result = __1.default(handlers => {
onData = handlers.onData;
onError = handlers.onError;
onEnd = handlers.onEnd;
return {
pause,
resume,
dispose,
highWaterMark: 2
};
});

@@ -108,0 +98,0 @@

@@ -1,5 +0,8 @@

interface PushStream<T> {
onData(fn: (value: T) => void): void;
onError(fn: (err: any) => void): void;
onEnd(fn: () => void): void;
export interface PushStreamInput<T> {
onData: (value: T) => void;
onError: (err: any) => void;
onEnd: () => void;
}
export interface PushStream {
dispose(): void;
pause(): void;

@@ -9,3 +12,2 @@ resume(): void;

}
export default function pushToAsyncIterable<T>(stream: PushStream<T>): AsyncGenerator<T, void, unknown>;
export {};
export default function pushToAsyncIterable<T>(getStream: (input: PushStreamInput<T>) => PushStream): AsyncGenerator<T, void, unknown>;

@@ -7,72 +7,90 @@ "use strict";

const Queue = require('then-queue');
const queue_1 = require("@databases/queue");
function pushToAsyncIterable(stream) {
const queue = new Queue();
function pushToAsyncIterable(getStream) {
const queue = new queue_1.AsyncQueue();
let bufferSize = 0;
let paused = false;
let ended = false;
stream.onData(value => {
if (!ended) {
queue.push({
done: false,
value
});
const stream = getStream({
onData(value) {
if (!ended) {
queue.push({
done: false,
value
});
bufferSize++;
if (!paused && queue.length >= stream.highWaterMark) {
paused = true;
stream.pause();
if (!paused && bufferSize >= stream.highWaterMark) {
paused = true;
stream.pause();
}
}
}
});
stream.onError(err => {
if (!ended) {
ended = true;
queue.push({
done: true,
err
});
},
if (paused) {
paused = false;
stream.resume();
onError(err) {
if (!ended) {
ended = true;
queue.push({
done: true,
err
});
}
},
onEnd() {
if (!ended) {
ended = true;
queue.push({
done: true,
err: undefined
});
}
}
});
stream.onEnd(() => {
if (!ended) {
ended = true;
queue.push({
done: true,
err: undefined
});
return {
async next() {
bufferSize--;
if (paused) {
if (paused && bufferSize < stream.highWaterMark) {
paused = false;
stream.resume();
}
}
});
return queueConsumer(queue, () => {
if (paused && queue.length < stream.highWaterMark) {
paused = false;
stream.resume();
}
});
}
exports.default = pushToAsyncIterable;
const next = await queue.shift();
async function* queueConsumer(queue, onPop) {
let value = await queue.pop();
if (next.done && next.err) {
throw next.err;
} else if (next.done) {
return {
done: true,
value: undefined
};
} else {
return next;
}
},
while (!value.done) {
yield value.value;
const next = queue.pop();
onPop();
value = await next;
}
async return() {
stream.dispose();
return {
done: true,
value: undefined
};
},
if (value.err) {
throw value.err;
}
}
async throw(e) {
stream.dispose();
throw e;
},
[Symbol.asyncIterator]() {
// tslint:disable-next-line no-invalid-this
return this;
}
};
}
exports.default = pushToAsyncIterable;
{
"name": "@databases/push-to-async-iterable",
"version": "2.0.0",
"version": "3.0.0",
"description": "",

@@ -8,3 +8,3 @@ "main": "./lib/index.js",

"dependencies": {
"then-queue": "^1.3.0"
"@databases/queue": "^1.0.0"
},

@@ -11,0 +11,0 @@ "scripts": {},

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc