Comparing version 1.3.12 to 2.0.0
118
index.js
@@ -12,2 +12,4 @@ var raf = require('random-access-file') | ||
PROTOCOL_VERSION = '1.0.0' | ||
module.exports = Multifeed | ||
@@ -158,7 +160,7 @@ | ||
var filtered = keys.filter(function (key) { | ||
return Buffer.isBuffer(key) && key.length === 32 | ||
return !Number.isNaN(parseInt(key, 16)) && key.length === 64 | ||
}) | ||
filtered.forEach(function (key) { | ||
var feeds = Object.values(self._feeds).filter(function (feed) { | ||
return feed.key.equals(key) | ||
return feed.key.toString('hex') === key | ||
}) | ||
@@ -172,3 +174,3 @@ if (!feeds.length) { | ||
debug('[REPLICATION] trying to create new local hypercore, key=' + key.toString('hex')) | ||
feed = self._hypercore(storage, key, self._opts) | ||
feed = self._hypercore(storage, Buffer.from(key, 'hex'), self._opts) | ||
} catch (e) { | ||
@@ -189,5 +191,2 @@ debug('[REPLICATION] failed to create new local hypercore, key=' + key.toString('hex')) | ||
debug('[REPLICATION] able to share ' + Object.values(this._feeds).length + ' keys') | ||
var feedWriteBuf = serializeFeedBuf(Object.values(this._feeds)) | ||
var firstWrite = true | ||
@@ -197,3 +196,6 @@ var writeStream = through(function (buf, _, next) { | ||
firstWrite = false | ||
this.push(feedWriteBuf) | ||
debug('[REPLICATION] able to share ' + Object.values(self._feeds).length + ' keys') | ||
var keys = Object.values(self._feeds).map(function (feed) { return feed.key.toString('hex') }) | ||
var headerBuf = serializeHeader(PROTOCOL_VERSION, keys) | ||
this.push(headerBuf) | ||
} | ||
@@ -204,29 +206,35 @@ this.push(buf) | ||
var firstRead = true | ||
var readingHeader = true | ||
var headerAccum = Buffer.alloc(0) | ||
var readStream = through(function (buf, _, next) { | ||
var self = this | ||
if (firstRead) { | ||
firstRead = false | ||
var res = deserializeFeedBuf(buf) | ||
if (!res) { | ||
// probably replicating with a non-multifeed peer: abort | ||
return next(new Error('replicating with non-multifeed peer')) | ||
if (readingHeader) { | ||
headerAccum = Buffer.concat([headerAccum, buf]) | ||
var expectedLen = headerAccum.readUInt32LE(0) | ||
if (headerAccum.length >= expectedLen + 4) { | ||
readingHeader = false | ||
try { | ||
var header = deserializeHeader(buf) | ||
debug('[REPLICATION] recv\'d header: ' + JSON.stringify(header)) | ||
if (!compatibleVersions(header.version, PROTOCOL_VERSION)) { | ||
debug('[REPLICATION] aborting; version mismatch (us='+PROTOCOL_VERSION+')') | ||
self.emit('error', new Error('protocol version mismatch! us='+PROTOCOL_VERSION + ' them=' + header.version)) | ||
return | ||
} | ||
addMissingKeys(header.keys, function () { | ||
// push remainder of buffer | ||
var leftover = buf.slice(expectedLen + 4) | ||
self.push(leftover) | ||
debug('[REPLICATION] starting hypercore replication') | ||
process.nextTick(startSync) | ||
next() | ||
}) | ||
} catch (e) { | ||
debug('[REPLICATION] aborting (bad header)') | ||
self.emit('error', e) | ||
return | ||
} | ||
} else { | ||
next() | ||
} | ||
var keys = res[0] | ||
var size = res[1] | ||
if (!Array.isArray(keys)) { | ||
// probably replicating with a non-multifeed peer: abort | ||
return next(new Error('replicating with non-multifeed peer')) | ||
} | ||
addMissingKeys(keys, function () { | ||
// push remainder of buffer | ||
var leftover = buf.slice(size) | ||
self.push(leftover) | ||
process.nextTick(function () { | ||
startSync() | ||
}) | ||
next() | ||
}) | ||
} else { | ||
@@ -272,24 +280,27 @@ this.push(buf) | ||
function serializeFeedBuf (feeds) { | ||
var myFeedKeys = feeds.map(function (feed) { | ||
return feed.key | ||
}) | ||
var numFeedsBuf = Buffer.alloc(2) | ||
numFeedsBuf.writeUInt16LE(myFeedKeys.length, 0) | ||
return Buffer.concat([numFeedsBuf].concat(myFeedKeys)) | ||
function serializeHeader (version, keys) { | ||
var header = { | ||
version: version, | ||
keys: keys | ||
} | ||
var json = JSON.stringify(header) | ||
var lenBuf = Buffer.alloc(4) | ||
lenBuf.writeUInt32LE(json.length, 0) | ||
var jsonBuf = Buffer.from(json, 'utf8') | ||
return Buffer.concat([ | ||
lenBuf, | ||
jsonBuf | ||
]) | ||
} | ||
function deserializeFeedBuf (buf) { | ||
var numFeeds = buf.readUInt16LE(0) | ||
var res = [] | ||
for (var i=0; i < numFeeds; i++) { | ||
var offset = 2 + i * 32 | ||
var key = buf.slice(offset, offset + 32) | ||
res.push(key) | ||
function deserializeHeader (buf) { | ||
var len = buf.readUInt32LE(0) | ||
var jsonBuf = buf.slice(4) | ||
try { | ||
var header = JSON.parse(jsonBuf.toString('utf8')) | ||
return header | ||
return header | ||
} catch (e) { | ||
return new Error('failed to parse header') | ||
} | ||
return [res, 2 + numFeeds * 32] | ||
} | ||
@@ -330,1 +341,8 @@ | ||
} | ||
// String, String -> Boolean | ||
function compatibleVersions (v1, v2) { | ||
var major1 = v1.split('.')[0] | ||
var major2 = v2.split('.')[0] | ||
return parseInt(major1) === parseInt(major2) | ||
} |
@@ -5,3 +5,3 @@ { | ||
"author": "Stephen Whitmore <sww@eight.net>", | ||
"version": "1.3.12", | ||
"version": "2.0.0", | ||
"repository": { | ||
@@ -8,0 +8,0 @@ "url": "git://github.com/noffle/multifeed.git" |
@@ -196,101 +196,1 @@ var test = require('tape') | ||
}) | ||
test('regression: concurrency of writer creation', function (t) { | ||
t.plan(3) | ||
var storage = tmp() | ||
var key | ||
var multi = multifeed(hypercore, storage, { valueEncoding: 'json' }) | ||
multi.writer('minuette', function (err, w) { | ||
t.error(err) | ||
t.ok(w.key) | ||
key = w.key | ||
}) | ||
multi.ready(function () { | ||
t.equals(multi.feeds().length, 0) | ||
}) | ||
}) | ||
test('regression: MF with no writer replicate to MF with 1 writer', function (t) { | ||
var m1 = multifeed(hypercore, ram, { valueEncoding: 'json' }) | ||
var m2 = multifeed(hypercore, ram, { valueEncoding: 'json' }) | ||
function setup1 (m, buf, cb) { | ||
m.writer(function (err, w) { | ||
t.error(err) | ||
var bufs = [] | ||
for(var i=0; i < 1000; i++) { | ||
bufs.push(buf) | ||
} | ||
w.append(bufs, function (err) { | ||
t.error(err) | ||
w.get(13, function (err, data) { | ||
t.error(err) | ||
t.equals(data, buf) | ||
t.deepEquals(m.feeds(), [w], 'read matches write') | ||
cb() | ||
}) | ||
}) | ||
}) | ||
} | ||
function setup2 (m, buf, cb) { | ||
m.writer(function (err, w) { | ||
t.error(err) | ||
var bufs = [] | ||
for(var i=0; i < 10; i++) { | ||
bufs.push(buf) | ||
} | ||
w.append(bufs, function (err) { | ||
t.error(err) | ||
w.get(3, function (err, data) { | ||
t.error(err) | ||
t.equals(data, buf) | ||
t.deepEquals(m.feeds(), [w], 'read matches write') | ||
cb() | ||
}) | ||
}) | ||
}) | ||
//cb() | ||
//m.writer(function (err, w) { | ||
// t.error(err) | ||
// cb() | ||
//}) | ||
} | ||
setup1(m1, 'foo', function () { | ||
setup2(m2, 'bar', function () { | ||
var r = m1.replicate() | ||
r.once('end', done) | ||
var s = m2.replicate() | ||
s.once('end', done) | ||
r.pipe(s).pipe(r) | ||
var pending = 2 | ||
function done () { | ||
if (!--pending) check() | ||
} | ||
}) | ||
}) | ||
function check () { | ||
t.equals(m1.feeds().length, 2, '2 feeds') | ||
t.equals(m2.feeds().length, 2, '2 feeds') | ||
t.equals(m1.feeds()[0].length, 1000, 'writer sees 1000 entries') | ||
t.equals(m1.feeds()[1].length, 10, 'writer sees 10 entries') | ||
t.equals(m2.feeds()[0].length, 10, 'receiver sees 10 entries') | ||
t.equals(m2.feeds()[1].length, 1000, 'receiver sees 1000 entries') | ||
m1.feeds()[1].get(0, function (err, data) { | ||
t.error(err) | ||
t.equals(data, 'bar', 'feed 1 has feed 2 data') | ||
m2.feeds()[1].get(0, function (err, data) { | ||
t.error(err) | ||
t.equals(data, 'foo', 'feed 2 has feed 1 data') | ||
t.end() | ||
}) | ||
}) | ||
} | ||
}) |
@@ -7,2 +7,102 @@ var test = require('tape') | ||
test('regression: concurrency of writer creation', function (t) { | ||
t.plan(3) | ||
var storage = tmp() | ||
var key | ||
var multi = multifeed(hypercore, storage, { valueEncoding: 'json' }) | ||
multi.writer('minuette', function (err, w) { | ||
t.error(err) | ||
t.ok(w.key) | ||
key = w.key | ||
}) | ||
multi.ready(function () { | ||
t.equals(multi.feeds().length, 0) | ||
}) | ||
}) | ||
test('regression: MF with no writer replicate to MF with 1 writer', function (t) { | ||
var m1 = multifeed(hypercore, ram, { valueEncoding: 'json' }) | ||
var m2 = multifeed(hypercore, ram, { valueEncoding: 'json' }) | ||
function setup1 (m, buf, cb) { | ||
m.writer(function (err, w) { | ||
t.error(err) | ||
var bufs = [] | ||
for(var i=0; i < 1000; i++) { | ||
bufs.push(buf) | ||
} | ||
w.append(bufs, function (err) { | ||
t.error(err) | ||
w.get(13, function (err, data) { | ||
t.error(err) | ||
t.equals(data, buf) | ||
t.deepEquals(m.feeds(), [w], 'read matches write') | ||
cb() | ||
}) | ||
}) | ||
}) | ||
} | ||
function setup2 (m, buf, cb) { | ||
m.writer(function (err, w) { | ||
t.error(err) | ||
var bufs = [] | ||
for(var i=0; i < 10; i++) { | ||
bufs.push(buf) | ||
} | ||
w.append(bufs, function (err) { | ||
t.error(err) | ||
w.get(3, function (err, data) { | ||
t.error(err) | ||
t.equals(data, buf) | ||
t.deepEquals(m.feeds(), [w], 'read matches write') | ||
cb() | ||
}) | ||
}) | ||
}) | ||
//cb() | ||
//m.writer(function (err, w) { | ||
// t.error(err) | ||
// cb() | ||
//}) | ||
} | ||
setup1(m1, 'foo', function () { | ||
setup2(m2, 'bar', function () { | ||
var r = m1.replicate() | ||
r.once('end', done) | ||
var s = m2.replicate() | ||
s.once('end', done) | ||
r.pipe(s).pipe(r) | ||
var pending = 2 | ||
function done () { | ||
if (!--pending) check() | ||
} | ||
}) | ||
}) | ||
function check () { | ||
t.equals(m1.feeds().length, 2, '2 feeds') | ||
t.equals(m2.feeds().length, 2, '2 feeds') | ||
t.equals(m1.feeds()[0].length, 1000, 'writer sees 1000 entries') | ||
t.equals(m1.feeds()[1].length, 10, 'writer sees 10 entries') | ||
t.equals(m2.feeds()[0].length, 10, 'receiver sees 10 entries') | ||
t.equals(m2.feeds()[1].length, 1000, 'receiver sees 1000 entries') | ||
m1.feeds()[1].get(0, function (err, data) { | ||
t.error(err) | ||
t.equals(data, 'bar', 'feed 1 has feed 2 data') | ||
m2.feeds()[1].get(0, function (err, data) { | ||
t.error(err) | ||
t.equals(data, 'foo', 'feed 2 has feed 1 data') | ||
t.end() | ||
}) | ||
}) | ||
} | ||
}) | ||
test('regression: start replicating before feeds are loaded', function (t) { | ||
@@ -9,0 +109,0 @@ t.plan(22) |
23871
631
8