couchdb-worker
Advanced tools
Comparing version 1.1.0 to 1.2.0
@@ -45,5 +45,26 @@ 'use strict'; | ||
grunt.registerTask('clean', 'Clean up lost test databases.', function() { | ||
var nano = require('nano')(process.env.COUCH_URL || 'http://localhost:5984'); | ||
var done = this.async(); | ||
function destroy(db, next) { | ||
nano.db.destroy(db, function(err, resp) { | ||
if (err) { | ||
grunt.log.err('Failed to delete ' + db + ': ' + err.error); | ||
} else { | ||
grunt.log.ok('Deleted ' + db); | ||
} | ||
next(err, resp); | ||
}); | ||
} | ||
nano.db.list(function(err, body) { | ||
var dbs = body.filter(function(db) { return db.match(/^couchdb-worker-test-/); }); | ||
grunt.util.async.map(dbs, destroy, done); | ||
}); | ||
}); | ||
// Default task. | ||
grunt.registerTask('default', ['jshint', 'nodeunit']); | ||
}; |
@@ -14,19 +14,20 @@ /* | ||
options.follow = options.follow || {}; | ||
options.status = options.status || {}; | ||
// defaults | ||
options.follow.include_docs = true; | ||
options.status.key = options.status.key || 'worker_status'; | ||
options.status.db = options.status.db || options.db; | ||
options.status.id = options.status.id || 'worker-status/' + options.id; | ||
// worker id | ||
// mandatory options | ||
if (typeof options.id !== 'string') { | ||
throw('worker needs an id.'); | ||
} | ||
var id = options.id; | ||
delete options.id; | ||
// process function | ||
if (typeof options.process !== 'function') { | ||
throw('worker needs a process function.'); | ||
} | ||
var process = options.process; | ||
delete options.process; | ||
// check if document is processable, that is | ||
// * it is not the status document | ||
// * it has no worker status at all (no worker touched it ever) | ||
@@ -36,9 +37,13 @@ // * there is no `triggered` status (no other worker has already taken over) | ||
function isProcessable(doc) { | ||
// status document | ||
if (doc._id === options.status.id) { | ||
return false; | ||
} | ||
// no worker status at all | ||
if (!doc.worker_status) { | ||
if (!doc[options.status.key]) { | ||
return true; | ||
} | ||
// no worker took over | ||
for (var key in doc.worker_status) { | ||
if (doc.worker_status[key].status === 'triggered') { | ||
for (var key in doc[options.status.key]) { | ||
if (doc[options.status.key][key].status === 'triggered') { | ||
return false; | ||
@@ -48,11 +53,24 @@ } | ||
// no own worker status | ||
return !doc.worker_status[id]; | ||
return !doc[options.status.key][options.id]; | ||
} | ||
// initialize database connector | ||
var db = require('nano')(options.db); | ||
// initialize status database connector | ||
var statusDb = require('nano')(options.status.db); | ||
// initialize feed | ||
var feed = db.follow(options.follow); | ||
feed.status = { | ||
_id: options.status.id | ||
}; | ||
function storeStatus() { | ||
statusDb.insert(feed.status, function(err, body) { | ||
if (!err) { | ||
feed.status._rev = body.rev; | ||
} | ||
}); | ||
} | ||
// context for processor function evaluation | ||
@@ -67,11 +85,14 @@ var ctx = { | ||
feed.status.checked = feed.status.checked || 0; | ||
feed.status.checked++; | ||
function ondone(err, next) { | ||
if (err) { | ||
doc.worker_status[id].status = 'error'; | ||
doc.worker_status[id].error = err; | ||
doc[options.status.key][options.id].status = 'error'; | ||
doc[options.status.key][options.id].error = err; | ||
} else { | ||
doc.worker_status[id].status = 'complete'; | ||
doc[options.status.key][options.id].status = 'complete'; | ||
} | ||
db.insert(doc, doc._rev, function(err, body) { | ||
db.insert(doc, function(err, body) { | ||
if (err) { | ||
@@ -84,2 +105,5 @@ feed.emit('worker:error', err, doc); | ||
feed.status.completed = feed.status.completed || 0; | ||
feed.status.completed++; | ||
feed.resume(); | ||
@@ -90,2 +114,5 @@ | ||
} | ||
// store status | ||
storeStatus(); | ||
}); | ||
@@ -100,8 +127,8 @@ } | ||
doc.worker_status = doc.worker_status || {}; | ||
doc.worker_status[id] = doc.worker_status[id] || {}; | ||
doc.worker_status[id].status = 'triggered'; | ||
delete doc.worker_status[id].error; | ||
doc[options.status.key] = doc[options.status.key] || {}; | ||
doc[options.status.key][options.id] = doc[options.status.key][options.id] || {}; | ||
doc[options.status.key][options.id].status = 'triggered'; | ||
delete doc[options.status.key][options.id].error; | ||
db.insert(doc, doc._rev, function(err, body) { | ||
db.insert(doc, function(err, body) { | ||
if (err) { | ||
@@ -114,5 +141,10 @@ if (err.error !== 'conflict') { | ||
feed.status.seq = change.seq; | ||
feed.status.last = doc._id; | ||
feed.status.triggered = feed.status.triggered || 0; | ||
feed.status.triggered++; | ||
doc._rev = body.rev; | ||
process.apply(ctx, [doc, ondone]); | ||
options.process.apply(ctx, [doc, ondone]); | ||
}); | ||
@@ -123,5 +155,14 @@ } | ||
feed.on('change', onchange); | ||
feed.on('stop', storeStatus); | ||
// start listening | ||
feed.follow(); | ||
statusDb.get(feed.status._id, function(err, doc) { | ||
if (!err && doc) { | ||
feed.status = doc; | ||
} | ||
if (feed.status.seq) { | ||
feed.since = feed.status.seq; | ||
} | ||
// start listening | ||
feed.follow(); | ||
}); | ||
@@ -128,0 +169,0 @@ // return feed object |
{ | ||
"name": "couchdb-worker", | ||
"description": "CouchDB worker module that manages state", | ||
"version": "1.1.0", | ||
"version": "1.2.0", | ||
"homepage": "https://github.com/jo/couchdb-worker", | ||
@@ -6,0 +6,0 @@ "author": { |
@@ -42,2 +42,6 @@ # couchdb-worker | ||
* `follow` | [follow](https://github.com/iriscouch/follow) options | ||
* `status` | status options (optional) | ||
* `status.key` | Property to store status inside documents. Default is `worker_status`. | ||
* `status.db` | [nano](https://github.com/dscape/nano) options for status database connection. Default is to use the `db` connection. | ||
* `status.id` | id for status document. Only used if `statusDb` is given. Default is `worker-status/<id>`. | ||
@@ -44,0 +48,0 @@ ## `process(doc, done)` |
@@ -67,8 +67,10 @@ 'use strict'; | ||
nano.db.create(this.dbname, function(err) { | ||
if (err) { | ||
throw(err); | ||
} | ||
that.db = nano.use(that.dbname); | ||
done(); | ||
process.nextTick(function() { | ||
nano.db.create(that.dbname, function(err) { | ||
if (err) { | ||
throw(err); | ||
} | ||
that.db = nano.use(that.dbname); | ||
done(); | ||
}); | ||
}); | ||
@@ -82,9 +84,11 @@ }, | ||
var w = worker.listen({ db: this.url, id: 'myworker', process: function() {} }); | ||
test.equal(typeof w, 'object', 'should return an object'); | ||
test.equal(typeof w.pause, 'function', 'should expose `pause` function'); | ||
test.equal(typeof w.resume, 'function', 'should expose `resume` function'); | ||
test.equal(typeof w.stop, 'function', 'should expose `stop` function'); | ||
test.equal(typeof w.on, 'function', 'should expose `on` function'); | ||
w.stop(); | ||
test.done(); | ||
w.on('start', function() { | ||
test.equal(typeof w, 'object', 'should return an object'); | ||
test.equal(typeof w.pause, 'function', 'should expose `pause` function'); | ||
test.equal(typeof w.resume, 'function', 'should expose `resume` function'); | ||
test.equal(typeof w.stop, 'function', 'should expose `stop` function'); | ||
test.equal(typeof w.on, 'function', 'should expose `on` function'); | ||
w.stop(); | ||
}); | ||
w.on('stop', test.done); | ||
}, | ||
@@ -99,5 +103,5 @@ 'process callback arguments': function(test) { | ||
w.stop(); | ||
test.done(); | ||
} | ||
w = worker.listen({ db: this.url, id: 'myworker', process: process }); | ||
w.on('stop', test.done); | ||
this.db.insert({}, 'mydoc'); | ||
@@ -113,6 +117,6 @@ }, | ||
w.stop(); | ||
test.done(); | ||
}); | ||
} | ||
var w = worker.listen({ db: this.url, id: 'myworker', process: process }); | ||
w.on('stop', test.done); | ||
this.db.insert({}, 'mydoc'); | ||
@@ -128,6 +132,6 @@ }, | ||
w.stop(); | ||
test.done(); | ||
}); | ||
} | ||
w = worker.listen({ db: this.url, id: 'myworker', process: process }); | ||
w.on('stop', test.done); | ||
this.db.insert({}); | ||
@@ -147,4 +151,4 @@ }, | ||
test.equal(doc._id, 'mydoc', 'doc _id should be `mydoc`'); | ||
test.done(); | ||
}); | ||
w.on('stop', test.done); | ||
this.db.insert({}, 'mydoc'); | ||
@@ -161,6 +165,6 @@ }, | ||
w.stop(); | ||
test.done(); | ||
}); | ||
} | ||
var w = worker.listen({ db: this.url, id: 'myworker', process: process }); | ||
w.on('stop', test.done); | ||
this.db.insert({}); | ||
@@ -184,7 +188,7 @@ }, | ||
test.equal(doc._id, 'mydoc', 'doc _id should be `mydoc`'); | ||
test.done(); | ||
}); | ||
w.on('stop', test.done); | ||
this.db.insert({}, 'mydoc'); | ||
}, | ||
'global worker status': function(test) { | ||
'global worker document status': function(test) { | ||
test.expect(1); | ||
@@ -198,6 +202,6 @@ var w; | ||
w.stop(); | ||
test.done(); | ||
} | ||
} | ||
w = worker.listen({ db: this.url, id: 'myworker', process: process }); | ||
w.on('stop', test.done); | ||
var db = this.db; | ||
@@ -208,3 +212,3 @@ this.db.insert({ worker_status: { otherworker: { status: 'triggered' } } }, function() { | ||
}, | ||
'own worker status triggered': function(test) { | ||
'own worker document status triggered': function(test) { | ||
test.expect(1); | ||
@@ -218,6 +222,6 @@ var w; | ||
w.stop(); | ||
test.done(); | ||
} | ||
} | ||
w = worker.listen({ db: this.url, id: 'myworker', process: process }); | ||
w.on('stop', test.done); | ||
var db = this.db; | ||
@@ -228,3 +232,3 @@ this.db.insert({ worker_status: { myworker: { status: 'triggered' } } }, function() { | ||
}, | ||
'own worker status complete': function(test) { | ||
'own worker document status complete': function(test) { | ||
test.expect(1); | ||
@@ -238,6 +242,6 @@ var w; | ||
w.stop(); | ||
test.done(); | ||
} | ||
} | ||
w = worker.listen({ db: this.url, id: 'myworker', process: process }); | ||
w.on('stop', test.done); | ||
var db = this.db; | ||
@@ -248,3 +252,3 @@ this.db.insert({ worker_status: { myworker: { status: 'complete' } } }, function() { | ||
}, | ||
'own worker status error': function(test) { | ||
'own worker document status error': function(test) { | ||
test.expect(1); | ||
@@ -258,6 +262,6 @@ var w; | ||
w.stop(); | ||
test.done(); | ||
} | ||
} | ||
w = worker.listen({ db: this.url, id: 'myworker', process: process }); | ||
w.on('stop', test.done); | ||
var db = this.db; | ||
@@ -268,3 +272,3 @@ this.db.insert({ worker_status: { myworker: { status: 'error' } } }, function() { | ||
}, | ||
'worker status': function(test) { | ||
'document status': function(test) { | ||
test.expect(3); | ||
@@ -276,3 +280,2 @@ var w; | ||
w.stop(); | ||
test.done(); | ||
} | ||
@@ -284,4 +287,71 @@ function process(doc, next) { | ||
w = worker.listen({ db: this.url, id: 'myworker', process: process }); | ||
w.on('stop', test.done); | ||
this.db.insert({}); | ||
}, | ||
'worker status default id': function(test) { | ||
test.expect(1); | ||
var w; | ||
function done() { | ||
w.stop(); | ||
} | ||
function process(doc, next) { | ||
next(null, done); | ||
} | ||
w = worker.listen({ db: this.url, id: 'myworker', process: process }); | ||
test.equal(w.status && w.status._id, 'worker-status/myworker', 'status should have default id'); | ||
w.on('stop', test.done); | ||
this.db.insert({}, 'mydoc'); | ||
}, | ||
'worker status custom id': function(test) { | ||
test.expect(1); | ||
var w; | ||
function done() { | ||
w.stop(); | ||
} | ||
function process(doc, next) { | ||
next(null, done); | ||
} | ||
w = worker.listen({ db: this.url, id: 'myworker', process: process, status: { id: 'mystatus' } }); | ||
test.equal(w.status && w.status._id, 'mystatus', 'status should have custom id'); | ||
w.on('stop', test.done); | ||
this.db.insert({}, 'mydoc'); | ||
}, | ||
'worker status updates': function(test) { | ||
test.expect(6); | ||
var w; | ||
function done() { | ||
w.stop(); | ||
} | ||
function process(doc, next) { | ||
test.equal(w.status.checked, 1, 'status should have checked one doc'); | ||
test.equal(w.status.seq, 1, 'status should have curren seq'); | ||
test.equal(w.status.last, 'mydoc', 'status should have used last `mydoc`'); | ||
test.equal(w.status.triggered, 1, 'status should have triggered one doc'); | ||
next(null, done); | ||
} | ||
w = worker.listen({ db: this.url, id: 'myworker', process: process }); | ||
test.equal(typeof w.status, 'object', 'status should be an object'); | ||
test.equal(w.status._id, 'worker-status/myworker', 'status should have default id'); | ||
w.on('stop', test.done); | ||
this.db.insert({}, 'mydoc'); | ||
}, | ||
'worker status storage': function(test) { | ||
test.expect(1); | ||
var w; | ||
function process(doc, next) { | ||
next(null); | ||
} | ||
w = worker.listen({ db: this.url, id: 'myworker', process: process }); | ||
var feed = this.db.follow(); | ||
feed.on('change', function(change) { | ||
if (change.id === 'worker-status/myworker') { | ||
test.ok(true, 'status has been stored'); | ||
feed.stop(); | ||
w.stop(); | ||
// test.done(); | ||
} | ||
}); | ||
feed.follow(); | ||
this.db.insert({}, 'mydoc'); | ||
}, | ||
}; |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
23639
527
135
2