Comparing version 0.1.8 to 0.2.0
module.exports = function(grunt) { | ||
grunt.loadNpmTasks('grunt-contrib-clean'); | ||
grunt.loadNpmTasks('grunt-contrib-watch'); | ||
grunt.loadNpmTasks('grunt-notify'); | ||
grunt.loadNpmTasks('grunt-traceur'); | ||
@@ -22,8 +24,24 @@ | ||
grunt.config('watch', { | ||
files: [ 'src/**/*.js' ], | ||
tasks: [ 'build' ], | ||
options: { interrupt: true } | ||
}); | ||
grunt.config('clean', [ 'lib' ]); | ||
grunt.config.set('notify.notify_hooks', { | ||
options: { enabled: true } | ||
}); | ||
grunt.registerTask('build', [ 'clean', 'traceur' ]); | ||
grunt.registerTask('default', [ 'build' ]); | ||
grunt.config('notify.build', { | ||
options: { message: "Build complete!" } | ||
}); | ||
grunt.registerTask('build', "Compile source files from src/ into lib/ directory", | ||
[ 'clean', 'traceur', 'notify:build' ]); | ||
grunt.registerTask('default', "Continously compile source files (build and watch)", | ||
[ 'build', 'watch' ]); | ||
} |
@@ -56,9 +56,4 @@ var $__getProtoParent = function(superClass) { | ||
if (!queues) { | ||
try { | ||
throw undefined; | ||
} catch (config) { | ||
config = this._config || {}; | ||
queues = new Queues(this, config); | ||
this._queues = queues; | ||
} | ||
queues = new Queues(this); | ||
this._queues = queues; | ||
} | ||
@@ -65,0 +60,0 @@ return queues; |
@@ -39,14 +39,56 @@ var $__Object = Object, $__getOwnPropertyNames = $__Object.getOwnPropertyNames, $__getOwnPropertyDescriptor = $__Object.getOwnPropertyDescriptor, $__getDescriptors = function(object) { | ||
var ERROR_BACKOFF = ms('30s'); | ||
var Configuration = function() { | ||
'use strict'; | ||
var $Configuration = ($__createClassNoExtends)({ | ||
constructor: function(workers) { | ||
this._workers = workers; | ||
}, | ||
get config() { | ||
var config = this._config; | ||
if (!config) { | ||
try { | ||
throw undefined; | ||
} catch (source) { | ||
source = this._workers._config && this._workers._config.queues; | ||
if (!source) source = (process.env.NODE_ENV == 'test') ? {prefix: 'test-'}: {}; | ||
this._config = config = { | ||
hostname: source.hostname || 'localhost', | ||
port: source.port || 11300, | ||
prefix: source.prefix | ||
}; | ||
if (source.token) { | ||
config.authenticate = 'oauth ' + source.token + ' ' + source.projectID; | ||
config.webhookURL = 'https://' + source.hostname + '/1/projects/' + source.projectID + '/queues/{queueName}/messages/webhook?oauth=' + source.token; | ||
} else { | ||
config.authenticate = null; | ||
config.webhookURL = 'https://<host>/1/projects/<project>/queues/{queueName}/messages/webhook?oauth=<token>'; | ||
} | ||
} | ||
} | ||
return config; | ||
}, | ||
get hostname() { | ||
return this.config.hostname; | ||
}, | ||
get port() { | ||
return this.config.port; | ||
}, | ||
prefixedName: function(queueName) { | ||
return (this.config.prefix || '') + queueName; | ||
}, | ||
get authenticate() { | ||
return this.config.authenticate; | ||
}, | ||
webhookURL: function(queueName) { | ||
return this.config.webhookURL.replace('{queueName}', queueName); | ||
} | ||
}, {}); | ||
return $Configuration; | ||
}(); | ||
module.exports = (function() { | ||
'use strict'; | ||
var Queues = ($__createClassNoExtends)({ | ||
constructor: function() { | ||
var logger = arguments[0] !== (void 0) ? arguments[0]: console; | ||
var $__3 = arguments[1], host = "host"in $__3 ? $__3.host: 'localhost', port = "port"in $__3 ? $__3.port: 11300, projectID = $__3.projectID, token = $__3.token, prefix = $__3.prefix; | ||
this._logger = logger; | ||
if (token) this._authenticate = 'oauth ' + token + ' ' + projectID; | ||
this._host = host; | ||
this._port = port; | ||
this._prefix = prefix || ''; | ||
this._webhookURL = 'https://' + host + '/1/projects/' + projectID + '/queues/{queueName}/messages/webhook?oauth=' + token; | ||
var Server = ($__createClassNoExtends)({ | ||
constructor: function(workers) { | ||
this.notify = workers; | ||
this.config = new Configuration(workers); | ||
this._queues = Object.create({}); | ||
@@ -58,11 +100,3 @@ }, | ||
if (!queue) { | ||
queue = new Queue({ | ||
name: name, | ||
prefixedName: this._prefix + name, | ||
host: this._host, | ||
port: this._port, | ||
authenticate: this._authenticate, | ||
webhookURL: this._webhookURL.replace('{queueName}', name), | ||
logger: this._logger | ||
}); | ||
queue = new Queue(name, this); | ||
this._queues[name] = queue; | ||
@@ -73,3 +107,3 @@ } | ||
start: function() { | ||
this._logger.debug("Start all queues"); | ||
this.notify.debug("Start all queues"); | ||
this._foreachQueue((function(queue) { | ||
@@ -80,3 +114,3 @@ return queue.start(); | ||
stop: function(callback) { | ||
this._logger.debug("Stop all queues"); | ||
this.notify.debug("Stop all queues"); | ||
this._foreachQueue((function(queue) { | ||
@@ -87,3 +121,3 @@ return queue.stop(); | ||
reset: function(callback) { | ||
this._logger.debug("Clear all queues"); | ||
this.notify.debug("Clear all queues"); | ||
this._foreachQueue((function(queue, done) { | ||
@@ -94,3 +128,3 @@ return queue.reset(done); | ||
once: function(callback) { | ||
this._logger.debug("Process all queued jobs"); | ||
this.notify.debug("Process all queued jobs"); | ||
var queues = _.values(this._queues); | ||
@@ -113,3 +147,3 @@ function iterate() { | ||
}, {}); | ||
return Queues; | ||
return Server; | ||
}()); | ||
@@ -119,11 +153,8 @@ var Session = function() { | ||
var $Session = ($__createClassNoExtends)({ | ||
constructor: function($__3, setup) { | ||
var name = $__3.name, host = $__3.host, port = $__3.port, authenticate = $__3.authenticate, logger = $__3.logger; | ||
constructor: function(name, server, setup) { | ||
this.name = name; | ||
this.host = host; | ||
this.port = port; | ||
this.authenticate = authenticate; | ||
this.config = server.config; | ||
this.notify = server.notify; | ||
this.setup = setup; | ||
this.pending = []; | ||
this._logger = logger; | ||
}, | ||
@@ -146,3 +177,3 @@ request: function(command) { | ||
}), TIMEOUT_REQUEST); | ||
this._withClient(function(client) { | ||
this.withClient(function(client) { | ||
var $__4; | ||
@@ -154,6 +185,6 @@ ($__4 = client[command]).call.apply($__4, $__spread([client], args, [oncomplete])); | ||
var message = error.toString(); | ||
if (message != "Error: Connection closed") this._logger.error("Client error in queue %s: %s", this.name, message); | ||
if (this._deferred) { | ||
this._deferred.reject(error); | ||
this._deferred = null; | ||
if (message != "Error: Connection closed") this.notify.error("Client error in queue %s: %s", this.name, message); | ||
if (this.deferred) { | ||
this.deferred.reject(error); | ||
this.deferred = null; | ||
} | ||
@@ -167,5 +198,5 @@ var pending = this.pending.slice(); | ||
}, | ||
_withClient: function(fn) { | ||
if (!this._deferred) this._connect(); | ||
this._deferred.promise.then(fn, (function(error) { | ||
withClient: function(fn) { | ||
if (!this.deferred) this.connect(); | ||
this.deferred.promise.then(fn, (function(error) { | ||
this.fail(error); | ||
@@ -175,8 +206,8 @@ this.use(fn); | ||
}, | ||
_connect: function() { | ||
var client = new fivebeans.client(this.host, this.port); | ||
connect: function() { | ||
var client = new fivebeans.client(this.config.hostname, this.config.port); | ||
var deferred = Q.defer(); | ||
var authenticateAndSetup = (function() { | ||
if (this.authenticate) { | ||
client.put(0, 0, 0, this.authenticate, function(error) { | ||
if (this.config.authenticate) { | ||
client.put(0, 0, 0, this.config.authenticate, function(error) { | ||
if (error) { | ||
@@ -205,3 +236,3 @@ deferred.reject(error); | ||
client.connect(); | ||
this._deferred = deferred; | ||
this.deferred = deferred; | ||
} | ||
@@ -214,31 +245,34 @@ }, {}); | ||
var $Queue = ($__createClassNoExtends)({ | ||
constructor: function($__3) { | ||
var name = $__3.name, prefixedName = $__3.prefixedName, host = $__3.host, port = $__3.port, authenticate = $__3.authenticate, webhookURL = $__3.webhookURL, logger = $__3.logger; | ||
constructor: function(name, server) { | ||
this.name = name; | ||
this.webhookURL = webhookURL; | ||
this._logger = logger; | ||
this.notify = server.notify; | ||
this.webhookURL = server.config.webhookURL(name); | ||
this._server = server; | ||
this._prefixedName = server.config.prefixedName(name); | ||
this._processing = false; | ||
this._handler = null; | ||
this._putSession = new Session({ | ||
name: name, | ||
host: host, | ||
port: port, | ||
authenticate: authenticate, | ||
logger: logger | ||
}, function(client, callback) { | ||
client.use(prefixedName, callback); | ||
}); | ||
this._getSession = new Session({ | ||
name: name, | ||
host: host, | ||
port: port, | ||
authenticate: authenticate, | ||
logger: logger | ||
}, function(client, callback) { | ||
client.ignore('default', function() { | ||
client.watch(prefixedName, callback); | ||
}); | ||
}); | ||
}, | ||
put: function(job, options, callback) { | ||
get _put() { | ||
var session = this._putSession; | ||
if (!session) { | ||
session = new Session(this.name, this._server, (function(client, callback) { | ||
client.use(this._prefixedName, callback); | ||
}).bind(this)); | ||
this._putSession = session; | ||
} | ||
return session; | ||
}, | ||
get _reserve() { | ||
var session = this._reserveSession; | ||
if (!session) { | ||
session = new Session(this.name, this._server, (function(client, callback) { | ||
client.ignore('default', (function() { | ||
client.watch(this._prefixedName, callback); | ||
}).bind(this)); | ||
}).bind(this)); | ||
this._reserveSession = session; | ||
} | ||
return session; | ||
}, | ||
push: function(job, options, callback) { | ||
assert(job, "Missing job to queue"); | ||
@@ -251,4 +285,4 @@ if (typeof (options) == 'function') { | ||
var delay = (options && options.delay) || 0; | ||
this._putSession.request('put', 0, delay, PROCESSING_TIMEOUT / 1000, payload, (function(error, jobID) { | ||
if (callback) callback(error); else if (error) this._logger.error("Error talking to Beanstalkd, queue %s: %s", this.name, error); | ||
this._put.request('put', 0, delay, PROCESSING_TIMEOUT / 1000, payload, (function(error, jobID) { | ||
if (callback) callback(error); else if (error) this.notify.error("Error talking to Beanstalkd, queue %s: %s", this.name, error); | ||
}).bind(this)); | ||
@@ -277,4 +311,4 @@ }, | ||
} | ||
this._logger.debug("Waiting for jobs on queue %s", this.name); | ||
this._getSession.request('reserve_with_timeout', 0, (function(error, jobID, payload) { | ||
this.notify.debug("Waiting for jobs on queue %s", this.name); | ||
this._reserve.request('reserve_with_timeout', 0, (function(error, jobID, payload) { | ||
if (error == 'DEADLINE_SOON' || error == 'TIMED_OUT' || (error && error.message == 'TIMED_OUT')) callback(null, false); else if (error) callback(error); else if (payload) this._processJob(jobID, payload, function(error) { | ||
@@ -288,9 +322,9 @@ callback(error, !error); | ||
if (!this._processing) return; | ||
this._getSession.request('reserve_with_timeout', RESERVE_TIMEOUT / 1000, (function(error, jobID, payload) { | ||
this._reserve.request('reserve_with_timeout', RESERVE_TIMEOUT / 1000, (function(error, jobID, payload) { | ||
if (error == 'DEADLINE_SOON' || error == 'TIMED_OUT' || (error && error.message == 'TIMED_OUT')) setImmediate(pickNextJob); else if (error) { | ||
this._logger.error(error); | ||
this.notify.error(error); | ||
setTimeout(pickNextJob, ERROR_BACKOFF); | ||
} else { | ||
this._processJob(jobID, payload, (function(error) { | ||
if (error) this._logger.error(error); | ||
if (error) this.notify.error(error); | ||
setImmediate(pickNextJob); | ||
@@ -301,3 +335,3 @@ }).bind(this)); | ||
}).bind(this); | ||
this._logger.debug("Waiting for jobs on queue %s", this.name); | ||
this.notify.debug("Waiting for jobs on queue %s", this.name); | ||
pickNextJob(); | ||
@@ -316,6 +350,6 @@ }, | ||
outcomeDeferred.promise.then((function() { | ||
this._getSession.request('destroy', jobID, (function(error) { | ||
if (error) this._logger.error(error.stack); | ||
this._reserve.request('destroy', jobID, (function(error) { | ||
if (error) this.notify.error(error.stack); | ||
}).bind(this)); | ||
this._logger.info("Completed job %s from queue %s", jobID, this.name); | ||
this.notify.info("Completed job %s from queue %s", jobID, this.name); | ||
clearTimeout(errorOnTimeout); | ||
@@ -325,6 +359,6 @@ callback(); | ||
var delay = (process.env.NODE_ENV == 'test' ? 0: RELEASE_DELAY); | ||
this._getSession.request('release', jobID, 0, delay / 1000, (function(error) { | ||
if (error) this._logger.error(error.stack); | ||
this._reserve.request('release', jobID, 0, delay / 1000, (function(error) { | ||
if (error) this.notify.error(error.stack); | ||
}).bind(this)); | ||
this._logger.error("Error processing job %s from queue %s: %s", jobID, this.name, error.stack); | ||
this.notify.error("Error processing job %s from queue %s: %s", jobID, this.name, error.stack); | ||
clearTimeout(errorOnTimeout); | ||
@@ -334,4 +368,4 @@ callback(error); | ||
domain.run((function() { | ||
this._logger.info("Picked up job %s from queue %s", jobID, this.name); | ||
this._logger.debug("Processing %s: %s", jobID, payload.toString()); | ||
this.notify.info("Picked up job %s from queue %s", jobID, this.name); | ||
this.notify.debug("Processing %s: %s", jobID, payload.toString()); | ||
var job = payload; | ||
@@ -347,3 +381,3 @@ try { | ||
reset: function(callback) { | ||
this._putSession._withClient(function(client) { | ||
this._put.withClient(function(client) { | ||
function deleteNextJob() { | ||
@@ -350,0 +384,0 @@ client.reserve_with_timeout(0, function(error, jobID, payload) { |
{ | ||
"name": "ironium", | ||
"version": "0.1.8", | ||
"version": "0.2.0", | ||
"scripts": { | ||
"test": "./node_modules/.bin/mocha", | ||
"build": "grunt build", | ||
"prepublish": "grunt build" | ||
@@ -27,3 +26,5 @@ }, | ||
"grunt-traceur": "aaronfrost/grunt-traceur", | ||
"grunt-contrib-clean": "~0.5.0" | ||
"grunt-contrib-clean": "~0.5.0", | ||
"grunt-contrib-watch": "~0.5.3", | ||
"grunt-notify": "~0.2.16" | ||
}, | ||
@@ -30,0 +31,0 @@ "repository": { |
238
README.md
@@ -1,131 +0,223 @@ | ||
# Job Queues | ||
**[Ironium](https://github.com/assaf/ironium)** A simple API for working with | ||
job queues and scheduled jobs, using | ||
[Beanstalkd](http://kr.github.io/beanstalkd/) and/or | ||
[Iron.io](http://www.iron.io/). | ||
A simple API for working with job queues. Can use | ||
[Iron.io](http://www.iron.io/) or Beanstalkd. | ||
## The Why | ||
## Accessing Queues | ||
Building a modern Web application involves a lot of workload that runs outside | ||
the application request/response cycle. | ||
The API consists of a single function that returns the named queue: | ||
Some tasks take a long time to complete (e.g. updating 3rd party APIs), and you | ||
want to run after you've sent the response back to the user. But since you send | ||
back a response, you better get the task to complete, by retrying if necessary. | ||
This is where job queues become useful: | ||
- They can distribute the workload over multiple servers, and in time | ||
- They can smooth out transient errors by retrying every joy | ||
- They can be used for jobs queued by the application and 3rd party | ||
([Webhooks](http://www.webhooks.org/)) | ||
Since high quality job queues exists, might as well use them. One option is | ||
[Beanstalkd](http://kr.github.io/beanstalkd/) which is easy to install and use, | ||
and battle tested. | ||
Another option is [Iron.io](http://www.iron.io/), a paid service that supports | ||
the same API and can be used interchangeable with Beanstalkd. This allows you | ||
to use Beanstalkd for development/testing, and Iron.io for production instances. | ||
Besides managed up-time and a management GUI, Iron.io can also handle Webhooks | ||
(sending and receiving updates) for you. | ||
### The How | ||
Ironium has a simple API that exposes three primary methods: | ||
- `push` a job into a queue | ||
- `each` to process each job from a queue | ||
- `schedule` a job to run at given schedule | ||
There's more than that, so let's explore it. The main object provides access to | ||
all the workers, and in particular has the following methods: | ||
#### queue(name) | ||
Returns the named queue. Calling this method with the same name will always | ||
return the same queue. Queues are created on-the-fly, you don't need to setup | ||
anything before accessing a queue. | ||
You can immediately push new jobs into the queue. To process all jobs, you need | ||
to first start the workers. This distinction allows you to push jobs from any | ||
Node.js servers, but only process jobs from specific nodes. | ||
For example, your code may have: | ||
``` | ||
var queues = require("./lib/queues"); | ||
var myQueue = queues("my-queue"); | ||
``` | ||
var workers = require('ironium'); | ||
var sendWelcomeEmail = workers.queue('send-welcome-email'); | ||
Pushing and processing messages is done via the returned queue object (see | ||
below). | ||
// If this is a new customer, queue sending welcome email. | ||
customer.on('save', function(next) { | ||
if (this.isNew) | ||
sendWelcomeEmail.push(this.id, next); | ||
else | ||
next(); | ||
}); | ||
An additional method is available in test environment only, to discard of all | ||
queues before/after running test: | ||
sendWelcomeEmail.each(function(id, callback) { | ||
// Do something to render and send email | ||
callback(); | ||
}); | ||
``` | ||
before(queues.clearAll); | ||
``` | ||
As you can see from this example, each queue has two interesting methods, `push` | ||
and `each`. | ||
## Using Queues | ||
#### queue.push(job, callback) | ||
### put(job, options, callback) | ||
Pushes a new job into the queue. The job can be any JavaScript value that | ||
serializes into JSON, so object, arrays, string are all usable. | ||
Use the `put` method to put a job in the queue. | ||
If you call `push` with a callback, the callback will be notified after the job | ||
has been successfully added to the queue. Use this to make sure the job is | ||
queued before proceeding. | ||
- job - The object to place on the queue. This object is serialized into | ||
a JSON string, so must only contain data values, no circular references. | ||
- options - Control how the job is processed (see below) | ||
- callback - Optional | ||
#### queue.each(fn) | ||
Currently supported options: | ||
Processes each job from the queue. The function is called with two arguments, | ||
the job (see `push` above) and a callback. You must call that callback when | ||
done processing the job. | ||
- delay - How long before the job is available, in seconds, defaults to 0 | ||
If you call the callback with an error, the job is considered to have failed, is | ||
put back in the queue, and will be picked up again after a 1 minute delay. | ||
If you're queuing requests initiated by the end-user, you likely care whether | ||
the job was queued successfully. Use a callback to determine that. | ||
If you don't call the callback within a minute, the job will be considered to | ||
have failed, and will be put back in the queue as well. | ||
If one minute of processing doesn't sound like enough, consider breaking large | ||
jobs into smaller chunks. Also consider that time-outs are a necessary evil, | ||
given the likelihood of a bug resulting in jobs that never complete, and the | ||
halting problem being NP hard. | ||
### get(handler, concurrency) | ||
#### queue.name | ||
Use the `get` method to process jobs from the queue. | ||
The queue name (property, not a method). This name does not include the prefix. | ||
- handler - The function that will be called for each job | ||
#### queue.webhookURL | ||
The handler is called for each job with the following arguments: | ||
The URL for recieving Webhook requests and queuing them, using Iron.io. This | ||
URL is only valid if your configured the workers with a project ID and token. | ||
- job - The job to process | ||
- done - Called when done processing the job | ||
The job to process is typically an object de-serialized from the JSON | ||
representation (see `put` method). However, it may also be a string, e.g. | ||
FullContact pushes URL-encoded name/value pairs to the queue. | ||
#### schedule(name, time, job) | ||
Once the job has been processed successfully, the handler must call then `done` | ||
method. | ||
TBD Schedules the named job to run at the given schedule. | ||
If the job cannot be processed successfully, the handler calls `done` with an | ||
error: the error will be logged, and the job will be put back in the queue, from | ||
where it will be processed again after a short delay. | ||
If the handler does not complete in time (60 seconds), the job will be returned | ||
to the queue and the next job will be processed. | ||
#### configure(object) | ||
Handlers are invoked sequentially, except for the case where a handler is | ||
considered to have timed-out. | ||
Configure the workers (see below). | ||
### name | ||
#### start() | ||
This property returns the queue name. | ||
You must call this method to start the workers. Jobs can be queued, but will | ||
not be processed until the workers are started. | ||
This extra space allows you to load the same code in every environment, and | ||
queue jobs on any server, but only process jobs on dedicated worker servers. | ||
### webhookURL | ||
This property returns the URL of a Webhook end-point associated with this queue. | ||
Messages posted to this URL will show up as jobs in the queue. | ||
#### stop() | ||
You can call this method to stop the workers. | ||
### onDrain(callback) | ||
Called when there are no more messages to process. This is only availabe in | ||
test environment. | ||
#### once(callback) | ||
Use this when testing. It will run all scheduled jobs exactly once (regardless | ||
of schedule), process all queued jobs, and finally call the callback. | ||
### clear(callback) | ||
This method exists since you cannot reliably pair `start` and `stop`. | ||
Remove all jobs from the queue. This is only availabe in test environment. | ||
#### reset(callback) | ||
## Iron.io | ||
Use this when testing (setup and/or teardown). It will delete all queued jobs, | ||
then call the callback. | ||
To use [Iron.io](http://hud.iron.io/), the queue configuration must specify the | ||
following properties: | ||
* `host` - The Iron.io host name | ||
* `projectID` - The project identifier | ||
* `token` - The authentication token | ||
These can be specified in the configuration file, or by setting the environment | ||
variables `IRON_PROJECT_ID` and `IRONIO_TOKEN`. | ||
## Configurations | ||
Please use the testing project ID when testing the Iron.io integration. | ||
For development and testing you can typically get by with the default | ||
configuration. For production, you may want to set the server in use, as simple | ||
as passing a configuration object to `workers.configure`: | ||
``` | ||
var workers = require('ironium'); | ||
## Testing/Development | ||
if (process.env.NODE_ENV == 'production') | ||
workers.configure({ | ||
queues: { | ||
hostname: 'my.beanstalkd.server' | ||
} | ||
}); | ||
``` | ||
For regular testing and development, we use Beanstalkd. The queue configuration | ||
need only specify: | ||
Or load it form a JSON configuration file: | ||
* `host` - The hostname (defaults to "localhost") | ||
* `port` - The port number (defaults to 11300) | ||
``` | ||
var workers = require('ironium'); | ||
var config = require('./workers.json'); | ||
When testing, the `queues` function has two properties you can call on it that | ||
affect all queues. | ||
if (process.env.NODE_ENV == 'production') | ||
workers.configure(config); | ||
``` | ||
### onDrainAll(callback) | ||
The configuration options are: | ||
Called when there are no more messages to process on any queue. | ||
* `queues.hostname` - Hostname of the queue server (defaults to `localhost`) | ||
* `queues.port` - Port number for the queue server (defaults to 11300) | ||
* `queues.prefix` - Prefix all queue names (when `NODE_ENV == test`, | ||
defaults to `test-`) | ||
* `queues.token` - When using Iron.io, the API token (get it from the | ||
project's credentials page) | ||
* `queues.projectID` - When using Iron.io, the API project ID (get it from the | ||
project's credentials page) | ||
### clearAll(callback) | ||
If you're running in development or test environment with a local Beanstalkd | ||
server, you can use the default configuration, which points to `localhost` port | ||
`11300` and uses the prefix `test-` in test envrionment. | ||
Remove all jobs from all queues. | ||
If you're running in production against a Beanstalkd, you will likely need to | ||
set the hostname and port number. | ||
If you're running in production against an [Iron.io](https://hud.iron.io/) | ||
server, you will need to set the hostname to `"mq-aws-us-east-1.iron.io"`, and | ||
set the `token` and `projectID` based on the Iron.io project's credentials. | ||
## Development | ||
Ironium is written in ECMAScript 6, because better syntax. Specifically you'll | ||
notice that `let` and `const` replaced all usage of `var`, class definitions are | ||
easier to read in the new syntax, and fat arrows (`=>`) replace `that = this`. | ||
However, the code doesn't use any ES6 library improvements (`Map`, `startsWith`, | ||
etc), since these can't be added without polluting the global namespace. | ||
The source files live in the `src` directory, and compiled into the `lib` | ||
directory with Grunt. Specifically: | ||
``` | ||
grunt build # Compile source files from src/ into lib/ directory | ||
grunt watch # Continously compile source files on every change | ||
grunt clean # Clean compiled files in lib/ directory | ||
grunt # Shortcut for grunt build watch | ||
``` | ||
The test suite is non-existent at the moment, but if it were to exist, you would | ||
run it with `npm test` or `mocha`. | ||
@@ -16,4 +16,4 @@ const assert = require('assert'); | ||
// Returns the named queue. Returned objects has the methods `put` and | ||
// `process`. | ||
// Returns the named queue. Returned objects has the methods `push` and | ||
// `each`. | ||
queue(name) { | ||
@@ -26,4 +26,3 @@ return this.queues.getQueue(name); | ||
if (!queues) { | ||
let config = this._config || {}; | ||
queues = new Queues(this, config); | ||
queues = new Queues(this); | ||
this._queues = queues; | ||
@@ -30,0 +29,0 @@ } |
@@ -10,6 +10,6 @@ const _ = require('lodash'); | ||
// How long to wait when reserving a job (in seconds). | ||
// How long to wait when reserving a job. | ||
const RESERVE_TIMEOUT = ms('5m'); | ||
// How long before we consider a request failed due to timeout (in seconds). | ||
// How long before we consider a request failed due to timeout. | ||
// Should be longer than RESERVE_TIMEOUT. | ||
@@ -19,3 +19,3 @@ const TIMEOUT_REQUEST = RESERVE_TIMEOUT + ms('1s'); | ||
// Timeout for processing job before we consider it failed and release it back | ||
// to the queue (in seconds). | ||
// to the queue. | ||
const PROCESSING_TIMEOUT = ms('2m'); | ||
@@ -33,31 +33,63 @@ | ||
class Configuration { | ||
constructor(workers) { | ||
this._workers = workers; | ||
} | ||
// Abstracts an Iron.io project / Beanstalkd configuration. | ||
// | ||
// Configured with: | ||
// host - Host name (defaults to localhost) | ||
// port - Port number (defaults to 11300) | ||
// projectID - Iron.io project ID | ||
// token - Iron.io authentication token | ||
// prefix - Queue name prefix (e.g. test-) | ||
module.exports = class Queues { | ||
get config() { | ||
let config = this._config; | ||
if (!config) { | ||
let source = this._workers._config && this._workers._config.queues; | ||
if (!source) | ||
source = (process.env.NODE_ENV == 'test') ? { prefix: 'test-' } : {}; | ||
this._config = config = { | ||
hostname: source.hostname || 'localhost', | ||
port: source.port || 11300, | ||
prefix: source.prefix | ||
}; | ||
if (source.token) { | ||
config.authenticate = 'oauth ' + source.token + ' ' + source.projectID; | ||
config.webhookURL = 'https://' + source.hostname + '/1/projects/' + source.projectID + | ||
'/queues/{queueName}/messages/webhook?oauth=' + source.token; | ||
} else { | ||
config.authenticate = null; | ||
config.webhookURL = 'https://<host>/1/projects/<project>/queues/{queueName}/messages/webhook?oauth=<token>'; | ||
} | ||
} | ||
return config; | ||
} | ||
constructor(logger = console, { host = 'localhost', port = 11300, projectID, token, prefix }) { | ||
this._logger = logger; | ||
// When using Iron.io, we send an authentication string based on the token. | ||
// Not applicable with standalone Beanstalkd. | ||
if (token) | ||
this._authenticate = 'oauth ' + token + ' ' + projectID; | ||
// These are used to connect the client. | ||
this._host = host; | ||
this._port = port; | ||
// Prefix used in certain environment, e.g. "test-" | ||
this._prefix = prefix || ''; | ||
// Base URL for all Webhooks, queues set their name via interoplation. | ||
this._webhookURL = 'https://' + host + '/1/projects/' + projectID + | ||
'/queues/{queueName}/messages/webhook?oauth=' + token; | ||
// Map (un-prefixed) queue name to queue. | ||
this._queues = Object.create({}); | ||
get hostname() { | ||
return this.config.hostname; | ||
} | ||
get port() { | ||
return this.config.port; | ||
} | ||
prefixedName(queueName) { | ||
return (this.config.prefix || '') + queueName; | ||
} | ||
// When using Iron.io, we send an authentication string based on the token. | ||
// Not applicable with standalone Beanstalkd. | ||
get authenticate() { | ||
return this.config.authenticate; | ||
} | ||
webhookURL(queueName) { | ||
return this.config.webhookURL.replace('{queueName}', queueName); | ||
} | ||
} | ||
// Abstracts a queue server, Beanstalkd or compatible, specifically Iron.io. | ||
module.exports = class Server { | ||
constructor(workers) { | ||
this.notify = workers; | ||
this.config = new Configuration(workers); | ||
this._queues = Object.create({}); | ||
} | ||
// Returns a new queue. | ||
@@ -68,11 +100,3 @@ getQueue(name) { | ||
if (!queue) { | ||
queue = new Queue({ | ||
name, | ||
prefixedName: this._prefix + name, | ||
host: this._host, | ||
port: this._port, | ||
authenticate: this._authenticate, | ||
webhookURL: this._webhookURL.replace('{queueName}', name), | ||
logger: this._logger | ||
}); | ||
queue = new Queue(name, this); | ||
this._queues[name] = queue; | ||
@@ -84,3 +108,3 @@ } | ||
start() { | ||
this._logger.debug("Start all queues"); | ||
this.notify.debug("Start all queues"); | ||
this._foreachQueue((queue)=> queue.start()); | ||
@@ -90,3 +114,3 @@ } | ||
stop(callback) { | ||
this._logger.debug("Stop all queues"); | ||
this.notify.debug("Stop all queues"); | ||
this._foreachQueue((queue)=> queue.stop()); | ||
@@ -97,3 +121,3 @@ } | ||
reset(callback) { | ||
this._logger.debug("Clear all queues"); | ||
this.notify.debug("Clear all queues"); | ||
this._foreachQueue((queue, done)=> queue.reset(done), callback); | ||
@@ -104,3 +128,3 @@ } | ||
once(callback) { | ||
this._logger.debug("Process all queued jobs"); | ||
this.notify.debug("Process all queued jobs"); | ||
let queues = _.values(this._queues); | ||
@@ -159,10 +183,8 @@ function iterate() { | ||
// callback - Call with error or nothing | ||
constructor({ name, host, port, authenticate, logger }, setup) { | ||
constructor(name, server, setup) { | ||
this.name = name; | ||
this.host = host; | ||
this.port = port; | ||
this.authenticate = authenticate; | ||
this.config = server.config; | ||
this.notify = server.notify; | ||
this.setup = setup; | ||
this.pending = []; | ||
this._logger = logger; | ||
} | ||
@@ -199,3 +221,3 @@ | ||
// Get Fivebeans to execute this command. | ||
this._withClient(function(client) { | ||
this.withClient(function(client) { | ||
client[command].call(client, ...args, oncomplete); | ||
@@ -210,8 +232,8 @@ }); | ||
if (message != "Error: Connection closed") | ||
this._logger.error("Client error in queue %s: %s", this.name, message); | ||
this.notify.error("Client error in queue %s: %s", this.name, message); | ||
// If we're in the process of setting up a connection, reject the promise. | ||
// Discard the promise, next/recursive call to use(fn) will re-connect. | ||
if (this._deferred) { | ||
this._deferred.reject(error); | ||
this._deferred = null; | ||
if (this.deferred) { | ||
this.deferred.reject(error); | ||
this.deferred = null; | ||
} | ||
@@ -231,6 +253,6 @@ // Fail all pending requests. | ||
// function. | ||
_withClient(fn) { | ||
if (!this._deferred) | ||
this._connect(); | ||
this._deferred.promise.then(fn, (error)=> { | ||
withClient(fn) { | ||
if (!this.deferred) | ||
this.connect(); | ||
this.deferred.promise.then(fn, (error)=> { | ||
this.fail(error); | ||
@@ -243,5 +265,5 @@ this.use(fn); | ||
// to a fully setup Fivebeans client. | ||
_connect() { | ||
connect() { | ||
// This is the Fivebeans client is essentially a session. | ||
let client = new fivebeans.client(this.host, this.port); | ||
let client = new fivebeans.client(this.config.hostname, this.config.port); | ||
let deferred = Q.defer(); | ||
@@ -252,4 +274,4 @@ | ||
let authenticateAndSetup = ()=> { | ||
if (this.authenticate) { | ||
client.put(0, 0, 0, this.authenticate, function(error) { | ||
if (this.config.authenticate) { | ||
client.put(0, 0, 0, this.config.authenticate, function(error) { | ||
if (error) { | ||
@@ -293,3 +315,3 @@ deferred.reject(error); | ||
// Make sure use/fail methods have access to the promise. | ||
this._deferred = deferred; | ||
this.deferred = deferred; | ||
} | ||
@@ -305,26 +327,43 @@ | ||
// webhookURL - URL for receiving Webhook posts (Iron.io only) | ||
// put - Method for putting message in the queue | ||
// each - Method for processing messages from the queue | ||
// push - Method for pushing job to the queue | ||
// each - Method for processing jobs from the queue | ||
class Queue { | ||
constructor({ name, prefixedName, host, port, authenticate, webhookURL, logger }) { | ||
this.name = name; | ||
this.webhookURL = webhookURL; | ||
this._logger = logger; | ||
constructor(name, server) { | ||
this.name = name; | ||
this.notify = server.notify; | ||
this.webhookURL = server.config.webhookURL(name); | ||
this._server = server; | ||
this._prefixedName = server.config.prefixedName(name); | ||
this._processing = false; | ||
this._handler = null; | ||
} | ||
// Session for storing messages and other manipulations. | ||
// Setup: tell Beanstalkd which tube to use (persistent to session). | ||
this._putSession = new Session({ name, host, port, authenticate, logger }, function(client, callback) { | ||
client.use(prefixedName, callback); | ||
}); | ||
// Session for storing messages and other manipulations. | ||
get _put() { | ||
let session = this._putSession; | ||
if (!session) { | ||
// Setup: tell Beanstalkd which tube to use (persistent to session). | ||
session = new Session(this.name, this._server, (client, callback)=> { | ||
client.use(this._prefixedName, callback); | ||
}); | ||
this._putSession = session; | ||
} | ||
return session; | ||
} | ||
// Session for processing messages, continously blocks so don't use elsewhere. | ||
// Setup: tell Beanstalkd which tube we're watching (and ignore default tube). | ||
this._getSession = new Session({ name, host, port, authenticate, logger }, function(client, callback) { | ||
client.ignore('default', function() { | ||
client.watch(prefixedName, callback); | ||
// Session for processing messages, continously blocks so don't use elsewhere. | ||
get _reserve() { | ||
let session = this._reserveSession; | ||
if (!session) { | ||
// Setup: tell Beanstalkd which tube we're watching (and ignore default tube). | ||
session = new Session(this.name, this._server, (client, callback)=> { | ||
client.ignore('default', ()=> { | ||
client.watch(this._prefixedName, callback); | ||
}); | ||
}); | ||
}); | ||
this._reserveSession = session; | ||
} | ||
return session; | ||
} | ||
@@ -334,3 +373,3 @@ | ||
// Push job to queue. | ||
put(job, options, callback) { | ||
push(job, options, callback) { | ||
assert(job, "Missing job to queue"); | ||
@@ -345,3 +384,3 @@ if (typeof(options) == 'function') { | ||
this._putSession.request('put', 0, delay, PROCESSING_TIMEOUT / 1000, payload, (error, jobID)=> { | ||
this._put.request('put', 0, delay, PROCESSING_TIMEOUT / 1000, payload, (error, jobID)=> { | ||
// Don't pass jobID to callback, easy to use in test before hook, like | ||
@@ -353,3 +392,3 @@ // this: | ||
else if (error) | ||
this._logger.error("Error talking to Beanstalkd, queue %s: %s", this.name, error); | ||
this.notify.error("Error talking to Beanstalkd, queue %s: %s", this.name, error); | ||
}); | ||
@@ -390,4 +429,4 @@ } | ||
this._logger.debug("Waiting for jobs on queue %s", this.name); | ||
this._getSession.request('reserve_with_timeout', 0, (error, jobID, payload)=> { | ||
this.notify.debug("Waiting for jobs on queue %s", this.name); | ||
this._reserve.request('reserve_with_timeout', 0, (error, jobID, payload)=> { | ||
if (error == 'DEADLINE_SOON' || error == 'TIMED_OUT' || (error && error.message == 'TIMED_OUT')) | ||
@@ -414,7 +453,7 @@ callback(null, false); | ||
this._getSession.request('reserve_with_timeout', RESERVE_TIMEOUT / 1000, (error, jobID, payload)=> { | ||
this._reserve.request('reserve_with_timeout', RESERVE_TIMEOUT / 1000, (error, jobID, payload)=> { | ||
if (error == 'DEADLINE_SOON' || error == 'TIMED_OUT' || (error && error.message == 'TIMED_OUT')) | ||
setImmediate(pickNextJob); | ||
else if (error) { | ||
this._logger.error(error); | ||
this.notify.error(error); | ||
setTimeout(pickNextJob, ERROR_BACKOFF); | ||
@@ -424,3 +463,3 @@ } else { | ||
if (error) | ||
this._logger.error(error); | ||
this.notify.error(error); | ||
setImmediate(pickNextJob); | ||
@@ -432,3 +471,3 @@ }); | ||
this._logger.debug("Waiting for jobs on queue %s", this.name); | ||
this.notify.debug("Waiting for jobs on queue %s", this.name); | ||
pickNextJob(); | ||
@@ -466,7 +505,7 @@ } | ||
// Promise resolved on successful completion; we destroy the job. | ||
this._getSession.request('destroy', jobID, (error)=> { | ||
this._reserve.request('destroy', jobID, (error)=> { | ||
if (error) | ||
this._logger.error(error.stack); | ||
this.notify.error(error.stack); | ||
}); | ||
this._logger.info("Completed job %s from queue %s", jobID, this.name); | ||
this.notify.info("Completed job %s from queue %s", jobID, this.name); | ||
// Move on to process next job. | ||
@@ -481,7 +520,7 @@ clearTimeout(errorOnTimeout); | ||
let delay = (process.env.NODE_ENV == 'test' ? 0 : RELEASE_DELAY); | ||
this._getSession.request('release', jobID, 0, delay / 1000, (error)=> { | ||
this._reserve.request('release', jobID, 0, delay / 1000, (error)=> { | ||
if (error) | ||
this._logger.error(error.stack); | ||
this.notify.error(error.stack); | ||
}); | ||
this._logger.error("Error processing job %s from queue %s: %s", jobID, this.name, error.stack); | ||
this.notify.error("Error processing job %s from queue %s: %s", jobID, this.name, error.stack); | ||
// Move on to process next job. | ||
@@ -496,4 +535,4 @@ clearTimeout(errorOnTimeout); | ||
domain.run(()=> { | ||
this._logger.info("Picked up job %s from queue %s", jobID, this.name); | ||
this._logger.debug("Processing %s: %s", jobID, payload.toString()); | ||
this.notify.info("Picked up job %s from queue %s", jobID, this.name); | ||
this.notify.debug("Processing %s: %s", jobID, payload.toString()); | ||
// Typically we queue JSON objects, but the payload may be just a | ||
@@ -515,3 +554,3 @@ // string, e.g. some services send URL encoded name/value pairs, or MIME | ||
reset(callback) { | ||
this._putSession._withClient(function(client) { | ||
this._put.withClient(function(client) { | ||
function deleteNextJob() { | ||
@@ -518,0 +557,0 @@ client.reserve_with_timeout(0, function(error, jobID, payload) { |
52574
1241
224
8
6