@jakubneubauer/limited-blocking-queue
Advanced tools
Comparing version 0.0.8 to 1.0.0
@@ -5,3 +5,3 @@ { | ||
"types": "dist/index.d.ts", | ||
"version": "0.0.8", | ||
"version": "1.0.0", | ||
"type": "module", | ||
@@ -8,0 +8,0 @@ "repository": "jakubneubauer/js-limited-blocking-queue", |
export class LimitedBlockingQueue { | ||
private closed: boolean; | ||
private readonly limit: number; | ||
private readonly queue: any[]; | ||
private readonly notEmptyResolves: ((_:any)=>any)[]; | ||
private readonly notFullResolves: ((_:any)=>any)[]; | ||
// list of resolve,reject tuples. | ||
private readonly notEmptyResolves: ([(_:any)=>any, (_:any)=>any])[]; | ||
// list of resolve,reject tuples. | ||
private readonly notFullResolves: ([(_:any)=>any, (_:any)=>any])[]; | ||
constructor(size = 1) { | ||
this.closed = false; | ||
this.queue = []; | ||
@@ -18,2 +22,5 @@ this.limit = size; | ||
push(item: any): Promise<any> { | ||
if(this.closed) { | ||
return Promise.reject(new Error("Queue is closed")); | ||
} | ||
if (this.notEmptyResolves.length) { | ||
@@ -23,8 +30,8 @@ // assert(this.queue.length === 0) | ||
// consumer through the resolve of promise he obtained. | ||
this.notEmptyResolves.shift()!(item); | ||
this.notEmptyResolves.shift(); | ||
return Promise.resolve(); | ||
} else { | ||
if (this.queue.length === this.limit) { | ||
return new Promise((resolve) => { | ||
this.notFullResolves.push(resolve); | ||
return new Promise((resolve, reject) => { | ||
this.notFullResolves.push([resolve, reject]); | ||
}).then(() => { | ||
@@ -42,5 +49,8 @@ this.queue.push(item); | ||
pull() { | ||
if(this.closed) { | ||
return Promise.reject(new Error("Queue is closed")); | ||
} | ||
if (this.queue.length === 0) { | ||
return new Promise((resolve) => { | ||
this.notEmptyResolves.push(resolve); | ||
return new Promise((resolve, reject) => { | ||
this.notEmptyResolves.push([resolve, reject]); | ||
}); | ||
@@ -50,3 +60,3 @@ } else { | ||
if (this.notFullResolves.length) { | ||
this.notFullResolves.shift()!(undefined); | ||
this.notFullResolves.shift(); | ||
} | ||
@@ -56,2 +66,9 @@ return Promise.resolve(item); | ||
} | ||
close() { | ||
this.closed = true; | ||
// reject waiting promises - both the waiting pull and push operations | ||
this.notFullResolves.forEach(([_,reject]) => {reject(new Error("Queue is closed"))}); | ||
this.notEmptyResolves.forEach(([_,reject]) => {reject(new Error("Queue is closed"))}); | ||
} | ||
@@ -58,0 +75,0 @@ get length() { |
@@ -20,1 +20,21 @@ import {LimitedBlockingQueue} from "../src/limited-blocking-queue"; | ||
}); | ||
test('close will reject all waiting pull operations', async () => { | ||
let q = new LimitedBlockingQueue(2); | ||
let pullPromise1 = q.pull(); | ||
let pullPromise2 = q.pull(); | ||
q.close(); | ||
await expect(pullPromise1).rejects.toThrow('Queue is closed'); | ||
await expect(pullPromise2).rejects.toThrow('Queue is closed'); | ||
}); | ||
test('close will reject all waiting push operations', async () => { | ||
let q = new LimitedBlockingQueue(1); | ||
await q.push(1); | ||
let pushPromise1 = q.push(2); | ||
let pushPromise2 = q.push(2); | ||
q.close(); | ||
await expect(pushPromise1).rejects.toThrow('Queue is closed'); | ||
await expect(pushPromise2).rejects.toThrow('Queue is closed'); | ||
}); | ||
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
9634
185
1