Sorry, the diff of this file is not supported yet
+30
-13
@@ -29,7 +29,23 @@ # | ||
| $.extend module.exports, { | ||
| push: (item) -> ready.wait( (err, queue) -> queue.push item ); @ | ||
| push: (item) -> | ||
| unless 'type' of item | ||
| throw new Error("a 'type' is required on work items") | ||
| ready.wait( (err, queue) -> queue.push item ); @ | ||
| pop: (cb) -> ready.wait( (err, queue) -> queue.pop cb ); @ | ||
| clear: -> ready.wait( (err, queue) -> queue.clear() ); @ | ||
| close: -> $.log("close wait"); return ready.wait (err, queue) -> queue.close() | ||
| close: -> ready.wait (err, queue) -> queue.close() | ||
| count: (cb) -> ready.wait( (err, queue) -> queue.count cb ); @ | ||
| register: (type, handler) -> | ||
| ready.wait (err, queue) -> queue.register type, handler | ||
| createWorker: (opts) -> | ||
| w = $.Promise() | ||
| ready.wait (err, queue) -> | ||
| if err then w.reject err | ||
| else w.resolve queue.createWorker() | ||
| return { | ||
| pause: -> | ||
| w.then (worker) -> worker.pause() | ||
| resume: -> | ||
| w.then (worker) -> worker.resume() | ||
| } | ||
@@ -49,3 +65,3 @@ connect: (url, opts) -> | ||
| q = db.collection(opts.collection) | ||
| q.ensureIndex { status: 1, readyAt: 1 }, unsafe | ||
| q.ensureIndex { status: 1, readyAt: 1 }, unsafe, (err) -> | ||
@@ -97,4 +113,3 @@ getNewReaderId = -> $.random.string opts.id[0].length + opts.id[1], opts.id[0] | ||
| mtime: $.now | ||
| }, | ||
| $inc: { retryCount: 1 } | ||
| }, "$inc": { retryCount: 1 } | ||
| }, { multi: false, safe: true }, cb | ||
@@ -120,3 +135,3 @@ | ||
| $.log "closing" | ||
| db.close() | ||
| db.close -> $.log "closed" | ||
| $.log "resetting", ready.promiseId | ||
@@ -130,3 +145,3 @@ ready.reset() | ||
| $.log "clearing" | ||
| q.remove {}, -> $.log "clear result:", arguments | ||
| q.remove {}, (err) -> $.log "clear result:", err | ||
@@ -136,2 +151,4 @@ count: (cb) -> q.count { status: "new" }, cb | ||
| log "inserting", item | ||
| unless 'type' of item | ||
| throw new Error("a 'type' is required on work items") | ||
| # TODO: if item has an _id that is already in the db | ||
@@ -186,5 +203,5 @@ # and the doc in the db is status: "failed", allow overwrite | ||
| worker_opts = $.extend { | ||
| idle_delay: 100 | ||
| busy_delay: 500 | ||
| busy_max: 10 | ||
| idle_delay: 100 # polling interval when the queue is empty | ||
| busy_delay: 500 # delay interval if we try to fetch new items while still working on an old one | ||
| busy_max: 10 # give up on a work item if we work on it for longer than (busy_max * busy_delay) ms | ||
| }, worker_opts | ||
@@ -197,3 +214,3 @@ nextItem = (err) => | ||
| when err then switch err | ||
| # Busy: we still have outstanding items | ||
| # Busy: we still have outstanding items | ||
| when "busy" | ||
@@ -203,7 +220,7 @@ if ++busy_count > worker_opts.busy_max | ||
| readerId = getNewReaderId() | ||
| log "starting over with new worker id", readerId | ||
| log = $.logger "worker #{readerId}" | ||
| log "starting over with new reader id" | ||
| $.delay worker_opts.busy_delay, nextItem | ||
| # Unknown Error: just log it and move on | ||
| else $.log "pop error:", err | ||
| else log "pop error:", err | ||
| # Idle: wait for idle_delay and poll the next item | ||
@@ -210,0 +227,0 @@ when (not item?) and (not done?) then $.delay worker_opts.idle_delay, nextItem |
+1
-1
| MOCHA=node_modules/.bin/mocha | ||
| MOCHA_FMT=spec | ||
| MOCHA_OPTS=--compilers coffee:coffee-script --globals Bling,$$ -R ${MOCHA_FMT} -s 100 | ||
| MOCHA_OPTS=--compilers coffee:coffee-script/register --globals Bling,$$ -R ${MOCHA_FMT} -s 100 --bail | ||
@@ -6,0 +6,0 @@ all: test |
+3
-2
| { | ||
| "name": "work-queue", | ||
| "version": "0.0.1", | ||
| "version": "0.0.2", | ||
| "description": "Process scheduled items from a queue held in MongoDB.", | ||
@@ -15,4 +15,5 @@ "main": "index.coffee", | ||
| "bling": "latest", | ||
| "mongodb": "latest" | ||
| "mongodb": "latest", | ||
| "optimist": "latest" | ||
| } | ||
| } |
+53
-24
@@ -13,11 +13,11 @@ The Work Queue | ||
| ```coffee | ||
| ``` | ||
| WorkQueue = require 'work-queue' | ||
| WorkQueue = require('work-queue') | ||
| WorkQueue.connect "mongodb://localhost:27017/test", { | ||
| WorkQueue.connect("mongodb://localhost:27017/test", { | ||
| collection: "workQueue", | ||
| readerId: [ "reader-", 5 ], # 5 chars of randomness | ||
| # consider, readerId: "app-server-56" | ||
| } | ||
| readerId: [ "reader-", 5 ], // 5 chars of randomness | ||
| // consider, readerId: "app-server-56" | ||
| }) | ||
@@ -31,6 +31,6 @@ ``` | ||
| WorkQueue.push { | ||
| type: "my-type" | ||
| WorkQueue.push({ | ||
| type: "my-type", | ||
| schedule: { at: timestamp } | ||
| } | ||
| }) | ||
@@ -42,9 +42,9 @@ ``` | ||
| * `every` with an interval in ms, example: | ||
| ```coffee | ||
| WorkQueue.push { type: "foo", schedule: { every: 30*1000} } | ||
| ``` | ||
| WorkQueue.push({ type: "foo", schedule: { every: 30*1000} }) | ||
| ``` | ||
| * `after` with a delay in ms, example: | ||
| ```coffee | ||
| WorkQueue.push { type: "foo", schedule: { after: 5*60*1000 } } | ||
| ``` | ||
| WorkQueue.push({ type: "foo", schedule: { after: 5*60*1000 } }) | ||
| ``` | ||
@@ -72,16 +72,18 @@ You can combine `every` and `after`, to control when the first iteration occurs. | ||
| ```coffee | ||
| WorkQueue.register 'my-type', (item, done) -> | ||
| # item has all the fields shown above | ||
| doWorkOnItem item, (err) -> | ||
| if err then done(err) # fail | ||
| else done() # all ok! | ||
| ``` | ||
| WorkQueue.register('my-type', function(item, done) { | ||
| // item has all the fields shown above | ||
| doWorkOnItem(item, function (err) { | ||
| if(err) { done(err) } | ||
| else { done() } | ||
| }) | ||
| }) | ||
| worker = WorkQueue.createWorker { | ||
| idle_delay: 100 # polling interval if nothing to do | ||
| } | ||
| worker = WorkQueue.createWorker({ | ||
| idle_delay: 100 // polling interval if nothing to do | ||
| }) | ||
| worker.resume() | ||
| # run this example for 10 seconds, then pause | ||
| setTimeout worker.pause, 10000 | ||
| // run this example for 10 seconds, then pause | ||
| setTimeout(worker.pause, 10000) | ||
@@ -91,1 +93,28 @@ ``` | ||
| A usable example can be found in `bin/queue-reader.coffee`. | ||
| bin/queue-reader.coffee | ||
| ----------------------- | ||
| ``` | ||
| Usage: queue-reader [options...] mongodb://host:port/db_name | ||
| Options: | ||
| -c, --collection the collection to hold work orders in [default: "workQueue"] | ||
| -i, --interval when idle, how often to look for new work [default: 100] | ||
| -r, --require require this/these module(s), which should export type handlers [default: ""] | ||
| -d, --demo DANGEROUS: load an example queue as a test, will flush all jobs in the specificed collection [default: false] | ||
| ``` | ||
| The `-r` or `--require` option is the most important if you want to do real work. It can be given multiple times, and each string given to it will be passed to `require()` within the reader script. | ||
| Each module required in this way should export an object full of `{ type: handler }` pairs. | ||
| Example: | ||
| ``` | ||
| module.exports['echo'] = function (item, done) { | ||
| console.log(item) | ||
| done() | ||
| } | ||
| ``` | ||
| The `-i` or `--interval` option is only meaningful when the queue is empty. When each work item is completed, a check for new work is performed immediately. |
+44
-20
@@ -16,9 +16,10 @@ $ = require 'bling' | ||
| ''' | ||
| describe '.push()', -> | ||
| it "adds work items (chainable)", (pass) -> | ||
| Q.clear().push( type: "my-type", schedule: { at: $.now } ).count (err, count) -> | ||
| assert.equal err, null | ||
| assert.equal count, 1 | ||
| pass() | ||
| Q.clear().push( type: "my-type", schedule: { at: $.now } ) | ||
| $.delay 30, -> | ||
| Q.count (err, count) -> | ||
| assert.equal err, null | ||
| assert.equal count, 1 | ||
| pass() | ||
| it "rejects items without a type", -> | ||
@@ -29,21 +30,44 @@ assert.throws -> Q.push( notype: "haha" ) | ||
| it "yields the next item, and a finalizer", (pass) -> | ||
| Q.clear().push( type: "pop-test", schedule: { at: $.now } ).pop (item, done) -> | ||
| assert.equal item.type, "pop-test" | ||
| done() | ||
| Q.count (err, count) -> | ||
| Q.clear().push( type: "pop-test", schedule: { at: $.now } ) | ||
| $.delay 30, -> # make sure that the new item is really 'due' | ||
| Q.pop (err, item, done) -> | ||
| assert.equal err, null | ||
| assert.equal count, 0 | ||
| pass() | ||
| assert.notEqual item, null | ||
| assert $.is 'function', done | ||
| assert.equal item.type, "pop-test" | ||
| done() | ||
| $.delay 30, -> | ||
| Q.count (err, count) -> | ||
| assert.equal err, null | ||
| assert.equal count, 0 | ||
| pass() | ||
| describe ".register()", -> | ||
| it "can register handlers by type", -> | ||
| Q.register "my-type", (err, item, done) -> | ||
| describe ".createWorker()", -> | ||
| it "creates a Worker" | ||
| it "creates a Worker", -> | ||
| worker = Q.createWorker() | ||
| assert $.is 'function', worker.pause | ||
| assert $.is 'function', worker.resume | ||
| describe "a Worker", -> | ||
| it "can pause" | ||
| it "can resume" | ||
| it "can register handlers by type" | ||
| it "processes jobs when running" | ||
| it "processes jobs when running", (pass) -> | ||
| Q.register "test-type", (item, done) -> | ||
| assert.equal $.type(done), 'function' | ||
| done() | ||
| assert.notEqual item, null | ||
| assert.equal item.type, "test-type" | ||
| pass() | ||
| ''' | ||
| it "can be closed", (pass) -> | ||
| assert $.is 'function', Q.close | ||
| assert $.is 'promise', Q.close().wait pass | ||
| Q.clear().push( { type: "test-type", schedule: { at: $.now } } ) | ||
| worker = Q.createWorker() | ||
| worker.resume() | ||
| $.delay 500, worker.pause | ||
| describe ".close()", -> | ||
| it "is a function", -> | ||
| assert $.is 'function', Q.close | ||
| it "returns a promise", (done) -> | ||
| Q.close().wait (err) -> done() | ||
| it "should pause all workers" |
| Optimist = require('optimist') | ||
| $.log "Options:", opts = Optimist \ | ||
| .options('c', { | ||
| alias: 'collection', | ||
| default: 'workQueue', | ||
| describe: 'the collection to hold work orders in' | ||
| }) \ | ||
| .options('i', { | ||
| alias: 'interval', | ||
| default: 100 | ||
| describe: 'when idle, how often to look for new work' | ||
| }) \ | ||
| .options('r', { | ||
| alias: 'require', | ||
| default: '', | ||
| describe: 'require this/these module(s), which should export type handlers' | ||
| }) \ | ||
| .option('d', { | ||
| alias: 'demo', | ||
| default: false | ||
| describe: 'DANGEROUS: load an example queue as a test, will flush all jobs in the specificed collection' | ||
| }) \ | ||
| .demand(1) | ||
| .check( (argv) -> | ||
| unless /^mongodb:/.test argv._ | ||
| throw new Error("url must begin with mongodb://") | ||
| ) | ||
| .usage("Usage: $0 [options...] mongodb://host:port/db_name") | ||
| .argv | ||
| url = opts._ | ||
| mins = 60*1000 | ||
| W = require "../index.coffee" | ||
| if $.is 'array', opts.require | ||
| for r in opts.require | ||
| $.log "Requiring '#{r}'..." | ||
| for type, handler of require(r) | ||
| $.assert ($.is 'string', type), "type must be string: #{type}" | ||
| $.assert ($.is 'function', handler), "handler must be function: #{handler}" | ||
| W.register type, handler | ||
| W.connect url, { collection: opts.collection } | ||
| worker = W.createWorker { | ||
| idle_delay: opts.interval | ||
| } | ||
| worker.resume() | ||
| if opts.demo | ||
| W.register 'echo', (item, done) -> | ||
| $.log "ECHO:", item.message | ||
| done() | ||
| W.clear().push( | ||
| type: "echo" | ||
| schedule: { every: .1*mins, maxFail: Infinity } | ||
| message: "Should recur every six seconds" | ||
| _id: "only_one" | ||
| ).push( | ||
| type: "echo" | ||
| schedule: { after: .5*mins } | ||
| message: "Once after thirty seconds" | ||
| ).push( | ||
| type: "echo" | ||
| schedule: { at: $.now + 3000 } | ||
| message: "Once after three seconds" | ||
| ) | ||
| $.delay 32000, -> | ||
| $.log "Ending demo." | ||
| worker.pause() |
15389
22.44%116
33.33%4
33.33%+ Added
+ Added
+ Added
+ Added