Socket
Socket
Sign inDemoInstall

couchdb-worker

Package Overview
Dependencies
23
Maintainers
1
Versions
28
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 1.2.4 to 2.0.0

144

lib/couchdb-worker.js

@@ -18,5 +18,5 @@ /*

options.follow.include_docs = true;
options.status.key = options.status.key || 'worker_status';
options.status.db = options.status.db || options.db;
options.status.id = options.status.id || 'worker-status/' + options.id;
options.status.prefix = options.status.prefix || 'worker-lock/' + options.id + '/';

@@ -30,37 +30,50 @@ // mandatory options

}
// database connector
var db = require('nano')(options.db);
// status database connector
var statusDb = require('nano')(options.status.db);
// changes feed
var feed = db.follow(options.follow);
// check if document is processable, that is
// * it is not the status document
// * it has no worker status at all (no worker touched it ever)
// * there is no `triggered` status (no other worker has already taken over)
// * no own worker status for this document (I haven't touched it)
function isProcessable(doc) {
// status document
// capture a document
function capture(doc, done) {
statusDb.insert({}, options.status.prefix + doc._id, done);
}
// release a document
function release(lock, done) {
statusDb.destroy(lock.id, lock.rev, function(err, data) {
if (!err) {
return done();
}
// force delete in case of conflict
if (err.error === 'conflict') {
statusDb.get(lock._id, function(err, doc) {
if (err) {
return feed.emit('worker:release-error', err, lock);
}
lock.rev = doc._rev;
release(lock, done);
});
}
});
}
function discard(doc) {
// discard status
if (doc._id === options.status.id) {
return false;
return true;
}
// no worker status at all
if (!doc[options.status.key]) {
// discard lock
var match = doc._id.match(options.status.prefix);
if (match && match.index === 0) {
return true;
}
// no worker took over
for (var key in doc[options.status.key]) {
if (doc[options.status.key][key].status === 'triggered') {
return false;
}
}
// no own worker status
return !doc[options.status.key][options.id];
}
// initialize database connector
var db = require('nano')(options.db);
// initialize feed
var feed = db.follow(options.follow);
// initialize status database connector
var statusDb = require('nano')(options.status.db);
var statusDoc = {
_id: options.status.id
_id: options.status.id,
worker_id: options.id
};

@@ -87,3 +100,3 @@ var statusDiff = {

}
statusDoc.last = statusDiff.last;
statusDoc.last_doc_id = statusDiff.last_doc_id;
statusDoc.checked = statusDoc.checked + statusDiff.checked;

@@ -108,3 +121,3 @@ statusDoc.triggered = statusDoc.triggered + statusDiff.triggered;

delete statusDiff.seq;
delete statusDiff.last;
delete statusDiff.last_doc_id;
statusDiff.checked = 0;

@@ -118,57 +131,16 @@ statusDiff.triggered = 0;

// context for processor function evaluation
var ctx = {
db: db,
feed: feed
};
function onchange(change) {
var doc = change.doc;
statusDiff.checked++;
function ondone(err, next) {
if (err) {
doc[options.status.key][options.id].status = 'error';
doc[options.status.key][options.id].error = err;
} else {
doc[options.status.key][options.id].status = 'complete';
}
db.insert(doc, doc._id, function(err, body) {
if (err) {
feed.emit('worker:error', err, doc);
} else {
doc._rev = body.rev;
feed.emit('worker:complete', doc);
}
statusDiff.completed++;
feed.resume();
if (typeof next === 'function') {
next.apply(ctx, [err, doc]);
}
storeStatus();
});
}
if (!isProcessable(doc)) {
if (discard(doc)) {
return;
}
statusDiff.checked++;
feed.pause();
doc[options.status.key] = doc[options.status.key] || {};
doc[options.status.key][options.id] = doc[options.status.key][options.id] || {};
doc[options.status.key][options.id].status = 'triggered';
delete doc[options.status.key][options.id].error;
db.insert(doc, doc._id, function(err, body) {
capture(doc, function(err, lock) {
if (err) {
if (err.error !== 'conflict') {
feed.emit('worker:error', err, doc);
}
feed.emit('worker:skip', doc);
return;

@@ -178,8 +150,22 @@ }

statusDiff.seq = change.seq;
statusDiff.last = change.id;
statusDiff.last_doc_id = change.id;
statusDiff.triggered++;
doc._rev = body.rev;
options.process(doc, function(err) {
if (err) {
statusDiff.failed++;
feed.emit('worker:error', err, doc);
}
options.process.apply(ctx, [doc, ondone]);
release(lock, function() {
if (!err) {
feed.emit('worker:complete', err, doc);
statusDiff.completed++;
}
feed.resume();
storeStatus();
});
});
});

@@ -186,0 +172,0 @@ }

{
"name": "couchdb-worker",
"description": "CouchDB worker module that manages state",
"version": "1.2.4",
"version": "2.0.0",
"homepage": "https://github.com/jo/couchdb-worker",

@@ -6,0 +6,0 @@ "author": {

@@ -43,5 +43,5 @@ # couchdb-worker

* `status` | status options (optional)
* `status.key` | Property to store status inside documents. Default is `worker_status`.
* `status.db` | [nano](https://github.com/dscape/nano) options for status database connection. Default is to use the `db` connection.
* `status.id` | id for status document. Only used if `statusDb` is given. Default is `worker-status/<id>`.
* `status.id` | id for status document. Default is `worker-status/<id>`.
* `status.prefix` | prefix for lock document ids. Default is `worker-lock/<id>/`.

@@ -54,24 +54,28 @@ ## `process(doc, done)`

The `done` callback accepts itself two arguments: an `error` property,
where you can inform couchdb-worker about any errors (it will also be stored inside the document)
and a `next` callback function which is called when the modified document is saved.
The `done` callback accepts itself one `error` argument,
where you can inform couchdb-worker about any errors (it will also be stored inside the document).
## Lock
To prevent two same workers from processing the same document twice,
couchdb-worker keeps a lock on the document.
This is achieved by putting an empty doc inside the `status.db` while processing,
which will be deleted when done.
The id of that lock document is calculated by appending the documents id to `status.prefix`.
## Status
* couchdb-worker stores its status inside the document in an object called `worker_status`.
* Each worker manages its own status inside this object, eg `worker_status.myworker`.
* The status can be `triggered`, `error` or `complete`.
* Only one worker can run at a time on one document.
* You can store your own worker status information (a retry count for example)
inside the `worker_status` object.
* If the processing failed, `worker_status.myworker.error` will contain the error.
couchdb-worker maintains a status document, where some stats are stored:
A status object can be
```javascript
{
worker_status: {
myworker: {
status: 'complete'
}
}
"_id": "worker-status/my-worker",
"worker_id": "my-worker",
"seq": 123,
"last_doc_id": "mydoc",
"checked": 42,
"triggered": 42,
"completed": 40,
"failed": 2
}

@@ -129,2 +133,3 @@ ```

## Release History
* `2.0.0`: do not store worker status in documents, store lock in extra documents
* `1.0.0`: complete rewrite and new (functional) API using [nano](https://github.com/dscape/nano)

@@ -131,0 +136,0 @@ (and [follow](https://github.com/iriscouch/follow)) - _currently no attachment support_

@@ -9,7 +9,2 @@ 'use strict';

// https://gist.github.com/jed/982883
var uuid = function b(a){return a?(a^Math.random()*16>>a/4).toString(16):([1e7]+-1e3+-4e3+-8e3+-1e11).replace(/[018]/g,b);};
var noop = function() {};
/*

@@ -35,2 +30,26 @@ ======== A Handy Little Nodeunit Reference ========

// https://gist.github.com/jed/982883
var uuid = function b(a){return a?(a^Math.random()*16>>a/4).toString(16):([1e7]+-1e3+-4e3+-8e3+-1e11).replace(/[018]/g,b);};
var setUp = function(done) {
var that = this;
this.dbname = 'couchdb-worker-test-' + encodeURIComponent(uuid());
this.url = url.resolve(server, this.dbname);
process.nextTick(function() {
nano.db.create(that.dbname, function(err) {
if (err) {
console.error('Could not create test database', that.dbname);
throw(err);
}
that.db = nano.use(that.dbname);
done();
});
});
};
var tearDown = function(done) {
nano.db.destroy(this.dbname, done);
};
exports.api = {

@@ -65,25 +84,11 @@ 'typeof': function(test) {

exports.listen = {
setUp: function(done) {
var that = this;
this.dbname = 'couchdb-worker-test-' + encodeURIComponent(uuid());
this.url = url.resolve(server, this.dbname);
exports['callback arguments'] = {
setUp: setUp,
tearDown: tearDown,
process.nextTick(function() {
nano.db.create(that.dbname, function(err) {
if (err) {
throw(err);
}
that.db = nano.use(that.dbname);
done();
});
});
},
tearDown: function(done) {
nano.db.destroy(this.dbname, done);
},
'feed object': function(test) {
test.expect(5);
var w = worker.listen({ db: this.url, id: 'myworker', process: noop });
w.on('start', function() {
var w = worker.listen({ db: this.url, id: 'myworker', process: function() {} });
w.on('stop', test.done);
w.on('confirm', function() {
test.equal(typeof w, 'object', 'should return an object');

@@ -94,5 +99,6 @@ test.equal(typeof w.pause, 'function', 'should expose `pause` function');

test.equal(typeof w.on, 'function', 'should expose `on` function');
w.stop();
setTimeout(function() {
w.stop();
}, 10);
});
w.on('stop', test.done);
},

@@ -111,26 +117,15 @@ 'process callback arguments': function(test) {

this.db.insert({}, 'mydoc');
},
'next callback arguments': function(test) {
test.expect(3);
function process(doc, next) {
next(null, function(err, doc) {
test.ok(!err, 'error should be null');
test.equal(typeof doc, 'object', 'doc should be an object');
test.equal(doc._id, 'mydoc', 'doc _id should be `mydoc`');
w.stop();
});
}
var w = worker.listen({ db: this.url, id: 'myworker', process: process });
w.on('stop', test.done);
this.db.insert({}, 'mydoc');
},
'pause during process': function(test) {
test.expect(2);
}
};
exports.pause = {
setUp: setUp,
tearDown: tearDown,
'during process': function(test) {
test.expect(1);
var w;
function process(doc, next) {
test.ok(w.is_paused, 'feed should be paused');
next(null, function() {
test.ok(!w.is_paused, 'feed should be resumed');
w.stop();
});
w.stop();
}

@@ -140,144 +135,48 @@ w = worker.listen({ db: this.url, id: 'myworker', process: process });

this.db.insert({});
},
'event worker:complete': function(test) {
}
};
exports.events = {
setUp: setUp,
tearDown: tearDown,
'complete': function(test) {
test.expect(3);
function process(doc, next) {
next(null, function() {
w.stop();
});
next(null);
}
var w = worker.listen({ db: this.url, id: 'myworker', process: process });
w.on('worker:complete', function(doc) {
w.on('stop', test.done);
w.on('worker:complete', function(err, doc) {
test.ok(true, 'worker:complete event should have been fired');
test.equal(typeof doc, 'object', 'doc should be an object');
test.equal(doc._id, 'mydoc', 'doc _id should be `mydoc`');
w.stop();
});
w.on('stop', test.done);
this.db.insert({}, 'mydoc');
},
'worker status error info': function(test) {
test.expect(4);
'error': function(test) {
test.expect(2);
var error = 'this is an error';
function process(doc, next) {
next({ failed: true, reason: 'my_reason' }, function(err, doc) {
test.equal(doc.worker_status.myworker.status, 'error', 'worker status should be set to `error`');
test.equal(typeof doc.worker_status.myworker.error, 'object', 'worker status error should be an object');
test.ok(doc.worker_status.myworker.error && doc.worker_status.myworker.error.failed, 'worker status error should be failed');
test.equal(doc.worker_status.myworker.error && doc.worker_status.myworker.error.reason, 'my_reason', 'worker status error reason should be set');
w.stop();
});
next(error);
}
var w = worker.listen({ db: this.url, id: 'myworker', process: process });
w.on('stop', test.done);
this.db.insert({});
},
'event worker:error': function(test) {
test.expect(5);
function process(doc, next) {
// let the status update fail
delete doc._rev;
next(null, function() {
w.stop();
});
}
var w = worker.listen({ db: this.url, id: 'myworker', process: process });
w.on('worker:error', function(err, doc) {
test.ok(true, 'worker:error event should have been fired');
test.equal(typeof err, 'object', 'err should be an object');
test.equal(err.error, 'conflict', 'err should be a `conflict`');
test.equal(typeof doc, 'object', 'doc should be an object');
test.equal(doc._id, 'mydoc', 'doc _id should be `mydoc`');
test.equal(err, error, 'error should be returned');
w.stop();
});
w.on('stop', test.done);
this.db.insert({}, 'mydoc');
},
'global worker document status': function(test) {
test.expect(1);
var w;
var count = 0;
function process(doc) {
count++;
if (doc._id === 'mydoc') {
test.equal(count, 1, 'only mydoc should have been processed');
w.stop();
}
}
w = worker.listen({ db: this.url, id: 'myworker', process: process });
w.on('stop', test.done);
var db = this.db;
this.db.insert({ worker_status: { otherworker: { status: 'triggered' } } }, function() {
db.insert({}, 'mydoc');
});
},
'own worker document status triggered': function(test) {
test.expect(1);
var w;
var count = 0;
function process(doc) {
count++;
if (doc._id === 'mydoc') {
test.equal(count, 1, 'only mydoc should have been processed');
w.stop();
}
}
w = worker.listen({ db: this.url, id: 'myworker', process: process });
w.on('stop', test.done);
var db = this.db;
this.db.insert({ worker_status: { myworker: { status: 'triggered' } } }, function() {
db.insert({}, 'mydoc');
});
},
'own worker document status complete': function(test) {
test.expect(1);
var w;
var count = 0;
function process(doc) {
count++;
if (doc._id === 'mydoc') {
test.equal(count, 1, 'only mydoc should have been processed');
w.stop();
}
}
w = worker.listen({ db: this.url, id: 'myworker', process: process });
w.on('stop', test.done);
var db = this.db;
this.db.insert({ worker_status: { myworker: { status: 'complete' } } }, function() {
db.insert({}, 'mydoc');
});
},
'own worker document status error': function(test) {
test.expect(1);
var w;
var count = 0;
function process(doc) {
count++;
if (doc._id === 'mydoc') {
test.equal(count, 1, 'only mydoc should have been processed');
w.stop();
}
}
w = worker.listen({ db: this.url, id: 'myworker', process: process });
w.on('stop', test.done);
var db = this.db;
this.db.insert({ worker_status: { myworker: { status: 'error' } } }, function() {
db.insert({}, 'mydoc');
});
},
'document status': function(test) {
test.expect(3);
var w;
function done(err, doc) {
test.ok(!err, 'no error occured');
test.equal(doc.worker_status.myworker.status, 'complete', 'doc should be in complete state');
w.stop();
}
function process(doc, next) {
test.equal(doc.worker_status.myworker.status, 'triggered', 'doc should be in triggered state');
next(null, done);
}
w = worker.listen({ db: this.url, id: 'myworker', process: process });
w.on('stop', test.done);
this.db.insert({});
},
}
// TODO: test skip event
};
exports.status = {
setUp: setUp,
tearDown: tearDown,
'worker status': function(test) {
test.expect(7);
test.expect(8);
function process(doc, next) {

@@ -291,4 +190,5 @@ next(null);

test.ok(true, 'status has been stored');
test.equal(change.doc.worker_id, 'myworker', 'status should have worker_id');
test.equal(change.doc.seq, 1, 'status should have curren seq');
test.equal(change.doc.last, 'mydoc', 'status should have used last `mydoc`');
test.equal(change.doc.last_doc_id, 'mydoc', 'status should have used last `mydoc`');
test.equal(change.doc.checked, 1, 'status should have one checked doc');

@@ -295,0 +195,0 @@ test.equal(change.doc.triggered, 1, 'status should have one triggered doc');

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc