level-trigger
Advanced tools
Comparing version 0.1.3 to 1.0.0
94
index.js
@@ -0,55 +1,65 @@ | ||
var shasum = require('shasum') | ||
var queue = require('level-queue') | ||
var hooks = require('level-hooks') | ||
var uuid = require('node-uuid') | ||
var iterate = require('iterate') | ||
//if a job starts, and another is queued before the current job ends, | ||
//delay it, so that the job is only triggered once. | ||
function between(key, range) { | ||
return ( | ||
(!range.start || range.start <= key) && | ||
(!range.end || key <= range.end) | ||
) | ||
} | ||
module.exports = function (input, jobs, map, work) { | ||
if(!work) work = map, map = function (data) { return data.key } | ||
//create a subsection for the jobs, | ||
//if you don't pass in a separate db, | ||
//create a section inside the input | ||
var pending = {}, running = {} | ||
module.exports = function (db) { | ||
if('string' === typeof jobs) | ||
jobs = input.sublevel(jobs) | ||
if(db.trigger) return db | ||
var retry = [] | ||
var ranges = {} | ||
function doJob (data) { | ||
//don't process deletes! | ||
if(!data.value) return | ||
var hash = shasum(data.value) | ||
db.trigger = { | ||
add: function (range, job) { | ||
if(!job) job = range.job | ||
else range.job = job | ||
if(!running[hash]) | ||
running[hash] = true | ||
else return | ||
if(!range.name) | ||
throw new Error('expects trigger to have name') | ||
var done = false | ||
//TODO allow range to be string. | ||
range.start = range.start || '' | ||
range.end = range.end || '~' | ||
ranges[range.name] = range | ||
range.map = range.map || function (e) { return e.key } | ||
db.queue.add('trigger:'+range.name, job) | ||
} | ||
work(data.value, function (err) { | ||
if(done) return | ||
done = true | ||
if(err) { | ||
running[hash] | ||
return setTimeout(function () { | ||
doJob(data) | ||
}, 500) | ||
} | ||
jobs.del(data.key, function (err) { | ||
if(err) return retry.push(data) | ||
delete running[hash] | ||
if(pending[hash]) { | ||
delete pending[hash] | ||
doJob(data) | ||
} | ||
}) | ||
}) | ||
} | ||
queue()(db) | ||
hooks()(db) | ||
input.pre(function (ch, add) { | ||
var key = map(ch) | ||
var hash = shasum(key) | ||
console.log('KEY', key) | ||
if(!pending[hash]) | ||
add({key: Date.now(), value: key, type: 'put'}, jobs) | ||
else | ||
pending[hash] = (0 || pending[hash]) + 1 | ||
}) | ||
db.hooks.pre(function (batch) { | ||
var insert = [] | ||
//for each operation inside each range, | ||
//atomically add a record, or run a the job. | ||
iterate.join(batch, ranges, function (item, range, _, name) { | ||
var key = ''+item.key, mapped | ||
if(between(key, range) && (mapped = range.map(item))) { | ||
insert.push(db.queue('trigger:' + range.name, mapped, false)) | ||
} | ||
}) | ||
jobs.createReadStream().on('data', doJob) | ||
jobs.post(doJob) | ||
return batch.concat(insert) | ||
}) | ||
return jobs | ||
} | ||
{ | ||
"name": "level-trigger", | ||
"version": "0.1.3", | ||
"version": "1.0.0", | ||
"homepage": "https://github.com/dominictarr/level-trigger", | ||
@@ -10,11 +10,10 @@ "repository": { | ||
"dependencies": { | ||
"iterate": "~0.1.0", | ||
"level-hooks": "~1.1.1", | ||
"level-queue": "~1.0", | ||
"node-uuid": "~1.4.0" | ||
}, | ||
"devDependencies": { | ||
"levelup": "~0.3.2", | ||
"macgyver": "~1.10.1", | ||
"rimraf": "~2.1.4", | ||
"level-sublevel": "~2.1.0", | ||
"levelup": "~0.6", | ||
"rimraf": "~2.0.2", | ||
"macgyver": "~1.9.2" | ||
"shasum": "0.0.2" | ||
}, | ||
@@ -21,0 +20,0 @@ "scripts": { |
@@ -5,3 +5,4 @@ | ||
var mac = require('macgyver')().autoValidate() | ||
var trigger = require('..') | ||
var Trigger = require('..') | ||
var SubLevel = require('level-sublevel') | ||
@@ -13,13 +14,10 @@ var path = '/tmp/level-trigger-test' | ||
trigger(db) | ||
var reduced = null | ||
SubLevel(db) | ||
var reduceDb = db.sublevel('reduce') | ||
var reduced | ||
db.trigger.add({ | ||
name: 'test', | ||
map: function (item) { | ||
var trigDb = Trigger(db, 'test-trigger', function (item) { | ||
//map | ||
console.log('MAP', JSON.stringify({ | ||
key: ''+item.key, | ||
key: ''+item.key, | ||
type: item.type | ||
@@ -33,4 +31,3 @@ })) | ||
}, | ||
job: function (value, done) { | ||
function (value, done) { | ||
value = JSON.parse(value) | ||
@@ -45,16 +42,12 @@ | ||
reduced = reduce(reduced, value, put) | ||
console.log('thing', reduced) | ||
db.put('~trigger:reduced', JSON.stringify(reduced), done) | ||
console.log('reduced', reduced) | ||
reduceDb.put('reduced', JSON.stringify(reduced), done) | ||
db.emit('test:reduce', reduced) | ||
} | ||
}) | ||
}) | ||
//if we don't wait for the queue to drain, | ||
//then it will read puts from these twice. | ||
//TODO: add snapshot option for leveldb. | ||
db.once('queue:drain', function () { | ||
console.log('QUEUE') | ||
// db.once('queue:drain', function () { | ||
// console.log('QUEUE') | ||
db.put('hello-A', JSON.stringify({thing: 1})) | ||
@@ -65,3 +58,3 @@ db.put('hello-B', JSON.stringify({thing: 2})) | ||
db.del('hello-C') | ||
}) | ||
// }) | ||
@@ -68,0 +61,0 @@ db.on('test:reduce', mac().times(5)) |
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
6854
0
8
142
0
5
- Removediterate@~0.1.0
- Removedlevel-hooks@~1.1.1
- Removedlevel-queue@~1.0
- Removednode-uuid@~1.4.0
- Removediterate@0.1.1(transitive)
- Removedlevel-hooks@1.1.2(transitive)
- Removedlevel-queue@1.0.2(transitive)
- Removedmonotonic-timestamp@0.0.7(transitive)
- Removednode-uuid@1.4.8(transitive)
- Removedsha1sum@0.0.1(transitive)