New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

hypercore-protocol

Package Overview
Dependencies
Maintainers
1
Versions
76
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

hypercore-protocol - npm Package Compare versions

Comparing version 6.12.0 to 7.0.0

109

example.js

@@ -1,65 +0,71 @@

var protocol = require('./')
var bufferFrom = require('buffer-from')
const Protocol = require('./')
var a = protocol({id: 'a'})
var b = protocol({id: 'b'})
const a = new Protocol(true, {
onauthenticate (remotePublicKey, done) {
console.log('verifying the public key of b', remotePublicKey)
done(null)
},
onhandshake () {
console.log('onhandshake()')
}
})
const b = new Protocol(false, {
onauthenticate (remotePublicKey, done) {
console.log('verifying the public key of a', remotePublicKey)
done(null)
}
})
a.pipe(b).pipe(a)
var key = bufferFrom('This is a 32 byte key, 012345678')
var missing = 5
a.on('close', () => console.log('a closed', a))
b.on('close', () => console.log('b closed', b))
var channel = a.feed(key)
var remoteChannel = b.feed(key)
const key = Buffer.from('This is a 32 byte key, 012345678')
let missing = 5
a.on('end', function () {
console.log('peer a ended')
})
const channel = a.open(key, {
onhave (have) {
console.log('channel.onhave()', have)
b.on('end', function () {
console.log('peer b ended')
})
for (var i = 0; i < 5; i++) {
channel.request({
index: i
})
}
},
ondata (data) {
console.log('channel.ondata()', data)
channel.on('have', function (have) {
console.log('channel.onhave()', have)
for (var i = 0; i < 5; i++) {
channel.request({
index: i
})
if (!--missing) {
channel.status({
uploading: false,
download: false
})
}
}
})
channel.on('data', function (data) {
console.log('channel.ondata()', data)
if (!--missing) {
channel.info({
uploading: false,
download: false
const remoteChannel = b.open(key, {
onrequest (request) {
console.log('remoteChannel.onrequest()', request)
remoteChannel.data({
index: request.index,
value: 'sup'
})
},
onwant (want) {
console.log('remoteChannel.onwant()', want)
remoteChannel.have({
start: 0,
length: 1000
})
},
onstatus (status) {
console.log('remoteChannel.onstatus', status)
remoteChannel.close()
}
})
remoteChannel.on('request', function (request) {
console.log('remoteChannel.onrequest()', request)
remoteChannel.data({
index: request.index,
value: 'sup'
})
})
remoteChannel.on('want', function (want) {
console.log('remoteChannel.onwant()', want)
remoteChannel.have({
start: 0,
length: 1000
})
})
remoteChannel.on('info', function (info) {
console.log('remoteChannel.oninfo', info)
b.finalize()
})
channel.want({

@@ -69,1 +75,6 @@ start: 0,

})
console.log('a:')
console.log(a)
console.log('b:')
console.log(b)

@@ -1,513 +0,424 @@

var stream = require('readable-stream')
var inherits = require('inherits')
var varint = require('varint')
var sodium = require('sodium-universal')
var indexOf = require('sorted-indexof')
var feed = require('./feed')
var messages = require('./messages')
var bufferAlloc = require('buffer-alloc-unsafe')
var bufferFrom = require('buffer-from')
const SHP = require('simple-hypercore-protocol')
const crypto = require('hypercore-crypto')
const timeout = require('timeout-refresh')
const inspect = require('inspect-custom-symbol')
const Nanoguard = require('nanoguard')
const pretty = require('pretty-hash')
const { Duplex } = require('streamx')
module.exports = Protocol
class Channelizer {
constructor (stream, encrypted, keyPair) {
this.stream = stream
this.created = new Map()
this.local = []
this.remote = []
this.encrypted = encrypted !== false
this.keyPair = keyPair
}
function Protocol (opts) {
if (!(this instanceof Protocol)) return new Protocol(opts)
if (!opts) opts = {}
allocLocal () {
const id = this.local.indexOf(null)
if (id > -1) return id
this.local.push(null)
return this.local.length - 1
}
stream.Duplex.call(this)
var self = this
attachLocal (ch) {
const id = this.allocLocal()
this.local[id] = ch
ch.localId = id
}
this.id = opts.id || randomBytes(32)
this.live = !!opts.live
this.ack = !!opts.ack
this.userData = opts.userData || null
this.remoteId = null
this.remoteLive = false
this.remoteUserData = null
attachRemote (ch, id) {
if (this.remote.length === id) this.remote.push(null)
this.remote[id] = ch
ch.remoteId = id
}
this.destroyed = false
this.encrypted = opts.encrypt !== false
this.key = null
this.discoveryKey = null
this.remoteDiscoveryKey = null
this.feeds = []
this.expectedFeeds = opts.expectedFeeds || 0
this.extensions = opts.extensions || []
this.remoteExtensions = null
this.maxFeeds = opts.maxFeeds || 256
getChannel (dk) {
return this.created.get(dk.toString('hex'))
}
this._localFeeds = []
this._remoteFeeds = []
this._feeds = {}
createChannel (dk) {
const hex = dk.toString('hex')
this._nonce = null
this._remoteNonce = null
this._xor = null
this._remoteXor = null
this._needsKey = false
this._length = bufferAlloc(varint.encodingLength(8388608))
this._missing = 0
this._buf = null
this._pointer = 0
this._data = null
this._start = 0
this._cb = null
this._interval = null
this._keepAlive = 0
this._remoteKeepAlive = 0
this._maybeFinalize = maybeFinalize
this._utp = null
const old = this.created.get(hex)
if (old) return old
if (opts.timeout !== 0 && opts.timeout !== false) this.setTimeout(opts.timeout || 5000, this._ontimeout)
this.on('finish', this.finalize)
this.on('pipe', this._onpipe)
const fresh = new Channel(this.stream.state, this.stream, dk)
this.created.set(hex, fresh)
return fresh
}
function maybeFinalize (err) {
if (err) return self.destroy(err)
if (!self.expectedFeeds) self.finalize()
onauthenticate (key, done) {
if (this.stream.handlers && this.stream.handlers.onauthenticate) this.stream.handlers.onauthenticate(key, done)
else done(null)
}
}
inherits(Protocol, stream.Duplex)
onhandshake () {
if (this.stream.handlers && this.stream.handlers.onhandshake) this.stream.handlers.onhandshake()
}
Protocol.prototype._onpipe = function (stream) {
if (typeof stream.setContentSize === 'function') this._utp = stream
}
onopen (channelId, message) {
const ch = this.createChannel(message.discoveryKey)
ch.remoteCapability = message.capability
this.attachRemote(ch, channelId)
if (ch.localId === -1) {
if (this.stream.handlers.ondiscoverykey) this.stream.handlers.ondiscoverykey(ch.discoveryKey)
this.stream.emit('discovery-key', ch.discoveryKey)
} else {
if (!ch.remoteVerified) {
// We are leaking metadata here that the remote cap was bad which means the remote prob can figure
// out that we indeed had the key. Since we were the one to initialise the channel that's ok, as
// that already kinda leaks that.
this.stream.destroy(new Error('Invalid remote channel capability'))
return
}
this.stream.emit('duplex-channel', ch)
}
if (ch.handlers && ch.handlers.onopen) ch.handlers.onopen()
}
Protocol.prototype._prefinalize = function () {
if (!this.emit('prefinalize', this._maybeFinalize)) this.finalize()
}
onoptions (channelId, message) {
const ch = this.remote[channelId]
if (ch && ch.handlers && ch.handlers.onoptions) ch.handlers.onoptions(message)
}
Protocol.prototype.setTimeout = function (ms, ontimeout) {
if (this.destroyed) return
if (ontimeout) this.once('timeout', ontimeout)
onstatus (channelId, message) {
const ch = this.remote[channelId]
if (ch && ch.handlers && ch.handlers.onstatus) ch.handlers.onstatus(message)
}
var self = this
onhave (channelId, message) {
const ch = this.remote[channelId]
if (ch && ch.handlers && ch.handlers.onhave) ch.handlers.onhave(message)
}
this._keepAlive = 0
this._remoteKeepAlive = 0
onunhave (channelId, message) {
const ch = this.remote[channelId]
if (ch && ch.handlers && ch.handlers.onunhave) ch.handlers.onunhave(message)
}
clearInterval(this._interval)
if (!ms) return
onwant (channelId, message) {
const ch = this.remote[channelId]
if (ch && ch.handlers && ch.handlers.onwant) ch.handlers.onwant(message)
}
this._interval = setInterval(kick, (ms / 4) | 0)
if (this._interval.unref) this._interval.unref()
onunwant (channelId, message) {
const ch = this.remote[channelId]
if (ch && ch.handlers && ch.handlers.onunwant) ch.handlers.onunwant(message)
}
function kick () {
self._kick()
onrequest (channelId, message) {
const ch = this.remote[channelId]
if (ch && ch.handlers && ch.handlers.onrequest) ch.handlers.onrequest(message)
}
}
Protocol.prototype.has = function (key) {
var hex = discoveryKey(key).toString('hex')
var ch = this._feeds[hex]
return !!ch
}
Protocol.prototype.feed = function (key, opts) {
if (this.destroyed) return null
if (!opts) opts = {}
var dk = opts.discoveryKey || discoveryKey(key)
var ch = this._feed(dk)
if (ch.id > -1) {
if (opts.peer) ch.peer = opts.peer
return ch
oncancel (channelId, message) {
const ch = this.remote[channelId]
if (ch && ch.handlers && ch.handlers.oncancel) ch.handlers.oncancel(message)
}
if (this._localFeeds.length >= this.maxFeeds) {
this._tooManyFeeds()
return null
ondata (channelId, message) {
const ch = this.remote[channelId]
if (ch && ch.handlers && ch.handlers.ondata) ch.handlers.ondata(message)
}
ch.id = this._localFeeds.push(ch) - 1
ch.header = ch.id << 4
ch.headerLength = varint.encodingLength(ch.header)
ch.key = key
ch.discoveryKey = dk
if (opts.peer) ch.peer = opts.peer
this.feeds.push(ch)
var first = !this.key
var feed = {
discoveryKey: dk,
nonce: null
onextension (channelId, id, buf) {
const ch = this.remote[channelId]
if (ch && ch.handlers && ch.handlers.onextension) ch.handlers.onextension(id, buf)
}
if (first) {
this.key = key
this.discoveryKey = dk
onclose (channelId, message) {
const ch = channelId < this.remote.length ? this.remote[channelId] : null
if (!this._sameKey()) return null
if (ch) {
this.remote[channelId] = null
if (ch.handlers && ch.handlers.onclose) ch.handlers.onclose()
} else if (message.discoveryKey) {
const local = this.getChannel(message.discoveryKey)
if (local && local.handlers && local.handlers.onclose) local.handlers.onclose()
}
if (this.encrypted) {
feed.nonce = this._nonce = randomBytes(24)
this._xor = sodium.crypto_stream_xor_instance(this._nonce, this.key)
if (this._remoteNonce) {
this._remoteXor = sodium.crypto_stream_xor_instance(this._remoteNonce, this.key)
}
if (ch && ch.localId > -1) {
this.local[ch.localId] = null
}
if (this._needsKey) {
this._needsKey = false
this._resume()
if (ch) {
this.created.delete(ch.discoveryKey.toString('hex'))
this.stream._prefinalize()
}
}
var box = encodeFeed(feed, ch.id)
if (!feed.nonce && this.encrypted) this._xor.update(box, box)
this._keepAlive = 0
this.push(box)
// called by the state machine
send (data) {
if (this.stream.keepAlive !== null) this.stream.keepAlive.refresh()
this.stream.bytesSent += data.length
return this.stream.push(data)
}
if (this.destroyed) return null
if (first) {
ch.handshake({
id: this.id,
live: this.live,
userData: this.userData,
extensions: this.extensions,
ack: this.ack
})
// called by the state machine
destroy (err) {
this.stream.destroy(err)
this.local = []
this.remote = []
for (const ch of this.created.values()) {
ch.localId = ch.remoteId = -1
if (ch.handlers && ch.handlers.onclose) ch.handlers.onclose()
}
this.created.clear()
}
if (ch._buffer.length) ch._resume()
else ch._buffer = null
return ch
}
Protocol.prototype._resume = function () {
var self = this
process.nextTick(resume)
class Channel {
constructor (state, stream, dk) {
this.key = null
this.discoveryKey = dk
this.localId = -1
this.remoteId = -1
this.remoteCapability = null
this.handlers = null
this.state = state
this.stream = stream
}
function resume () {
if (!self._data) return
get opened () {
return this.localId > -1
}
var data = self._data
var start = self._start
var cb = self._cb
get closed () {
return this.localId === -1
}
self._data = null
self._start = 0
self._cb = null
self._parse(data, start, cb)
get remoteOpened () {
return this.remoteId > -1
}
}
Protocol.prototype._kick = function () {
if (this._remoteKeepAlive > 4) {
clearInterval(this._interval)
this.emit('timeout')
return
get remoteVerified () {
return this.localId > -1 &&
this.remoteId > -1 &&
!!this.remoteCapability &&
this.remoteCapability.equals(this.state.remoteCapability(this.key))
}
for (var i = 0; i < this.feeds.length; i++) {
var ch = this.feeds[i]
if (ch.peer) ch.peer.ontick()
else ch.emit('tick')
options (message) {
return this.state.options(this.localId, message)
}
this._remoteKeepAlive++
status (message) {
return this.state.status(this.localId, message)
}
if (this._keepAlive > 2) {
this.ping()
this._keepAlive = 0
} else {
this._keepAlive++
have (message) {
return this.state.have(this.localId, message)
}
}
Protocol.prototype.ping = function () {
if (!this.key) return true
var ping = bufferFrom([0])
if (this._xor) this._xor.update(ping, ping)
return this.push(ping)
}
unhave (message) {
return this.state.unhave(this.localId, message)
}
Protocol.prototype.destroy = function (err) {
if (this.destroyed) return
this.destroyed = true
if (err) this.emit('error', err)
this._close()
this.emit('close')
}
want (message) {
return this.state.want(this.localId, message)
}
Protocol.prototype.finalize = function () {
if (this.destroyed) return
this.destroyed = true
this._close()
this.push(null)
}
unwant (message) {
return this.state.unwant(this.localId, message)
}
Protocol.prototype._close = function () {
clearInterval(this._interval)
request (message) {
return this.state.request(this.localId, message)
}
var feeds = this.feeds
this.feeds = []
for (var i = 0; i < feeds.length; i++) feeds[i]._onclose()
cancel (message) {
return this.state.cancel(this.localId, message)
}
if (this._xor) {
this._xor.final()
this._xor = null
data (message) {
return this.state.data(this.localId, message)
}
}
Protocol.prototype._read = function () {
// do nothing, user back-pressures
}
extension (id, buf) {
return this.state.extension(this.localId, id, buf)
}
Protocol.prototype._push = function (data) {
if (this.destroyed) return
this._keepAlive = 0
if (this._xor) this._xor.update(data, data)
return this.push(data)
}
close () {
if (this.closed) return
this.state.close(this.localId, {})
}
Protocol.prototype._write = function (data, enc, cb) {
this._remoteKeepAlive = 0
this._parse(data, 0, cb)
destroy (err) {
this.stream.destroy(err)
}
}
Protocol.prototype._feed = function (dk) {
var hex = dk.toString('hex')
var ch = this._feeds[hex]
if (ch) return ch
ch = this._feeds[hex] = feed(this)
return ch
}
module.exports = class ProtocolStream extends Duplex {
constructor (initiator, handlers = {}) {
super()
Protocol.prototype.remoteSupports = function (name) {
var i = this.extensions.indexOf(name)
return i > -1 && !!this.remoteExtensions && this.remoteExtensions.indexOf(i) > -1
}
this.initiator = initiator
this.handlers = handlers
this.channelizer = new Channelizer(this, handlers.encrypted, handlers.keyPair)
this.state = new SHP(initiator, this.channelizer)
this.timeout = null
this.keepAlive = null
this.prefinalize = new Nanoguard()
this.bytesSent = 0
this.bytesReceived = 0
Protocol.prototype._onhandshake = function (handshake) {
if (this.remoteId) return
this.once('finish', this.push.bind(this, null))
this.remoteId = handshake.id || randomBytes(32)
this.remoteLive = handshake.live
this.remoteUserData = handshake.userData
this.remoteExtensions = indexOf(this.extensions, handshake.extensions)
this.remoteAck = handshake.ack
if (handlers.timeout !== false && handlers.timeout !== 0) {
const timeout = handlers.timeout || 20000
this.setTimeout(timeout, () => this.destroy(new Error('ETIMEDOUT')))
this.setKeepAlive(Math.ceil(timeout / 2))
}
}
this.emit('handshake')
}
Protocol.prototype._onopen = function (id, data, start, end) {
var feed = decodeFeed(data, start, end)
if (!feed) return this._badFeed()
if (!this.remoteDiscoveryKey) {
this.remoteDiscoveryKey = feed.discoveryKey
if (!this._sameKey()) return
if (this.encrypted && !this._remoteNonce) {
if (!feed.nonce) {
this.destroy(new Error('Remote did not include a nonce'))
return
}
this._remoteNonce = feed.nonce
[inspect] (depth, opts) {
let indent = ''
if (typeof opts.indentationLvl === 'number') {
while (indent.length < opts.indentationLvl) indent += ' '
}
if (this.encrypted && this.key && !this._remoteXor) {
this._remoteXor = sodium.crypto_stream_xor_instance(this._remoteNonce, this.key)
}
return 'HypercoreProtocolStream(\n' +
indent + ' publicKey: ' + opts.stylize((this.publicKey && pretty(this.publicKey)), 'string') + '\n' +
indent + ' remotePublicKey: ' + opts.stylize((this.remotePublicKey && pretty(this.remotePublicKey)), 'string') + '\n' +
indent + ' initiator: ' + opts.stylize(this.initiator, 'boolean') + '\n' +
indent + ' channelCount: ' + opts.stylize(this.channelCount, 'number') + '\n' +
indent + ' destroyed: ' + opts.stylize(this.destroyed, 'boolean') + '\n' +
indent + ' prefinalized: ' + opts.stylize(!this.prefinalize.waiting, 'boolean') + '\n' +
indent + ' bytesSent: ' + opts.stylize(this.bytesSent, 'number') + '\n' +
indent + ' bytesReceived: ' + opts.stylize(this.bytesReceived, 'number') + '\n' +
indent + ')'
}
this._remoteFeeds[id] = this._feed(feed.discoveryKey)
this._remoteFeeds[id].remoteId = id
static isProtocolStream (s) {
return !!(s && typeof s.initiator === 'boolean' && typeof s.pipe === 'function' && s.state)
}
this.emit('feed', feed.discoveryKey)
}
static keyPair () {
return SHP.keyPair()
}
Protocol.prototype._onmessage = function (data, start, end) {
if (end - start < 2) return
get publicKey () {
return this.state.publicKey
}
var header = decodeHeader(data, start)
if (header === -1) return this.destroy(new Error('Remote sent invalid header'))
get remotePublicKey () {
return this.state.remotePublicKey
}
start += varint.decode.bytes
_write (data, cb) {
if (this.timeout !== null) this.timeout.refresh()
this.bytesReceived += data.length
this.state.recv(data)
cb(null)
}
var id = header >> 4
var type = header & 15
_destroy (cb) {
this.channelizer.destroy()
this.state.destroy()
cb(null)
}
if (id >= this.maxFeeds) return this._tooManyFeeds()
while (this._remoteFeeds.length < id) this._remoteFeeds.push(null)
_predestroy () {
if (this.timeout !== null) {
this.timeout.destroy()
this.timeout = null
}
if (this.keepAlive !== null) {
this.keepAlive.destroy()
this.keepAlive = null
}
this.prefinalize.destroy()
}
var ch = this._remoteFeeds[id]
_prefinalize () {
this.emit('prefinalize')
this.prefinalize.ready(() => {
if (this.destroyed) return
if (this.channelCount) return
this.finalize()
})
}
if (type === 0) {
if (ch) ch._onclose()
return this._onopen(id, data, start, end)
remoteOpened (key) {
const ch = this.channelizer.getChannel(crypto.discoveryKey(key))
return !!(ch && ch.remoteId > -1)
}
if (!ch) return this._badFeed()
if (type === 15) return ch._onextension(data, start, end)
ch._onmessage(type, data, start, end)
}
remoteVerified (key) {
const ch = this.channelizer.getChannel(crypto.discoveryKey(key))
return !!ch && !!ch.remoteCapability && ch.remoteCapability.equals(this.state.remoteCapability(key))
}
Protocol.prototype._parse = function (data, start, cb) {
var decrypted = !!this._remoteXor
opened (key) {
const ch = this.channelizer.getChannel(crypto.discoveryKey(key))
return !!(ch && ch.localId > -1)
}
if (start) {
data = data.slice(start)
start = 0
ping () {
return this.state.ping()
}
if (this._remoteXor) this._remoteXor.update(data, data)
while (start < data.length && !this.destroyed) {
if (this._missing) start = this._parseMessage(data, start)
else start = this._parseLength(data, start)
if (this._needsKey) {
this._data = data
this._start = start
this._cb = cb
setKeepAlive (ms) {
if (this.keepAlive) this.keepAlive.destroy()
if (!ms) {
this.keepAlive = null
return
}
this.keepAlive = timeout(ms, ping, this)
if (!decrypted && this._remoteXor) {
return this._parse(data, start, cb)
function ping () {
this.ping()
this.keepAlive = timeout(ms, ping, this)
}
}
cb()
}
Protocol.prototype._parseMessage = function (data, start) {
var end = start + this._missing
if (end <= data.length) {
var ret = end
if (this._buf) {
data.copy(this._buf, this._pointer, start)
data = this._buf
start = 0
end = data.length
this._buf = null
setTimeout (ms, ontimeout) {
if (this.timeout) this.timeout.destroy()
if (!ms) {
this.timeout = null
return
}
this.timeout = timeout(ms, this.emit.bind(this, 'timeout'))
if (ontimeout) this.once('timeout', ontimeout)
}
this._missing = 0
this._pointer = 0
if (this.encrypted && !this.key) this._needsKey = true
this._onmessage(data, start, end)
return ret
get channelCount () {
return this.channelizer.created.size
}
if (!this._buf) {
this._buf = bufferAlloc(this._missing)
this._pointer = 0
get channels () {
return this.channelizer.created.values()
}
var rem = data.length - start
open (key, handlers) {
const discoveryKey = crypto.discoveryKey(key)
const ch = this.channelizer.createChannel(discoveryKey)
data.copy(this._buf, this._pointer, start)
this._pointer += rem
this._missing -= rem
if (ch.key === null) {
ch.key = key
this.channelizer.attachLocal(ch)
this.state.open(ch.localId, { key, discoveryKey })
}
return data.length
}
if (handlers) ch.handlers = handlers
Protocol.prototype._parseLength = function (data, start) {
while (!this._missing && start < data.length) {
var byte = this._length[this._pointer++] = data[start++]
if (ch.remoteId > -1) this.emit('duplex-channel', ch)
if (!(byte & 0x80)) {
this._missing = varint.decode(this._length)
this._pointer = 0
if (this._missing > 8388608) return this._tooBig(data.length)
if (this._utp) {
var reallyMissing = this._missing - (data.length - start)
if (reallyMissing > 0 && !this._needsKey) this._utp.setContentSize(reallyMissing)
}
return start
}
if (this._pointer >= this._length.length) return this._tooBig(data.length)
return ch
}
return start
}
close (key) {
const discoveryKey = crypto.discoveryKey(key)
const ch = this.channelizer.getChannel(discoveryKey)
Protocol.prototype._sameKey = function () {
if (!this.encrypted) return true
if (!this.discoveryKey || !this.remoteDiscoveryKey) return true
if (this.remoteDiscoveryKey.toString('hex') === this.discoveryKey.toString('hex')) return true
this.destroy(new Error('First shared hypercore must be the same'))
return false
}
Protocol.prototype._tooManyFeeds = function () {
this.destroy(new Error('Only ' + this.maxFeeds + ' feeds currently supported. Open a Github issue if you need more'))
}
Protocol.prototype._tooBig = function (len) {
this.destroy(new Error('Remote message is larger than 8MB (max allowed)'))
return len
}
Protocol.prototype._badFeed = function () {
this.destroy(new Error('Remote sent invalid feed message'))
}
Protocol.prototype._ontimeout = function () {
this.destroy(new Error('Remote timed out'))
}
function decodeHeader (data, start) {
try {
return varint.decode(data, start)
} catch (err) {
return -1
if (ch) ch.close()
else this.state.close(this.channelizer.allocLocal(), { discoveryKey })
}
}
function decodeFeed (data, start, end) {
var feed = null
try {
feed = messages.Feed.decode(data, start, end)
} catch (err) {
return null
finalize () {
this.push(null)
}
if (feed.discoveryKey.length !== 32) return null
if (feed.nonce && feed.nonce.length !== 24) return null
return feed
}
function encodeFeed (feed, id) {
var header = id << 4
var len = varint.encodingLength(header) + messages.Feed.encodingLength(feed)
var box = bufferAlloc(varint.encodingLength(len) + len)
var offset = 0
varint.encode(len, box, offset)
offset += varint.encode.bytes
varint.encode(header, box, offset)
offset += varint.encode.bytes
messages.Feed.encode(feed, box, offset)
return box
}
function discoveryKey (key) {
var buf = bufferAlloc(32)
sodium.crypto_generichash(buf, bufferFrom('hypercore'), key)
return buf
}
function randomBytes (n) {
var buf = bufferAlloc(n)
sodium.randombytes_buf(buf)
return buf
}
{
"name": "hypercore-protocol",
"version": "6.12.0",
"version": "7.0.0",
"description": "Stream that implements the hypercore protocol",
"main": "index.js",
"dependencies": {
"buffer-alloc-unsafe": "^1.0.0",
"buffer-from": "^1.0.0",
"inherits": "^2.0.3",
"protocol-buffers-encodings": "^1.1.0",
"readable-stream": "^2.2.6",
"sodium-universal": "^2.0.0",
"sorted-indexof": "^1.0.0",
"varint": "^5.0.0"
"hypercore-crypto": "^1.0.0",
"inspect-custom-symbol": "^1.1.0",
"nanoguard": "^1.2.1",
"pretty-hash": "^1.0.1",
"simple-hypercore-protocol": "^1.1.2",
"streamx": "^2.1.0",
"timeout-refresh": "^1.0.0"
},
"devDependencies": {
"choppa": "^1.0.2",
"protocol-buffers": "^4.0.2",
"standard": "^9.0.1",
"tape": "^4.6.3"
"standard": "^14.1.0",
"tape": "^4.11.0"
},
"scripts": {
"test": "standard && tape test.js",
"protobuf": "protocol-buffers schema.proto -o messages.js"
"test": "standard && tape test.js"
},

@@ -26,0 +22,0 @@ "repository": {

@@ -11,15 +11,24 @@ # hypercore-protocol

For detailed info on the messages sent on each channel see [simple-hypercore-protocol](https://github.com/mafintosh/simple-hypercore-protocol)
Note that the latest version of this is Hypercore Wire Protocol 7, which is not compatible with earlier versions.
## Usage
``` js
var protocol = require('hypercore-protocol')
const Protocol = require('hypercore-protocol')
// create two streams with hypercore protocol
var streamA = protocol({ id: 'a' })
var streamB = protocol({ id: 'b' })
const streamA = new Protocol(true) // true indicates this is the initiator
const streamB = new Protocol(false) // false indicates this is not the initiator
// open two feeds specified by a 32 byte key
var key = Buffer.from('deadbeefdeadbeefdeadbeefdeadbeef')
var feed = streamA.feed(key)
var remoteFeed = streamB.feed(key)
const key = Buffer.from('deadbeefdeadbeefdeadbeefdeadbeef')
const feed = streamA.open(key)
const remoteFeed = streamB.open(key, {
// listen for data in remote feed
ondata (message) {
console.log(message.value.toString())
}
})

@@ -29,7 +38,2 @@ // add data to feed

// listen data in remoteFeed
remoteFeed.on('data', function (message) {
console.log(message.value.toString())
})
streamA.pipe(streamB).pipe(streamA)

@@ -42,3 +46,3 @@ ```

#### `var stream = protocol([options])`
#### `const stream = new Protocol(initiator, [options])`

@@ -51,37 +55,67 @@ Create a new protocol duplex stream.

{
id: optionalPeerId, // you can use this to detect if you connect to yourself
live: keepStreamOpen, // signal to the other peer that you want to keep this stream open forever
ack: false, // Explicitly ask a peer to acknowledge each received block
userData: opaqueUserData // include user data that you can retrieve on handshake
encrypt: true, // set to false to disable encryption if you are already piping through a encrypted stream
timeout: 5000, // stream timeout. set to 0 or false to disable.
extensions: [], // names of extensions to use for replication. Must be sorted alphanumerically for handshaking to work
timeout: 20000, // stream timeout. set to 0 or false to disable.
keyPair: { publicKey, secretKey }, // use this keypair for the stream authentication
onauthenticate (remotePublicKey, done) { }, // hook to verify the remotes public key
onhandshake () { }, // function called when the stream handshake has finished
ondiscoverykey (discoveryKey) { } // function called when the remote stream opens a feed you have not
}
```
If you don't specify a peer id a random 32 byte will be used.
You can access the peer id using `p.id` and the remote peer id using `p.remoteId`.
#### `stream.on('discovery-key', discoveryKey)`
#### `var feed = stream.feed(key)`
Emitted when the remote opens a feed you have not opened.
Also calls `stream.handlers.ondiscoverykey(discoveryKey)`
Signal the other end that you want to share a hypercore feed.
#### `stream.on('timeout')`
You can use the same stream to share more than one BUT the first feed shared
should be the same one. The key of the first feed is also used to encrypt the stream using [libsodium](https://github.com/mafintosh/sodium-native#crypto_stream_xorcipher-message-nonce-key).
Emitted when the stream times out.
Per default a timeout triggers a destruction of the stream, unless you disable timeout handling in the constructor.
#### `var bool = stream.has(key)`
#### `stream.setTimeout(ms, ontimeout)`
Returns true if the stream already has open a channel open for the given key and false if not.
Set a stream timeout.
#### `stream.on('handshake')`
#### `stream.setKeepAlive(ms)`
Emitted when a protocol handshake has been received. Afterwards you can check `.remoteId` to get the remote peer id, `.remoteLive` to get its live status, or `.remoteUserData` to get its user data.
Send a keep alive ping every ms, if no other message has been sent.
This is enabled per default every timeout / 2 ms unless you disable timeout handling in the constructor.
#### `stream.on('feed', discoveryKey)`
#### `stream.prefinalize`
Emitted when a remote is sharing a feed. `discoveryKey` is the hypercore discovery key of the feed they want to share.
A [nanoguard](https://github.com/mafintosh/nanoguard) instance that is used to guard the final closing of the stream.
Internally this guard is ready'ed before the stream checks if all channels have been closed and the stream is finalised.
Call wait/continue on this guard if need asynchrously add more channels and don't want to stream to finalise underneath you.
If you are sharing multiple hypercores on the same port you can use this event to wait for the remote peer to indicate which hypercore
they are interested in.
#### `stream.remotePublicKey`
The remotes public key.
#### `stream.publicKey`
Your public key.
#### `const bool = stream.remoteVerified(key)`
Returns true if the remote sent a valid capability for the key when they opened the channel.
Use this in `ondiscoverykey` to check that the remote has the key corresponding to the discovery key.
#### `const bool = Protocol.isProtocolStream(stream)`
Static method to check if an object is a hypercore protocol stream.
#### `const keyPair = Protocol.keyPair()`
Static method to generate an static authentication key pair.
#### `const channel = stream.open(key, handlers)`
Signal the other end that you want to share a hypercore feed.
The feed key will be hashed and sent as the "discovery key" which protects the feed key from being learned by a remote peer who does not already possess it. Also includes a cryptographic proof that the local possesses the feed key, which can be implicitly verified using the above `remoteVerified` api.
[See the protobuf schema for more info on this messsage](https://github.com/mafintosh/simple-hypercore-protocol/blob/master/schema.proto#L7)
The `handlers` is an object of functions for handling incoming messages and is described below.
#### `stream.destroy([error])`

@@ -94,79 +128,86 @@

Gracefully end the stream. Closes all feeds as well.
This is automatically called after the prefinalise guard and all channels have been closed.
#### `feed.info(message)`
#### `feed.options(message)`
Send an `info` message. See the [schema.proto](schema.proto) file for more information.
Send an `options` message. [See the protobuf schema for more info on this messsage](https://github.com/mafintosh/simple-hypercore-protocol/blob/master/schema.proto#L13)
#### `feed.on('info', message)`
#### `feed.handlers.onoptions(message)`
Emitted when an `info` message has been received.
Called when a options message has been received.
#### `feed.status(message)`
Send an `status` message. [See the protobuf schema for more info on this messsage](https://github.com/mafintosh/simple-hypercore-protocol/blob/master/schema.proto#L20)
#### `feed.handlers.onstatus(message)`
Called when a status message has been received.
#### `feed.have(message)`
Send a `have` message. See the [schema.proto](schema.proto) file for more information.
Send a `have` message. [See the protobuf schema for more info on this messsage](https://github.com/mafintosh/simple-hypercore-protocol/blob/master/schema.proto#L26)
#### `feed.on('have', message)`
#### `feed.handlers.onhave(message)`
Emitted when a `have` message has been received.
Called when a `have` message has been received.
#### `feed.unhave(message)`
Send a `unhave` message. See the [schema.proto](schema.proto) file for more information.
Send a `unhave` message. [See the protobuf schema for more info on this messsage](https://github.com/mafintosh/simple-hypercore-protocol/blob/master/schema.proto#L34)
#### `feed.on('unhave', message)`
Emitted when a `unhave` message has been received.
#### `feed.handlers.onunhave(message)`
Called when a `unhave` message has been received.
#### `feed.want(want)`
Send a `want` message. See the [schema.proto](schema.proto) file for more information.
Send a `want` message. [See the protobuf schema for more info on this messsage](https://github.com/mafintosh/simple-hypercore-protocol/blob/master/schema.proto#L40)
#### `feed.on('want', want)`
#### `feed.handlers.onwant(want)`
Emitted when a `want` message has been received.
Called when a `want` message has been received.
#### `feed.unwant(unwant)`
Send a `unwant` message. See the [schema.proto](schema.proto) file for more information.
Send a `unwant` message. [See the protobuf schema for more info on this messsage](https://github.com/mafintosh/simple-hypercore-protocol/blob/master/schema.proto#L46)
#### `feed.on('unwant', unwant)`
#### `feed.handlers.onunwant(unwant)`
Emitted when a `unwant` message has been received.
Called when a `unwant` message has been received.
#### `feed.request(request)`
Send a `request` message. See the [schema.proto](schema.proto) file for more information.
Send a `request` message. [See the protobuf schema for more info on this messsage](https://github.com/mafintosh/simple-hypercore-protocol/blob/master/schema.proto#L52)
#### `feed.on('request', request)`
Emitted when a `request` message has been received.
#### `feed.handlers.onrequest(request)`
Called when a `request` message has been received.
#### `feed.cancel(cancel)`
Send a `cancel` message. See the [schema.proto](schema.proto) file for more information.
Send a `cancel` message. [See the protobuf schema for more info on this messsage](https://github.com/mafintosh/simple-hypercore-protocol/blob/master/schema.proto#L60)
#### `feed.on('cancel', cancel)`
#### `feed.handlers.oncancel(cancel)`
Emitted when a `cancel` message has been received.
Called when a `cancel` message has been received.
#### `feed.data(data)`
Send a `data` message. See the [schema.proto](schema.proto) file for more information.
Send a `data` message. [See the protobuf schema for more info on this messsage](https://github.com/mafintosh/simple-hypercore-protocol/blob/master/schema.proto#L67)
#### `feed.on('data', data)`
#### `feed.handlers.ondata(data)`
Emitted when a `data` message has been received.
Called when a `data` message has been received.
#### `feed.extension(name, message)`
#### `feed.extension(id, buffer)`
Send an `extension` message. `name` must be in `extensions` list. See the [schema.proto](schema.proto) file for more information.
Send an `extension` message. `id` should be the index an extension name in the `extensions` list sent in a previous `options` message for this channel.
#### `feed.on('extension', name, message)`
#### `feed.handlers.onextension(id, buffer)`
Emitted when an `extension` message has been received.
Called when an `extension` message has been received. `id` is the index of an extension name received in an extension list in a previous `options` message for this channel.
#### `feed.on('close')`
Emitted when this feed has been closed. All feeds are automatically closed when the stream ends or is destroyed.
#### `feed.close()`

@@ -176,2 +217,6 @@

#### `feed.handlers.onclose()`
Called when this feed has been closed. All feeds are automatically closed when the stream ends or is destroyed.
#### `feed.destroy(err)`

@@ -183,4 +228,10 @@

The hypercore protocol uses a basic varint length prefixed format to send messages over the wire.
The hypercore protocol consists of two phases.
A handshake phase and a message exchange phage.
For the handshake Noise is used with the XX pattern. Each Noise message is sent with varint framing.
After the handshake a message exchange phased is started.
This uses a basic varint length prefixed format to send messages over the wire.
All messages contains a header indicating the type and feed id, and a protobuf encoded payload.

@@ -187,0 +238,0 @@

@@ -1,65 +0,38 @@

var tape = require('tape')
var choppa = require('choppa')
var protocol = require('./')
var bufferFrom = require('buffer-from')
const tape = require('tape')
const Protocol = require('./')
var KEY = bufferFrom('01234567890123456789012345678901')
var OTHER_KEY = bufferFrom('12345678901234567890123456789012')
const KEY = Buffer.from('01234567890123456789012345678901')
const OTHER_KEY = Buffer.from('12345678901234567890123456789012')
tape('basic', function (t) {
t.plan(2)
t.plan(4)
var a = protocol()
var b = protocol()
const a = new Protocol(true)
const b = new Protocol(false)
a.feed(KEY)
b.feed(KEY)
a.once('handshake', function () {
t.pass('a got handshake')
const local = a.open(KEY, {
ondata (data) {
t.same(data.index, 42)
t.same(data.value, Buffer.from('value'))
t.end()
}
})
b.once('handshake', function () {
t.pass('b got handshake')
const remote = b.open(KEY, {
onopen () {
t.pass('opened')
},
onrequest (request) {
t.same(request.index, 42)
remote.data({
index: request.index,
value: Buffer.from('value')
})
}
})
a.pipe(b).pipe(a)
})
tape('basic with handshake options', function (t) {
t.plan(16)
var data = [
'eeaa62fbb11ba521cce58cf3fae42deb15d94a0436fc7fa0cbba8f130e7c0499',
'8c797667bf307d82c51a8308fe477b781a13708e0ec1f2cc7f497392574e2464'
]
var a = protocol({id: bufferFrom('a'), live: true, userData: bufferFrom(data)})
var b = protocol({id: bufferFrom('b'), live: false, ack: true})
a.feed(KEY)
b.feed(KEY)
a.once('handshake', function () {
t.same(a.id, bufferFrom('a'))
t.same(a.live, true)
t.same(a.ack, false)
t.same(a.userData, bufferFrom(data))
t.same(a.remoteId, bufferFrom('b'))
t.same(a.remoteLive, false)
t.same(a.remoteUserData, null)
t.same(a.remoteAck, true)
local.request({
index: 42
})
b.once('handshake', function () {
t.same(b.id, bufferFrom('b'))
t.same(b.live, false)
t.same(b.ack, true)
t.same(b.userData, null)
t.same(b.remoteId, bufferFrom('a'))
t.same(b.remoteLive, true)
t.same(b.remoteUserData, bufferFrom(data))
t.same(b.remoteAck, false)
})
a.pipe(b).pipe(a)

@@ -71,221 +44,68 @@ })

var a = protocol()
var b = protocol()
var ch1 = a.feed(KEY)
var ch2 = b.feed(KEY)
b.on('feed', function (discoveryKey) {
t.same(discoveryKey, ch1.discoveryKey)
const a = new Protocol(true, {
ondiscoverykey (discoveryKey) {
t.same(discoveryKey, other.discoveryKey)
}
})
const b = new Protocol(false)
a.on('feed', function (discoveryKey) {
t.same(discoveryKey, ch2.discoveryKey)
const ch1 = a.open(KEY, {
onopen () {
ch1.data({ index: 42, value: Buffer.from('hi') })
ch1.request({ index: 10 })
ch1.cancel({ index: 100 })
},
onwant (want) {
t.same(want, { start: 10, length: 100 })
},
onstatus (status) {
t.same(status, { uploading: false, downloading: true })
},
onunwant (unwant) {
t.same(unwant, { start: 11, length: 100 })
},
onunhave (unhave) {
t.same(unhave, { start: 18, length: 100 })
},
onhave (have) {
t.same(have, { start: 10, length: 10, bitfield: null, ack: false })
}
})
ch2.on('data', function (data) {
t.same(data, {index: 42, signature: null, value: bufferFrom('hi'), nodes: []})
const ch2 = b.open(KEY, {
onopen () {
ch2.want({ start: 10, length: 100 })
ch2.status({ uploading: false, downloading: true })
ch2.unwant({ start: 11, length: 100 })
ch2.unhave({ start: 18, length: 100 })
ch2.have({ start: 10, length: 10 })
},
onrequest (request) {
t.same(request, { index: 10, hash: false, bytes: 0, nodes: 0 })
},
ondata (data) {
t.same(data, { index: 42, signature: null, value: Buffer.from('hi'), nodes: [] })
},
oncancel (cancel) {
t.same(cancel, { index: 100, hash: false, bytes: 0 })
}
})
ch1.data({index: 42, value: bufferFrom('hi')})
const other = b.open(OTHER_KEY)
ch2.on('request', function (request) {
t.same(request, {index: 10, hash: false, bytes: 0, nodes: 0})
a.on('discovery-key', function (discoveryKey) {
t.same(discoveryKey, other.discoveryKey)
})
ch1.request({index: 10})
ch2.on('cancel', function (cancel) {
t.same(cancel, {index: 100, hash: false, bytes: 0})
})
ch1.cancel({index: 100})
ch1.on('want', function (want) {
t.same(want, {start: 10, length: 100})
})
ch2.want({start: 10, length: 100})
ch1.on('info', function (info) {
t.same(info, {uploading: false, downloading: true})
})
ch2.info({uploading: false, downloading: true})
ch1.on('unwant', function (unwant) {
t.same(unwant, {start: 11, length: 100})
})
ch2.unwant({start: 11, length: 100})
ch1.on('unhave', function (unhave) {
t.same(unhave, {start: 18, length: 100})
})
ch2.unhave({start: 18, length: 100})
ch1.on('have', function (have) {
t.same(have, {start: 10, length: 10, bitfield: null, ack: false})
})
ch2.have({start: 10, length: 10})
a.pipe(b).pipe(a)
})
tape('send messages (chunked)', function (t) {
t.plan(10)
tape('destroy', function (t) {
const a = new Protocol(true)
var a = protocol()
var b = protocol()
var ch1 = a.feed(KEY)
var ch2 = b.feed(KEY)
b.on('feed', function (discoveryKey) {
t.same(discoveryKey, ch1.discoveryKey)
})
a.on('feed', function (discoveryKey) {
t.same(discoveryKey, ch2.discoveryKey)
})
ch2.on('data', function (data) {
t.same(data, {index: 42, signature: null, value: bufferFrom('hi'), nodes: []})
})
ch1.data({index: 42, value: bufferFrom('hi')})
ch2.on('request', function (request) {
t.same(request, {index: 10, hash: false, bytes: 0, nodes: 0})
})
ch1.request({index: 10})
ch2.on('cancel', function (cancel) {
t.same(cancel, {index: 100, hash: false, bytes: 0})
})
ch1.cancel({index: 100})
ch1.on('want', function (want) {
t.same(want, {start: 10, length: 100})
})
ch2.want({start: 10, length: 100})
ch1.on('info', function (info) {
t.same(info, {uploading: false, downloading: true})
})
ch2.info({uploading: false, downloading: true})
ch1.on('unwant', function (unwant) {
t.same(unwant, {start: 11, length: 100})
})
ch2.unwant({start: 11, length: 100})
ch1.on('unhave', function (unhave) {
t.same(unhave, {start: 18, length: 100})
})
ch2.unhave({start: 18, length: 100})
ch1.on('have', function (have) {
t.same(have, {start: 10, length: 10, bitfield: null, ack: false})
})
ch2.have({start: 10, length: 10})
a.pipe(choppa()).pipe(b).pipe(choppa()).pipe(a)
})
tape('send messages (concat)', function (t) {
t.plan(10)
var a = protocol()
var b = protocol()
var ch1 = a.feed(KEY)
var ch2 = b.feed(KEY)
b.on('feed', function (discoveryKey) {
t.same(discoveryKey, ch1.discoveryKey)
})
a.on('feed', function (discoveryKey) {
t.same(discoveryKey, ch2.discoveryKey)
})
ch2.on('data', function (data) {
t.same(data, {index: 42, signature: null, value: bufferFrom('hi'), nodes: []})
})
ch1.data({index: 42, value: bufferFrom('hi')})
ch2.on('request', function (request) {
t.same(request, {index: 10, hash: false, bytes: 0, nodes: 0})
})
ch1.request({index: 10})
ch2.on('cancel', function (cancel) {
t.same(cancel, {index: 100, hash: false, bytes: 0})
})
ch1.cancel({index: 100})
ch1.on('want', function (want) {
t.same(want, {start: 10, length: 100})
})
ch2.want({start: 10, length: 100})
ch1.on('info', function (info) {
t.same(info, {uploading: false, downloading: true})
})
ch2.info({uploading: false, downloading: true})
ch1.on('unwant', function (unwant) {
t.same(unwant, {start: 11, length: 100})
})
ch2.unwant({start: 11, length: 100})
ch1.on('unhave', function (unhave) {
t.same(unhave, {start: 18, length: 100})
})
ch2.unhave({start: 18, length: 100})
ch1.on('have', function (have) {
t.same(have, {start: 10, length: 10, bitfield: null, ack: false})
})
ch2.have({start: 10, length: 10})
b.write(toBuffer(a))
a.write(toBuffer(b))
a.pipe(b).pipe(a)
function toBuffer (stream) {
var bufs = []
while (true) {
var next = stream.read()
if (!next) return Buffer.concat(bufs)
bufs.push(next)
a.open(KEY, {
onclose () {
t.pass('closed')
t.end()
}
}
})
tape('destroy', function (t) {
var a = protocol()
var ch1 = a.feed(KEY)
ch1.on('close', function () {
t.pass('closed')
t.end()
})

@@ -299,105 +119,78 @@

var a = protocol()
var b = protocol()
var ch1 = a.feed(KEY)
var ch2 = b.feed(KEY)
b.on('feed', function (discoveryKey) {
t.same(discoveryKey, ch1.discoveryKey)
const a = new Protocol(true, {
ondiscoverykey (discoveryKey) {
t.same(discoveryKey, other.discoveryKey)
}
})
const b = new Protocol(false)
a.on('feed', function (discoveryKey) {
t.same(discoveryKey, ch2.discoveryKey)
const ch1 = a.open(KEY, {
onopen () {
ch1.data({ index: 42, value: Buffer.from('hi') })
ch1.request({ index: 10 })
ch1.cancel({ index: 100 })
},
onwant (want) {
t.same(want, { start: 10, length: 100 })
},
onstatus (status) {
t.same(status, { uploading: false, downloading: true })
},
onunwant (unwant) {
t.same(unwant, { start: 11, length: 100 })
},
onunhave (unhave) {
t.same(unhave, { start: 18, length: 100 })
},
onhave (have) {
t.same(have, { start: 10, length: 10, bitfield: null, ack: true })
}
})
ch2.on('data', function (data) {
t.same(data, {index: 42, signature: null, value: bufferFrom('hi'), nodes: []})
const ch2 = b.open(KEY, {
onopen () {
ch2.want({ start: 10, length: 100 })
ch2.status({ uploading: false, downloading: true })
ch2.unwant({ start: 11, length: 100 })
ch2.unhave({ start: 18, length: 100 })
ch2.have({ start: 10, length: 10, ack: true })
},
onrequest (request) {
t.same(request, { index: 10, hash: false, bytes: 0, nodes: 0 })
},
ondata (data) {
t.same(data, { index: 42, signature: null, value: Buffer.from('hi'), nodes: [] })
},
oncancel (cancel) {
t.same(cancel, { index: 100, hash: false, bytes: 0 })
}
})
ch1.data({index: 42, value: bufferFrom('hi')})
const other = b.open(OTHER_KEY)
ch2.on('request', function (request) {
t.same(request, {index: 10, hash: false, bytes: 0, nodes: 0})
a.on('discovery-key', function (discoveryKey) {
t.same(discoveryKey, other.discoveryKey)
})
ch1.request({index: 10})
ch2.on('cancel', function (cancel) {
t.same(cancel, {index: 100, hash: false, bytes: 0})
})
ch1.cancel({index: 100})
ch1.on('want', function (want) {
t.same(want, {start: 10, length: 100})
})
ch2.want({start: 10, length: 100})
ch1.on('info', function (info) {
t.same(info, {uploading: false, downloading: true})
})
ch2.info({uploading: false, downloading: true})
ch1.on('unwant', function (unwant) {
t.same(unwant, {start: 11, length: 100})
})
ch2.unwant({start: 11, length: 100})
ch1.on('unhave', function (unhave) {
t.same(unhave, {start: 18, length: 100})
})
ch2.unhave({start: 18, length: 100})
ch1.on('have', function (have) {
t.same(have, {start: 10, length: 10, bitfield: null, ack: true})
})
ch2.have({start: 10, length: 10, ack: true})
a.pipe(b).pipe(a)
})
tape('first feed should be the same', function (t) {
t.plan(2)
var a = protocol()
var b = protocol()
a.feed(KEY)
b.feed(OTHER_KEY)
a.once('error', function () {
t.pass('a should error')
})
b.once('error', function () {
t.pass('b should error')
})
a.pipe(b).pipe(a)
})
tape('multiple feeds', function (t) {
var a = protocol()
var b = protocol()
const a = new Protocol(true)
const b = new Protocol(false)
a.feed(KEY)
b.feed(KEY)
a.open(KEY)
b.open(KEY)
var ch1 = a.feed(OTHER_KEY)
var ch2 = b.feed(OTHER_KEY)
ch1.have({
start: 10,
length: 100
const ch1 = a.open(OTHER_KEY, {
onopen () {
ch1.have({ start: 10, length: 100 })
}
})
ch2.on('have', function () {
t.pass('got message on second channel')
t.end()
b.open(OTHER_KEY, {
onhave () {
t.pass('got message on second channel')
t.end()
}
})

@@ -409,17 +202,21 @@

tape('async feed', function (t) {
var a = protocol()
var b = protocol()
const a = new Protocol(true)
const b = new Protocol(false, {
ondiscoverykey () {
setTimeout(function () {
t.ok(b.remoteVerified(KEY))
b.open(KEY, {
onrequest (request) {
t.same(request.index, 42)
t.end()
}
})
}, 100)
}
})
var ch1 = a.feed(KEY)
ch1.request({index: 42})
b.once('feed', function () {
setTimeout(function () {
var ch2 = b.feed(KEY)
ch2.on('request', function (request) {
t.same(request.index, 42)
t.end()
})
}, 100)
const ch1 = a.open(KEY, {
onopen () {
ch1.request({ index: 42 })
}
})

@@ -431,14 +228,22 @@

tape('stream is encrypted', function (t) {
var a = protocol()
var b = protocol()
const a = new Protocol(true)
const b = new Protocol(false)
let gotData = false
var ch1 = a.feed(KEY)
var ch2 = b.feed(KEY)
const ch1 = a.open(KEY, {
onopen () {
ch1.data({ index: 42, value: Buffer.from('i am secret') })
}
})
ch2.on('data', function (data) {
t.same(data.value, bufferFrom('i am secret'))
t.end()
b.open(KEY, {
ondata (data) {
t.ok(gotData, 'got some data')
t.same(data.value, Buffer.from('i am secret'))
t.end()
}
})
a.on('data', function (data) {
gotData = true
t.ok(data.toString().indexOf('secret') === -1)

@@ -448,21 +253,27 @@ })

a.pipe(b).pipe(a)
ch1.data({index: 42, value: bufferFrom('i am secret')})
})
tape('stream can be unencrypted', function (t) {
var a = protocol({encrypt: false})
var b = protocol({encrypt: false})
const a = new Protocol(true, { encrypted: false })
const b = new Protocol(false, { encrypted: false })
let gotData = false
let sawSecret = false
var ch1 = a.feed(KEY)
var ch2 = b.feed(KEY)
var sawSecret = false
const ch1 = a.open(KEY, {
onopen () {
ch1.data({ index: 42, value: Buffer.from('i am secret') })
}
})
ch2.on('data', function (data) {
t.ok(sawSecret, 'saw secret')
t.same(data.value, bufferFrom('i am secret'))
t.end()
b.open(KEY, {
ondata (data) {
t.ok(sawSecret, 'saw the secret')
t.ok(gotData, 'got some data')
t.same(data.value, Buffer.from('i am secret'))
t.end()
}
})
a.on('data', function (data) {
gotData = true
if (data.toString().indexOf('secret') > -1) {

@@ -474,14 +285,9 @@ sawSecret = true

a.pipe(b).pipe(a)
ch1.data({index: 42, value: bufferFrom('i am secret')})
})
tape('keep alives', function (t) {
var a = protocol({timeout: 100})
var b = protocol({timeout: 100})
const a = new Protocol(true, { timeout: 100 })
const b = new Protocol(false, { timeout: 100 })
a.feed(KEY)
b.feed(KEY)
var timeout = setTimeout(function () {
const timeout = setTimeout(function () {
t.pass('should not time out')

@@ -501,9 +307,11 @@ t.end()

tape('timeouts', function (t) {
var a = protocol({timeout: false})
var b = protocol({timeout: 100})
const a = new Protocol(true, { timeout: false })
const b = new Protocol(false, { timeout: 100 })
var timeout = setTimeout(function () {
const timeout = setTimeout(function () {
t.fail('should time out')
}, 1000)
a.on('error', () => {})
b.on('error', function () {

@@ -518,20 +326,12 @@ clearTimeout(timeout)

tape('expected feeds', function (t) {
var a = protocol({expectedFeeds: 1})
a.resume()
a.on('end', function () {
t.pass('should end')
t.end()
tape('prefinalise hook', function (t) {
const a = new Protocol(true)
const b = new Protocol(false, {
ondiscoverykey (discoveryKey) {
b.close(discoveryKey)
}
})
var ch = a.feed(KEY)
let created = 0
ch.close()
})
tape('2 expected feeds', function (t) {
var a = protocol({expectedFeeds: 2})
var created = 0
a.resume()

@@ -545,3 +345,6 @@ a.on('end', function () {

created++
var ch = a.feed(KEY)
a.prefinalize.wait()
b.prefinalize.wait()
const ch = a.open(KEY)
ch.close()

@@ -551,5 +354,9 @@

created++
var ch = a.feed(OTHER_KEY)
const ch = a.open(OTHER_KEY)
ch.close()
a.prefinalize.continue()
b.prefinalize.continue()
}, 100)
a.pipe(b).pipe(a)
})

@@ -560,15 +367,16 @@

var a = protocol()
var b = protocol()
const a = new Protocol(true)
const b = new Protocol(false)
var ch1 = a.feed(KEY)
var ch2 = b.feed(KEY)
const ch1 = a.open(KEY)
ch2.on('have', function (have) {
t.pass('got have')
b.open(KEY, {
onhave () {
t.pass('got have')
}
})
ch1.have({start: 1})
ch1.have({ start: 1 })
a.ping()
ch1.have({start: 2})
ch1.have({ start: 2 })

@@ -579,77 +387,52 @@ a.pipe(b).pipe(a)

tape('extension message', function (t) {
t.plan(10)
t.plan(6)
var a = protocol({
extensions: ['a', 'b']
})
const a = new Protocol(true)
const b = new Protocol(false)
var b = protocol({
extensions: ['b', 'c']
const ch1 = a.open(KEY, {
onopen () {
ch1.options({
extensions: ['a', 'b']
})
},
onoptions (options) {
t.same(options.extensions, ['b', 'c'])
ch1.extension(1, Buffer.from('hello ch2'))
},
onextension (type, message) {
t.same(type, 0)
t.same(message, Buffer.from('hello ch1'))
}
})
var ch1 = a.feed(KEY)
var ch2 = b.feed(KEY)
ch2.on('extension', function (type, message) {
t.same(type, 'b')
t.same(message, bufferFrom('hello ch2'))
const ch2 = b.open(KEY, {
onopen () {
ch2.options({
extensions: ['b', 'c']
})
},
onoptions (options) {
t.same(options.extensions, ['a', 'b'])
ch2.extension(0, Buffer.from('hello ch1'))
},
onextension (type, message) {
t.same(type, 1)
t.same(message, Buffer.from('hello ch2'))
}
})
ch1.on('extension', function (type, message) {
t.same(type, 'b')
t.same(message, bufferFrom('hello ch1'))
})
a.once('handshake', function () {
t.same(a.remoteSupports('a'), false)
t.same(a.remoteSupports('b'), true)
t.same(a.remoteSupports('c'), false)
ch1.extension('a', bufferFrom('nooo'))
ch1.extension('b', bufferFrom('hello ch2'))
ch1.extension('c', bufferFrom('nooo'))
})
b.once('handshake', function () {
t.same(b.remoteSupports('a'), false)
t.same(b.remoteSupports('b'), true)
t.same(b.remoteSupports('c'), false)
ch2.extension('a', bufferFrom('nooo'))
ch2.extension('b', bufferFrom('hello ch1'))
ch2.extension('c', bufferFrom('nooo'))
})
a.pipe(b).pipe(a)
})
tape('encrypt: false should ignore the first key', function (t) {
t.plan(1)
var a = protocol({ encrypt: false })
var b = protocol({ encrypt: false })
var ch1 = a.feed(KEY)
b.feed(OTHER_KEY)
var ch2 = b.feed(KEY)
ch1.on('data', function (data) {
t.same(data, {index: 42, signature: null, value: bufferFrom('hi'), nodes: []})
})
ch2.data({index: 42, value: bufferFrom('hi')})
a.pipe(b).pipe(a)
})
tape('feed channel ids are set up correctly', function (t) {
var a = protocol()
var b = protocol()
const a = new Protocol(true)
const b = new Protocol(false)
a.feed(KEY)
a.open(KEY)
a.pipe(b).pipe(a)
b.once('feed', function () {
var ch2 = b.feed(KEY)
t.ok(ch2.id > -1)
b.once('discovery-key', function () {
const ch2 = b.open(KEY)
t.ok(ch2.localId > -1)
t.ok(ch2.remoteId > -1)

@@ -656,0 +439,0 @@ t.end()

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc