Comparing version 1.2.0 to 1.3.1
29
index.js
@@ -9,2 +9,3 @@ const | ||
truncate : false, | ||
autocommit : true, | ||
path : os.tmpdir() | ||
@@ -15,3 +16,3 @@ } | ||
constructor(name,options) { | ||
this.queue = new Queue(name,options.path,options.truncate); | ||
this.queue = new Queue(name,options); | ||
} | ||
@@ -24,3 +25,3 @@ | ||
options = options || {}; | ||
options.truncate = options.truncate || DEF.truncate; | ||
options.truncate = options.truncate===true; | ||
options.path = options.path || DEF.path; | ||
@@ -42,7 +43,27 @@ REGISTRY[`${options.path}/${name}`] = new FileQueue(name,options); | ||
peek(callback,time) { | ||
return this.queue.peek(callback,time); | ||
poll(callback,time) { | ||
return this.queue.poll(callback,time); | ||
} | ||
head(callback,time) { | ||
return this.queue.head(callback,time); | ||
} | ||
peek(callback,time,commit) { | ||
return this.queue.peek(callback,time,commit); | ||
} | ||
get locked() { | ||
return this.queue.locked; | ||
} | ||
lock(callback) { | ||
return this.queue.lock(callback); | ||
} | ||
unlock() { | ||
return this.queue.unlock(); | ||
} | ||
} | ||
module.exports = FileQueue; |
@@ -33,5 +33,29 @@ const | ||
function remove(db,rw,id) { | ||
rw.read++; | ||
return Promise.all([db.del(id), save(db,rw)]); | ||
} | ||
async function awaitCommit(item,callback) { | ||
try { | ||
await new Promise((ok,rej)=>{ | ||
if(callback) callback(null,item,(err)=>{ | ||
if(err) rej(err); | ||
else ok() | ||
}); | ||
}); | ||
return true; | ||
}catch(err) { | ||
return false; | ||
} | ||
} | ||
class Queue { | ||
constructor(name,path,truncate) { | ||
path = path || os.tmpdir(); | ||
constructor(name,options) { | ||
let path = options.path || os.tmpdir(); | ||
let truncate = options.truncate; | ||
this.mutex = new Semaphore(1); | ||
this.commiter = new Semaphore(1); | ||
this.wm = fs. | ||
@@ -46,2 +70,15 @@ mkdirp(path). | ||
get locked() { | ||
return this.mutex.available <= 0; | ||
} | ||
async lock(callback) { | ||
await this.mutex.take(); | ||
if(callback) callback(); | ||
} | ||
unlock() { | ||
this.mutex.leave(); | ||
} | ||
async push(data,callback) { | ||
@@ -63,3 +100,7 @@ try { | ||
async peek(callback,time) { | ||
async poll(callback,time) { | ||
return this.peek(callback,time,false); | ||
} | ||
async head(callback,time) { | ||
if(typeof(callback)=="number") { | ||
@@ -71,2 +112,25 @@ time = callback; | ||
try { | ||
let item = await new Promise((ok,rej)=>{ | ||
this.peek((err,item,done)=>{ | ||
if(err) rej(err); | ||
else ok(item); | ||
done(true); | ||
},time,true); | ||
}); | ||
if(callback) callback(null,item); | ||
else return item; | ||
}catch(err) { | ||
if(callback) callback(err,null); | ||
else throw err; | ||
} | ||
} | ||
async peek(callback,time,commit) { | ||
if(typeof(callback)=="number") { | ||
time = callback; | ||
callback = null; | ||
} | ||
try { | ||
await this.commiter.take(); | ||
let rw = await this.wm; | ||
@@ -77,10 +141,18 @@ let db = this.db; | ||
let item = await db.get(id,{asBuffer:false}); | ||
rw.read++; | ||
await Promise.all([ | ||
db.del(id), | ||
save(db,rw) | ||
]); | ||
if(callback) callback(null,JSON.parse(item)); | ||
else return JSON.parse(item); | ||
item = JSON.parse(item); | ||
if(commit && callback) { | ||
await this.lock(); | ||
let docommit = await awaitCommit(item,callback); | ||
if(docommit) await remove(db,rw,id); | ||
this.commiter.leave(); | ||
} | ||
else { | ||
await remove(db,rw,id); | ||
this.commiter.leave(); | ||
if(callback) callback(null,item); | ||
else return item; | ||
} | ||
}catch(err) { | ||
this.commiter.leave(); | ||
if(callback) callback(err); | ||
@@ -87,0 +159,0 @@ else throw err; |
@@ -31,3 +31,3 @@ class Semaphore { | ||
leave(err) { | ||
leave() { | ||
this.count++; | ||
@@ -34,0 +34,0 @@ if(this.queue.length) { |
{ | ||
"name": "fileq", | ||
"version": "1.2.0", | ||
"version": "1.3.1", | ||
"description": "File based FIFO queue", | ||
@@ -16,3 +16,8 @@ "author": "David Gómez Matarrodona <solzimer@gmail.com>", | ||
"queue", | ||
"FIFO" | ||
"FIFO", | ||
"peek", | ||
"poll", | ||
"transactional", | ||
"transaction", | ||
"commit" | ||
], | ||
@@ -19,0 +24,0 @@ "dependencies": { |
114
README.md
@@ -8,2 +8,3 @@ # fileq | ||
* Multiple writers and readers on the same queue | ||
* Callback and promise modes | ||
* Can recover previous queue if process is restarted | ||
@@ -27,2 +28,3 @@ * Recover queue position on process restart | ||
// Callback mode | ||
setInterval(()=>{ | ||
@@ -38,2 +40,13 @@ queue.push({key:i, message:"This is the entry for "+i}); | ||
},100); | ||
// Promise mode | ||
setInterval(async ()=>{ | ||
await queue.push({key:i, message:"This is the entry for "+(i++)}); | ||
},100); | ||
setInterval(async ()=>{ | ||
let item = await queue.peek(); | ||
console.log(item); | ||
},100); | ||
``` | ||
@@ -55,9 +68,104 @@ | ||
### queue.peek(time,callback) => Promise | ||
### queue.peek(time,callback,commit) => Promise | ||
Retrieves a JSON object from the queue, in a FIFO manner. Callback takes the | ||
usual *err* and *result* arguments. If no callback is provided, it returns a | ||
promise. The argument **time** specifies a wait for data timeout. If no data | ||
cold be read before **time**, then callback is called with "timeout" error | ||
(promise is rejected). | ||
could be read before **time**, then callback is called with "timeout" error | ||
(promise is rejected). The *commit* parameter specifies a transactional requirement. | ||
When *commit* is *true*, callback function receives a third argument (function *done*) | ||
that must be called in order to remove the item from the queue. | ||
Commit mode only works when callback function is passed: | ||
```javascript | ||
// Promise mode | ||
// Waits forever for an entry. Entry is returned and removed from queue | ||
let item = await queue.peek(); | ||
// Waits 100 ms for an entry. Entry is returned and removed from queue, or timeout error | ||
let item = await queue.peek(10); | ||
// Callback mode | ||
// Waits forever for an entry. Entry is returned and removed from queue | ||
queue.peek((err,item)=>{ | ||
console.log(item); | ||
}); | ||
// Waits 100 ms for an entry. Entry is returned and removed from queue, or timeout error | ||
queue.peek((err,item)=>{ | ||
if(err) console.error(err); | ||
else console.log(entry); | ||
},100); | ||
// Waits forever for an entry. Entry is returned but not remove until done is called | ||
queue.peek((err,item,done)=>{ | ||
let error = doSomething(item); | ||
if(error) done(error); // If done is called with arguments, item is not removed | ||
else done(); // done called without arguments remove the item from the queue | ||
},100,true); | ||
``` | ||
**Important:** If commit mode is used, no more reads will be done until *done* | ||
has been called (queue will block further reads to avoid inconsistency): | ||
```javascript | ||
queue.peek((err,item,done)=>{ | ||
console.log(item); | ||
setTimeout(done,1000); // Queue will be locked 1 sec | ||
},0,true); | ||
// Cannot retrieve next item until previous call ends | ||
queue.peek((err,item)=>{ | ||
console.log(item); | ||
}); | ||
``` | ||
### queue.poll(time,callback) => Promise | ||
The same as **queue.peek** but without the *commit* feature | ||
### queue.head(time,callback) => Promise | ||
Retrieves the head of the queue, without removing the element, as | ||
oposed to **peek** and **poll** | ||
### queue.lock(callback) => Promise | ||
Locks the queue so no other callers can read from it until **queue.unlock** | ||
is called. Note that this is a soft lock (other readers can ignore the lock). The only time where a lock cannot be ignored if is **queue.peek** is called with *commit* feature (It's a different hard lock): | ||
```javascript | ||
// Locks the queue for 100 reads | ||
async function reader1() { | ||
await queue.lock(); | ||
for(let i=0;i<100;i++) { | ||
let item = await queue.poll(); | ||
} | ||
queue.unlock(); | ||
} | ||
// Same as reader1: If reader1 has the lock, reader2 must wait | ||
async function reader2() { | ||
await queue.lock(); | ||
for(let i=0;i<10;i++) { | ||
let item = await queue.poll(); | ||
} | ||
queue.unlock(); | ||
} | ||
// reader3 doesn't ask for lock, so it can read without waiting | ||
async function reader3() { | ||
for(let i=0;i<100;i++) { | ||
let item = await queue.poll(); | ||
} | ||
} | ||
// reader4 doesn't ask for lock, but uses commit feature, so nobody | ||
// can read until commit is applied | ||
async function reader4() { | ||
for(let i=0;i<10;i++) { | ||
queue.peek((err,item,done)=>{ | ||
setTimeout(done,1000); | ||
}); | ||
} | ||
} | ||
``` | ||
### queue.unlock() | ||
Unlocks queue reads | ||
### queue.locked => Boolean | ||
Returns *true* if queue has a virtual lock; *false* otherwise. | ||
## Options | ||
@@ -64,0 +172,0 @@ When creating a queue, data are stored in several files in a folder. |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
18799
10
445
177