hypercore-protocol
Advanced tools
Comparing version 3.0.0 to 4.0.0
649
index.js
@@ -1,57 +0,70 @@ | ||
var crypto = require('crypto') | ||
var events = require('events') | ||
var inherits = require('inherits') | ||
var duplexify = require('duplexify') | ||
var inherits = require('inherits') | ||
var varint = require('varint') | ||
var events = require('events') | ||
var messages = require('./messages') | ||
var lpstream = require('length-prefixed-stream') | ||
var stream = require('readable-stream') | ||
var lpstream = require('length-prefixed-stream') | ||
var crypto = require('crypto') | ||
var encryption = require('sodium-encryption') | ||
var increment = require('increment-buffer') | ||
var equals = require('buffer-equals') | ||
var varint = require('varint') | ||
var xtend = require('xtend') | ||
var pe = require('passthrough-encoding') | ||
var messages = require('./messages') | ||
var KEEP_ALIVE = Buffer([0]) | ||
var ENCODERS = [ | ||
var DEFAULT_TYPES = [ | ||
messages.Handshake, | ||
null, // close | ||
null, // pause | ||
null, // resume | ||
messages.Have, | ||
messages.Want, | ||
messages.Request, | ||
messages.Response, | ||
messages.Cancel | ||
messages.Data, | ||
messages.Cancel, | ||
null, // pause | ||
null // resume | ||
] | ||
module.exports = use([]) | ||
var DEFAULT_EVENTS = [ | ||
'handshake', | ||
'have', | ||
'want', | ||
'request', | ||
'data', | ||
'cancel', | ||
'pause', | ||
'resume' | ||
] | ||
while (DEFAULT_EVENTS.length < 64) { // reserved | ||
DEFAULT_EVENTS.push('unknown') | ||
DEFAULT_TYPES.push(null) | ||
} | ||
module.exports = use({}) | ||
function use (extensions) { | ||
if (extensions.length > 64) { | ||
throw new Error('Only 64 extensions are supported') | ||
} | ||
var extensionNames = Object.keys(extensions).sort() | ||
var types = DEFAULT_TYPES.slice(0) | ||
var eventNames = DEFAULT_EVENTS.slice(0) | ||
function Channel (stream, publicId) { | ||
function Channel (protocol) { | ||
events.EventEmitter.call(this) | ||
this.feed = null | ||
this.stream = stream | ||
this.publicId = publicId | ||
this.state = null // set by someone else | ||
this.protocol = protocol | ||
this.opened = false | ||
this.closed = false | ||
this.key = null | ||
this.closed = false | ||
this.discoveryKey = null | ||
this.local = -1 | ||
this.remote = -1 | ||
this.buffer = [] | ||
this.remotePaused = true | ||
this.amPaused = true | ||
this._nonce = crypto.randomBytes(24) | ||
this._remoteNonce = null | ||
// populated by hypercore. defined here so v8 is happy | ||
this.feed = null | ||
this.remoteBitfield = null | ||
this.remoteTree = null | ||
this._firstMessage = null | ||
this._encrypted = stream.encrypted | ||
this._receivedHandshake = false | ||
this._nonce = null | ||
this._remoteNonce = null | ||
this._local = -1 | ||
this._remote = -1 | ||
this._firstNonce = Buffer(24) | ||
this._nonce.copy(this._firstNonce) | ||
this.on('handshake', this._onhandshake) | ||
} | ||
@@ -61,229 +74,57 @@ | ||
Channel.prototype.handshake = function (handshake) { | ||
if (!handshake) handshake = {} | ||
if (!this.stream.remoteId) { | ||
handshake.peerId = this.stream.id | ||
handshake.extensions = extensions | ||
} | ||
this.amPaused = !!handshake.paused | ||
this._send(0, handshake) | ||
Channel.prototype.remoteSupports = function (name) { | ||
return this.protocol.remoteSupports(name) | ||
} | ||
Channel.prototype.close = function (err) { | ||
if (err) return this.stream.destroy(err) // fatal close | ||
var keyHex = this.publicId.toString('hex') | ||
if (this.stream._channels[keyHex] !== this) return | ||
delete this.stream._channels[keyHex] | ||
if (this._remote > -1) this.stream._remote[this._remote] = null | ||
if (this._local > -1) this.stream._local[this._local] = null | ||
this.closed = true | ||
this.remotePaused = true | ||
this.emit('close') | ||
if (this.key && !this.stream.destroyed && this._receivedHandshake) this._send(1, null) | ||
Channel.prototype.handshake = function (message) { | ||
return this.protocol._send(this, 0, message) | ||
} | ||
Channel.prototype.pause = function () { | ||
this._send(2, null) | ||
Channel.prototype.have = function (message) { | ||
return this.protocol._send(this, 1, message) | ||
} | ||
Channel.prototype.resume = function () { | ||
this._send(3, null) | ||
Channel.prototype.want = function (message) { | ||
return this.protocol._send(this, 2, message) | ||
} | ||
Channel.prototype.have = function (have) { | ||
this._send(4, have) | ||
Channel.prototype.request = function (message) { | ||
return this.protocol._send(this, 3, message) | ||
} | ||
Channel.prototype.want = function (want) { | ||
this._send(5, want) | ||
Channel.prototype.data = function (message) { | ||
return this.protocol._send(this, 4, message) | ||
} | ||
Channel.prototype.request = function (request) { | ||
this._send(6, request) | ||
Channel.prototype.cancel = function (message) { | ||
return this.protocol._send(this, 5, message) | ||
} | ||
Channel.prototype.response = function (response) { | ||
this._send(7, response) | ||
Channel.prototype.pause = function () { | ||
return this.protocol._send(this, 6, null) | ||
} | ||
Channel.prototype.cancel = function (cancel) { | ||
this._send(8, cancel) | ||
Channel.prototype.resume = function () { | ||
return this.protocol._send(this, 7, null) | ||
} | ||
Channel.prototype.remoteSupports = function (id) { | ||
return this.stream.remoteSupports(id) | ||
Channel.prototype.end = function () { | ||
this.protocol._close(this) | ||
} | ||
Channel.prototype._open = function (key) { | ||
if (this.key) return | ||
this.key = key | ||
this._local = this.stream._local.indexOf(null) | ||
if (this._local === -1) this._local = this.stream._local.push(null) - 1 | ||
this.stream[this._local] = this | ||
this._nonce = crypto.randomBytes(24) | ||
var open = {publicId: this.publicId, nonce: this._nonce} | ||
var len = varint.encodingLength(this._local) + messages.Open.encodingLength(open) | ||
var buf = Buffer(varint.encodingLength(len) + len) | ||
var offset = 0 | ||
varint.encode(len, buf, offset) | ||
offset += varint.encode.bytes | ||
varint.encode(this._local, buf, offset) | ||
offset += varint.encode.bytes | ||
messages.Open.encode(open, buf, offset) | ||
offset += messages.Open.encode.bytes | ||
this.stream._encode.push(buf) | ||
this.stream._keepAlive = 0 | ||
if (this._firstMessage) { | ||
this._parse(this._firstMessage, 0) | ||
this._firstMessage = null | ||
} | ||
} | ||
Channel.prototype._encrypt = function (buf) { | ||
buf = encryption.encrypt(buf, this._nonce, this.key) | ||
if (this._receivedHandshake) increment(this._nonce) | ||
return buf | ||
} | ||
Channel.prototype._decrypt = function (buf) { | ||
buf = encryption.decrypt(buf, this._remoteNonce, this.key) | ||
if (!buf) return null | ||
if (this._receivedHandshake) increment(this._remoteNonce) | ||
return buf | ||
} | ||
Channel.prototype._parse = function (buf, offset) { | ||
if (!this.key) { | ||
if (this._firstMessage) { | ||
return this.stream.destroy(new Error('Remote send a message before waiting for handshake')) | ||
} | ||
this._firstMessage = buf.slice(offset) | ||
return | ||
} | ||
if (this._encrypted) { | ||
buf = this._decrypt(buf.slice(offset)) | ||
offset = 0 | ||
if (!buf) return this.stream.destroy(new Error('Could not decrypt message')) | ||
} | ||
var type = buf[offset++] | ||
if (type > 127) return this.stream.destroy(new Error('Only single byte bytes currently supported')) | ||
if (!this._receivedHandshake && type !== 0) { | ||
return this.stream.destroy(new Error('First message received should be a handshake')) | ||
} | ||
if (type >= 64) return this._onextension(type - 64, buf.slice(offset)) | ||
var enc = ENCODERS[type] | ||
try { | ||
var message = enc && enc.decode(buf, offset) | ||
} catch (err) { | ||
this.stream.destroy(err) | ||
return | ||
} | ||
switch (type) { | ||
case 0: return this._onhandshake(message) | ||
case 1: return this.close() | ||
case 2: return this._onpause() | ||
case 3: return this._onresume() | ||
case 4: return this.emit('have', message) | ||
case 5: return this.emit('want', message) | ||
case 6: return this.emit('request', message) | ||
case 7: return this.emit('response', message) | ||
case 8: return this.emit('cancel', message) | ||
} | ||
} | ||
Channel.prototype._onextension = function (type, buf) { | ||
var ext = type < this.stream._remoteExtensions.length ? this.stream._remoteExtensions[type] : -1 | ||
if (ext > -1) this.emit(extensions[ext], buf) | ||
} | ||
Channel.prototype._onpause = function () { | ||
if (this.remotePaused) return | ||
this.remotePaused = true | ||
this.emit('pause') | ||
} | ||
Channel.prototype._onresume = function () { | ||
if (!this.remotePaused) return | ||
this.remotePaused = false | ||
this.emit('resume') | ||
} | ||
Channel.prototype._onhandshake = function (handshake) { | ||
if (!this.stream.remoteId) this.stream._onhandshake(handshake) | ||
var nonce = this._nonce | ||
var remoteNonce = this._remoteNonce | ||
this._receivedHandshake = true | ||
this._nonce = createNonce(nonce, remoteNonce) | ||
this._remoteNonce = createNonce(remoteNonce, nonce) | ||
if (equals(this._nonce, this._remoteNonce)) return this.stream.destroy(new Error('Remote nonce is invalid')) | ||
this.remotePaused = handshake.paused | ||
this.emit('handshake', handshake) | ||
this.protocol._onhandshake(handshake) | ||
} | ||
Channel.prototype._send = function (type, message) { | ||
if (this.closed && type !== 1) return | ||
if (!this._receivedHandshake && type !== 0) throw new Error('Wait for remote handshake') | ||
extensionNames.forEach(function (name, type) { | ||
if (Channel.prototype[name] || name === 'error') throw new Error('Invalid extension name') | ||
var enc = type < ENCODERS.length ? ENCODERS[type] : null | ||
var tmp = Buffer(1 + (enc ? enc.encodingLength(message) : (message ? message.length : 0))) | ||
var enc = isEncoder(extensions[name]) ? extensions[name] : pe | ||
tmp[0] = type | ||
if (enc) enc.encode(message, tmp, 1) | ||
else if (message) message.copy(tmp, 1) | ||
types.push(enc) | ||
eventNames.push(name) | ||
if (this._encrypted) tmp = this._encrypt(tmp) | ||
var len = varint.encodingLength(this._local) + tmp.length | ||
var buf = Buffer(varint.encodingLength(len) + len) | ||
var offset = 0 | ||
varint.encode(len, buf, offset) | ||
offset += varint.encode.bytes | ||
varint.encode(this._local, buf, offset) | ||
offset += varint.encode.bytes | ||
tmp.copy(buf, offset) | ||
this.stream._encode.push(buf) | ||
this.stream._keepAlive = 0 | ||
} | ||
Channel.prototype._onopen = function (message, remote) { | ||
this._remote = remote | ||
this.stream._remote[remote] = this | ||
this._remoteNonce = message.nonce | ||
this.emit('open') | ||
} | ||
extensions.forEach(function (name, id) { | ||
if (name === 'error' || !/^[a-z][_a-z0-9]+$/i.test(name) || Channel.prototype[name]) { | ||
throw new Error('Invalid extension name: ' + name) | ||
Channel.prototype[name] = function (message) { | ||
return this.protocol._send(this, DEFAULT_TYPES.length + type, message) | ||
} | ||
Channel.prototype[name] = function (buf) { | ||
this._send(id + 64, buf) | ||
} | ||
}) | ||
@@ -293,25 +134,22 @@ | ||
if (!(this instanceof Protocol)) return new Protocol(opts, onopen) | ||
if (!opts) opts = {} | ||
if (typeof opts === 'function') { | ||
onopen = opts | ||
opts = {} | ||
opts = null | ||
} | ||
if (!opts) opts = {} | ||
var self = this | ||
duplexify.call(this) | ||
var self = this | ||
this.channels = {} | ||
this.private = opts.private !== false | ||
this.id = opts.id || crypto.randomBytes(32) | ||
this.remoteId = null | ||
this.encrypted = opts.encrypt !== false | ||
this._channels = {} | ||
this._finalized = false | ||
this._paused = 0 | ||
this._local = [] | ||
this._remote = [] | ||
this._onopen = onopen || opts.open || noop | ||
this._keepAlive = 0 | ||
this._remoteKeepAlive = 0 | ||
this._interval = null | ||
this._remoteExtensions = new Array(extensions.length) | ||
@@ -323,14 +161,33 @@ this._localExtensions = new Array(extensions.length) | ||
this._encode = new stream.Readable() | ||
this._keepAlive = 0 | ||
this._remoteKeepAlive = 0 | ||
this._interval = null | ||
this._encode = stream.Readable() | ||
this._encode._read = noop | ||
this.setReadable(this._encode) | ||
this._decode = lpstream.decode({allowEmpty: true, limit: 5 * 1024 * 1024}) | ||
this._decode.on('data', parse) | ||
this.setReadable(this._encode) | ||
this.setWritable(this._decode) | ||
this.on('end', this.destroy) | ||
this.on('finish', this.destroy) | ||
this.on('close', this._close) | ||
this.on('close', onfinalize) | ||
this.on('end', onfinalize) | ||
this.on('finish', this.finalize) | ||
if (onopen) this.on('open', onopen) | ||
function onfinalize () { | ||
if (self._finalized) return | ||
self._finalized = true | ||
clearInterval(this._interval) | ||
var keys = Object.keys(self.channels) | ||
for (var i = 0; i < keys.length; i++) { | ||
self._close(self.channels[keys[i]]) | ||
} | ||
} | ||
function parse (data) { | ||
@@ -343,14 +200,15 @@ self._parse(data) | ||
Protocol.extensions = extensions | ||
Protocol.use = function (ext) { | ||
return use(extensions.concat(ext).sort().filter(noDups)) | ||
if (typeof ext === 'string') ext = toObject(ext) | ||
return use(xtend(ext, extensions)) | ||
} | ||
Protocol.prototype.remoteSupports = function (id) { | ||
var i = typeof id === 'number' ? id : extensions.indexOf(id) | ||
return this._localExtensions[i] > -1 | ||
Protocol.prototype.finalize = function () { | ||
this._encode.push(null) | ||
} | ||
Protocol.prototype.setTimeout = function (ms, ontimeout) { | ||
if (this._finalized) return | ||
if (ontimeout) this.once('timeout', ontimeout) | ||
var self = this | ||
@@ -363,3 +221,3 @@ | ||
this._interval = setInterval(kick, (ms / 4) | 0) | ||
if (this._interval) this._interval.unref() | ||
if (this._interval.unref) this._interval.unref() | ||
@@ -383,30 +241,97 @@ function kick () { | ||
Protocol.prototype.channel = function (key, publicId) { | ||
if (!publicId) publicId = crypto.createHmac('sha256', key).update('hypercore').digest() | ||
Protocol.prototype.remoteSupports = function (name) { | ||
var i = typeof name === 'string' ? extensionNames.indexOf(name) : name | ||
return i > -1 && this._localExtensions[i] > -1 | ||
} | ||
var keyHex = publicId.toString('hex') | ||
var channel = this._channels[keyHex] | ||
Protocol.prototype.open = function (key, opts) { | ||
if (this._finalized) return null // already finalized | ||
if (!opts) opts = {} | ||
if (!channel) channel = this._channels[keyHex] = new Channel(this, publicId) | ||
if (channel.key) return channel | ||
channel._open(key) | ||
return channel | ||
} | ||
var d = opts.discoveryKey || discoveryKey(key) | ||
var keyHex = d.toString('hex') | ||
var ch = this.channels[keyHex] | ||
Protocol.prototype.keys = function () { | ||
var keys = Object.keys(this._channels) | ||
var list = [] | ||
for (var i = 0; i < keys.length; i++) { | ||
var key = this._channels[keys[i]].key | ||
if (key) list.push(key) | ||
if (!ch) { | ||
ch = new Channel(this) | ||
ch.discoveryKey = d | ||
this.channels[keyHex] = ch | ||
} | ||
return list | ||
if (ch.local > -1) return ch | ||
ch.key = key | ||
ch.local = this._local.indexOf(null) | ||
if (ch.local === -1) ch.local = this._local.push(null) - 1 | ||
this._local[ch.local] = ch | ||
var open = messages.Open.encode({ | ||
feed: ch.discoveryKey, | ||
nonce: ch._nonce | ||
}) | ||
this._sendRaw(ch, open) | ||
if (!this.remoteId && opts.handshake !== false) { | ||
ch.handshake({ | ||
id: this.id, | ||
extensions: extensionNames | ||
}) | ||
} | ||
if (ch.buffer.length) this._parseSoon(ch) | ||
return ch | ||
} | ||
Protocol.prototype._close = function () { | ||
clearInterval(this._interval) | ||
var keys = Object.keys(this._channels) | ||
for (var i = 0; i < keys.length; i++) { | ||
var ch = this._channels[keys[i]] | ||
if (ch.key) ch.close() | ||
Protocol.prototype._send = function (channel, type, message) { | ||
if (channel.closed) return false | ||
var enc = types[type] | ||
var len = enc ? enc.encodingLength(message) : 0 | ||
var buf = Buffer(len + 1) | ||
buf[0] = type | ||
if (enc) enc.encode(message, buf, 1) | ||
if (this.private) buf = this._encrypt(channel, buf) | ||
return this._sendRaw(channel, buf) | ||
} | ||
Protocol.prototype._sendRaw = function (channel, buf) { | ||
this._keepAlive = 0 | ||
var len = buf.length + varint.encodingLength(channel.local) | ||
var box = Buffer(varint.encodingLength(len) + len) | ||
var offset = 0 | ||
varint.encode(len, box, offset) | ||
offset += varint.encode.bytes | ||
varint.encode(channel.local, box, offset) | ||
offset += varint.encode.bytes | ||
buf.copy(box, offset) | ||
return this._encode.push(box) | ||
} | ||
Protocol.prototype._pause = function () { | ||
if (!this._paused++) this._decode.pause() | ||
} | ||
Protocol.prototype._resume = function () { | ||
if (!--this._paused) this._decode.resume() | ||
} | ||
Protocol.prototype._parseSoon = function (channel) { | ||
var self = this | ||
this._pause() | ||
process.nextTick(drain) | ||
function drain () { | ||
if (self._finalized || channel.closed) return | ||
while (channel.buffer.length) self._parse(channel.buffer.shift()) | ||
if (!self._finalized) self._resume() | ||
} | ||
@@ -417,28 +342,101 @@ } | ||
this._remoteKeepAlive = 0 | ||
if (!data.length) return | ||
var remote = varint.decode(data) | ||
if (!data.length || this._finalized) return | ||
var remote = varint.decode(data, 0) | ||
var offset = varint.decode.bytes | ||
if (remote === this._remote.length) this._remote.push(null) | ||
if (remote > this._remote.length) { | ||
return this.destroy(new Error('Received invalid channel')) | ||
if (remote >= this._remote.length) this._remote.push(null) | ||
if (remote > this._remote.length) return this.destroy(new Error('Unexpected channel number')) | ||
if (!this._remote[remote]) this._onopen(remote, data, offset) | ||
else if (offset !== data.length) this._onmessage(remote, data, offset) | ||
else this._onclose(remote) | ||
} | ||
Protocol.prototype._parseType = function (type) { | ||
if (type > 127) return -1 | ||
if (type < 64) return type | ||
if (type - 64 >= this._remoteExtensions.length) return -1 | ||
type = this._remoteExtensions[type - 64] | ||
if (type === -1) return -1 | ||
return type + 64 | ||
} | ||
Protocol.prototype._onopen = function (remote, data, offset) { | ||
try { | ||
var open = messages.Open.decode(data, offset) | ||
} catch (err) { | ||
return this.destroy(err) | ||
} | ||
if (open.feed.length !== 32 || open.nonce.length !== 24) return this.destroy(new Error('Invalid open message')) | ||
var keyHex = open.feed.toString('hex') | ||
var ch = this.channels[keyHex] | ||
if (!ch) { | ||
ch = new Channel(this) | ||
ch.discoveryKey = open.feed | ||
this.channels[keyHex] = ch | ||
} | ||
if (ch.remote > -1) return this.destroy(new Error('Double open for same channel')) | ||
ch.remote = remote | ||
ch._remoteNonce = open.nonce | ||
this._remote[remote] = ch | ||
this._open(ch) | ||
if (!this._finalized && ch.local === -1) this.emit('open', ch.discoveryKey) | ||
} | ||
Protocol.prototype._onmessage = function (remote, data, offset) { | ||
var channel = this._remote[remote] | ||
if (channel) { | ||
return channel._parse(data, offset) | ||
if (!channel.key) { | ||
if (channel.buffer.length === 16) return this.destroy(new Error('Buffer overflow')) | ||
channel.buffer.push(data) | ||
return | ||
} | ||
if (this._remote.indexOf(null) !== remote) { | ||
return this.destroy(new Error('Received invalid channel')) | ||
var box = this._decrypt(channel, data.slice(offset)) | ||
if (!box || !box.length) return this.destroy(new Error('Invalid message')) | ||
var type = this._parseType(box[0]) | ||
if (type < 0) return | ||
if (type && !this.remoteId) return this.destroy(new Error('Did not receive handshake')) | ||
if (type >= types.length) return | ||
var enc = types[type] | ||
try { | ||
var message = enc ? enc.decode(box, 1) : null | ||
} catch (err) { | ||
return this.destroy(err) | ||
} | ||
this._open(remote, data, offset) | ||
channel.emit(eventNames[type], message) | ||
} | ||
Protocol.prototype._onclose = function (remote) { | ||
var channel = this._remote[remote] | ||
this._remote[remote] = null | ||
channel.remote = -1 | ||
if (channel.local > -1) this._close(channel) | ||
var keyHex = channel.discoveryKey.toString('hex') | ||
if (this.channels[keyHex] === channel) delete this.channels[keyHex] | ||
} | ||
Protocol.prototype._onhandshake = function (handshake) { | ||
// called with the first handshake | ||
this.remoteId = handshake.peerId || crypto.randomBytes(32) | ||
if (this.remoteId) return // already handshaked | ||
this.remoteId = handshake.id | ||
var exts = handshake.extensions | ||
// extensions *must* be sorted | ||
@@ -448,4 +446,4 @@ var local = 0 | ||
while (local < extensions.length && remote < handshake.extensions.length && remote < 64) { | ||
if (extensions[local] === handshake.extensions[remote]) { | ||
while (local < extensionNames.length && remote < exts.length && remote < 64) { | ||
if (extensionNames[local] === exts[remote]) { | ||
this._localExtensions[local] = remote | ||
@@ -455,3 +453,3 @@ this._remoteExtensions[remote] = local | ||
remote++ | ||
} else if (extensions[local] < handshake.extensions[remote]) { | ||
} else if (extensionNames[local] < exts[remote]) { | ||
local++ | ||
@@ -466,35 +464,52 @@ } else { | ||
Protocol.prototype._open = function (remote, data, offset) { | ||
try { | ||
var open = messages.Open.decode(data, offset) | ||
} catch (err) { | ||
return | ||
} | ||
Protocol.prototype._open = function (channel) { | ||
if (channel.local === -1 || channel.remote === -1) return | ||
if (equals(channel._remoteNonce, channel._firstNonce)) return this.destroy(new Error('Remote echoed nonce')) | ||
channel.opened = true | ||
channel.emit('open') | ||
} | ||
if (open.publicId.length !== 32 || open.nonce.length !== 24) return | ||
Protocol.prototype._close = function (channel) { | ||
if (channel.closed) return | ||
channel.closed = true | ||
var keyHex = open.publicId.toString('hex') | ||
var channel = this._channels[keyHex] | ||
if (!this._finalized) this._sendRaw(channel, Buffer(0)) | ||
if (channel) { | ||
channel._onopen(open, remote) | ||
return | ||
} | ||
this._local[channel.local] = null | ||
channel.local = -1 | ||
channel.emit('end') | ||
} | ||
channel = this._channels[keyHex] = new Channel(this, open.publicId) | ||
channel._onopen(open, remote) | ||
this._onopen(open.publicId) | ||
Protocol.prototype._encrypt = function (channel, buf) { | ||
if (!this.private) return buf | ||
var box = encryption.encrypt(buf, channel._nonce, channel.key) | ||
increment(channel._nonce) | ||
return box | ||
} | ||
return Protocol | ||
} | ||
Protocol.prototype._decrypt = function (channel, box) { | ||
if (!this.private) return box | ||
var buf = box.length < 16 ? null : encryption.decrypt(box, channel._remoteNonce, channel.key) | ||
if (!buf) return null | ||
increment(channel._remoteNonce) | ||
return buf | ||
} | ||
function noop () {} | ||
function discoveryKey (key) { | ||
return crypto.createHmac('sha256', key).update('hypercore').digest() | ||
} | ||
function createNonce (a, b) { | ||
return crypto.createHash('sha256').update(a).update(b).digest().slice(0, 24) | ||
} | ||
function isEncoder (val) { | ||
return val && typeof val.encode === 'function' | ||
} | ||
function noDups (name, i, extensions) { | ||
return extensions.indexOf(name) === i | ||
function toObject (name) { | ||
var tmp = {} | ||
tmp[name] = true | ||
return tmp | ||
} | ||
function noop () {} | ||
return Protocol | ||
} |
{ | ||
"name": "hypercore-protocol", | ||
"version": "3.0.0", | ||
"version": "4.0.0", | ||
"description": "Stream that implements the hypercore protocol", | ||
"main": "index.js", | ||
"dependencies": { | ||
"brfs": "^1.4.3", | ||
"buffer-equals": "^1.0.3", | ||
@@ -13,10 +12,12 @@ "duplexify": "^3.4.3", | ||
"length-prefixed-stream": "^1.5.0", | ||
"passthrough-encoding": "^1.2.0", | ||
"protocol-buffers": "^3.1.6", | ||
"readable-stream": "^2.0.5", | ||
"sodium-encryption": "^1.0.1", | ||
"varint": "^4.0.0" | ||
"readable-stream": "^2.1.4", | ||
"sodium-encryption": "^1.1.0", | ||
"varint": "^4.0.0", | ||
"xtend": "^4.0.1" | ||
}, | ||
"devDependencies": { | ||
"standard": "^6.0.7", | ||
"tape": "^4.5.0" | ||
"standard": "^7.1.0", | ||
"tape": "^4.5.1" | ||
}, | ||
@@ -23,0 +24,0 @@ "scripts": { |
@@ -19,16 +19,9 @@ # hypercore-protocol | ||
// open a channel specified by a 32 byte key | ||
var channel = p.channel(Buffer('deadbeefdeadbeefdeadbeefdeadbeef')) | ||
var channel = p.open(Buffer('deadbeefdeadbeefdeadbeefdeadbeef')) | ||
// send a handshake | ||
channel.handshake() | ||
channel.on('handshake', function () { | ||
// request block 42 | ||
channel.request({block: 42}) | ||
channel.request({block: 42}) | ||
channel.on('data', function (message) { | ||
console.log(message) // contains message.block and message.value | ||
}) | ||
channel.on('response', function (message) { | ||
console.log(message) // contains message.block and message.data | ||
}) | ||
stream.pipe(anotherStream).pipe(stream) | ||
@@ -45,3 +38,3 @@ ``` | ||
If the remote peer joins a channel you haven't opened, hypercore will call an optional `onopen` | ||
method if you specify it with the public id for that channel. | ||
method if you specify it with the discovery key for that channel. | ||
@@ -54,7 +47,7 @@ ``` js | ||
// open with corresponding key to join | ||
var channel = p.channel(Buffer('deadbeefdeadbeefdeadbeefdeadbeef')) | ||
var channel = p.open(Buffer('deadbeefdeadbeefdeadbeefdeadbeef')) | ||
}) | ||
``` | ||
See below for more information about channels, keys, and public ids. | ||
See below for more information about channels, keys, and discovery keys. | ||
Other options include: | ||
@@ -72,5 +65,5 @@ | ||
#### `var channel = p.channel(key, [publicId])` | ||
#### `var channel = p.open(key, [options])` | ||
Open a stream channel. A channel uses the [sodium](https://github.com/mafintosh/sodium-prebuilt) module to encrypt all messages using the key you specify. The public id for the channel is send unencrypted together with a random 24 byte nonce. If you do not specify a public id, an HMAC of the string `hypercore` using the key as the password will be used. | ||
Open a stream channel. A channel uses the [sodium](https://github.com/mafintosh/sodium-prebuilt) module to encrypt all messages using the key you specify. The discovery key for the channel is send unencrypted together with a random 24 byte nonce. If you do not specify a discovery key in the options map, an HMAC of the string `hypercore` using the key as the password will be used. | ||
@@ -81,31 +74,15 @@ #### `p.on('handshake')` | ||
#### `var keys = p.keys()` | ||
Lists the keys of all the channels you have opened | ||
#### `p.setTimeout(ms, [ontimeout])` | ||
Will call the timeout function if the remote peer | ||
hasn't send any messages within `ms`. Will also send a heartbeat | ||
message to the other peer if you've been inactive for `ms / 2` | ||
Will call the timeout function if the remote peer hasn't send any messages within `ms`. Will also send a heartbeat message to the other peer if you've been inactive for `ms / 2` | ||
## Channel API | ||
#### `channel.handshake(message)` | ||
#### `channel.end()` | ||
This should be the first message you send. The `peerId` and `extensions` field | ||
will be automatically populated for you. See the protobuf schema or more information. | ||
Ends a channel | ||
#### `channel.on('handshake', message)` | ||
#### `channel.on('end')` | ||
Emitted when the other peer sends a handshake. You should wait for this event | ||
to be emitted before sending any messages. | ||
#### `channel.close()` | ||
Closes a channel | ||
#### `channel.on('close')` | ||
Emitted when a channel is closed, either by you or the remote peer. | ||
Emitted when a channel is ended, either by you or the remote peer. | ||
No other events will be emitted after this. | ||
@@ -121,9 +98,9 @@ | ||
#### `channel.response(message)` | ||
#### `channel.data(message)` | ||
Send a response message. See the protobuf schema or more information | ||
Send a data message. See the protobuf schema or more information | ||
#### `channel.on('response', message)` | ||
#### `channel.on('data', message)` | ||
Emitted when a response message is received | ||
Emitted when a data message is received | ||
@@ -146,2 +123,10 @@ #### `channel.cancel(message)` | ||
#### `channel.want(message)` | ||
Send a want message. See the protobuf schema or more information | ||
#### `channel.on('want', message)` | ||
Emitted when a want message is received | ||
#### `channel.resume()` | ||
@@ -168,8 +153,7 @@ | ||
#### `protocol = protocol.use(extension)` | ||
#### `protocol = protocol.use(extensionName)` | ||
Use an extension specified by the string name you pass in. Returns a new prototype | ||
Will create a new method on all your channel objects that has the same name as the extension | ||
and emit an event with the same name when an extension message is received | ||
Will create a new method on all your channel objects that has the same name as the extension and emit an event with the same name when an extension message is received | ||
@@ -180,3 +164,3 @@ ``` js | ||
var p = protocol() | ||
var channel = p.channel(someKey) | ||
var channel = p.open(someKey) | ||
@@ -192,4 +176,12 @@ channel.on('handshake', function () { | ||
#### `var bool = p.remoteSupports(extension)` | ||
Per default all messages are buffers. If you want to encode/decode your messages you can specify an [abstract-encoding](https://github.com/mafintosh/abstract-encoding) compliant encoder as well | ||
``` js | ||
protocol = protocol.use({ | ||
ping: someEncoder | ||
}) | ||
``` | ||
#### `var bool = p.remoteSupports(extensionName)` | ||
After the protocol instance emits `handshake` you can call this method to check | ||
@@ -196,0 +188,0 @@ if the remote peer also supports one of your extensions. |
299
test.js
@@ -5,197 +5,183 @@ var tape = require('tape') | ||
var key = Buffer('12345678123456781234567812345678') | ||
var otherKey = Buffer('01234567012345670123456701234567') | ||
var otherKey = Buffer('02345678123456781234567812345678') | ||
tape('join channel', function (t) { | ||
var p = protocol() | ||
var ch = p.channel(key) | ||
tape('open channel', function (t) { | ||
t.plan(3) | ||
t.same(ch.key.toString('hex'), key.toString('hex'), 'same key') | ||
t.end() | ||
}) | ||
var stream1 = protocol() | ||
var stream2 = protocol() | ||
tape('join two channels', function (t) { | ||
var p = protocol() | ||
var channel1 = stream1.open(key) | ||
var channel2 = stream2.open(key) | ||
var channel = p.channel(key) | ||
var otherChannel = p.channel(otherKey) | ||
channel1.request({ | ||
block: 10 | ||
}) | ||
t.same(channel.key, key, 'expected channel key') | ||
t.same(otherChannel.key, otherKey, 'expected channel key') | ||
t.same(p.keys(), [key, otherKey], 'joined both channels') | ||
t.end() | ||
}) | ||
channel1.once('data', function (message) { | ||
t.same(message.block, 10, 'same block') | ||
t.same(message.value, Buffer('hello world'), 'same value') | ||
}) | ||
tape('join and leave', function (t) { | ||
var p = protocol() | ||
channel2.once('request', function (message) { | ||
t.same(message.block, 10, 'same block') | ||
channel2.data({block: 10, value: Buffer('hello world')}) | ||
}) | ||
t.same(p.keys(), [], 'not in any channel') | ||
var ch = p.channel(key) | ||
t.same(p.keys(), [key], 'joined channel') | ||
ch.close() | ||
t.same(p.keys(), [], 'not in any channel') | ||
ch.close() | ||
t.same(p.keys(), [], 'not in any channel') | ||
t.end() | ||
stream1.pipe(stream2).pipe(stream1) | ||
}) | ||
tape('encrypts messages', function (t) { | ||
var p1 = protocol() | ||
var p2 = protocol() | ||
var buf = [] | ||
tape('async open', function (t) { | ||
t.plan(3) | ||
p1.on('data', function (data) { | ||
buf.push(data) | ||
var stream1 = protocol() | ||
var stream2 = protocol(function () { | ||
setTimeout(function () { | ||
var channel2 = stream2.open(key) | ||
channel2.once('request', function (message) { | ||
t.same(message.block, 10, 'same block') | ||
channel2.data({block: 10, value: Buffer('hello world')}) | ||
}) | ||
}, 100) | ||
}) | ||
p1.on('finish', function () { | ||
buf = Buffer.concat(buf) | ||
t.ok(buf.length > 32 + 20 + 25, 'sending some data') // ~ hmac + nonce + data | ||
t.same(buf.toString().indexOf('hello i should be encrypted'), -1, 'does not contain plaintext') | ||
t.end() | ||
var channel1 = stream1.open(key) | ||
channel1.request({ | ||
block: 10 | ||
}) | ||
var ch1 = p1.channel(key) | ||
var ch2 = p2.channel(key) | ||
ch1.handshake() | ||
ch2.handshake() | ||
ch1.on('handshake', function () { | ||
t.pass('received handshake') | ||
ch1.response({block: 0, data: Buffer('hello i should be encrypted.')}) | ||
p1.end() | ||
channel1.once('data', function (message) { | ||
t.same(message.block, 10, 'same block') | ||
t.same(message.value, Buffer('hello world'), 'same value') | ||
}) | ||
p1.pipe(p2).pipe(p1) | ||
stream1.pipe(stream2).pipe(stream1) | ||
}) | ||
tape('does not encrypt messages if encrypt === false', function (t) { | ||
var p1 = protocol({encrypt: false}) | ||
var p2 = protocol({encrypt: false}) | ||
var buf = [] | ||
tape('empty messages work', function (t) { | ||
t.plan(2) | ||
p1.on('data', function (data) { | ||
buf.push(data) | ||
}) | ||
var stream1 = protocol() | ||
var stream2 = protocol() | ||
p1.on('finish', function () { | ||
buf = Buffer.concat(buf) | ||
t.ok(buf.length > 32 + 20 + 25, 'sending some data') // ~ hmac + nonce + data | ||
t.ok(buf.toString().indexOf('hello i should not be encrypted') > -1, 'does contain plaintext') | ||
t.end() | ||
}) | ||
var channel1 = stream1.open(key) | ||
var channel2 = stream2.open(key) | ||
var ch1 = p1.channel(key) | ||
var ch2 = p2.channel(key) | ||
channel1.pause() | ||
ch1.handshake() | ||
ch2.handshake() | ||
channel1.once('resume', function (message) { | ||
t.pass('resumed') | ||
}) | ||
ch1.on('handshake', function () { | ||
t.pass('received handshake') | ||
ch1.response({block: 0, data: Buffer('hello i should not be encrypted.')}) | ||
p1.end() | ||
channel2.once('pause', function (message) { | ||
t.pass('paused') | ||
channel2.resume() | ||
}) | ||
p1.pipe(p2).pipe(p1) | ||
stream1.pipe(stream2).pipe(stream1) | ||
}) | ||
tape('one side encryption returns error', function (t) { | ||
var p1 = protocol({encrypt: false}) | ||
var p2 = protocol() | ||
tape('is encrypted', function (t) { | ||
var stream1 = protocol() | ||
var stream2 = protocol() | ||
var ch1 = p1.channel(key) | ||
var ch2 = p2.channel(key) | ||
var channel1 = stream1.open(key) | ||
var channel2 = stream2.open(key) | ||
ch1.handshake() | ||
ch2.handshake() | ||
ch1.on('handshake', function () { | ||
t.fail('received handshake') | ||
channel1.on('data', function (message) { | ||
t.same(message.block, 10, 'same block') | ||
t.same(message.value, Buffer('hello world'), 'same value') | ||
t.end() | ||
}) | ||
p1.on('error', function () { | ||
// there might be an error here as well | ||
stream2.on('data', function (data) { | ||
t.ok(data.toString().indexOf('hello world') === -1, 'is encrypted') | ||
}) | ||
p2.on('error', function (err) { | ||
t.ok(err, 'had error') | ||
t.end() | ||
}) | ||
stream1.pipe(stream2).pipe(stream1) | ||
p1.pipe(p2).pipe(p1) | ||
channel2.data({block: 10, value: Buffer('hello world')}) | ||
}) | ||
tape('remote joins', function (t) { | ||
var p1 = protocol() | ||
var p2 = protocol() | ||
var remoteJoined = 2 | ||
tape('can disable encryption', function (t) { | ||
var stream1 = protocol({private: false}) | ||
var stream2 = protocol({private: false}) | ||
var ch1 = p1.channel(key) | ||
var foundHello = false | ||
var channel1 = stream1.open(key) | ||
var channel2 = stream2.open(key) | ||
ch1.handshake() | ||
ch1.once('open', function () { | ||
t.pass('remote joined') | ||
remoteJoined-- | ||
channel1.on('data', function (message) { | ||
t.same(message.block, 10, 'same block') | ||
t.same(message.value, Buffer('hello world'), 'same value') | ||
t.ok(foundHello, 'sent in plain text') | ||
t.end() | ||
}) | ||
ch1.on('request', function (request) { | ||
t.same(request.block, 42, 'received request') | ||
ch1.response({block: 42, data: Buffer('some data')}) | ||
stream2.on('data', function (data) { | ||
if (!foundHello) foundHello = data.toString().indexOf('hello world') > -1 | ||
}) | ||
var ch2 = p2.channel(key) | ||
stream1.pipe(stream2).pipe(stream1) | ||
ch2.handshake() | ||
channel2.data({block: 10, value: Buffer('hello world')}) | ||
}) | ||
p1.on('handshake', function () { | ||
ch2.request({block: 42}) | ||
tape('end channel', function (t) { | ||
t.plan(3) | ||
var stream1 = protocol() | ||
var stream2 = protocol() | ||
var c1 = stream1.open(key) | ||
var c2 = stream2.open(key) | ||
c2.on('request', function () { | ||
t.pass('received request') | ||
}) | ||
ch2.on('response', function (response) { | ||
t.same(response.block, 42, 'received response') | ||
t.same(response.data, Buffer('some data'), 'expected data') | ||
t.same(remoteJoined, 0, 'both emitted open') | ||
t.end() | ||
c2.on('end', function () { | ||
t.pass('channel ended') | ||
}) | ||
ch2.once('open', function () { | ||
t.pass('remote joined') | ||
remoteJoined-- | ||
c1.on('end', function () { | ||
t.pass('channel ended') | ||
}) | ||
p1.pipe(p2).pipe(p1) | ||
c1.on('open', function () { | ||
c1.request({block: 10}) | ||
c1.end() | ||
}) | ||
stream1.pipe(stream2).pipe(stream1) | ||
}) | ||
tape('remote joins and closes', function (t) { | ||
var localClose = false | ||
var remoteClose = false | ||
tape('destroy ends all channels', function (t) { | ||
t.plan(3) | ||
var p1 = protocol() | ||
var p2 = protocol(function (publicId) { | ||
var channel = p2.channel(key) | ||
var stream1 = protocol() | ||
var stream2 = protocol() | ||
channel.handshake() | ||
channel.on('close', function () { | ||
remoteClose = true | ||
t.ok(localClose, 'local closed') | ||
t.ok(remoteClose, 'remote closed') | ||
t.end() | ||
}) | ||
var c1 = stream1.open(key) | ||
var other = stream1.open(otherKey) | ||
var c2 = stream2.open(key) | ||
other.on('end', function () { | ||
t.pass('channel ended') | ||
}) | ||
var channel = p1.channel(key) | ||
channel.on('close', function () { | ||
localClose = true | ||
c1.on('end', function () { | ||
t.pass('channel ended') | ||
}) | ||
channel.on('handshake', function () { | ||
channel.close() | ||
c2.on('end', function () { | ||
t.pass('channel ended') | ||
}) | ||
channel.handshake() | ||
stream1.pipe(stream2).pipe(stream1) | ||
p1.pipe(p2).pipe(p1) | ||
setTimeout(function () { | ||
stream1.finalize() | ||
}, 100) | ||
}) | ||
@@ -266,20 +252,2 @@ | ||
tape('leave channels on close', function (t) { | ||
t.plan(2) | ||
var p1 = protocol() | ||
var c1 = p1.channel(key) | ||
var c2 = p1.channel(otherKey) | ||
c1.once('close', function () { | ||
t.pass('first channel closed') | ||
}) | ||
c2.once('close', function () { | ||
t.pass('second channel closed') | ||
}) | ||
p1.destroy() | ||
}) | ||
tape('extension', function (t) { | ||
@@ -292,8 +260,5 @@ t.plan(4) | ||
var ch1 = p1.channel(key) | ||
var ch2 = p2.channel(key) | ||
var ch1 = p1.open(key) | ||
var ch2 = p2.open(key) | ||
ch1.handshake() | ||
ch2.handshake() | ||
p1.once('handshake', function () { | ||
@@ -326,13 +291,10 @@ t.ok(p1.remoteSupports('test'), 'protocol supported') | ||
var protocol1 = protocol.use(['test', 'bar']) | ||
var protocol2 = protocol.use(['foo', 'test']) | ||
var protocol1 = protocol.use({test: 1, bar: 1}) | ||
var protocol2 = protocol.use({foo: 1, test: 1}) | ||
var p1 = protocol1() | ||
var p2 = protocol2() | ||
var ch1 = p1.channel(key) | ||
var ch2 = p2.channel(key) | ||
var ch1 = p1.open(key) | ||
var ch2 = p2.open(key) | ||
ch1.handshake() | ||
ch2.handshake() | ||
p1.once('handshake', function () { | ||
@@ -369,12 +331,9 @@ t.ok(!p1.remoteSupports('foo'), 'protocol not supported') | ||
var protocol2 = protocol.use(['test', 'bar']) | ||
var protocol2 = protocol.use({test: 1, bar: 1}) | ||
var p1 = protocol() | ||
var p2 = protocol2() | ||
var ch1 = p1.channel(key) | ||
var ch2 = p2.channel(key) | ||
var ch1 = p1.open(key) | ||
var ch2 = p2.open(key) | ||
ch1.handshake() | ||
ch2.handshake() | ||
p1.once('handshake', function () { | ||
@@ -381,0 +340,0 @@ t.ok(!p1.remoteSupports('foo'), 'protocol not supported') |
Sorry, the diff of this file is not supported yet
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
29788
11
687
183
1
+ Addedpassthrough-encoding@^1.2.0
+ Addedxtend@^4.0.1
+ Addedpassthrough-encoding@1.2.0(transitive)
- Removedbrfs@^1.4.3
Updatedreadable-stream@^2.1.4
Updatedsodium-encryption@^1.1.0