Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@databases/push-to-async-iterable

Package Overview
Dependencies
Maintainers
1
Versions
3
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@databases/push-to-async-iterable - npm Package Compare versions

Comparing version 1.0.0 to 2.0.0

231

lib/__tests__/index.test.js
"use strict";
var __awaiter = undefined && undefined.__awaiter || function (thisArg, _arguments, P, generator) {
function adopt(value) {
return value instanceof P ? value : new P(function (resolve) {
resolve(value);
});
}
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) {
try {
step(generator.next(value));
} catch (e) {
reject(e);
}
}
function rejected(value) {
try {
step(generator["throw"](value));
} catch (e) {
reject(e);
}
}
function step(result) {
result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected);
}
step((generator = generator.apply(thisArg, _arguments || [])).next());
});
};
Object.defineProperty(exports, "__esModule", { value: true });
Object.defineProperty(exports, "__esModule", {
value: true
});
const __1 = require("../");
test('pushToAsyncIterable', () => __awaiter(void 0, void 0, void 0, function* () {
const pause = jest.fn();
const resume = jest.fn();
let onData;
let onError;
let onEnd;
const result = __1.default({
onData(fn) {
onData = fn;
},
onError(fn) {
onError = fn;
},
onEnd(fn) {
onEnd = fn;
},
pause,
resume,
highWaterMark: 2
});
onData(1);
expect(pause).not.toBeCalled();
expect(resume).not.toBeCalled();
expect((yield result.next())).toEqual({ done: false, value: 1 });
expect(pause).not.toBeCalled();
expect(resume).not.toBeCalled();
onData(2);
expect(pause).not.toBeCalled();
expect(resume).not.toBeCalled();
onData(3);
expect(pause).toBeCalledTimes(1);
expect(resume).not.toBeCalled();
onData(4);
expect(pause).toBeCalledTimes(1);
expect(resume).not.toBeCalled();
expect((yield result.next())).toEqual({ done: false, value: 2 });
expect(pause).toBeCalledTimes(1);
expect(resume).not.toBeCalled();
expect((yield result.next())).toEqual({ done: false, value: 3 });
expect(pause).toBeCalledTimes(1);
expect(resume).toBeCalledTimes(1);
expect((yield result.next())).toEqual({ done: false, value: 4 });
expect(pause).toBeCalledTimes(1);
expect(resume).toBeCalledTimes(1);
onEnd();
expect(pause).toBeCalledTimes(1);
expect(resume).toBeCalledTimes(1);
expect((yield result.next())).toEqual({ done: true });
expect(pause).toBeCalledTimes(1);
expect(resume).toBeCalledTimes(1);
expect(typeof onError).toBe('function');
}));
test('pushToAsyncIterable Error', () => __awaiter(void 0, void 0, void 0, function* () {
const pause = jest.fn();
const resume = jest.fn();
let onData;
let onError;
let onEnd;
const result = __1.default({
onData(fn) {
onData = fn;
},
onError(fn) {
onError = fn;
},
onEnd(fn) {
onEnd = fn;
},
pause,
resume,
highWaterMark: 2
});
onData(1);
expect(pause).not.toBeCalled();
expect(resume).not.toBeCalled();
expect((yield result.next())).toEqual({ done: false, value: 1 });
expect(pause).not.toBeCalled();
expect(resume).not.toBeCalled();
const TEST_ERROR = {};
onError(TEST_ERROR);
yield expect(result.next()).rejects.toBe(TEST_ERROR);
expect(pause).not.toBeCalled();
expect(resume).not.toBeCalled();
expect(typeof onEnd).toBe('function');
}));
//# sourceMappingURL=index.test.js.map
test('pushToAsyncIterable', async () => {
const pause = jest.fn();
const resume = jest.fn();
let onData;
let onError;
let onEnd;
const result = __1.default({
onData(fn) {
onData = fn;
},
onError(fn) {
onError = fn;
},
onEnd(fn) {
onEnd = fn;
},
pause,
resume,
highWaterMark: 2
});
onData(1);
expect(pause).not.toBeCalled();
expect(resume).not.toBeCalled();
expect(await result.next()).toEqual({
done: false,
value: 1
});
expect(pause).not.toBeCalled();
expect(resume).not.toBeCalled();
onData(2);
expect(pause).not.toBeCalled();
expect(resume).not.toBeCalled();
onData(3);
expect(pause).toBeCalledTimes(1);
expect(resume).not.toBeCalled();
onData(4);
expect(pause).toBeCalledTimes(1);
expect(resume).not.toBeCalled();
expect(await result.next()).toEqual({
done: false,
value: 2
});
expect(pause).toBeCalledTimes(1);
expect(resume).not.toBeCalled();
expect(await result.next()).toEqual({
done: false,
value: 3
});
expect(pause).toBeCalledTimes(1);
expect(resume).toBeCalledTimes(1);
expect(await result.next()).toEqual({
done: false,
value: 4
});
expect(pause).toBeCalledTimes(1);
expect(resume).toBeCalledTimes(1);
onEnd();
expect(pause).toBeCalledTimes(1);
expect(resume).toBeCalledTimes(1);
expect(await result.next()).toEqual({
done: true
});
expect(pause).toBeCalledTimes(1);
expect(resume).toBeCalledTimes(1);
expect(typeof onError).toBe('function');
});
test('pushToAsyncIterable Error', async () => {
const pause = jest.fn();
const resume = jest.fn();
let onData;
let onError;
let onEnd;
const result = __1.default({
onData(fn) {
onData = fn;
},
onError(fn) {
onError = fn;
},
onEnd(fn) {
onEnd = fn;
},
pause,
resume,
highWaterMark: 2
});
onData(1);
expect(pause).not.toBeCalled();
expect(resume).not.toBeCalled();
expect(await result.next()).toEqual({
done: false,
value: 1
});
expect(pause).not.toBeCalled();
expect(resume).not.toBeCalled();
const TEST_ERROR = {};
onError(TEST_ERROR);
await expect(result.next()).rejects.toBe(TEST_ERROR);
expect(pause).not.toBeCalled();
expect(resume).not.toBeCalled();
expect(typeof onEnd).toBe('function');
});
"use strict";
var __await = undefined && undefined.__await || function (v) {
return this instanceof __await ? (this.v = v, this) : new __await(v);
};
var __asyncGenerator = undefined && undefined.__asyncGenerator || function (thisArg, _arguments, generator) {
if (!Symbol.asyncIterator) throw new TypeError("Symbol.asyncIterator is not defined.");
var g = generator.apply(thisArg, _arguments || []),
i,
q = [];
return i = {}, verb("next"), verb("throw"), verb("return"), i[Symbol.asyncIterator] = function () {
return this;
}, i;
function verb(n) {
if (g[n]) i[n] = function (v) {
return new Promise(function (a, b) {
q.push([n, v, a, b]) > 1 || resume(n, v);
});
};
Object.defineProperty(exports, "__esModule", {
value: true
});
const Queue = require('then-queue');
function pushToAsyncIterable(stream) {
const queue = new Queue();
let paused = false;
let ended = false;
stream.onData(value => {
if (!ended) {
queue.push({
done: false,
value
});
if (!paused && queue.length >= stream.highWaterMark) {
paused = true;
stream.pause();
}
}
function resume(n, v) {
try {
step(g[n](v));
} catch (e) {
settle(q[0][3], e);
}
});
stream.onError(err => {
if (!ended) {
ended = true;
queue.push({
done: true,
err
});
if (paused) {
paused = false;
stream.resume();
}
}
function step(r) {
r.value instanceof __await ? Promise.resolve(r.value.v).then(fulfill, reject) : settle(q[0][2], r);
});
stream.onEnd(() => {
if (!ended) {
ended = true;
queue.push({
done: true,
err: undefined
});
if (paused) {
paused = false;
stream.resume();
}
}
function fulfill(value) {
resume("next", value);
});
return queueConsumer(queue, () => {
if (paused && queue.length < stream.highWaterMark) {
paused = false;
stream.resume();
}
function reject(value) {
resume("throw", value);
}
function settle(f, v) {
if (f(v), q.shift(), q.length) resume(q[0][0], q[0][1]);
}
};
Object.defineProperty(exports, "__esModule", { value: true });
const Queue = require('then-queue');
function pushToAsyncIterable(stream) {
const queue = new Queue();
let paused = false;
let ended = false;
stream.onData(value => {
if (!ended) {
queue.push({ done: false, value });
if (!paused && queue.length >= stream.highWaterMark) {
paused = true;
stream.pause();
}
}
});
stream.onError(err => {
if (!ended) {
ended = true;
queue.push({ done: true, err });
if (paused) {
paused = false;
stream.resume();
}
}
});
stream.onEnd(() => {
if (!ended) {
ended = true;
queue.push({ done: true, err: undefined });
if (paused) {
paused = false;
stream.resume();
}
}
});
return queueConsumer(queue, () => {
if (paused && queue.length < stream.highWaterMark) {
paused = false;
stream.resume();
}
});
});
}
exports.default = pushToAsyncIterable;
function queueConsumer(queue, onPop) {
return __asyncGenerator(this, arguments, function* queueConsumer_1() {
let value = yield __await(queue.pop());
while (!value.done) {
yield yield __await(value.value);
const next = queue.pop();
onPop();
value = yield __await(next);
}
if (value.err) {
throw value.err;
}
});
}
//# sourceMappingURL=index.js.map
async function* queueConsumer(queue, onPop) {
let value = await queue.pop();
while (!value.done) {
yield value.value;
const next = queue.pop();
onPop();
value = await next;
}
if (value.err) {
throw value.err;
}
}
{
"name": "@databases/push-to-async-iterable",
"version": "1.0.0",
"version": "2.0.0",
"description": "",

@@ -5,0 +5,0 @@ "main": "./lib/index.js",

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc