deferred-queue
Node.js project
Series control flow library
Version: 0.5.0
This module brings to you a very lighweight control flow mechanism that it's meant to be the glue between the user calls and the asynchronous nature of your module. It provides a fluent interface, so if your module has an asynchronous api which tends to create the callback pyramid of doom, a deferred queue may help you. It can also be used as a standalone module.
For example, suppose you have an api like the following one:
var r = new Reader ("file");
r.read (10, function (error, bytesRead, buffer){
if (error) return console.error (error);
fn1 (bytesRead, buffer);
r.read (20, function (error, bytesRead, buffer){
if (error) return console.error (error);
fn2 (bytesRead, buffer);
r.close (function (error){
if (error) return console.error (error);
fn3 ();
});
});
});
The above example has two problems: the callback nesting and the error handling. With a deferred queue the example can be rewritten as follows:
var r = new Reader ("file");
r.on ("error", function (error){
console.error (error);
});
r.on ("close", fn3);
r.read (10, fn1);
r.read (20, fn2);
r.close ();
Look at the reader example for further details.
Projects using this library:
- binary-reader: A real project based on the previous example.
- seraphim: Configuration management made easy.
Installation
npm install deferred-queue
Documentation
Functions
Objects
async vs deferred-queue
async.waterfall()
is the function with more similarities with a deferred queue.
async.waterfall ([
function (cb){
cb (null, 1, 2);
},
function (n1, n2, cb){
cb ();
}
], function (error){
if (error) return console.error (error);
});
dq ()
.on ("error", function (error){
console.error (error);
})
.push (function (cb){
cb (null, 1, 2);
}, function (error, n1, n2){
})
.push (function (){});
Both are very similar but there are 3 big differences:
-
async
's error handling has a major flaw. When something fails you don't know from which task comes the error, so you cannot apply rollback or fallback strategies.
This library separates a task from its result. If you look at the task 1 you can see that push() receives a second function as a parameter, it's the result of the task and is executed between two tasks: the current and the next. If the current task fails, the error is passed to this function and then the error
event is emitted (if preventDefault() is not called).
-
async.waterfall()
forwards the result values to the next task. That's ok until you need to use these values from any other task. Javascript has closures, let's use them. There's no need to pass the values to the next task, simply store them in a closure (the second parameter of push()) and let the user decide where to save them.
var myValue;
dq ()
.on ("error", function (error){
console.error (error);
})
.push (function (cb){
cb (null, 1);
}, function (error, n){
if (error) return;
myValue = n;
})
.push (function (){})
.push (function (){
});
-
async
internally uses process.nextTick()
to call the next task. In the other hand, deferred-queue
doesn't make any assumption, you decide how to enqueue the tasks; synchronously, asynchronously or both.
dq ()
.on ("error", function (error){
console.error (error);
})
.push (function (){})
.push (function (cb){
process.nextTick (cb);
})
.push (function (cb){
cb ();
});
module() : DeferredQueue
Returns a new DeferredQueue
instance.
var dq = require ("deferred-queue");
var q = dq ();
DeferredQueue
Methods
DeferredQueue#pause() : undefined
Pauses the queue execution. Look at the async-function-between-tasks.js example for further details.
q.push (function (){
}, function (){
this.pause ();
});
DeferredQueue#pending() : Number
Returns the number of pending tasks in the queue.
DeferredQueue#preventDefault() : undefined
Prevents the propagation of the error, that is, the error
event is not emitted. It must be used from inside the callback parameter of the push() and unshift() functions.
q.push (function (){
}, function (error){
if (error){
this.preventDefault ();
}
});
DeferredQueue#push(task[, result]) : DeferredQueue
Adds a task to the end of the queue and tries to execute it. If there are pending tasks, it simply waits until all the previous tasks have been executed.
The task is the function that you want to execute. The result is a callback that is executed when the task finishes.
The tasks can be synchronous or asynchronous.
Synchronous
You can only return one value. If you want to return an error, throw it, it will be catched. Both the error and the value are passed to the result callback, if any.
q.push (function (){
return 1;
}, function (error, value){
});
q.push (function (){
throw 1;
}, function (error, value){
});
Asynchronous
If you want to execute an asynchronous task you must call the cb
parameter when you are ready to continue. As usual, the error is the first parameter.
q.push (function (cb){
process.nextTick (function (){
cb (null, 1, 2);
});
}, function (error, v1, v2){
});
q.push (function (cb){
cb (1);
}, function (error, v1, v2){
});
Note: Being synchronous or asynchronous depends exclusively on the user, but if you use the cb
parameter of the task a different internal strategy is used. In other words, you can execute a synchronous task using the cb
parameter. This is useful when you need to return more than one value.
There are subtle differences when the tasks are synchronous or asynchronous:
q.push (A);
q.push (function (){
q.push (C);
q.push (D);
});
q.push (B);
If A, B, C, D are asynchronous: A → B → C → D. Asynchronous example.
If A, B, C, D are synchronous: A → C → D → B. Synchronous example.
When an error occurs, it is passed to the callback and then, it is emitted. If you don't want to emit the error, call to preventDefault():
q
.on ("error", function (error){
})
.push (function (){
throw new Error ();
}, function (error){
if (error) this.preventDefault ();
});
I want to execute an asynchronous function inside the result callback
You can. The last parameter of the result callback, cb
, is another callback that you must call in order to continue executing the remaining tasks.
q.push (function (){
return 1;
}, function (error, v, cb){
process.nextTick (function (){
cb (new Error ());
});
});
DeferredQueue#resume() : undefined
Resumes the queue execution. Look at the async-function-between-tasks.js example for further details.
DeferredQueue#unshift(task[, result]) : DeferredQueue
Adds a task to the beginning of the queue. It has the same functionality as the push() function.