Comparing version 0.3.1 to 0.4.0
@@ -8,3 +8,3 @@ const | ||
class FileManager { | ||
static initPath(path,size,callback) { | ||
static initPath(path,truncate,callback) { | ||
callback = callback || voidfn; | ||
@@ -18,4 +18,13 @@ return new Promise((resolve,reject)=>{ | ||
else { | ||
resolve(); | ||
callback(); | ||
if(truncate) { | ||
FileManager.listFiles(path).then(list=>{ | ||
list.forEach(f=>fs.unlink(path+"/"+f,()=>{})); | ||
resolve(); | ||
callback(); | ||
}); | ||
} | ||
else { | ||
resolve(); | ||
callback(); | ||
} | ||
} | ||
@@ -22,0 +31,0 @@ }); |
@@ -14,3 +14,4 @@ const | ||
bsize : 100, | ||
csize : 100 | ||
csize : 100, | ||
truncate : false, | ||
}; | ||
@@ -28,2 +29,4 @@ | ||
options = options || {}; | ||
var truncate = options.truncate===undefined? DEF_CONF.truncate : options.truncate; | ||
this.uid = "queue_"+Math.random(); | ||
@@ -42,3 +45,3 @@ this.path = path || options.path || DEF_CONF.path+"/"+this.uid; | ||
this.ready = FileManager. | ||
initPath(this.path). | ||
initPath(this.path,truncate). | ||
then(()=>FileManager.newFile(this.path)). | ||
@@ -180,2 +183,4 @@ then(fname=>QueueFile.create(fname,this.max,this.bsize)). | ||
from(path,options) { | ||
options = options || DEF_CONF; | ||
if(!path) { | ||
@@ -182,0 +187,0 @@ var queue = new Queue(null,options); |
{ | ||
"name": "fileq", | ||
"version": "0.3.1", | ||
"version": "0.4.0", | ||
"description": "File based FIFO queue", | ||
@@ -5,0 +5,0 @@ "author": "David Gómez Matarrodona <solzimer@gmail.com>", |
@@ -7,2 +7,3 @@ const fs = require("fs"); | ||
const BPAD = 1; // Next byte number | ||
const CODE = {EOL : 0, NEXT : 1, READ : 2} | ||
@@ -44,2 +45,3 @@ class QueueFile { | ||
var buffer = Buffer.allocUnsafe(this.bsize+1); | ||
var bread = Buffer.allocUnsafe(1).fill(CODE.READ); | ||
@@ -51,10 +53,20 @@ // Read block | ||
else { | ||
this.rpos += buffer.length; | ||
// Get content and "next" flag | ||
var sidx = buffer.indexOf(0); | ||
if(sidx<0) sidx = buffer.indexOf(1); | ||
var next = buffer.readUInt8(buffer.length-1); | ||
var code = buffer.readUInt8(buffer.length-1); | ||
// Read block, ignore | ||
if(code==CODE.READ) sidx=0; | ||
var nbuff = Buffer.concat([buffer],sidx); | ||
// Not previously read. Mark as a read block | ||
if(code!=CODE.READ) | ||
fs.write(this.fd,bread,0,1,this.rpos+this.bsize,()=>{}); | ||
// Move read pointer | ||
this.rpos += buffer.length; | ||
// If next, append content with the rest of the read | ||
if(next) { | ||
if(code!=CODE.EOL) { | ||
this._bread((err,res)=>{ | ||
@@ -87,2 +99,7 @@ if(!err) | ||
skip(n,i) { | ||
var bread = Buffer.allocUnsafe(1).fill(CODE.READ); | ||
for(var j=0;j<n;j++) { | ||
fs.write(this.fd,bread,0,1,this.rpos+(j+1)*(this.bsize+BPAD)-1,()=>{}); | ||
} | ||
this.rpos += n*(this.bsize+BPAD); | ||
@@ -170,3 +187,3 @@ this.rcount += i||1; | ||
return new Promise((resolve,reject)=>{ | ||
fs.open(path, "r", (err,fd)=>{ | ||
fs.open(path, "r+", (err,fd)=>{ | ||
if(err) { | ||
@@ -173,0 +190,0 @@ callback(err); |
@@ -9,2 +9,4 @@ # fileq | ||
* Can recover previous queue if process is restarted | ||
* Recover queue position on process restart | ||
* Persitent or truncate modes on process restart | ||
* In-memory direct access when reads are faster then writes | ||
@@ -70,2 +72,5 @@ * Customizable memory cache size | ||
the needs of our process: | ||
* **truncate** : If *true*, previous queue status is reset, and a new empty | ||
queue is created. If *false*, a previously created queue is recovered. By | ||
default is set to *false*. | ||
* **max** : Maximum number of objects that will be stored in each file of the | ||
@@ -72,0 +77,0 @@ queue. By default, *max* is 100. |
@@ -8,3 +8,3 @@ const | ||
var queue = Queue.from("./db"); | ||
var queue = null; | ||
var i = 0; | ||
@@ -35,5 +35,7 @@ | ||
.option('-R, --read [ms]', 'Read millisecons interval',"parseInt") | ||
.option('-T, --truncate', 'Truncate queue') | ||
.parse(process.argv); | ||
queue = Queue.from("./db",{truncate:program.truncate}); | ||
if(program.write>0) setTimeout(write,program.write || IWRITE); | ||
if(program.read>0) setTimeout(read,program.read || IREAD); |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
17056
446
85