New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@jakubneubauer/limited-blocking-queue

Package Overview
Dependencies
Maintainers
1
Versions
14
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@jakubneubauer/limited-blocking-queue - npm Package Compare versions

Comparing version 0.0.8 to 1.0.0

2

package.json

@@ -5,3 +5,3 @@ {

"types": "dist/index.d.ts",
"version": "0.0.8",
"version": "1.0.0",
"type": "module",

@@ -8,0 +8,0 @@ "repository": "jakubneubauer/js-limited-blocking-queue",

export class LimitedBlockingQueue {
private closed: boolean;
private readonly limit: number;
private readonly queue: any[];
private readonly notEmptyResolves: ((_:any)=>any)[];
private readonly notFullResolves: ((_:any)=>any)[];
// list of resolve,reject tuples.
private readonly notEmptyResolves: ([(_:any)=>any, (_:any)=>any])[];
// list of resolve,reject tuples.
private readonly notFullResolves: ([(_:any)=>any, (_:any)=>any])[];
constructor(size = 1) {
this.closed = false;
this.queue = [];

@@ -18,2 +22,5 @@ this.limit = size;

push(item: any): Promise<any> {
if(this.closed) {
return Promise.reject(new Error("Queue is closed"));
}
if (this.notEmptyResolves.length) {

@@ -23,8 +30,8 @@ // assert(this.queue.length === 0)

// consumer through the resolve of promise he obtained.
this.notEmptyResolves.shift()!(item);
this.notEmptyResolves.shift()![0](item);
return Promise.resolve();
} else {
if (this.queue.length === this.limit) {
return new Promise((resolve) => {
this.notFullResolves.push(resolve);
return new Promise((resolve, reject) => {
this.notFullResolves.push([resolve, reject]);
}).then(() => {

@@ -42,5 +49,8 @@ this.queue.push(item);

pull() {
if(this.closed) {
return Promise.reject(new Error("Queue is closed"));
}
if (this.queue.length === 0) {
return new Promise((resolve) => {
this.notEmptyResolves.push(resolve);
return new Promise((resolve, reject) => {
this.notEmptyResolves.push([resolve, reject]);
});

@@ -50,3 +60,3 @@ } else {

if (this.notFullResolves.length) {
this.notFullResolves.shift()!(undefined);
this.notFullResolves.shift()![0](undefined);
}

@@ -56,2 +66,9 @@ return Promise.resolve(item);

}
close() {
this.closed = true;
// reject waiting promises - both the waiting pull and push operations
this.notFullResolves.forEach(([_,reject]) => {reject(new Error("Queue is closed"))});
this.notEmptyResolves.forEach(([_,reject]) => {reject(new Error("Queue is closed"))});
}

@@ -58,0 +75,0 @@ get length() {

@@ -20,1 +20,21 @@ import {LimitedBlockingQueue} from "../src/limited-blocking-queue";

});
test('close will reject all waiting pull operations', async () => {
let q = new LimitedBlockingQueue(2);
let pullPromise1 = q.pull();
let pullPromise2 = q.pull();
q.close();
await expect(pullPromise1).rejects.toThrow('Queue is closed');
await expect(pullPromise2).rejects.toThrow('Queue is closed');
});
test('close will reject all waiting push operations', async () => {
let q = new LimitedBlockingQueue(1);
await q.push(1);
let pushPromise1 = q.push(2);
let pushPromise2 = q.push(2);
q.close();
await expect(pushPromise1).rejects.toThrow('Queue is closed');
await expect(pushPromise2).rejects.toThrow('Queue is closed');
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc