couchdb-worker
Advanced tools
Comparing version 1.2.4 to 2.0.0
@@ -18,5 +18,5 @@ /* | ||
options.follow.include_docs = true; | ||
options.status.key = options.status.key || 'worker_status'; | ||
options.status.db = options.status.db || options.db; | ||
options.status.id = options.status.id || 'worker-status/' + options.id; | ||
options.status.prefix = options.status.prefix || 'worker-lock/' + options.id + '/'; | ||
@@ -30,37 +30,50 @@ // mandatory options | ||
} | ||
// database connector | ||
var db = require('nano')(options.db); | ||
// status database connector | ||
var statusDb = require('nano')(options.status.db); | ||
// changes feed | ||
var feed = db.follow(options.follow); | ||
// check if document is processable, that is | ||
// * it is not the status document | ||
// * it has no worker status at all (no worker touched it ever) | ||
// * there is no `triggered` status (no other worker has already taken over) | ||
// * no own worker status for this document (I haven't touched it) | ||
function isProcessable(doc) { | ||
// status document | ||
// capture a document | ||
function capture(doc, done) { | ||
statusDb.insert({}, options.status.prefix + doc._id, done); | ||
} | ||
// release a document | ||
function release(lock, done) { | ||
statusDb.destroy(lock.id, lock.rev, function(err, data) { | ||
if (!err) { | ||
return done(); | ||
} | ||
// force delete in case of conflict | ||
if (err.error === 'conflict') { | ||
statusDb.get(lock._id, function(err, doc) { | ||
if (err) { | ||
return feed.emit('worker:release-error', err, lock); | ||
} | ||
lock.rev = doc._rev; | ||
release(lock, done); | ||
}); | ||
} | ||
}); | ||
} | ||
function discard(doc) { | ||
// discard status | ||
if (doc._id === options.status.id) { | ||
return false; | ||
return true; | ||
} | ||
// no worker status at all | ||
if (!doc[options.status.key]) { | ||
// discard lock | ||
var match = doc._id.match(options.status.prefix); | ||
if (match && match.index === 0) { | ||
return true; | ||
} | ||
// no worker took over | ||
for (var key in doc[options.status.key]) { | ||
if (doc[options.status.key][key].status === 'triggered') { | ||
return false; | ||
} | ||
} | ||
// no own worker status | ||
return !doc[options.status.key][options.id]; | ||
} | ||
// initialize database connector | ||
var db = require('nano')(options.db); | ||
// initialize feed | ||
var feed = db.follow(options.follow); | ||
// initialize status database connector | ||
var statusDb = require('nano')(options.status.db); | ||
var statusDoc = { | ||
_id: options.status.id | ||
_id: options.status.id, | ||
worker_id: options.id | ||
}; | ||
@@ -87,3 +100,3 @@ var statusDiff = { | ||
} | ||
statusDoc.last = statusDiff.last; | ||
statusDoc.last_doc_id = statusDiff.last_doc_id; | ||
statusDoc.checked = statusDoc.checked + statusDiff.checked; | ||
@@ -108,3 +121,3 @@ statusDoc.triggered = statusDoc.triggered + statusDiff.triggered; | ||
delete statusDiff.seq; | ||
delete statusDiff.last; | ||
delete statusDiff.last_doc_id; | ||
statusDiff.checked = 0; | ||
@@ -118,57 +131,16 @@ statusDiff.triggered = 0; | ||
// context for processor function evaluation | ||
var ctx = { | ||
db: db, | ||
feed: feed | ||
}; | ||
function onchange(change) { | ||
var doc = change.doc; | ||
statusDiff.checked++; | ||
function ondone(err, next) { | ||
if (err) { | ||
doc[options.status.key][options.id].status = 'error'; | ||
doc[options.status.key][options.id].error = err; | ||
} else { | ||
doc[options.status.key][options.id].status = 'complete'; | ||
} | ||
db.insert(doc, doc._id, function(err, body) { | ||
if (err) { | ||
feed.emit('worker:error', err, doc); | ||
} else { | ||
doc._rev = body.rev; | ||
feed.emit('worker:complete', doc); | ||
} | ||
statusDiff.completed++; | ||
feed.resume(); | ||
if (typeof next === 'function') { | ||
next.apply(ctx, [err, doc]); | ||
} | ||
storeStatus(); | ||
}); | ||
} | ||
if (!isProcessable(doc)) { | ||
if (discard(doc)) { | ||
return; | ||
} | ||
statusDiff.checked++; | ||
feed.pause(); | ||
doc[options.status.key] = doc[options.status.key] || {}; | ||
doc[options.status.key][options.id] = doc[options.status.key][options.id] || {}; | ||
doc[options.status.key][options.id].status = 'triggered'; | ||
delete doc[options.status.key][options.id].error; | ||
db.insert(doc, doc._id, function(err, body) { | ||
capture(doc, function(err, lock) { | ||
if (err) { | ||
if (err.error !== 'conflict') { | ||
feed.emit('worker:error', err, doc); | ||
} | ||
feed.emit('worker:skip', doc); | ||
return; | ||
@@ -178,8 +150,22 @@ } | ||
statusDiff.seq = change.seq; | ||
statusDiff.last = change.id; | ||
statusDiff.last_doc_id = change.id; | ||
statusDiff.triggered++; | ||
doc._rev = body.rev; | ||
options.process(doc, function(err) { | ||
if (err) { | ||
statusDiff.failed++; | ||
feed.emit('worker:error', err, doc); | ||
} | ||
options.process.apply(ctx, [doc, ondone]); | ||
release(lock, function() { | ||
if (!err) { | ||
feed.emit('worker:complete', err, doc); | ||
statusDiff.completed++; | ||
} | ||
feed.resume(); | ||
storeStatus(); | ||
}); | ||
}); | ||
}); | ||
@@ -186,0 +172,0 @@ } |
{ | ||
"name": "couchdb-worker", | ||
"description": "CouchDB worker module that manages state", | ||
"version": "1.2.4", | ||
"version": "2.0.0", | ||
"homepage": "https://github.com/jo/couchdb-worker", | ||
@@ -6,0 +6,0 @@ "author": { |
@@ -43,5 +43,5 @@ # couchdb-worker | ||
* `status` | status options (optional) | ||
* `status.key` | Property to store status inside documents. Default is `worker_status`. | ||
* `status.db` | [nano](https://github.com/dscape/nano) options for status database connection. Default is to use the `db` connection. | ||
* `status.id` | id for status document. Only used if `statusDb` is given. Default is `worker-status/<id>`. | ||
* `status.id` | id for status document. Default is `worker-status/<id>`. | ||
* `status.prefix` | prefix for lock document ids. Default is `worker-lock/<id>/`. | ||
@@ -54,24 +54,28 @@ ## `process(doc, done)` | ||
The `done` callback accepts itself two arguments: an `error` property, | ||
where you can inform couchdb-worker about any errors (it will also be stored inside the document) | ||
and a `next` callback function which is called when the modified document is saved. | ||
The `done` callback accepts itself one `error` argument, | ||
where you can inform couchdb-worker about any errors (it will also be stored inside the document). | ||
## Lock | ||
To prevent two same workers from processing the same document twice, | ||
couchdb-worker keeps a lock on the document. | ||
This is achieved by putting an empty doc inside the `status.db` while processing, | ||
which will be deleted when done. | ||
The id of that lock document is calculated by appending the documents id to `status.prefix`. | ||
## Status | ||
* couchdb-worker stores its status inside the document in an object called `worker_status`. | ||
* Each worker manages its own status inside this object, eg `worker_status.myworker`. | ||
* The status can be `triggered`, `error` or `complete`. | ||
* Only one worker can run at a time on one document. | ||
* You can store your own worker status information (a retry count for example) | ||
inside the `worker_status` object. | ||
* If the processing failed, `worker_status.myworker.error` will contain the error. | ||
couchdb-worker maintains a status document, where some stats are stored: | ||
A status object can be | ||
```javascript | ||
{ | ||
worker_status: { | ||
myworker: { | ||
status: 'complete' | ||
} | ||
} | ||
"_id": "worker-status/my-worker", | ||
"worker_id": "my-worker", | ||
"seq": 123, | ||
"last_doc_id": "mydoc", | ||
"checked": 42, | ||
"triggered": 42, | ||
"completed": 40, | ||
"failed": 2 | ||
} | ||
@@ -129,2 +133,3 @@ ``` | ||
## Release History | ||
* `2.0.0`: do not store worker status in documents, store lock in extra documents | ||
* `1.0.0`: complete rewrite and new (functional) API using [nano](https://github.com/dscape/nano) | ||
@@ -131,0 +136,0 @@ (and [follow](https://github.com/iriscouch/follow)) - _currently no attachment support_ |
@@ -9,7 +9,2 @@ 'use strict'; | ||
// https://gist.github.com/jed/982883 | ||
var uuid = function b(a){return a?(a^Math.random()*16>>a/4).toString(16):([1e7]+-1e3+-4e3+-8e3+-1e11).replace(/[018]/g,b);}; | ||
var noop = function() {}; | ||
/* | ||
@@ -35,2 +30,26 @@ ======== A Handy Little Nodeunit Reference ======== | ||
// https://gist.github.com/jed/982883 | ||
var uuid = function b(a){return a?(a^Math.random()*16>>a/4).toString(16):([1e7]+-1e3+-4e3+-8e3+-1e11).replace(/[018]/g,b);}; | ||
var setUp = function(done) { | ||
var that = this; | ||
this.dbname = 'couchdb-worker-test-' + encodeURIComponent(uuid()); | ||
this.url = url.resolve(server, this.dbname); | ||
process.nextTick(function() { | ||
nano.db.create(that.dbname, function(err) { | ||
if (err) { | ||
console.error('Could not create test database', that.dbname); | ||
throw(err); | ||
} | ||
that.db = nano.use(that.dbname); | ||
done(); | ||
}); | ||
}); | ||
}; | ||
var tearDown = function(done) { | ||
nano.db.destroy(this.dbname, done); | ||
}; | ||
exports.api = { | ||
@@ -65,25 +84,11 @@ 'typeof': function(test) { | ||
exports.listen = { | ||
setUp: function(done) { | ||
var that = this; | ||
this.dbname = 'couchdb-worker-test-' + encodeURIComponent(uuid()); | ||
this.url = url.resolve(server, this.dbname); | ||
exports['callback arguments'] = { | ||
setUp: setUp, | ||
tearDown: tearDown, | ||
process.nextTick(function() { | ||
nano.db.create(that.dbname, function(err) { | ||
if (err) { | ||
throw(err); | ||
} | ||
that.db = nano.use(that.dbname); | ||
done(); | ||
}); | ||
}); | ||
}, | ||
tearDown: function(done) { | ||
nano.db.destroy(this.dbname, done); | ||
}, | ||
'feed object': function(test) { | ||
test.expect(5); | ||
var w = worker.listen({ db: this.url, id: 'myworker', process: noop }); | ||
w.on('start', function() { | ||
var w = worker.listen({ db: this.url, id: 'myworker', process: function() {} }); | ||
w.on('stop', test.done); | ||
w.on('confirm', function() { | ||
test.equal(typeof w, 'object', 'should return an object'); | ||
@@ -94,5 +99,6 @@ test.equal(typeof w.pause, 'function', 'should expose `pause` function'); | ||
test.equal(typeof w.on, 'function', 'should expose `on` function'); | ||
w.stop(); | ||
setTimeout(function() { | ||
w.stop(); | ||
}, 10); | ||
}); | ||
w.on('stop', test.done); | ||
}, | ||
@@ -111,26 +117,15 @@ 'process callback arguments': function(test) { | ||
this.db.insert({}, 'mydoc'); | ||
}, | ||
'next callback arguments': function(test) { | ||
test.expect(3); | ||
function process(doc, next) { | ||
next(null, function(err, doc) { | ||
test.ok(!err, 'error should be null'); | ||
test.equal(typeof doc, 'object', 'doc should be an object'); | ||
test.equal(doc._id, 'mydoc', 'doc _id should be `mydoc`'); | ||
w.stop(); | ||
}); | ||
} | ||
var w = worker.listen({ db: this.url, id: 'myworker', process: process }); | ||
w.on('stop', test.done); | ||
this.db.insert({}, 'mydoc'); | ||
}, | ||
'pause during process': function(test) { | ||
test.expect(2); | ||
} | ||
}; | ||
exports.pause = { | ||
setUp: setUp, | ||
tearDown: tearDown, | ||
'during process': function(test) { | ||
test.expect(1); | ||
var w; | ||
function process(doc, next) { | ||
test.ok(w.is_paused, 'feed should be paused'); | ||
next(null, function() { | ||
test.ok(!w.is_paused, 'feed should be resumed'); | ||
w.stop(); | ||
}); | ||
w.stop(); | ||
} | ||
@@ -140,144 +135,48 @@ w = worker.listen({ db: this.url, id: 'myworker', process: process }); | ||
this.db.insert({}); | ||
}, | ||
'event worker:complete': function(test) { | ||
} | ||
}; | ||
exports.events = { | ||
setUp: setUp, | ||
tearDown: tearDown, | ||
'complete': function(test) { | ||
test.expect(3); | ||
function process(doc, next) { | ||
next(null, function() { | ||
w.stop(); | ||
}); | ||
next(null); | ||
} | ||
var w = worker.listen({ db: this.url, id: 'myworker', process: process }); | ||
w.on('worker:complete', function(doc) { | ||
w.on('stop', test.done); | ||
w.on('worker:complete', function(err, doc) { | ||
test.ok(true, 'worker:complete event should have been fired'); | ||
test.equal(typeof doc, 'object', 'doc should be an object'); | ||
test.equal(doc._id, 'mydoc', 'doc _id should be `mydoc`'); | ||
w.stop(); | ||
}); | ||
w.on('stop', test.done); | ||
this.db.insert({}, 'mydoc'); | ||
}, | ||
'worker status error info': function(test) { | ||
test.expect(4); | ||
'error': function(test) { | ||
test.expect(2); | ||
var error = 'this is an error'; | ||
function process(doc, next) { | ||
next({ failed: true, reason: 'my_reason' }, function(err, doc) { | ||
test.equal(doc.worker_status.myworker.status, 'error', 'worker status should be set to `error`'); | ||
test.equal(typeof doc.worker_status.myworker.error, 'object', 'worker status error should be an object'); | ||
test.ok(doc.worker_status.myworker.error && doc.worker_status.myworker.error.failed, 'worker status error should be failed'); | ||
test.equal(doc.worker_status.myworker.error && doc.worker_status.myworker.error.reason, 'my_reason', 'worker status error reason should be set'); | ||
w.stop(); | ||
}); | ||
next(error); | ||
} | ||
var w = worker.listen({ db: this.url, id: 'myworker', process: process }); | ||
w.on('stop', test.done); | ||
this.db.insert({}); | ||
}, | ||
'event worker:error': function(test) { | ||
test.expect(5); | ||
function process(doc, next) { | ||
// let the status update fail | ||
delete doc._rev; | ||
next(null, function() { | ||
w.stop(); | ||
}); | ||
} | ||
var w = worker.listen({ db: this.url, id: 'myworker', process: process }); | ||
w.on('worker:error', function(err, doc) { | ||
test.ok(true, 'worker:error event should have been fired'); | ||
test.equal(typeof err, 'object', 'err should be an object'); | ||
test.equal(err.error, 'conflict', 'err should be a `conflict`'); | ||
test.equal(typeof doc, 'object', 'doc should be an object'); | ||
test.equal(doc._id, 'mydoc', 'doc _id should be `mydoc`'); | ||
test.equal(err, error, 'error should be returned'); | ||
w.stop(); | ||
}); | ||
w.on('stop', test.done); | ||
this.db.insert({}, 'mydoc'); | ||
}, | ||
'global worker document status': function(test) { | ||
test.expect(1); | ||
var w; | ||
var count = 0; | ||
function process(doc) { | ||
count++; | ||
if (doc._id === 'mydoc') { | ||
test.equal(count, 1, 'only mydoc should have been processed'); | ||
w.stop(); | ||
} | ||
} | ||
w = worker.listen({ db: this.url, id: 'myworker', process: process }); | ||
w.on('stop', test.done); | ||
var db = this.db; | ||
this.db.insert({ worker_status: { otherworker: { status: 'triggered' } } }, function() { | ||
db.insert({}, 'mydoc'); | ||
}); | ||
}, | ||
'own worker document status triggered': function(test) { | ||
test.expect(1); | ||
var w; | ||
var count = 0; | ||
function process(doc) { | ||
count++; | ||
if (doc._id === 'mydoc') { | ||
test.equal(count, 1, 'only mydoc should have been processed'); | ||
w.stop(); | ||
} | ||
} | ||
w = worker.listen({ db: this.url, id: 'myworker', process: process }); | ||
w.on('stop', test.done); | ||
var db = this.db; | ||
this.db.insert({ worker_status: { myworker: { status: 'triggered' } } }, function() { | ||
db.insert({}, 'mydoc'); | ||
}); | ||
}, | ||
'own worker document status complete': function(test) { | ||
test.expect(1); | ||
var w; | ||
var count = 0; | ||
function process(doc) { | ||
count++; | ||
if (doc._id === 'mydoc') { | ||
test.equal(count, 1, 'only mydoc should have been processed'); | ||
w.stop(); | ||
} | ||
} | ||
w = worker.listen({ db: this.url, id: 'myworker', process: process }); | ||
w.on('stop', test.done); | ||
var db = this.db; | ||
this.db.insert({ worker_status: { myworker: { status: 'complete' } } }, function() { | ||
db.insert({}, 'mydoc'); | ||
}); | ||
}, | ||
'own worker document status error': function(test) { | ||
test.expect(1); | ||
var w; | ||
var count = 0; | ||
function process(doc) { | ||
count++; | ||
if (doc._id === 'mydoc') { | ||
test.equal(count, 1, 'only mydoc should have been processed'); | ||
w.stop(); | ||
} | ||
} | ||
w = worker.listen({ db: this.url, id: 'myworker', process: process }); | ||
w.on('stop', test.done); | ||
var db = this.db; | ||
this.db.insert({ worker_status: { myworker: { status: 'error' } } }, function() { | ||
db.insert({}, 'mydoc'); | ||
}); | ||
}, | ||
'document status': function(test) { | ||
test.expect(3); | ||
var w; | ||
function done(err, doc) { | ||
test.ok(!err, 'no error occured'); | ||
test.equal(doc.worker_status.myworker.status, 'complete', 'doc should be in complete state'); | ||
w.stop(); | ||
} | ||
function process(doc, next) { | ||
test.equal(doc.worker_status.myworker.status, 'triggered', 'doc should be in triggered state'); | ||
next(null, done); | ||
} | ||
w = worker.listen({ db: this.url, id: 'myworker', process: process }); | ||
w.on('stop', test.done); | ||
this.db.insert({}); | ||
}, | ||
} | ||
// TODO: test skip event | ||
}; | ||
exports.status = { | ||
setUp: setUp, | ||
tearDown: tearDown, | ||
'worker status': function(test) { | ||
test.expect(7); | ||
test.expect(8); | ||
function process(doc, next) { | ||
@@ -291,4 +190,5 @@ next(null); | ||
test.ok(true, 'status has been stored'); | ||
test.equal(change.doc.worker_id, 'myworker', 'status should have worker_id'); | ||
test.equal(change.doc.seq, 1, 'status should have curren seq'); | ||
test.equal(change.doc.last, 'mydoc', 'status should have used last `mydoc`'); | ||
test.equal(change.doc.last_doc_id, 'mydoc', 'status should have used last `mydoc`'); | ||
test.equal(change.doc.checked, 1, 'status should have one checked doc'); | ||
@@ -295,0 +195,0 @@ test.equal(change.doc.triggered, 1, 'status should have one triggered doc'); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
140
18773
400