Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@jakubneubauer/simple-streams

Package Overview
Dependencies
Maintainers
1
Versions
4
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@jakubneubauer/simple-streams - npm Package Compare versions

Comparing version 0.0.1 to 0.0.2

doc/perf1.jpg

6

dist/simple-streams.d.ts

@@ -35,5 +35,2 @@ export interface ReaderController {

done: boolean;
value?: undefined;
} | {
done: boolean;
value: any;

@@ -47,5 +44,2 @@ }>;

done: boolean;
value?: undefined;
} | {
done: boolean;
value: any;

@@ -52,0 +46,0 @@ }>;

30

dist/simple-streams.js

@@ -17,4 +17,2 @@ var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {

}
// TODO: rozdil mezi controller.close - jen udela EOF. a reader.close - musi ukoncit cteni. Asi bude potrebovat podporu
// v LimitedBlockingQueue - pridat tam taky close(), ktery prerusi vsechny pending pull operace
export class Reader {

@@ -39,5 +37,4 @@ constructor(source, options) {

return Promise.resolve();
this.reader.closed = true;
this.reader.pullAlg = undefined;
return this.reader.buffer.push(EOF).then(this.reader.cancelAlg);
return this.reader.buffer.push(EOF).then(() => { this.reader.buffer.close(true); });
}

@@ -63,10 +60,21 @@ };

if (this.closed && this.buffer.length === 0) {
return { done: true };
return { done: true, value: undefined };
}
const chunk = yield this.buffer.pull();
if (chunk === EOF) {
return { done: true };
try {
let chunk = yield this.buffer.pull();
if (chunk === EOF) {
this.closed = true;
return { done: true, value: undefined };
}
else {
return { done: false, value: chunk };
}
}
else {
return { done: false, value: chunk };
catch (e) {
if ((e === null || e === void 0 ? void 0 : e.message) == 'Queue is closed') {
return { done: true, value: undefined };
}
else {
throw e;
}
}

@@ -77,3 +85,3 @@ });

return __awaiter(this, void 0, void 0, function* () {
yield this.controller.close();
this.closed = true;
this.buffer.close();

@@ -80,0 +88,0 @@ });

@@ -5,5 +5,8 @@ {

"types": "dist/index.d.ts",
"version": "0.0.1",
"version": "0.0.2",
"type": "module",
"repository": "jakubneubauer/js-simple-streams",
"repository": {
"type": "git",
"url": "http://github.com/"
},
"devDependencies": {

@@ -16,3 +19,3 @@ "@types/jest": "^27.5.0",

"dependencies": {
"@jakubneubauer/limited-blocking-queue": "^1.0.1"
"@jakubneubauer/limited-blocking-queue": "^1.0.2"
},

@@ -19,0 +22,0 @@ "jest": {

@@ -6,4 +6,6 @@ # simple-streams

The API is similar, but not same.
Uses async operations to buffer data. Implementation uses
[Limited blocking queue]() as a buffer for reading/writing
Implementation uses
[Limited blocking queue](https://www.npmjs.com/package/@jakubneubauer/limited-blocking-queue)
as a buffer for reading/writing
and synchronization mechanism between readers/transformers/writers.

@@ -74,2 +76,2 @@

![Performance Comparison](./doc/perf1.png)
![Performance Comparison](./doc/perf1.jpg)

@@ -34,4 +34,2 @@ import {LimitedBlockingQueue} from "@jakubneubauer/limited-blocking-queue";

// TODO: rozdil mezi controller.close - jen udela EOF. a reader.close - musi ukoncit cteni. Asi bude potrebovat podporu
// v LimitedBlockingQueue - pridat tam taky close(), ktery prerusi vsechny pending pull operace
export class Reader {

@@ -61,5 +59,4 @@ private closed: boolean;

if(this.reader.closed) return Promise.resolve()
this.reader.closed = true;
this.reader.pullAlg = undefined;
return this.reader.buffer.push(EOF).then(this.reader.cancelAlg);
return this.reader.buffer.push(EOF).then(() => {this.reader.buffer.close(true);});
}

@@ -86,14 +83,23 @@ }

if (this.closed && this.buffer.length === 0) {
return {done: true};
return {done: true, value: undefined};
}
const chunk = await this.buffer.pull();
if (chunk === EOF) {
return {done: true};
} else {
return {done: false, value: chunk};
try {
let chunk = await this.buffer.pull()
if (chunk === EOF) {
this.closed = true;
return {done: true, value: undefined};
} else {
return {done: false, value: chunk};
}
} catch(e) {
if((e as Error)?.message == 'Queue is closed') {
return {done: true, value: undefined};
} else {
throw e;
}
}
}
async close() {
await this.controller.close();
this.closed = true;
this.buffer.close();

@@ -100,0 +106,0 @@ }

@@ -206,2 +206,66 @@ import {Reader, Writer, Transformer, WriterSink, ReaderSource, ReaderController} from "../src";

test("reader - pending read will succeed after controller close", async() => {
let pullsRequestedResolve: (_:any) => any = (_) => {};
let pullsRequestedProm = new Promise((resolve) => {
pullsRequestedResolve = resolve;
})
let r = new Reader({
async start(controller) {
// wait for the pull calls.
await pullsRequestedProm;
await controller.enqueue(1);
await controller.close();
}
}, {bufferSize: 5});
let pullProm1 = r.read();
let pullProm2 = r.read();
let pullProm3 = r.read();
// notify the pull operations are called
pullsRequestedResolve(undefined);
expect((await pullProm1).value).toBe(1);
expect((await pullProm2).done).toBe(true);
expect((await pullProm3).done).toBe(true);
});
test("reader - pending read will succeed after reader close", async() => {
let r = new Reader({}, {bufferSize: 5});
let pullProm1 = r.read();
let pullProm2 = r.read();
let pullProm3 = r.read();
await r.close();
expect((await pullProm1).done).toBe(true);
expect((await pullProm2).done).toBe(true);
expect((await pullProm3).done).toBe(true);
});
test("reader - pending controller enqueue will fail after close", async () => {
let enqueueCalledResolve: (_:any) => any = (_) => {};
let enqueueCalledPromise = new Promise((resolve) => {
enqueueCalledResolve = resolve;
})
let enqueueProm1 = undefined;
let enqueueProm2 = undefined;
let r = new Reader({
async start(controller) {
enqueueProm1 = controller.enqueue(1);
enqueueProm2 = controller.enqueue(2);
// notify that enqueue was called
enqueueCalledResolve(undefined);
}
}, {bufferSize: 1});
// wait for controller.enqueue being called
await enqueueCalledPromise;
// close the reader
await r.close();
// verify that first async enqueue was successful,
await enqueueProm1;
// verify that second async enqueue failed - was cancelled by close()
await expect(enqueueProm2).rejects.toThrow();
});
async function readAllToString(reader: Reader) {

@@ -208,0 +272,0 @@ let result = "";

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc