Socket
Socket
Sign inDemoInstall

couchdb-worker

Package Overview
Dependencies
Maintainers
1
Versions
28
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

couchdb-worker - npm Package Compare versions

Comparing version 1.1.0 to 1.2.0

23

Gruntfile.js

@@ -45,5 +45,26 @@ 'use strict';

grunt.registerTask('clean', 'Clean up lost test databases.', function() {
var nano = require('nano')(process.env.COUCH_URL || 'http://localhost:5984');
var done = this.async();
function destroy(db, next) {
nano.db.destroy(db, function(err, resp) {
if (err) {
grunt.log.err('Failed to delete ' + db + ': ' + err.error);
} else {
grunt.log.ok('Deleted ' + db);
}
next(err, resp);
});
}
nano.db.list(function(err, body) {
var dbs = body.filter(function(db) { return db.match(/^couchdb-worker-test-/); });
grunt.util.async.map(dbs, destroy, done);
});
});
// Default task.
grunt.registerTask('default', ['jshint', 'nodeunit']);
};

89

lib/couchdb-worker.js

@@ -14,19 +14,20 @@ /*

options.follow = options.follow || {};
options.status = options.status || {};
// defaults
options.follow.include_docs = true;
options.status.key = options.status.key || 'worker_status';
options.status.db = options.status.db || options.db;
options.status.id = options.status.id || 'worker-status/' + options.id;
// worker id
// mandatory options
if (typeof options.id !== 'string') {
throw('worker needs an id.');
}
var id = options.id;
delete options.id;
// process function
if (typeof options.process !== 'function') {
throw('worker needs a process function.');
}
var process = options.process;
delete options.process;
// check if document is processable, that is
// * it is not the status document
// * it has no worker status at all (no worker touched it ever)

@@ -36,9 +37,13 @@ // * there is no `triggered` status (no other worker has already taken over)

function isProcessable(doc) {
// status document
if (doc._id === options.status.id) {
return false;
}
// no worker status at all
if (!doc.worker_status) {
if (!doc[options.status.key]) {
return true;
}
// no worker took over
for (var key in doc.worker_status) {
if (doc.worker_status[key].status === 'triggered') {
for (var key in doc[options.status.key]) {
if (doc[options.status.key][key].status === 'triggered') {
return false;

@@ -48,11 +53,24 @@ }

// no own worker status
return !doc.worker_status[id];
return !doc[options.status.key][options.id];
}
// initialize database connector
var db = require('nano')(options.db);
// initialize status database connector
var statusDb = require('nano')(options.status.db);
// initialize feed
var feed = db.follow(options.follow);
feed.status = {
_id: options.status.id
};
function storeStatus() {
statusDb.insert(feed.status, function(err, body) {
if (!err) {
feed.status._rev = body.rev;
}
});
}
// context for processor function evaluation

@@ -67,11 +85,14 @@ var ctx = {

feed.status.checked = feed.status.checked || 0;
feed.status.checked++;
function ondone(err, next) {
if (err) {
doc.worker_status[id].status = 'error';
doc.worker_status[id].error = err;
doc[options.status.key][options.id].status = 'error';
doc[options.status.key][options.id].error = err;
} else {
doc.worker_status[id].status = 'complete';
doc[options.status.key][options.id].status = 'complete';
}
db.insert(doc, doc._rev, function(err, body) {
db.insert(doc, function(err, body) {
if (err) {

@@ -84,2 +105,5 @@ feed.emit('worker:error', err, doc);

feed.status.completed = feed.status.completed || 0;
feed.status.completed++;
feed.resume();

@@ -90,2 +114,5 @@

}
// store status
storeStatus();
});

@@ -100,8 +127,8 @@ }

doc.worker_status = doc.worker_status || {};
doc.worker_status[id] = doc.worker_status[id] || {};
doc.worker_status[id].status = 'triggered';
delete doc.worker_status[id].error;
doc[options.status.key] = doc[options.status.key] || {};
doc[options.status.key][options.id] = doc[options.status.key][options.id] || {};
doc[options.status.key][options.id].status = 'triggered';
delete doc[options.status.key][options.id].error;
db.insert(doc, doc._rev, function(err, body) {
db.insert(doc, function(err, body) {
if (err) {

@@ -114,5 +141,10 @@ if (err.error !== 'conflict') {

feed.status.seq = change.seq;
feed.status.last = doc._id;
feed.status.triggered = feed.status.triggered || 0;
feed.status.triggered++;
doc._rev = body.rev;
process.apply(ctx, [doc, ondone]);
options.process.apply(ctx, [doc, ondone]);
});

@@ -123,5 +155,14 @@ }

feed.on('change', onchange);
feed.on('stop', storeStatus);
// start listening
feed.follow();
statusDb.get(feed.status._id, function(err, doc) {
if (!err && doc) {
feed.status = doc;
}
if (feed.status.seq) {
feed.since = feed.status.seq;
}
// start listening
feed.follow();
});

@@ -128,0 +169,0 @@ // return feed object

{
"name": "couchdb-worker",
"description": "CouchDB worker module that manages state",
"version": "1.1.0",
"version": "1.2.0",
"homepage": "https://github.com/jo/couchdb-worker",

@@ -6,0 +6,0 @@ "author": {

@@ -42,2 +42,6 @@ # couchdb-worker

* `follow` | [follow](https://github.com/iriscouch/follow) options
* `status` | status options (optional)
* `status.key` | Property to store status inside documents. Default is `worker_status`.
* `status.db` | [nano](https://github.com/dscape/nano) options for status database connection. Default is to use the `db` connection.
* `status.id` | id for status document. Only used if `statusDb` is given. Default is `worker-status/<id>`.

@@ -44,0 +48,0 @@ ## `process(doc, done)`

@@ -67,8 +67,10 @@ 'use strict';

nano.db.create(this.dbname, function(err) {
if (err) {
throw(err);
}
that.db = nano.use(that.dbname);
done();
process.nextTick(function() {
nano.db.create(that.dbname, function(err) {
if (err) {
throw(err);
}
that.db = nano.use(that.dbname);
done();
});
});

@@ -82,9 +84,11 @@ },

var w = worker.listen({ db: this.url, id: 'myworker', process: function() {} });
test.equal(typeof w, 'object', 'should return an object');
test.equal(typeof w.pause, 'function', 'should expose `pause` function');
test.equal(typeof w.resume, 'function', 'should expose `resume` function');
test.equal(typeof w.stop, 'function', 'should expose `stop` function');
test.equal(typeof w.on, 'function', 'should expose `on` function');
w.stop();
test.done();
w.on('start', function() {
test.equal(typeof w, 'object', 'should return an object');
test.equal(typeof w.pause, 'function', 'should expose `pause` function');
test.equal(typeof w.resume, 'function', 'should expose `resume` function');
test.equal(typeof w.stop, 'function', 'should expose `stop` function');
test.equal(typeof w.on, 'function', 'should expose `on` function');
w.stop();
});
w.on('stop', test.done);
},

@@ -99,5 +103,5 @@ 'process callback arguments': function(test) {

w.stop();
test.done();
}
w = worker.listen({ db: this.url, id: 'myworker', process: process });
w.on('stop', test.done);
this.db.insert({}, 'mydoc');

@@ -113,6 +117,6 @@ },

w.stop();
test.done();
});
}
var w = worker.listen({ db: this.url, id: 'myworker', process: process });
w.on('stop', test.done);
this.db.insert({}, 'mydoc');

@@ -128,6 +132,6 @@ },

w.stop();
test.done();
});
}
w = worker.listen({ db: this.url, id: 'myworker', process: process });
w.on('stop', test.done);
this.db.insert({});

@@ -147,4 +151,4 @@ },

test.equal(doc._id, 'mydoc', 'doc _id should be `mydoc`');
test.done();
});
w.on('stop', test.done);
this.db.insert({}, 'mydoc');

@@ -161,6 +165,6 @@ },

w.stop();
test.done();
});
}
var w = worker.listen({ db: this.url, id: 'myworker', process: process });
w.on('stop', test.done);
this.db.insert({});

@@ -184,7 +188,7 @@ },

test.equal(doc._id, 'mydoc', 'doc _id should be `mydoc`');
test.done();
});
w.on('stop', test.done);
this.db.insert({}, 'mydoc');
},
'global worker status': function(test) {
'global worker document status': function(test) {
test.expect(1);

@@ -198,6 +202,6 @@ var w;

w.stop();
test.done();
}
}
w = worker.listen({ db: this.url, id: 'myworker', process: process });
w.on('stop', test.done);
var db = this.db;

@@ -208,3 +212,3 @@ this.db.insert({ worker_status: { otherworker: { status: 'triggered' } } }, function() {

},
'own worker status triggered': function(test) {
'own worker document status triggered': function(test) {
test.expect(1);

@@ -218,6 +222,6 @@ var w;

w.stop();
test.done();
}
}
w = worker.listen({ db: this.url, id: 'myworker', process: process });
w.on('stop', test.done);
var db = this.db;

@@ -228,3 +232,3 @@ this.db.insert({ worker_status: { myworker: { status: 'triggered' } } }, function() {

},
'own worker status complete': function(test) {
'own worker document status complete': function(test) {
test.expect(1);

@@ -238,6 +242,6 @@ var w;

w.stop();
test.done();
}
}
w = worker.listen({ db: this.url, id: 'myworker', process: process });
w.on('stop', test.done);
var db = this.db;

@@ -248,3 +252,3 @@ this.db.insert({ worker_status: { myworker: { status: 'complete' } } }, function() {

},
'own worker status error': function(test) {
'own worker document status error': function(test) {
test.expect(1);

@@ -258,6 +262,6 @@ var w;

w.stop();
test.done();
}
}
w = worker.listen({ db: this.url, id: 'myworker', process: process });
w.on('stop', test.done);
var db = this.db;

@@ -268,3 +272,3 @@ this.db.insert({ worker_status: { myworker: { status: 'error' } } }, function() {

},
'worker status': function(test) {
'document status': function(test) {
test.expect(3);

@@ -276,3 +280,2 @@ var w;

w.stop();
test.done();
}

@@ -284,4 +287,71 @@ function process(doc, next) {

w = worker.listen({ db: this.url, id: 'myworker', process: process });
w.on('stop', test.done);
this.db.insert({});
},
'worker status default id': function(test) {
test.expect(1);
var w;
function done() {
w.stop();
}
function process(doc, next) {
next(null, done);
}
w = worker.listen({ db: this.url, id: 'myworker', process: process });
test.equal(w.status && w.status._id, 'worker-status/myworker', 'status should have default id');
w.on('stop', test.done);
this.db.insert({}, 'mydoc');
},
'worker status custom id': function(test) {
test.expect(1);
var w;
function done() {
w.stop();
}
function process(doc, next) {
next(null, done);
}
w = worker.listen({ db: this.url, id: 'myworker', process: process, status: { id: 'mystatus' } });
test.equal(w.status && w.status._id, 'mystatus', 'status should have custom id');
w.on('stop', test.done);
this.db.insert({}, 'mydoc');
},
'worker status updates': function(test) {
test.expect(6);
var w;
function done() {
w.stop();
}
function process(doc, next) {
test.equal(w.status.checked, 1, 'status should have checked one doc');
test.equal(w.status.seq, 1, 'status should have curren seq');
test.equal(w.status.last, 'mydoc', 'status should have used last `mydoc`');
test.equal(w.status.triggered, 1, 'status should have triggered one doc');
next(null, done);
}
w = worker.listen({ db: this.url, id: 'myworker', process: process });
test.equal(typeof w.status, 'object', 'status should be an object');
test.equal(w.status._id, 'worker-status/myworker', 'status should have default id');
w.on('stop', test.done);
this.db.insert({}, 'mydoc');
},
'worker status storage': function(test) {
test.expect(1);
var w;
function process(doc, next) {
next(null);
}
w = worker.listen({ db: this.url, id: 'myworker', process: process });
var feed = this.db.follow();
feed.on('change', function(change) {
if (change.id === 'worker-status/myworker') {
test.ok(true, 'status has been stored');
feed.stop();
w.stop();
// test.done();
}
});
feed.follow();
this.db.insert({}, 'mydoc');
},
};
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc