Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

fileq

Package Overview
Dependencies
Maintainers
1
Versions
14
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

fileq - npm Package Compare versions

Comparing version 1.2.0 to 1.3.1

test/test_002.js

29

index.js

@@ -9,2 +9,3 @@ const

truncate : false,
autocommit : true,
path : os.tmpdir()

@@ -15,3 +16,3 @@ }

constructor(name,options) {
this.queue = new Queue(name,options.path,options.truncate);
this.queue = new Queue(name,options);
}

@@ -24,3 +25,3 @@

options = options || {};
options.truncate = options.truncate || DEF.truncate;
options.truncate = options.truncate===true;
options.path = options.path || DEF.path;

@@ -42,7 +43,27 @@ REGISTRY[`${options.path}/${name}`] = new FileQueue(name,options);

peek(callback,time) {
return this.queue.peek(callback,time);
poll(callback,time) {
return this.queue.poll(callback,time);
}
head(callback,time) {
return this.queue.head(callback,time);
}
peek(callback,time,commit) {
return this.queue.peek(callback,time,commit);
}
get locked() {
return this.queue.locked;
}
lock(callback) {
return this.queue.lock(callback);
}
unlock() {
return this.queue.unlock();
}
}
module.exports = FileQueue;

@@ -33,5 +33,29 @@ const

function remove(db,rw,id) {
rw.read++;
return Promise.all([db.del(id), save(db,rw)]);
}
async function awaitCommit(item,callback) {
try {
await new Promise((ok,rej)=>{
if(callback) callback(null,item,(err)=>{
if(err) rej(err);
else ok()
});
});
return true;
}catch(err) {
return false;
}
}
class Queue {
constructor(name,path,truncate) {
path = path || os.tmpdir();
constructor(name,options) {
let path = options.path || os.tmpdir();
let truncate = options.truncate;
this.mutex = new Semaphore(1);
this.commiter = new Semaphore(1);
this.wm = fs.

@@ -46,2 +70,15 @@ mkdirp(path).

get locked() {
return this.mutex.available <= 0;
}
async lock(callback) {
await this.mutex.take();
if(callback) callback();
}
unlock() {
this.mutex.leave();
}
async push(data,callback) {

@@ -63,3 +100,7 @@ try {

async peek(callback,time) {
async poll(callback,time) {
return this.peek(callback,time,false);
}
async head(callback,time) {
if(typeof(callback)=="number") {

@@ -71,2 +112,25 @@ time = callback;

try {
let item = await new Promise((ok,rej)=>{
this.peek((err,item,done)=>{
if(err) rej(err);
else ok(item);
done(true);
},time,true);
});
if(callback) callback(null,item);
else return item;
}catch(err) {
if(callback) callback(err,null);
else throw err;
}
}
async peek(callback,time,commit) {
if(typeof(callback)=="number") {
time = callback;
callback = null;
}
try {
await this.commiter.take();
let rw = await this.wm;

@@ -77,10 +141,18 @@ let db = this.db;

let item = await db.get(id,{asBuffer:false});
rw.read++;
await Promise.all([
db.del(id),
save(db,rw)
]);
if(callback) callback(null,JSON.parse(item));
else return JSON.parse(item);
item = JSON.parse(item);
if(commit && callback) {
await this.lock();
let docommit = await awaitCommit(item,callback);
if(docommit) await remove(db,rw,id);
this.commiter.leave();
}
else {
await remove(db,rw,id);
this.commiter.leave();
if(callback) callback(null,item);
else return item;
}
}catch(err) {
this.commiter.leave();
if(callback) callback(err);

@@ -87,0 +159,0 @@ else throw err;

2

lib/semaphore.js

@@ -31,3 +31,3 @@ class Semaphore {

leave(err) {
leave() {
this.count++;

@@ -34,0 +34,0 @@ if(this.queue.length) {

{
"name": "fileq",
"version": "1.2.0",
"version": "1.3.1",
"description": "File based FIFO queue",

@@ -16,3 +16,8 @@ "author": "David Gómez Matarrodona <solzimer@gmail.com>",

"queue",
"FIFO"
"FIFO",
"peek",
"poll",
"transactional",
"transaction",
"commit"
],

@@ -19,0 +24,0 @@ "dependencies": {

@@ -8,2 +8,3 @@ # fileq

* Multiple writers and readers on the same queue
* Callback and promise modes
* Can recover previous queue if process is restarted

@@ -27,2 +28,3 @@ * Recover queue position on process restart

// Callback mode
setInterval(()=>{

@@ -38,2 +40,13 @@ queue.push({key:i, message:"This is the entry for "+i});

},100);
// Promise mode
setInterval(async ()=>{
await queue.push({key:i, message:"This is the entry for "+(i++)});
},100);
setInterval(async ()=>{
let item = await queue.peek();
console.log(item);
},100);
```

@@ -55,9 +68,104 @@

### queue.peek(time,callback) => Promise
### queue.peek(time,callback,commit) => Promise
Retrieves a JSON object from the queue, in a FIFO manner. Callback takes the
usual *err* and *result* arguments. If no callback is provided, it returns a
promise. The argument **time** specifies a wait for data timeout. If no data
cold be read before **time**, then callback is called with "timeout" error
(promise is rejected).
could be read before **time**, then callback is called with "timeout" error
(promise is rejected). The *commit* parameter specifies a transactional requirement.
When *commit* is *true*, callback function receives a third argument (function *done*)
that must be called in order to remove the item from the queue.
Commit mode only works when callback function is passed:
```javascript
// Promise mode
// Waits forever for an entry. Entry is returned and removed from queue
let item = await queue.peek();
// Waits 100 ms for an entry. Entry is returned and removed from queue, or timeout error
let item = await queue.peek(10);
// Callback mode
// Waits forever for an entry. Entry is returned and removed from queue
queue.peek((err,item)=>{
console.log(item);
});
// Waits 100 ms for an entry. Entry is returned and removed from queue, or timeout error
queue.peek((err,item)=>{
if(err) console.error(err);
else console.log(entry);
},100);
// Waits forever for an entry. Entry is returned but not remove until done is called
queue.peek((err,item,done)=>{
let error = doSomething(item);
if(error) done(error); // If done is called with arguments, item is not removed
else done(); // done called without arguments remove the item from the queue
},100,true);
```
**Important:** If commit mode is used, no more reads will be done until *done*
has been called (queue will block further reads to avoid inconsistency):
```javascript
queue.peek((err,item,done)=>{
console.log(item);
setTimeout(done,1000); // Queue will be locked 1 sec
},0,true);
// Cannot retrieve next item until previous call ends
queue.peek((err,item)=>{
console.log(item);
});
```
### queue.poll(time,callback) => Promise
The same as **queue.peek** but without the *commit* feature
### queue.head(time,callback) => Promise
Retrieves the head of the queue, without removing the element, as
oposed to **peek** and **poll**
### queue.lock(callback) => Promise
Locks the queue so no other callers can read from it until **queue.unlock**
is called. Note that this is a soft lock (other readers can ignore the lock). The only time where a lock cannot be ignored if is **queue.peek** is called with *commit* feature (It's a different hard lock):
```javascript
// Locks the queue for 100 reads
async function reader1() {
await queue.lock();
for(let i=0;i<100;i++) {
let item = await queue.poll();
}
queue.unlock();
}
// Same as reader1: If reader1 has the lock, reader2 must wait
async function reader2() {
await queue.lock();
for(let i=0;i<10;i++) {
let item = await queue.poll();
}
queue.unlock();
}
// reader3 doesn't ask for lock, so it can read without waiting
async function reader3() {
for(let i=0;i<100;i++) {
let item = await queue.poll();
}
}
// reader4 doesn't ask for lock, but uses commit feature, so nobody
// can read until commit is applied
async function reader4() {
for(let i=0;i<10;i++) {
queue.peek((err,item,done)=>{
setTimeout(done,1000);
});
}
}
```
### queue.unlock()
Unlocks queue reads
### queue.locked => Boolean
Returns *true* if queue has a virtual lock; *false* otherwise.
## Options

@@ -64,0 +172,0 @@ When creating a queue, data are stored in several files in a folder.

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc