map-reduce
Advanced tools
Comparing version 1.0.3 to 1.0.4
29
index.js
@@ -6,2 +6,3 @@ | ||
var liveStream = require('level-live-stream') | ||
var delayJob = require('./delay-job') | ||
@@ -60,2 +61,11 @@ module.exports = function (opts) { | ||
db.queue | ||
//simplify by doing the maps jobs atomically, | ||
//as part of the hook. | ||
//OH, map is async, because a single map can point to many. | ||
//and, it has implicit deletes. | ||
//it's a fan-out map. | ||
//and then reduce is fan-in. | ||
//there are two distict modules here which are conflated. | ||
.add('map:'+name, function (job, done) { | ||
@@ -66,21 +76,4 @@ db.get(job, function (err, doc) { | ||
}) | ||
.add('reduce:'+name, function (job, done) { | ||
job = JSON.parse(job) | ||
if('string' === typeof job) | ||
throw new Error(JSON.stringify(job)) | ||
var jsonKey = JSON.stringify(job) | ||
.add('reduce:'+name, delayJob(doReduce)) | ||
function go() { | ||
delete reducers[jsonKey] | ||
doReduce(job, done) | ||
} | ||
var old = reducers[jsonKey] | ||
if(old) clearTimeout(old.timeout) | ||
reducers[jsonKey] = {done: done, timeout: setTimeout(go, 500)} | ||
//mark the old job as done. | ||
if(old && 'function' === typeof old.done) old.done() | ||
}) | ||
var reducers = {} | ||
function doReduce (key, cb) { | ||
@@ -87,0 +80,0 @@ if(!Array.isArray(key)) |
{ | ||
"name": "map-reduce", | ||
"description": "map-reduce on leveldb", | ||
"version": "1.0.3", | ||
"version": "1.0.4", | ||
"homepage": "https://github.com/dominictarr/map-reduce", | ||
@@ -15,3 +15,4 @@ "repository": { | ||
"level-queue": "~0.0.4", | ||
"level-live-stream": "0.0.2" | ||
"level-live-stream": "0.0.2", | ||
"sha1sum": "0.0.1" | ||
}, | ||
@@ -18,0 +19,0 @@ "devDependencies": { |
19017
16
476
5
+ Addedsha1sum@0.0.1
+ Addedsha1sum@0.0.1(transitive)