@jakubneubauer/simple-streams
Advanced tools
Comparing version 0.0.1 to 0.0.2
@@ -35,5 +35,2 @@ export interface ReaderController { | ||
done: boolean; | ||
value?: undefined; | ||
} | { | ||
done: boolean; | ||
value: any; | ||
@@ -47,5 +44,2 @@ }>; | ||
done: boolean; | ||
value?: undefined; | ||
} | { | ||
done: boolean; | ||
value: any; | ||
@@ -52,0 +46,0 @@ }>; |
@@ -17,4 +17,2 @@ var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { | ||
} | ||
// TODO: rozdil mezi controller.close - jen udela EOF. a reader.close - musi ukoncit cteni. Asi bude potrebovat podporu | ||
// v LimitedBlockingQueue - pridat tam taky close(), ktery prerusi vsechny pending pull operace | ||
export class Reader { | ||
@@ -39,5 +37,4 @@ constructor(source, options) { | ||
return Promise.resolve(); | ||
this.reader.closed = true; | ||
this.reader.pullAlg = undefined; | ||
return this.reader.buffer.push(EOF).then(this.reader.cancelAlg); | ||
return this.reader.buffer.push(EOF).then(() => { this.reader.buffer.close(true); }); | ||
} | ||
@@ -63,10 +60,21 @@ }; | ||
if (this.closed && this.buffer.length === 0) { | ||
return { done: true }; | ||
return { done: true, value: undefined }; | ||
} | ||
const chunk = yield this.buffer.pull(); | ||
if (chunk === EOF) { | ||
return { done: true }; | ||
try { | ||
let chunk = yield this.buffer.pull(); | ||
if (chunk === EOF) { | ||
this.closed = true; | ||
return { done: true, value: undefined }; | ||
} | ||
else { | ||
return { done: false, value: chunk }; | ||
} | ||
} | ||
else { | ||
return { done: false, value: chunk }; | ||
catch (e) { | ||
if ((e === null || e === void 0 ? void 0 : e.message) == 'Queue is closed') { | ||
return { done: true, value: undefined }; | ||
} | ||
else { | ||
throw e; | ||
} | ||
} | ||
@@ -77,3 +85,3 @@ }); | ||
return __awaiter(this, void 0, void 0, function* () { | ||
yield this.controller.close(); | ||
this.closed = true; | ||
this.buffer.close(); | ||
@@ -80,0 +88,0 @@ }); |
@@ -5,5 +5,8 @@ { | ||
"types": "dist/index.d.ts", | ||
"version": "0.0.1", | ||
"version": "0.0.2", | ||
"type": "module", | ||
"repository": "jakubneubauer/js-simple-streams", | ||
"repository": { | ||
"type": "git", | ||
"url": "http://github.com/" | ||
}, | ||
"devDependencies": { | ||
@@ -16,3 +19,3 @@ "@types/jest": "^27.5.0", | ||
"dependencies": { | ||
"@jakubneubauer/limited-blocking-queue": "^1.0.1" | ||
"@jakubneubauer/limited-blocking-queue": "^1.0.2" | ||
}, | ||
@@ -19,0 +22,0 @@ "jest": { |
@@ -6,4 +6,6 @@ # simple-streams | ||
The API is similar, but not same. | ||
Uses async operations to buffer data. Implementation uses | ||
[Limited blocking queue]() as a buffer for reading/writing | ||
Implementation uses | ||
[Limited blocking queue](https://www.npmjs.com/package/@jakubneubauer/limited-blocking-queue) | ||
as a buffer for reading/writing | ||
and synchronization mechanism between readers/transformers/writers. | ||
@@ -74,2 +76,2 @@ | ||
![Performance Comparison](./doc/perf1.png) | ||
![Performance Comparison](./doc/perf1.jpg) |
@@ -34,4 +34,2 @@ import {LimitedBlockingQueue} from "@jakubneubauer/limited-blocking-queue"; | ||
// TODO: rozdil mezi controller.close - jen udela EOF. a reader.close - musi ukoncit cteni. Asi bude potrebovat podporu | ||
// v LimitedBlockingQueue - pridat tam taky close(), ktery prerusi vsechny pending pull operace | ||
export class Reader { | ||
@@ -61,5 +59,4 @@ private closed: boolean; | ||
if(this.reader.closed) return Promise.resolve() | ||
this.reader.closed = true; | ||
this.reader.pullAlg = undefined; | ||
return this.reader.buffer.push(EOF).then(this.reader.cancelAlg); | ||
return this.reader.buffer.push(EOF).then(() => {this.reader.buffer.close(true);}); | ||
} | ||
@@ -86,14 +83,23 @@ } | ||
if (this.closed && this.buffer.length === 0) { | ||
return {done: true}; | ||
return {done: true, value: undefined}; | ||
} | ||
const chunk = await this.buffer.pull(); | ||
if (chunk === EOF) { | ||
return {done: true}; | ||
} else { | ||
return {done: false, value: chunk}; | ||
try { | ||
let chunk = await this.buffer.pull() | ||
if (chunk === EOF) { | ||
this.closed = true; | ||
return {done: true, value: undefined}; | ||
} else { | ||
return {done: false, value: chunk}; | ||
} | ||
} catch(e) { | ||
if((e as Error)?.message == 'Queue is closed') { | ||
return {done: true, value: undefined}; | ||
} else { | ||
throw e; | ||
} | ||
} | ||
} | ||
async close() { | ||
await this.controller.close(); | ||
this.closed = true; | ||
this.buffer.close(); | ||
@@ -100,0 +106,0 @@ } |
@@ -206,2 +206,66 @@ import {Reader, Writer, Transformer, WriterSink, ReaderSource, ReaderController} from "../src"; | ||
test("reader - pending read will succeed after controller close", async() => { | ||
let pullsRequestedResolve: (_:any) => any = (_) => {}; | ||
let pullsRequestedProm = new Promise((resolve) => { | ||
pullsRequestedResolve = resolve; | ||
}) | ||
let r = new Reader({ | ||
async start(controller) { | ||
// wait for the pull calls. | ||
await pullsRequestedProm; | ||
await controller.enqueue(1); | ||
await controller.close(); | ||
} | ||
}, {bufferSize: 5}); | ||
let pullProm1 = r.read(); | ||
let pullProm2 = r.read(); | ||
let pullProm3 = r.read(); | ||
// notify the pull operations are called | ||
pullsRequestedResolve(undefined); | ||
expect((await pullProm1).value).toBe(1); | ||
expect((await pullProm2).done).toBe(true); | ||
expect((await pullProm3).done).toBe(true); | ||
}); | ||
test("reader - pending read will succeed after reader close", async() => { | ||
let r = new Reader({}, {bufferSize: 5}); | ||
let pullProm1 = r.read(); | ||
let pullProm2 = r.read(); | ||
let pullProm3 = r.read(); | ||
await r.close(); | ||
expect((await pullProm1).done).toBe(true); | ||
expect((await pullProm2).done).toBe(true); | ||
expect((await pullProm3).done).toBe(true); | ||
}); | ||
test("reader - pending controller enqueue will fail after close", async () => { | ||
let enqueueCalledResolve: (_:any) => any = (_) => {}; | ||
let enqueueCalledPromise = new Promise((resolve) => { | ||
enqueueCalledResolve = resolve; | ||
}) | ||
let enqueueProm1 = undefined; | ||
let enqueueProm2 = undefined; | ||
let r = new Reader({ | ||
async start(controller) { | ||
enqueueProm1 = controller.enqueue(1); | ||
enqueueProm2 = controller.enqueue(2); | ||
// notify that enqueue was called | ||
enqueueCalledResolve(undefined); | ||
} | ||
}, {bufferSize: 1}); | ||
// wait for controller.enqueue being called | ||
await enqueueCalledPromise; | ||
// close the reader | ||
await r.close(); | ||
// verify that first async enqueue was successful, | ||
await enqueueProm1; | ||
// verify that second async enqueue failed - was cancelled by close() | ||
await expect(enqueueProm2).rejects.toThrow(); | ||
}); | ||
async function readAllToString(reader: Reader) { | ||
@@ -208,0 +272,0 @@ let result = ""; |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No License Found
License(Experimental) License information could not be found.
Found 1 instance in 1 package
No repository
Supply chain riskPackage does not have a linked source code repository. Without this field, a package will have no reference to the location of the source code use to generate the package.
Found 1 instance in 1 package
14
0
752
76
0
38904