hyperdrive
Advanced tools
Comparing version 1.0.4 to 1.1.0
10
index.js
@@ -5,3 +5,3 @@ var subleveldown = require('subleveldown') | ||
var swarm = require('./lib/swarm') | ||
var feedInfo = require('./lib/feed-info') | ||
var feedState = require('./lib/feed-state') | ||
var messages = require('./lib/messages') | ||
@@ -52,6 +52,6 @@ | ||
var id = link.toString('hex') | ||
var info = this._opened[id] | ||
if (info) return info | ||
this._opened[id] = feedInfo(link, opts) | ||
return info | ||
var state = this._opened[id] | ||
if (state) return state | ||
state = this._opened[id] = feedState(this, link, opts) | ||
return state | ||
} |
@@ -5,2 +5,3 @@ var thunky = require('thunky') | ||
var BLOCKS_PER_DIRECTORY = 8192 | ||
var MAX_BINARY_LENGTH = 65536 | ||
@@ -18,20 +19,25 @@ module.exports = Feed | ||
this.index = opts.index || null | ||
this.bitfield = null | ||
this.drive = drive | ||
this._blocks = opts.blocks | ||
this._state = drive._open(link) | ||
this._decode = !!opts.decode | ||
this._ptr = null | ||
this._channel = this.drive.swarm.join(link) // TODO: only join if not fully downloaded (on open) | ||
this._subswarm = this.drive.swarm.join(link) // TODO: only join if not fully downloaded | ||
this.open = thunky(function (cb) { | ||
self._channel.ready(function () { | ||
cb(self) | ||
this.ready = thunky(function (cb) { | ||
self._state.open(function onopen (err) { | ||
if (err) return cb(err) | ||
self.bitfield = self._state.bitfield | ||
if (!self.index) self.index = self._state.index | ||
if (!self.blocks) self.blocks = self._state.blocks | ||
if (self.blocks) cb() | ||
else self._state.once('update', onopen) | ||
}) | ||
}) | ||
this.open() | ||
this.ready() | ||
} | ||
Feed.prototype.cursor = function () { | ||
if (this._decode) throw new Error('You can only create binary cursors for binary feeds') | ||
return new Cursor(this) | ||
@@ -41,21 +47,15 @@ } | ||
Feed.prototype.get = function (index, cb) { | ||
if (!this._state.opened) return this._state.open(this.get.bind(this, index, cb)) | ||
if (this._decode) cb = decoder(cb) | ||
this.open(function (self) { | ||
if (self._channel.blocks && index >= self._channel.blocks) return cb(null, null) | ||
if (self._channel.bitfield.get(index)) return self._block(index, cb) | ||
if (this._state.blocks && index >= this._state.blocks) return cb(null, null) | ||
self._channel.want.push({block: index, cb: cb}) | ||
self._channel.fetch(index) | ||
}) | ||
if (this._state.bitfield.get(index)) { | ||
this._state.get(index, cb) | ||
} else { | ||
this._state.want.push({block: index, callback: cb}) | ||
this._subswarm.fetch() | ||
} | ||
} | ||
Feed.prototype._block = function (index, cb) { | ||
var self = this | ||
this.drive._hashes.get(this._channel.prefix + (2 * index), function (err, hash) { | ||
if (err) return cb(err) | ||
self.drive._blocks.get(hash.toString('hex'), cb) | ||
}) | ||
} | ||
function decoder (cb) { | ||
@@ -78,7 +78,7 @@ return function (err, value) { | ||
if (this._feed._channel.blocks || this._feed._blocks) this._onblocks() | ||
if (this._feed._state.blocks || this._feed.blocks) this._onblocks() | ||
} | ||
Cursor.prototype._onblocks = function () { | ||
var blocks = this._feed._channel.blocks || this._feed._blocks | ||
var blocks = this._feed._state.blocks || this._feed.blocks | ||
this._indexSize = this._feed.index ? this._feed.index.length : Math.ceil(blocks / BLOCKS_PER_DIRECTORY) | ||
@@ -102,3 +102,3 @@ this._end = blocks - this._indexSize | ||
Cursor.prototype.next = function (cb) { | ||
var inited = this._feed._channel.blocks || this._feed._blocks | ||
var inited = this._feed._state.blocks || this._feed.blocks | ||
if (!inited) return this.seekAndRead(offset, cb) | ||
@@ -142,3 +142,3 @@ | ||
for (var j = 0; j < BLOCKS_PER_DIRECTORY; j++) { | ||
len = dir.readUInt16BE(2 * j) | ||
len = dir.readUInt16BE(2 * j) || MAX_BINARY_LENGTH | ||
if (offset < len) break | ||
@@ -160,3 +160,3 @@ offset -= len | ||
if (this._feed._channel.blocks) loop(0) | ||
if (this._feed._state.blocks) loop(0) | ||
this._feed.get(0, retry) | ||
@@ -188,5 +188,5 @@ | ||
Cursor.prototype._block = function (index, cb) { | ||
var blocks = this._feed._channel.blocks || this._feed._blocks | ||
var blocks = this._feed._state.blocks || this._feed.blocks | ||
if (index >= blocks) return cb(null, null) | ||
this._feed.get(index, cb) | ||
} |
var lpstream = require('length-prefixed-stream') | ||
var duplexify = require('duplexify') | ||
var ids = require('numeric-id-map') | ||
var crypto = require('crypto') | ||
var events = require('events') | ||
@@ -144,4 +145,6 @@ var bitfield = require('bitfield') | ||
function Protocol () { | ||
if (!(this instanceof Protocol)) return new Protocol() | ||
function Protocol (opts) { | ||
if (!(this instanceof Protocol)) return new Protocol(opts) | ||
if (!opts) opts = {} | ||
duplexify.call(this) | ||
@@ -152,2 +155,4 @@ | ||
this.inflight = 0 | ||
this.id = opts.id || crypto.randomBytes(32) | ||
this.remoteId = null | ||
@@ -164,3 +169,4 @@ this._parser = lpstream.decode({limit: MAX_MESSAGE}) | ||
var handshake = { | ||
protocol: 'hyperdrive' | ||
protocol: 'hyperdrive', | ||
id: this.id | ||
} | ||
@@ -184,2 +190,3 @@ | ||
} | ||
this.remoteId = handshake.id | ||
this.emit('handshake') | ||
@@ -186,0 +193,0 @@ } |
310
lib/swarm.js
var protocol = require('./protocol') | ||
var hash = require('./hash') | ||
var thunky = require('thunky') | ||
var flat = require('flat-tree') | ||
var equals = require('buffer-equals') | ||
var debug = require('debug')('hyperdrive-swarm') | ||
@@ -13,3 +9,3 @@ | ||
if (!opts) opts = {} | ||
this._name = opts.name || 'unknown' | ||
this.name = opts.name || 'unknown' | ||
this.drive = drive | ||
@@ -23,221 +19,39 @@ this.peers = [] | ||
var id = link.toString('hex') | ||
var self = this | ||
if (this.joined[id]) return this.joined[id] | ||
var bitfield = require('bitfield') | ||
var low = require('last-one-wins') | ||
var prefix = require('sublevel-prefixer')() | ||
var self = this | ||
var ptr = null | ||
var channels = { | ||
var subswarm = { | ||
id: id, | ||
feed: this.drive._open(link), | ||
link: link, | ||
bitfield: bitfield(1, {grow: Infinity}), | ||
blocks: 0, | ||
want: [], | ||
peers: [], | ||
fetch: fetch, | ||
put: put, | ||
block: get, | ||
ready: thunky(open), | ||
sync: low(sync) | ||
fetch: fetch | ||
} | ||
var first = true | ||
this.joined[id] = subswarm | ||
return subswarm | ||
function sync (data, cb) { | ||
debug('[%s] syncing bitfield', self._name) | ||
if (first) self.drive._links.put(id, {id: link, blocks: channels.blocks}) | ||
first = false | ||
self.drive._bitfields.put(id, channels.bitfield.buffer, cb) | ||
} | ||
var pending = {} | ||
function put (block, data, proof, cb) { | ||
if (!cb) cb = function () {} | ||
var batch = [] | ||
var rootLength = 0 | ||
if (!channels.blocks) { // first ever response *MUST* include root proof | ||
var blocks = validateRoots(proof, link) | ||
if (!blocks) return cb(new Error('Invalid root proof')) | ||
channels.blocks = blocks | ||
rootLength = flat.fullRoots(2 * blocks).length | ||
var roots = proof.slice(-rootLength) | ||
for (var i = 0; i < roots.length; i++) { | ||
pending['hash/' + roots[i].index] = roots[i].hash | ||
batch.push({ | ||
type: 'put', | ||
key: ptr + roots[i].index, | ||
value: roots[i].hash | ||
}) | ||
function fetch () { | ||
if (!subswarm.feed.opened) return | ||
debug('[%s] should fetch', self.name) | ||
for (var i = 0; i < subswarm.peers.length; i++) { | ||
var peer = subswarm.peers[i] | ||
if (peer.stream.inflight >= 10) continue // max 10 inflight requests | ||
var block = chooseBlock(peer) | ||
if (block > -1) { | ||
peer.request(block) | ||
debug('[%s] peer #%d is fetching block %d', self.name, i, block) | ||
} | ||
} | ||
var top = hash.data(data) | ||
var want = 2 * block | ||
var offset = 0 | ||
var swap = false | ||
var digest = { | ||
index: want, | ||
hash: top | ||
} | ||
getHash(want, loop) | ||
function write () { | ||
pending['hash/' + digest.index] = digest.hash | ||
batch.push({ | ||
type: 'put', | ||
key: ptr + digest.index, | ||
value: digest.hash | ||
}) | ||
for (var i = 0; i < offset; i++) { | ||
pending['hash/' + proof[i].index] = proof[i].hash | ||
batch.push({ | ||
type: 'put', | ||
key: ptr + proof[i].index, | ||
value: proof[i].hash | ||
}) | ||
} | ||
self.drive._hashes.batch(batch, function (err) { | ||
for (var i = 0; i < offset; i++) delete pending['hash/' + i] | ||
for (var j = proof.length - rootLength; j < proof.length; j++) delete pending['hash/' + j] | ||
if (err) return cb(err) | ||
self.drive._blocks.put(digest.hash.toString('hex'), data, function (err) { | ||
if (err) return cb(err) | ||
channels.bitfield.set(block) | ||
channels.sync() | ||
for (var i = 0; i < channels.peers.length; i++) { | ||
channels.peers[i].have(block) | ||
} | ||
var remove = [] | ||
for (var j = 0; j < channels.want.length; j++) { | ||
if (channels.want[j].block === block) { | ||
remove.push(j) | ||
channels.want[j].cb(null, data) | ||
} | ||
} | ||
for (var k = 0; k < remove.length; k++) { | ||
channels.want.splice(k, 1) | ||
} | ||
cb(null) | ||
}) | ||
}) | ||
} | ||
function loop (_, trusted) { | ||
if (trusted && equals(trusted, top)) return write() | ||
var sibling = flat.sibling(want) | ||
swap = sibling < want | ||
want = flat.parent(sibling) | ||
if (offset < proof.length && proof[offset].index === sibling) { | ||
next(null, proof[offset++].hash) | ||
} else { | ||
getHash(sibling, next) | ||
} | ||
} | ||
function next (err, sibling) { | ||
if (err) return cb(err) | ||
if (swap) top = hash.tree(sibling, top) | ||
else top = hash.tree(top, sibling) | ||
batch.push({ | ||
type: 'put', | ||
key: ptr + want, | ||
value: top | ||
}) | ||
getHash(want, loop) | ||
} | ||
} | ||
function getHash (index, cb) { | ||
var tmp = pending['hash/' + index] | ||
if (tmp) return cb(null, tmp) | ||
self.drive._hashes.get(ptr + index, cb) | ||
} | ||
function get (block, proof, cb) { | ||
if (!ptr) return cb(new Error('Must open first')) | ||
getHash(2 * block, function (err, hash) { | ||
if (err) return cb(err) | ||
self.drive._blocks.get(hash.toString('hex'), function (err, data) { | ||
if (err) return cb(err) | ||
var i = 0 | ||
loop() | ||
function loop () { | ||
if (i === proof.length) return cb(null, data, proof) | ||
self.drive._hashes.get(ptr + proof[i].index, next) | ||
} | ||
function next (err, hash) { | ||
if (err) return cb(err) | ||
proof[i++].hash = hash | ||
loop() | ||
} | ||
}) | ||
}) | ||
} | ||
function open (cb) { | ||
self.drive._links.get(id, function (_, info) { | ||
self.drive._bitfields.get(id, function (_, bits) { | ||
if (info) { | ||
first = false | ||
channels.blocks = info.blocks | ||
ptr = prefix((info.pointer || info.id).toString('hex'), '') | ||
} else { | ||
ptr = prefix(link.toString('hex'), '') | ||
} | ||
channels.prefix = ptr | ||
if (bits) { | ||
channels.bitfield = bitfield(bits, {grow: channels.blocks || Infinity}) | ||
} | ||
cb() | ||
}) | ||
}) | ||
} | ||
function fetch () { | ||
debug('[%s] should fetch', self._name) | ||
channels.ready(function () { | ||
for (var i = 0; i < channels.peers.length; i++) { | ||
var peer = channels.peers[i] | ||
if (peer.stream.inflight >= 10) continue // max 10 inflight requests | ||
var block = chooseBlock(peer) | ||
if (block > -1) { | ||
peer.request(block) | ||
debug('[%s] peer #%d is fetching block %d', self._name, i, block) | ||
} | ||
} | ||
}) | ||
} | ||
function chooseBlock (peer) { | ||
var len = peer.remoteBitfield.buffer.length * 8 | ||
for (var j = 0; j < channels.want.length; j++) { | ||
var block = channels.want[j].block | ||
for (var j = 0; j < subswarm.feed.want.length; j++) { | ||
var block = subswarm.feed.want[j].block | ||
if (peer.amRequesting.get(block)) continue | ||
if (peer.remoteBitfield.get(block) && !channels.bitfield.get(block)) { | ||
debug('[%s] choosing prioritized block #%d', self._name, block) | ||
if (peer.remoteBitfield.get(block) && !subswarm.feed.bitfield.get(block)) { | ||
debug('[%s] choosing prioritized block #%d', self.name, block) | ||
return block | ||
@@ -249,3 +63,3 @@ } | ||
if (peer.amRequesting.get(i)) continue | ||
if (peer.remoteBitfield.get(i) && !channels.bitfield.get(i)) { | ||
if (peer.remoteBitfield.get(i) && !subswarm.feed.bitfield.get(i)) { | ||
return i | ||
@@ -257,11 +71,7 @@ } | ||
} | ||
this.joined[id] = channels | ||
return channels | ||
} | ||
Swarm.prototype.join = function (link) { | ||
if (this.links.indexOf(link.toString('hex')) === -1) { | ||
this.links.push(link.toString('hex')) | ||
} | ||
var id = link.toString('hex') | ||
if (this.links.indexOf(id) === -1) this.links.push(id) | ||
@@ -279,3 +89,3 @@ for (var i = 0; i < this.peers.length; i++) { | ||
debug('[%s] new peer stream', this._name) | ||
debug('[%s] new peer stream', this.name) | ||
@@ -295,21 +105,22 @@ peer.on('channel', onchannel) | ||
function add (ch) { | ||
var state = self._get(ch.link) | ||
state.peers.push(ch) | ||
var subswarm = self._get(ch.link) | ||
subswarm.peers.push(ch) | ||
ch.on('leave', function () { | ||
var i = state.peers.indexOf(ch) | ||
if (i > -1) state.peers.splice(ch, 1) | ||
var i = subswarm.peers.indexOf(ch) | ||
if (i > -1) subswarm.peers.splice(ch, 1) | ||
}) | ||
return state | ||
return subswarm | ||
} | ||
function onchannel (ch) { | ||
var name = ch.link.toString('hex').slice(0, 12) + '/' + self._name | ||
var name = ch.link.toString('hex').slice(0, 12) + '/' + self.name | ||
var subswarm = add(ch) | ||
debug('[channel %s] joined channel', name) | ||
var state = add(ch) | ||
ch.on('response', function (block, data, proof) { | ||
debug('[channel %s] rcvd response #%d (%d bytes, proof contained %d hashes)', name, block, data.length, proof.length) | ||
state.put(block, data, proof, function (err) { | ||
subswarm.feed.put(block, data, proof, function (err) { | ||
if (err) ch.leave(err) | ||
state.fetch() | ||
subswarm.fetch(ch) | ||
}) | ||
@@ -319,30 +130,10 @@ }) | ||
ch.on('request', function (block) { | ||
if (!state.blocks) return | ||
debug('[channel %s] rcvd request #%d', name, block) | ||
// TODO: only send back what the peer is missing | ||
var proof = [] | ||
var limit = 2 * state.blocks | ||
var want = flat.sibling(2 * block) | ||
while (flat.rightSpan(want) < limit) { | ||
proof.push({ | ||
index: want, | ||
hash: null | ||
subswarm.feed.get(block, function (err, data) { | ||
if (err) return ch.leave(err) | ||
if (!data) return ch.leave(new Error('Remote peer wants a block that is out of bounds')) | ||
subswarm.feed.proof(block, function (err, proof) { | ||
if (err) return ch.leave(err) | ||
ch.response(block, data, proof) | ||
}) | ||
want = flat.sibling(flat.parent(want)) | ||
} | ||
var roots = flat.fullRoots(limit) | ||
for (var i = 0; i < roots.length; i++) { | ||
proof.push({ | ||
index: roots[i], | ||
hash: null | ||
}) | ||
} | ||
state.block(block, proof, function (err, data, proof) { | ||
if (err) return ch.leave(err) | ||
ch.response(block, data, proof) | ||
}) | ||
@@ -356,8 +147,8 @@ }) | ||
ch.on('have', function () { | ||
state.fetch() | ||
subswarm.fetch(ch) | ||
}) | ||
state.ready(function (err) { | ||
subswarm.feed.open(function (err) { | ||
if (err) return ch.leave(err) | ||
ch.bitfield(state.bitfield) | ||
ch.bitfield(subswarm.feed.bitfield) | ||
}) | ||
@@ -371,14 +162,1 @@ } | ||
} | ||
function validateRoots (proof, id) { | ||
if (!proof.length) return 0 | ||
var blocks = (flat.rightSpan(proof[proof.length - 1].index) + 2) / 2 | ||
var roots = flat.fullRoots(2 * blocks) | ||
if (proof.length < roots.length) return 0 | ||
var proofRoots = proof.slice(-roots.length) | ||
for (var i = 0; i < roots.length; i++) { | ||
if (proofRoots[i].index !== roots[i]) return 0 | ||
} | ||
if (!equals(id, hash.root(proofRoots))) return 0 | ||
return blocks | ||
} |
{ | ||
"name": "hyperdrive", | ||
"version": "1.0.4", | ||
"version": "1.1.0", | ||
"description": "A file sharing network based on rabin file chunking and append only feeds of data verified by merkle trees.", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -26,3 +26,3 @@ # hyperdrive | ||
- [ ] Tests for internal logic | ||
- [ ] A bit of refactoring | ||
- [x] A bit of refactoring | ||
@@ -29,0 +29,0 @@ ## Usage |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
42113
17
1182