Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

hyperdrive

Package Overview
Dependencies
Maintainers
1
Versions
273
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

hyperdrive - npm Package Compare versions

Comparing version 1.0.4 to 1.1.0

lib/feed-state.js

10

index.js

@@ -5,3 +5,3 @@ var subleveldown = require('subleveldown')

var swarm = require('./lib/swarm')
var feedInfo = require('./lib/feed-info')
var feedState = require('./lib/feed-state')
var messages = require('./lib/messages')

@@ -52,6 +52,6 @@

var id = link.toString('hex')
var info = this._opened[id]
if (info) return info
this._opened[id] = feedInfo(link, opts)
return info
var state = this._opened[id]
if (state) return state
state = this._opened[id] = feedState(this, link, opts)
return state
}

@@ -5,2 +5,3 @@ var thunky = require('thunky')

var BLOCKS_PER_DIRECTORY = 8192
var MAX_BINARY_LENGTH = 65536

@@ -18,20 +19,25 @@ module.exports = Feed

this.index = opts.index || null
this.bitfield = null
this.drive = drive
this._blocks = opts.blocks
this._state = drive._open(link)
this._decode = !!opts.decode
this._ptr = null
this._channel = this.drive.swarm.join(link) // TODO: only join if not fully downloaded (on open)
this._subswarm = this.drive.swarm.join(link) // TODO: only join if not fully downloaded
this.open = thunky(function (cb) {
self._channel.ready(function () {
cb(self)
this.ready = thunky(function (cb) {
self._state.open(function onopen (err) {
if (err) return cb(err)
self.bitfield = self._state.bitfield
if (!self.index) self.index = self._state.index
if (!self.blocks) self.blocks = self._state.blocks
if (self.blocks) cb()
else self._state.once('update', onopen)
})
})
this.open()
this.ready()
}
Feed.prototype.cursor = function () {
if (this._decode) throw new Error('You can only create binary cursors for binary feeds')
return new Cursor(this)

@@ -41,21 +47,15 @@ }

Feed.prototype.get = function (index, cb) {
if (!this._state.opened) return this._state.open(this.get.bind(this, index, cb))
if (this._decode) cb = decoder(cb)
this.open(function (self) {
if (self._channel.blocks && index >= self._channel.blocks) return cb(null, null)
if (self._channel.bitfield.get(index)) return self._block(index, cb)
if (this._state.blocks && index >= this._state.blocks) return cb(null, null)
self._channel.want.push({block: index, cb: cb})
self._channel.fetch(index)
})
if (this._state.bitfield.get(index)) {
this._state.get(index, cb)
} else {
this._state.want.push({block: index, callback: cb})
this._subswarm.fetch()
}
}
Feed.prototype._block = function (index, cb) {
var self = this
this.drive._hashes.get(this._channel.prefix + (2 * index), function (err, hash) {
if (err) return cb(err)
self.drive._blocks.get(hash.toString('hex'), cb)
})
}
function decoder (cb) {

@@ -78,7 +78,7 @@ return function (err, value) {

if (this._feed._channel.blocks || this._feed._blocks) this._onblocks()
if (this._feed._state.blocks || this._feed.blocks) this._onblocks()
}
Cursor.prototype._onblocks = function () {
var blocks = this._feed._channel.blocks || this._feed._blocks
var blocks = this._feed._state.blocks || this._feed.blocks
this._indexSize = this._feed.index ? this._feed.index.length : Math.ceil(blocks / BLOCKS_PER_DIRECTORY)

@@ -102,3 +102,3 @@ this._end = blocks - this._indexSize

Cursor.prototype.next = function (cb) {
var inited = this._feed._channel.blocks || this._feed._blocks
var inited = this._feed._state.blocks || this._feed.blocks
if (!inited) return this.seekAndRead(offset, cb)

@@ -142,3 +142,3 @@

for (var j = 0; j < BLOCKS_PER_DIRECTORY; j++) {
len = dir.readUInt16BE(2 * j)
len = dir.readUInt16BE(2 * j) || MAX_BINARY_LENGTH
if (offset < len) break

@@ -160,3 +160,3 @@ offset -= len

if (this._feed._channel.blocks) loop(0)
if (this._feed._state.blocks) loop(0)
this._feed.get(0, retry)

@@ -188,5 +188,5 @@

Cursor.prototype._block = function (index, cb) {
var blocks = this._feed._channel.blocks || this._feed._blocks
var blocks = this._feed._state.blocks || this._feed.blocks
if (index >= blocks) return cb(null, null)
this._feed.get(index, cb)
}
var lpstream = require('length-prefixed-stream')
var duplexify = require('duplexify')
var ids = require('numeric-id-map')
var crypto = require('crypto')
var events = require('events')

@@ -144,4 +145,6 @@ var bitfield = require('bitfield')

function Protocol () {
if (!(this instanceof Protocol)) return new Protocol()
function Protocol (opts) {
if (!(this instanceof Protocol)) return new Protocol(opts)
if (!opts) opts = {}
duplexify.call(this)

@@ -152,2 +155,4 @@

this.inflight = 0
this.id = opts.id || crypto.randomBytes(32)
this.remoteId = null

@@ -164,3 +169,4 @@ this._parser = lpstream.decode({limit: MAX_MESSAGE})

var handshake = {
protocol: 'hyperdrive'
protocol: 'hyperdrive',
id: this.id
}

@@ -184,2 +190,3 @@

}
this.remoteId = handshake.id
this.emit('handshake')

@@ -186,0 +193,0 @@ }

var protocol = require('./protocol')
var hash = require('./hash')
var thunky = require('thunky')
var flat = require('flat-tree')
var equals = require('buffer-equals')
var debug = require('debug')('hyperdrive-swarm')

@@ -13,3 +9,3 @@

if (!opts) opts = {}
this._name = opts.name || 'unknown'
this.name = opts.name || 'unknown'
this.drive = drive

@@ -23,221 +19,39 @@ this.peers = []

var id = link.toString('hex')
var self = this
if (this.joined[id]) return this.joined[id]
var bitfield = require('bitfield')
var low = require('last-one-wins')
var prefix = require('sublevel-prefixer')()
var self = this
var ptr = null
var channels = {
var subswarm = {
id: id,
feed: this.drive._open(link),
link: link,
bitfield: bitfield(1, {grow: Infinity}),
blocks: 0,
want: [],
peers: [],
fetch: fetch,
put: put,
block: get,
ready: thunky(open),
sync: low(sync)
fetch: fetch
}
var first = true
this.joined[id] = subswarm
return subswarm
function sync (data, cb) {
debug('[%s] syncing bitfield', self._name)
if (first) self.drive._links.put(id, {id: link, blocks: channels.blocks})
first = false
self.drive._bitfields.put(id, channels.bitfield.buffer, cb)
}
var pending = {}
function put (block, data, proof, cb) {
if (!cb) cb = function () {}
var batch = []
var rootLength = 0
if (!channels.blocks) { // first ever response *MUST* include root proof
var blocks = validateRoots(proof, link)
if (!blocks) return cb(new Error('Invalid root proof'))
channels.blocks = blocks
rootLength = flat.fullRoots(2 * blocks).length
var roots = proof.slice(-rootLength)
for (var i = 0; i < roots.length; i++) {
pending['hash/' + roots[i].index] = roots[i].hash
batch.push({
type: 'put',
key: ptr + roots[i].index,
value: roots[i].hash
})
function fetch () {
if (!subswarm.feed.opened) return
debug('[%s] should fetch', self.name)
for (var i = 0; i < subswarm.peers.length; i++) {
var peer = subswarm.peers[i]
if (peer.stream.inflight >= 10) continue // max 10 inflight requests
var block = chooseBlock(peer)
if (block > -1) {
peer.request(block)
debug('[%s] peer #%d is fetching block %d', self.name, i, block)
}
}
var top = hash.data(data)
var want = 2 * block
var offset = 0
var swap = false
var digest = {
index: want,
hash: top
}
getHash(want, loop)
function write () {
pending['hash/' + digest.index] = digest.hash
batch.push({
type: 'put',
key: ptr + digest.index,
value: digest.hash
})
for (var i = 0; i < offset; i++) {
pending['hash/' + proof[i].index] = proof[i].hash
batch.push({
type: 'put',
key: ptr + proof[i].index,
value: proof[i].hash
})
}
self.drive._hashes.batch(batch, function (err) {
for (var i = 0; i < offset; i++) delete pending['hash/' + i]
for (var j = proof.length - rootLength; j < proof.length; j++) delete pending['hash/' + j]
if (err) return cb(err)
self.drive._blocks.put(digest.hash.toString('hex'), data, function (err) {
if (err) return cb(err)
channels.bitfield.set(block)
channels.sync()
for (var i = 0; i < channels.peers.length; i++) {
channels.peers[i].have(block)
}
var remove = []
for (var j = 0; j < channels.want.length; j++) {
if (channels.want[j].block === block) {
remove.push(j)
channels.want[j].cb(null, data)
}
}
for (var k = 0; k < remove.length; k++) {
channels.want.splice(k, 1)
}
cb(null)
})
})
}
function loop (_, trusted) {
if (trusted && equals(trusted, top)) return write()
var sibling = flat.sibling(want)
swap = sibling < want
want = flat.parent(sibling)
if (offset < proof.length && proof[offset].index === sibling) {
next(null, proof[offset++].hash)
} else {
getHash(sibling, next)
}
}
function next (err, sibling) {
if (err) return cb(err)
if (swap) top = hash.tree(sibling, top)
else top = hash.tree(top, sibling)
batch.push({
type: 'put',
key: ptr + want,
value: top
})
getHash(want, loop)
}
}
function getHash (index, cb) {
var tmp = pending['hash/' + index]
if (tmp) return cb(null, tmp)
self.drive._hashes.get(ptr + index, cb)
}
function get (block, proof, cb) {
if (!ptr) return cb(new Error('Must open first'))
getHash(2 * block, function (err, hash) {
if (err) return cb(err)
self.drive._blocks.get(hash.toString('hex'), function (err, data) {
if (err) return cb(err)
var i = 0
loop()
function loop () {
if (i === proof.length) return cb(null, data, proof)
self.drive._hashes.get(ptr + proof[i].index, next)
}
function next (err, hash) {
if (err) return cb(err)
proof[i++].hash = hash
loop()
}
})
})
}
function open (cb) {
self.drive._links.get(id, function (_, info) {
self.drive._bitfields.get(id, function (_, bits) {
if (info) {
first = false
channels.blocks = info.blocks
ptr = prefix((info.pointer || info.id).toString('hex'), '')
} else {
ptr = prefix(link.toString('hex'), '')
}
channels.prefix = ptr
if (bits) {
channels.bitfield = bitfield(bits, {grow: channels.blocks || Infinity})
}
cb()
})
})
}
function fetch () {
debug('[%s] should fetch', self._name)
channels.ready(function () {
for (var i = 0; i < channels.peers.length; i++) {
var peer = channels.peers[i]
if (peer.stream.inflight >= 10) continue // max 10 inflight requests
var block = chooseBlock(peer)
if (block > -1) {
peer.request(block)
debug('[%s] peer #%d is fetching block %d', self._name, i, block)
}
}
})
}
function chooseBlock (peer) {
var len = peer.remoteBitfield.buffer.length * 8
for (var j = 0; j < channels.want.length; j++) {
var block = channels.want[j].block
for (var j = 0; j < subswarm.feed.want.length; j++) {
var block = subswarm.feed.want[j].block
if (peer.amRequesting.get(block)) continue
if (peer.remoteBitfield.get(block) && !channels.bitfield.get(block)) {
debug('[%s] choosing prioritized block #%d', self._name, block)
if (peer.remoteBitfield.get(block) && !subswarm.feed.bitfield.get(block)) {
debug('[%s] choosing prioritized block #%d', self.name, block)
return block

@@ -249,3 +63,3 @@ }

if (peer.amRequesting.get(i)) continue
if (peer.remoteBitfield.get(i) && !channels.bitfield.get(i)) {
if (peer.remoteBitfield.get(i) && !subswarm.feed.bitfield.get(i)) {
return i

@@ -257,11 +71,7 @@ }

}
this.joined[id] = channels
return channels
}
Swarm.prototype.join = function (link) {
if (this.links.indexOf(link.toString('hex')) === -1) {
this.links.push(link.toString('hex'))
}
var id = link.toString('hex')
if (this.links.indexOf(id) === -1) this.links.push(id)

@@ -279,3 +89,3 @@ for (var i = 0; i < this.peers.length; i++) {

debug('[%s] new peer stream', this._name)
debug('[%s] new peer stream', this.name)

@@ -295,21 +105,22 @@ peer.on('channel', onchannel)

function add (ch) {
var state = self._get(ch.link)
state.peers.push(ch)
var subswarm = self._get(ch.link)
subswarm.peers.push(ch)
ch.on('leave', function () {
var i = state.peers.indexOf(ch)
if (i > -1) state.peers.splice(ch, 1)
var i = subswarm.peers.indexOf(ch)
if (i > -1) subswarm.peers.splice(ch, 1)
})
return state
return subswarm
}
function onchannel (ch) {
var name = ch.link.toString('hex').slice(0, 12) + '/' + self._name
var name = ch.link.toString('hex').slice(0, 12) + '/' + self.name
var subswarm = add(ch)
debug('[channel %s] joined channel', name)
var state = add(ch)
ch.on('response', function (block, data, proof) {
debug('[channel %s] rcvd response #%d (%d bytes, proof contained %d hashes)', name, block, data.length, proof.length)
state.put(block, data, proof, function (err) {
subswarm.feed.put(block, data, proof, function (err) {
if (err) ch.leave(err)
state.fetch()
subswarm.fetch(ch)
})

@@ -319,30 +130,10 @@ })

ch.on('request', function (block) {
if (!state.blocks) return
debug('[channel %s] rcvd request #%d', name, block)
// TODO: only send back what the peer is missing
var proof = []
var limit = 2 * state.blocks
var want = flat.sibling(2 * block)
while (flat.rightSpan(want) < limit) {
proof.push({
index: want,
hash: null
subswarm.feed.get(block, function (err, data) {
if (err) return ch.leave(err)
if (!data) return ch.leave(new Error('Remote peer wants a block that is out of bounds'))
subswarm.feed.proof(block, function (err, proof) {
if (err) return ch.leave(err)
ch.response(block, data, proof)
})
want = flat.sibling(flat.parent(want))
}
var roots = flat.fullRoots(limit)
for (var i = 0; i < roots.length; i++) {
proof.push({
index: roots[i],
hash: null
})
}
state.block(block, proof, function (err, data, proof) {
if (err) return ch.leave(err)
ch.response(block, data, proof)
})

@@ -356,8 +147,8 @@ })

ch.on('have', function () {
state.fetch()
subswarm.fetch(ch)
})
state.ready(function (err) {
subswarm.feed.open(function (err) {
if (err) return ch.leave(err)
ch.bitfield(state.bitfield)
ch.bitfield(subswarm.feed.bitfield)
})

@@ -371,14 +162,1 @@ }

}
function validateRoots (proof, id) {
if (!proof.length) return 0
var blocks = (flat.rightSpan(proof[proof.length - 1].index) + 2) / 2
var roots = flat.fullRoots(2 * blocks)
if (proof.length < roots.length) return 0
var proofRoots = proof.slice(-roots.length)
for (var i = 0; i < roots.length; i++) {
if (proofRoots[i].index !== roots[i]) return 0
}
if (!equals(id, hash.root(proofRoots))) return 0
return blocks
}
{
"name": "hyperdrive",
"version": "1.0.4",
"version": "1.1.0",
"description": "A file sharing network based on rabin file chunking and append only feeds of data verified by merkle trees.",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -26,3 +26,3 @@ # hyperdrive

- [ ] Tests for internal logic
- [ ] A bit of refactoring
- [x] A bit of refactoring

@@ -29,0 +29,0 @@ ## Usage

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc