Comparing version 0.5.0 to 0.5.1
@@ -0,1 +1,8 @@ | ||
# 0.5.1 | ||
Added `workers.fulfill` to ease using generators with callbacks. | ||
Added documentation for logging. | ||
# 0.5.0 | ||
@@ -5,4 +12,5 @@ | ||
# 0.4.0 | ||
Use native Promise instead of Q. |
@@ -42,2 +42,3 @@ var $__getProtoParent = function(superClass) { | ||
var Scheduler = require('./scheduler'); | ||
var runner = require('./runner'); | ||
function returnPromiseOrCallback(promise, callback) { | ||
@@ -76,2 +77,7 @@ if (callback) promise.then(function() { | ||
}, | ||
fulfill: function() { | ||
var $__6; | ||
for (var args = [], $__1 = 0; $__1 < arguments.length; $__1++) args[$__1] = arguments[$__1]; | ||
return ($__6 = runner).fulfill.apply($__6, $__toObject(args)); | ||
}, | ||
once: function(callback) { | ||
@@ -90,11 +96,11 @@ var promise = this._scheduler.once().then((function() { | ||
debug: function(message) { | ||
for (var args = [], $__1 = 1; $__1 < arguments.length; $__1++) args[$__1 - 1] = arguments[$__1]; | ||
for (var args = [], $__2 = 1; $__2 < arguments.length; $__2++) args[$__2 - 1] = arguments[$__2]; | ||
this.emit('debug', format.apply(null, $__spread([message], args))); | ||
}, | ||
info: function(message) { | ||
for (var args = [], $__2 = 1; $__2 < arguments.length; $__2++) args[$__2 - 1] = arguments[$__2]; | ||
for (var args = [], $__3 = 1; $__3 < arguments.length; $__3++) args[$__3 - 1] = arguments[$__3]; | ||
this.emit('info', format.apply(null, $__spread([message], args))); | ||
}, | ||
error: function(messageOrError) { | ||
for (var args = [], $__3 = 1; $__3 < arguments.length; $__3++) args[$__3 - 1] = arguments[$__3]; | ||
for (var args = [], $__4 = 1; $__4 < arguments.length; $__4++) args[$__4 - 1] = arguments[$__4]; | ||
if (messageOrError instanceof Error) this.emit('error', messageOrError); else { | ||
@@ -101,0 +107,0 @@ try { |
@@ -31,3 +31,3 @@ var $__Object = Object, $__getOwnPropertyNames = $__Object.getOwnPropertyNames, $__getOwnPropertyDescriptor = $__Object.getOwnPropertyDescriptor, $__getDescriptors = function(object) { | ||
var ms = require('ms'); | ||
var runJob = require('./runner'); | ||
var runJob = require('./runner').runJob; | ||
var RESERVE_TIMEOUT = ms('30s'); | ||
@@ -34,0 +34,0 @@ var RESERVE_BACKOFF = ms('30s'); |
@@ -0,5 +1,6 @@ | ||
var assert = require('assert'); | ||
var createDomain = require('domain').createDomain; | ||
module.exports = function runJob($__0) { | ||
var id = $__0.id, notify = $__0.notify, timeout = $__0.timeout, fn = $__0.fn; | ||
notify.debug("Processing job %s", id); | ||
function runJob($__1) { | ||
var id = $__1.id, notify = $__1.notify, timeout = $__1.timeout, fn = $__1.fn; | ||
notify.notify("Processing job %s", id); | ||
var domain = createDomain(); | ||
@@ -34,3 +35,3 @@ var errorOnTimeout; | ||
return promise; | ||
}; | ||
} | ||
function resolveWithGenerator(generator, resolve, reject) { | ||
@@ -47,9 +48,17 @@ function nextFromYield(valueToYield) { | ||
throw undefined; | ||
} catch ($__1) { | ||
} catch ($__2) { | ||
{ | ||
$__1 = generator.next(valueToYield); | ||
value = $__1.value; | ||
done = $__1.done; | ||
$__2 = generator.next(valueToYield); | ||
value = $__2.value; | ||
done = $__2.done; | ||
} | ||
if (done) resolve(); else nextValue(value); | ||
if (done) { | ||
resolve(); | ||
} else if (value && typeof (value.then) == 'function') { | ||
value.then(nextFromYield, (function(error) { | ||
return generator.throw (error); | ||
})); | ||
} else { | ||
generator.throw (new Error("Expected yield promise, received " + value)); | ||
} | ||
} | ||
@@ -63,42 +72,40 @@ } | ||
nextFromYield(); | ||
function nextValue(value) { | ||
if (value && value.then && value) { | ||
value.then((function(resolvedValue) { | ||
return nextFromYield(resolvedValue); | ||
}), (function(error) { | ||
return generator.throw (error); | ||
})); | ||
} else if (value === undefined) { | ||
} | ||
function fulfill() { | ||
for (var args = [], $__0 = 0; $__0 < arguments.length; $__0++) args[$__0] = arguments[$__0]; | ||
var fn; | ||
if (args.length > 1) { | ||
try { | ||
throw undefined; | ||
} catch (method) { | ||
try { | ||
throw undefined; | ||
} catch (object) { | ||
try { | ||
throw undefined; | ||
} catch (done) { | ||
try { | ||
throw undefined; | ||
} catch (value) { | ||
try { | ||
throw undefined; | ||
} catch ($__1) { | ||
try { | ||
throw undefined; | ||
} catch (callback) { | ||
callback = function(error) { | ||
if (error) generator.throw (error); else nextValue(value, done); | ||
}; | ||
{ | ||
$__1 = generator.next(callback); | ||
value = $__1.value; | ||
done = $__1.done; | ||
} | ||
} | ||
} | ||
} catch ($__2) { | ||
{ | ||
$__2 = args; | ||
object = $__2[0]; | ||
method = $__2[1]; | ||
} | ||
if (typeof (method) == 'function') fn = method.bind(object); else fn = object[method].bind(object); | ||
} | ||
} catch (error) { | ||
reject(error); | ||
} | ||
} else { | ||
nextFromYield(); | ||
} | ||
} | ||
} else fn = args[0]; | ||
assert(typeof (fn) == 'function', "Must call callback with a function"); | ||
var promise = new Promise(function(resolve, reject) { | ||
function callback(error, value) { | ||
if (error) reject(error); else resolve(value); | ||
} | ||
setImmediate(function() { | ||
fn(callback); | ||
}); | ||
}); | ||
return promise; | ||
} | ||
module.exports = { | ||
runJob: runJob, | ||
fulfill: fulfill | ||
}; |
@@ -18,3 +18,3 @@ var $__Object = Object, $__getOwnPropertyNames = $__Object.getOwnPropertyNames, $__getOwnPropertyDescriptor = $__Object.getOwnPropertyDescriptor, $__getDescriptors = function(object) { | ||
var CronJob = require('cron'); | ||
var runJob = require('./runner'); | ||
var runJob = require('./runner').runJob; | ||
var DEVELOPMENT_CRON_TIME = '*/5 * * * * *'; | ||
@@ -21,0 +21,0 @@ module.exports = (function() { |
{ | ||
"name": "ironium", | ||
"version": "0.5.0", | ||
"version": "0.5.1", | ||
"scripts": { | ||
@@ -5,0 +5,0 @@ "test": "./node_modules/.bin/mocha", |
139
README.md
@@ -128,21 +128,6 @@ # [Ironium](https://github.com/assaf/ironium) | ||
Alternatively, the function can return a promise, in which case the job is | ||
discarded when the promise resolved, or returned to the queue if the promise is | ||
rejected. | ||
Alternatively, the function can return a promise or a generator. We discuss | ||
promises and generators later on. | ||
For example: | ||
``` | ||
workers.queue('delayed-echo').each(job) { | ||
var defered = new Promise(); | ||
setTimeout(function() { | ||
console.log('Echo', job.message); | ||
promise.resolve(); | ||
}, 5000); | ||
return promise; | ||
}); | ||
``` | ||
You must use either callback or promise to indicate completion, and do so within | ||
@@ -255,3 +240,17 @@ 10 minutes. Jobs that don't complete within that time frame are considered to | ||
#### fulfill(fn) | ||
Calls the function with a callback that fulfills a promise, returns that | ||
promise. | ||
Use this with yield expressions to wrap a function that takes a callback, and | ||
yield its promise. For example: | ||
``` | ||
var contents = yield workers.fulfill(function(callback) { | ||
File.readFile(filename, callback); | ||
}); | ||
``` | ||
#### reset(callback) | ||
@@ -274,2 +273,108 @@ | ||
## Using Promises | ||
If you prefer, your jobs can return promises instead of using callbacks. The | ||
job is considered complete when the promise resolves, and failed if the promise | ||
gets rejected. In the case of queues, the failed job will return to the queue | ||
and processed again. | ||
For example: | ||
``` | ||
workers.queue('delayed-echo').each(function(job) { | ||
var promise = new Promise(function(resolve, reject) { | ||
console.log('Echo', job.message); | ||
resolve(); | ||
}); | ||
return promise; | ||
}); | ||
``` | ||
## Using Generators | ||
Jobs that have multiple steps can also use generators in combination with | ||
promises and callback. At each step the job can yield with a promise, and act | ||
on the value of that promise. For example: | ||
``` | ||
workers.queue('update-name').each(function(job) { | ||
var customer = yield Customer.findById(job.customerID); | ||
// At this point customer is set | ||
customer.set('firstName', job.firstName); | ||
customer.set('lastName', job.lastName); | ||
assert(customer.isModified()); | ||
yield customer.save(); | ||
// Customer has been saved in the database | ||
assert(!customer.isModified()); | ||
}); | ||
``` | ||
If you need to use callbacks, you can call `workers.fulfill()` to get a promise, | ||
and run code with a callback that fulfills that promise. | ||
For example, Mongoose finders return promises, but the save method doesn't, so | ||
you'll need to write your code like this: | ||
``` | ||
workers.queue('update-name').each(function* updateName(job) { | ||
// You must call exec() to turn query into a promise | ||
var customer = yield Customer.findById(job.customerID).exec(); | ||
// At this point customer is set | ||
customer.set('firstName', job.firstName); | ||
customer.set('lastName', job.lastName); | ||
assert(customer.isModified()); | ||
// customer.save needs a callback, yields needs a promise | ||
yield workers.fulfill(function(callback) { | ||
customer.save(callback); | ||
}); | ||
// Customer has been saved in the database | ||
assert(!customer.isModified()); | ||
}); | ||
``` | ||
Another example: | ||
``` | ||
workers.queue('echo-file').each(function* writeFile(job) { | ||
var contents = yield workers.fulfill(function(callback) { | ||
File.readFile(job.filename, callback); | ||
}); | ||
console.log(contents); | ||
}); | ||
``` | ||
## Logging | ||
Don't work blind! There are three events you can listen to: | ||
`error` - Emitted on any error (processing job, connection, etc) | ||
`info` - Logs job getting processed and successful completion | ||
`debug` - Way more information, useful for troubleshooting | ||
For example: | ||
``` | ||
if (process.env.DEBUG) | ||
workers.on('debug', function(message) { | ||
console.log(message); | ||
}); | ||
workers.on('info', function(message) { | ||
console.log(message); | ||
}); | ||
workers.on('error', function(error) { | ||
console.error(error.stack); | ||
errorService.notify(error); | ||
}); | ||
``` | ||
## Configurations | ||
@@ -276,0 +381,0 @@ |
@@ -6,2 +6,3 @@ const assert = require('assert'); | ||
const Scheduler = require('./scheduler'); | ||
const runner = require('./runner'); | ||
@@ -63,2 +64,8 @@ | ||
// Calls the function with a callback that fulfills a promise, returns that | ||
// promise. | ||
fulfill(...args) { | ||
return runner.fulfill(...args); | ||
} | ||
// Used in testing: run all scheduled jobs once (immediately), run all queued | ||
@@ -65,0 +72,0 @@ // jobs, finally call callback. If called with no arguments, returns a promise. |
@@ -5,3 +5,3 @@ const _ = require('lodash'); | ||
const ms = require('ms'); | ||
const runJob = require('./runner'); | ||
const { runJob } = require('./runner'); | ||
@@ -8,0 +8,0 @@ |
@@ -0,1 +1,2 @@ | ||
const assert = require('assert'); | ||
const { createDomain } = require('domain'); | ||
@@ -8,4 +9,4 @@ | ||
// fn - The function to execute | ||
module.exports = function runJob({ id, notify, timeout, fn }) { | ||
notify.debug("Processing job %s", id); | ||
function runJob({ id, notify, timeout, fn }) { | ||
notify.notify("Processing job %s", id); | ||
// Ideally we call the function, function calls the callback, all is well. | ||
@@ -76,7 +77,13 @@ // But the handler may throw an exception, or suffer some other | ||
try { | ||
let { value, done } = generator.next(valueToYield); | ||
if (done) | ||
if (done) { | ||
resolve(); | ||
else | ||
nextValue(value); | ||
} else if (value && typeof(value.then) == 'function') { | ||
// It's a promise! Resolve it and pass result back to generator | ||
value.then(nextFromYield, (error)=> generator.throw(error)); | ||
} else { | ||
generator.throw(new Error("Expected yield promise, received " + value)); | ||
} | ||
} catch (error) { | ||
@@ -90,42 +97,39 @@ reject(error); | ||
} | ||
// Handle the result of a yield, which can be one of: | ||
// - We're done with this generator, resolve this job. | ||
// - It's a promise! resolve it and pass the result back to the | ||
// generator. | ||
// - Empty yield is our cue to provide a callback. | ||
// - Ignore any other value | ||
function nextValue(value) { | ||
if (value && value.then && value) { | ||
// It's a promise! Resolve it and pass result back to generator | ||
value.then((resolvedValue)=> nextFromYield(resolvedValue), | ||
(error)=> generator.throw(error)); | ||
} else if (value === undefined) { | ||
// The generator does something like: | ||
// var callback = yield; | ||
// setTimeout(callback, 1000); | ||
// | ||
// This is the callback function that we pass through yield, | ||
// and we do so by calling next, which executes the code that | ||
// includes setTimeout. We're going to wait on the callback to | ||
// complete before doing anything with the value provided by | ||
// the generator. | ||
try { | ||
function callback(error) { | ||
if (error) | ||
generator.throw(error); | ||
else | ||
nextValue(value, done); | ||
} | ||
let { value, done } = generator.next(callback); | ||
} catch (error) { | ||
function fulfill(...args) { | ||
let fn; | ||
if (args.length > 1) { | ||
let [object, method] = args; | ||
if (typeof(method) == 'function') | ||
fn = method.bind(object); | ||
else | ||
fn = object[method].bind(object); | ||
} else | ||
fn = args[0]; | ||
assert(typeof(fn) == 'function', "Must call callback with a function"); | ||
let promise = new Promise(function(resolve, reject) { | ||
function callback(error, value) { | ||
if (error) | ||
reject(error); | ||
} | ||
} else { | ||
// yield noop essentially, useful for waiting for callbacks. | ||
nextFromYield(); | ||
else | ||
resolve(value); | ||
} | ||
} | ||
// Make sure to call function *after* we returned the promise. | ||
setImmediate(function() { | ||
fn(callback); | ||
}) | ||
}); | ||
return promise; | ||
} | ||
module.exports = { | ||
runJob, | ||
fulfill | ||
} | ||
// Essentially cron for scheduling tasks in Node. | ||
const _ = require('lodash'); | ||
const assert = require('assert'); | ||
const CronJob = require('cron'); | ||
const runJob = require('./runner'); | ||
const _ = require('lodash'); | ||
const assert = require('assert'); | ||
const CronJob = require('cron'); | ||
const { runJob } = require('./runner'); | ||
@@ -8,0 +8,0 @@ |
65790
1460
455