promise-stream-queue
Advanced tools
Comparing version 0.0.3 to 0.1.0
147
main.js
const | ||
EventEmitter = require('events').EventEmitter; | ||
Util = require('util'); | ||
Util = require('util'), | ||
Item = require('./item.js'); | ||
const voidfn = function(){}; | ||
function Item(id,pr,timeout) { | ||
this.id = id; | ||
this.ts = Date.now(); | ||
this.timeout = timeout||0; | ||
this.pr = pr; | ||
var self = this; | ||
var onsuccess = []; | ||
var onerror = []; | ||
var oncatch = []; | ||
var status = "pending"; | ||
var result = null; | ||
var to = null; | ||
function checkNotify() { | ||
if(status=="resolve") thenhdl(result); | ||
else if(status=="reject") errhdl(result); | ||
else if(status=="catch") catchhdl(result); | ||
else if(status=="killed") killhdl(result); | ||
} | ||
function set(st,res) { | ||
status = st; | ||
result = res; | ||
if(to) clearTimeout(to); | ||
to = null; | ||
} | ||
this.kill = function(msg) { | ||
killhdl(msg||"Promise killed"); | ||
} | ||
this.then = function(cbsuccess,cberror) { | ||
if(cbsuccess) onsuccess.unshift(cbsuccess); | ||
if(cberror) onerror.unshift(cberror); | ||
checkNotify(); | ||
return this; | ||
} | ||
this.catch = function(cbcatch) { | ||
if(cbcatch) oncatch.unshift(cbcatch); | ||
checkNotify(); | ||
return this; | ||
} | ||
function thenhdl(data) { | ||
if(status=="killed") return; | ||
set("resolve",data); | ||
while(onsuccess.length) { | ||
onsuccess.pop()(data); | ||
} | ||
} | ||
function errhdl(error) { | ||
if(status=="killed") return; | ||
set("reject",error); | ||
while(onerror.length) { | ||
onerror.pop()(error); | ||
} | ||
} | ||
function catchhdl(error) { | ||
if(status=="killed") return; | ||
set("catch",error); | ||
while(oncatch.length) { | ||
oncatch.pop()(error); | ||
} | ||
} | ||
function killhdl(error) { | ||
set("killed",error); | ||
while(onerror.length) { | ||
onerror.pop()(error); | ||
} | ||
} | ||
this.pr.then(thenhdl,errhdl).catch(catchhdl); | ||
if(this.timeout>0) { | ||
to = setTimeout(()=>{ | ||
self.kill("Promise timeout"); | ||
},this.timeout); | ||
}; | ||
} | ||
function Stream(timeout) { | ||
@@ -105,3 +20,22 @@ EventEmitter.call(this); | ||
pending = false; | ||
process.nextTick(fetchNext); | ||
var res = event=="resolve"? data : null; | ||
var err = event!="resolve"? data : null; | ||
var ex = event=="catch"? data : null; | ||
var defs = []; | ||
callbacks.forEach(cb=>{ | ||
if(!cb.sync) cb.fn(err,res,ex); | ||
else { | ||
defs.push(new Promise((resolve,reject)=>{ | ||
cb.fn(err,res,ex,()=>{ | ||
resolve(); | ||
}); | ||
})); | ||
} | ||
}); | ||
Promise.all(defs).then(()=>{ | ||
process.nextTick(fetchNext); | ||
}); | ||
} | ||
@@ -118,9 +52,6 @@ | ||
notify("resolve",data); | ||
callbacks.forEach(cb=>cb(null,data,null)); | ||
},err=>{ | ||
notify("reject",err); | ||
callbacks.forEach(cb=>cb(err,null,null)); | ||
}).catch(err=>{ | ||
notify("catch",err); | ||
callbacks.forEach(cb=>cb(err,null,err)); | ||
}); | ||
@@ -152,4 +83,8 @@ } | ||
this.forEach = function(callback) { | ||
callbacks.push(callback); | ||
callbacks.push({fn:callback,sync:false}); | ||
} | ||
this.forEachSync = function(callback) { | ||
callbacks.push({fn:callback,sync:true}); | ||
} | ||
} | ||
@@ -159,11 +94,31 @@ Util.inherits(Stream, EventEmitter); | ||
if(!module.parent) { | ||
const readline = require('readline'); | ||
var stream = new Stream(5000); | ||
var nums = []; | ||
stream.forEach((err,data,errcatch)=>{ | ||
if(errcatch) console.log("RESULT => Catch\t",err); | ||
else if(err) console.log("RESULT => Reject\t",err); | ||
else console.log("RESULT => Resolve\t",data); | ||
const rl = readline.createInterface({ | ||
input: process.stdin, | ||
output: process.stdout | ||
}); | ||
if(process.argv[2]=="sync") { | ||
stream.forEachSync((err,data,errcatch,next)=>{ | ||
if(errcatch) console.log("RESULT => Catch\t",err); | ||
else if(err) console.log("RESULT => Reject\t",err); | ||
else console.log("RESULT => Resolve\t",data); | ||
rl.question('Next? <yes> / no : ', (answer) => { | ||
if(!answer||answer=="next") next(); | ||
else process.exit(0); | ||
}); | ||
}); | ||
} | ||
else { | ||
stream.forEach((err,data,errcatch)=>{ | ||
if(errcatch) console.log("RESULT => Catch\t",err); | ||
else if(err) console.log("RESULT => Reject\t",err); | ||
else console.log("RESULT => Resolve\t",data); | ||
}); | ||
} | ||
for(var i=0;i<100;i++) nums.push(i); | ||
@@ -170,0 +125,0 @@ |
{ | ||
"name": "promise-stream-queue", | ||
"version": "0.0.3", | ||
"version": "0.1.0", | ||
"description": "Promise Stream. Queue promises and retrieve the resolved/rejected ones in the inserted order", | ||
@@ -5,0 +5,0 @@ "author": "David Gómez Matarrodona <solzimer@gmail.com>", |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
7103
6
190