Comparing version 0.1.5 to 0.2.1
@@ -15,2 +15,4 @@ // this module assumes it will be used as a .prototype (e.g. uses `this`) | ||
var through = require('through') | ||
var batcher = require('level-batcher') | ||
var combiner = require('stream-combiner') | ||
var EOL = require('os').EOL | ||
@@ -22,3 +24,3 @@ var storage = require(path.join(__dirname, 'storage')) | ||
valueEncoding: 'json', | ||
writeBufferSize: 1024 * 1024 * 16 // 16MB | ||
writeBufferSize: 1024 * 1024 * 4 // 4MB | ||
} | ||
@@ -206,13 +208,41 @@ | ||
var ended = false | ||
return through(function onWrite(row) { | ||
var writeStream = this | ||
var batchStream = batcher(dbOptions.writeBufferSize) | ||
var writeStream = through(onWrite, onEnd) | ||
return combiner(batchStream, writeStream) | ||
function onWrite(rows) { | ||
var store = self._storage(options, function(err) { | ||
if (err) return writeStream.queue({error: err.message, row: row}) | ||
store.put(row, function(err, saved) { | ||
if (err) return writeStream.queue({error: err.message, row: row}) | ||
writeStream.queue({success: true, row: saved}) | ||
if (ended) writeStream.queue(null) | ||
}) | ||
if (err) return console.error('error opening storage', err) | ||
var batch = store.db.batch() | ||
var len = rows.length | ||
var pending = len | ||
for (var i = 0; i < len; i++) { | ||
updateClosure(i) | ||
function updateClosure(i) { | ||
store.updateRevision(rows[i], function(err, row) { | ||
if (err) return rows[i] = {err: err, row: rows[i]} | ||
rows[i] = {success: true, row: row} | ||
var keys = store.rowKeys(row._id, row._seq) | ||
batch.put(keys.seq, {_id: row, _seq: row._seq, _rev: row._rev}) | ||
batch.put(keys.row, row) | ||
pending-- | ||
if (pending === 0) commit() | ||
}) | ||
} | ||
} | ||
function commit() { | ||
batch.write(function(err) { | ||
if (err) console.log('batch write err', err) | ||
for (var i = 0; i < len; i++) writeStream.queue(rows[i]) | ||
batchStream.next() | ||
if (ended) writeStream.queue(null) | ||
}) | ||
} | ||
}) | ||
}, function(){ ended = true }) | ||
} | ||
function onEnd() { ended = true } | ||
} | ||
@@ -219,0 +249,0 @@ |
@@ -76,3 +76,2 @@ // this file is adapted from mikeal/level-sleep/index.js | ||
if (e) return cb(new Error('not found.')) | ||
// console.log('next', startKey, key, value) | ||
cb(false, value._id) | ||
@@ -83,3 +82,3 @@ }) | ||
Database.prototype.put = function (doc, cb) { | ||
Database.prototype.updateRevision = function (doc, cb) { | ||
var self = this | ||
@@ -89,3 +88,3 @@ if (typeof doc._id !== 'string') doc._id = uuid() | ||
if (err) old = {} | ||
if (old._rev !== doc._rev) return cb(false, 'Error: Supplied _rev out of date. Existing _rev: ' + old._rev + ', supplied _rev: ' + doc._rev + '.') | ||
if (old._rev !== doc._rev) return cb('Error: Supplied _rev out of date. Existing _rev: ' + old._rev + ', supplied _rev: ' + doc._rev + '.') | ||
var prev = row.revision(old._rev) | ||
@@ -95,9 +94,22 @@ doc._rev = prev + 1 + '-' + row.hash(doc) | ||
doc._seq = seq | ||
var seqKey = self._key(self.seqKey, lexint(seq, 'hex')) | ||
var rowKey = self._key(self.dataKey, doc._id + seqKey) | ||
self.mutex.put(seqKey, {_id: doc._id, _seq: seq, _rev: doc._rev}, noop) | ||
self.mutex.put(rowKey, doc, function (e) { | ||
cb(false, doc) | ||
}) | ||
} | ||
Database.prototype.rowKeys = function(id, seq) { | ||
var seqKey = this._key(this.seqKey, lexint(seq, 'hex')) | ||
return { | ||
seq: seqKey, | ||
row: this._key(this.dataKey, id + seqKey) | ||
} | ||
} | ||
Database.prototype.put = function (rawDoc, cb) { | ||
var self = this | ||
this.updateRevision(rawDoc, function(err, doc) { | ||
var keys = self.rowKeys(doc._id, doc._seq) | ||
self.mutex.put(keys.seq, {_id: doc._id, _seq: doc._seq, _rev: doc._rev}, noop) | ||
self.mutex.put(keys.row, doc, function (e) { | ||
if (e) return cb(e) | ||
cb(null, doc) | ||
// self.emit('entry', {seq: seq, data: doc}) | ||
}) | ||
@@ -114,3 +126,2 @@ }) | ||
cb(null, seq) | ||
// self.emit('entry', {seq: seq, id: key, data: value, deleted: true}) | ||
}) | ||
@@ -117,0 +128,0 @@ } |
{ | ||
"name": "dat", | ||
"version": "0.1.5", | ||
"version": "0.2.1", | ||
"description": "data sharing and replication tool", | ||
@@ -26,7 +26,9 @@ "main": "index.js", | ||
"untar": "~0.2.3", | ||
"lexicographic-integer": "0.0.0", | ||
"lexicographic-integer": "~0.0.0", | ||
"through": "~2.3.4", | ||
"split": "~0.2.10", | ||
"request": "~2.27.0", | ||
"hat": "0.0.3" | ||
"hat": "~0.0.3", | ||
"level-batcher": "~0.0.2", | ||
"stream-combiner": "~0.0.2" | ||
}, | ||
@@ -33,0 +35,0 @@ "devDependencies": { |
356924
676
15
+ Addedlevel-batcher@~0.0.2
+ Addedstream-combiner@~0.0.2
+ Addedduplexer@0.1.2(transitive)
+ Addedlevel-batcher@0.0.3(transitive)
+ Addedstream-combiner@0.0.4(transitive)
Updatedhat@~0.0.3
Updatedlexicographic-integer@~0.0.0