Comparing version 1.3.1 to 1.3.2
@@ -14,5 +14,5 @@ const | ||
]); | ||
return {read : rw[0], write : rw[1], mutex : new Semaphore(rw[1]-rw[0])} | ||
return {read : rw[0], write : rw[1], mutex : new Semaphore(rw[1]-rw[0]), commit: new Semaphore(1)} | ||
}catch(err) { | ||
let rw = {read : 0, write : 0, mutex : new Semaphore(0)}; | ||
let rw = {read : 0, write : 0, mutex : new Semaphore(0), commit: new Semaphore(1)}; | ||
await Promise.all([ | ||
@@ -26,15 +26,33 @@ db.put("$read",`${rw.read}`), | ||
async function save(db,rw) { | ||
await Promise.all([ | ||
function commit(db,rw) { | ||
return Promise.all([ | ||
db.put("$read",`${rw.read}`), | ||
db.put("$write",`${rw.write}`) | ||
]); | ||
rw.mutex.capacity(rw.write-rw.read); | ||
} | ||
function remove(db,rw,id) { | ||
rw.read++; | ||
return Promise.all([db.del(id), save(db,rw)]); | ||
async function save(db,rw) { | ||
try { | ||
await rw.commit.take(); | ||
rw.write++; | ||
await commit(db,rw); | ||
rw.commit.leave(); | ||
}catch(err) { | ||
rw.commit.leave(); | ||
throw err; | ||
} | ||
} | ||
async function remove(db,rw,id) { | ||
try { | ||
await rw.commit.take(); | ||
rw.read++; | ||
await Promise.all([db.del(id), commit(db,rw)]); | ||
rw.commit.leave(); | ||
}catch(err) { | ||
rw.commit.leave(); | ||
throw err; | ||
} | ||
} | ||
async function awaitCommit(item,callback) { | ||
@@ -59,4 +77,6 @@ try { | ||
this.pusher = new Semaphore(1); | ||
this.mutex = new Semaphore(1); | ||
this.commiter = new Semaphore(1); | ||
this.path = `${path}/${name}`; | ||
@@ -66,5 +86,5 @@ this.wm = fs. | ||
then(()=>{ | ||
if(truncate) return fs.remove(`${path}/${name}`); | ||
if(truncate) return fs.remove(this.path); | ||
}). | ||
then(()=>this.db=levelup(leveldown(`${path}/${name}`))). | ||
then(()=>this.db=levelup(leveldown(this.path))). | ||
then(()=>init(this.db)); | ||
@@ -88,11 +108,13 @@ } | ||
try { | ||
await this.pusher.take(); | ||
let rw = await this.wm; | ||
let db = this.db; | ||
let id = rw.write++; | ||
await Promise.all([ | ||
db.put(id,JSON.stringify(data)), | ||
save(db,rw) | ||
]); | ||
let id = rw.write; | ||
await db.put(id,JSON.stringify(data)), | ||
await save(db,rw) | ||
rw.mutex.capacity(rw.write-rw.read); | ||
this.pusher.leave(); | ||
if(callback) callback(); | ||
}catch(err) { | ||
this.pusher.leave(); | ||
if(callback) callback(err); | ||
@@ -139,3 +161,3 @@ else throw err; | ||
let db = this.db; | ||
let id = rw.read; | ||
var id = rw.read; | ||
await rw.mutex.take(time); | ||
@@ -142,0 +164,0 @@ let item = await db.get(id,{asBuffer:false}); |
{ | ||
"name": "fileq", | ||
"version": "1.3.1", | ||
"version": "1.3.2", | ||
"description": "File based FIFO queue", | ||
@@ -5,0 +5,0 @@ "author": "David Gómez Matarrodona <solzimer@gmail.com>", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
19268
466