New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

hypercore-protocol

Package Overview
Dependencies
Maintainers
1
Versions
76
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

hypercore-protocol - npm Package Compare versions

Comparing version 3.0.0 to 4.0.0

649

index.js

@@ -1,57 +0,70 @@

var crypto = require('crypto')
var events = require('events')
var inherits = require('inherits')
var duplexify = require('duplexify')
var inherits = require('inherits')
var varint = require('varint')
var events = require('events')
var messages = require('./messages')
var lpstream = require('length-prefixed-stream')
var stream = require('readable-stream')
var lpstream = require('length-prefixed-stream')
var crypto = require('crypto')
var encryption = require('sodium-encryption')
var increment = require('increment-buffer')
var equals = require('buffer-equals')
var varint = require('varint')
var xtend = require('xtend')
var pe = require('passthrough-encoding')
var messages = require('./messages')
var KEEP_ALIVE = Buffer([0])
var ENCODERS = [
var DEFAULT_TYPES = [
messages.Handshake,
null, // close
null, // pause
null, // resume
messages.Have,
messages.Want,
messages.Request,
messages.Response,
messages.Cancel
messages.Data,
messages.Cancel,
null, // pause
null // resume
]
module.exports = use([])
var DEFAULT_EVENTS = [
'handshake',
'have',
'want',
'request',
'data',
'cancel',
'pause',
'resume'
]
while (DEFAULT_EVENTS.length < 64) { // reserved
DEFAULT_EVENTS.push('unknown')
DEFAULT_TYPES.push(null)
}
module.exports = use({})
function use (extensions) {
if (extensions.length > 64) {
throw new Error('Only 64 extensions are supported')
}
var extensionNames = Object.keys(extensions).sort()
var types = DEFAULT_TYPES.slice(0)
var eventNames = DEFAULT_EVENTS.slice(0)
function Channel (stream, publicId) {
function Channel (protocol) {
events.EventEmitter.call(this)
this.feed = null
this.stream = stream
this.publicId = publicId
this.state = null // set by someone else
this.protocol = protocol
this.opened = false
this.closed = false
this.key = null
this.closed = false
this.discoveryKey = null
this.local = -1
this.remote = -1
this.buffer = []
this.remotePaused = true
this.amPaused = true
this._nonce = crypto.randomBytes(24)
this._remoteNonce = null
// populated by hypercore. defined here so v8 is happy
this.feed = null
this.remoteBitfield = null
this.remoteTree = null
this._firstMessage = null
this._encrypted = stream.encrypted
this._receivedHandshake = false
this._nonce = null
this._remoteNonce = null
this._local = -1
this._remote = -1
this._firstNonce = Buffer(24)
this._nonce.copy(this._firstNonce)
this.on('handshake', this._onhandshake)
}

@@ -61,229 +74,57 @@

Channel.prototype.handshake = function (handshake) {
if (!handshake) handshake = {}
if (!this.stream.remoteId) {
handshake.peerId = this.stream.id
handshake.extensions = extensions
}
this.amPaused = !!handshake.paused
this._send(0, handshake)
Channel.prototype.remoteSupports = function (name) {
return this.protocol.remoteSupports(name)
}
Channel.prototype.close = function (err) {
if (err) return this.stream.destroy(err) // fatal close
var keyHex = this.publicId.toString('hex')
if (this.stream._channels[keyHex] !== this) return
delete this.stream._channels[keyHex]
if (this._remote > -1) this.stream._remote[this._remote] = null
if (this._local > -1) this.stream._local[this._local] = null
this.closed = true
this.remotePaused = true
this.emit('close')
if (this.key && !this.stream.destroyed && this._receivedHandshake) this._send(1, null)
Channel.prototype.handshake = function (message) {
return this.protocol._send(this, 0, message)
}
Channel.prototype.pause = function () {
this._send(2, null)
Channel.prototype.have = function (message) {
return this.protocol._send(this, 1, message)
}
Channel.prototype.resume = function () {
this._send(3, null)
Channel.prototype.want = function (message) {
return this.protocol._send(this, 2, message)
}
Channel.prototype.have = function (have) {
this._send(4, have)
Channel.prototype.request = function (message) {
return this.protocol._send(this, 3, message)
}
Channel.prototype.want = function (want) {
this._send(5, want)
Channel.prototype.data = function (message) {
return this.protocol._send(this, 4, message)
}
Channel.prototype.request = function (request) {
this._send(6, request)
Channel.prototype.cancel = function (message) {
return this.protocol._send(this, 5, message)
}
Channel.prototype.response = function (response) {
this._send(7, response)
Channel.prototype.pause = function () {
return this.protocol._send(this, 6, null)
}
Channel.prototype.cancel = function (cancel) {
this._send(8, cancel)
Channel.prototype.resume = function () {
return this.protocol._send(this, 7, null)
}
Channel.prototype.remoteSupports = function (id) {
return this.stream.remoteSupports(id)
Channel.prototype.end = function () {
this.protocol._close(this)
}
Channel.prototype._open = function (key) {
if (this.key) return
this.key = key
this._local = this.stream._local.indexOf(null)
if (this._local === -1) this._local = this.stream._local.push(null) - 1
this.stream[this._local] = this
this._nonce = crypto.randomBytes(24)
var open = {publicId: this.publicId, nonce: this._nonce}
var len = varint.encodingLength(this._local) + messages.Open.encodingLength(open)
var buf = Buffer(varint.encodingLength(len) + len)
var offset = 0
varint.encode(len, buf, offset)
offset += varint.encode.bytes
varint.encode(this._local, buf, offset)
offset += varint.encode.bytes
messages.Open.encode(open, buf, offset)
offset += messages.Open.encode.bytes
this.stream._encode.push(buf)
this.stream._keepAlive = 0
if (this._firstMessage) {
this._parse(this._firstMessage, 0)
this._firstMessage = null
}
}
Channel.prototype._encrypt = function (buf) {
buf = encryption.encrypt(buf, this._nonce, this.key)
if (this._receivedHandshake) increment(this._nonce)
return buf
}
Channel.prototype._decrypt = function (buf) {
buf = encryption.decrypt(buf, this._remoteNonce, this.key)
if (!buf) return null
if (this._receivedHandshake) increment(this._remoteNonce)
return buf
}
Channel.prototype._parse = function (buf, offset) {
if (!this.key) {
if (this._firstMessage) {
return this.stream.destroy(new Error('Remote send a message before waiting for handshake'))
}
this._firstMessage = buf.slice(offset)
return
}
if (this._encrypted) {
buf = this._decrypt(buf.slice(offset))
offset = 0
if (!buf) return this.stream.destroy(new Error('Could not decrypt message'))
}
var type = buf[offset++]
if (type > 127) return this.stream.destroy(new Error('Only single byte bytes currently supported'))
if (!this._receivedHandshake && type !== 0) {
return this.stream.destroy(new Error('First message received should be a handshake'))
}
if (type >= 64) return this._onextension(type - 64, buf.slice(offset))
var enc = ENCODERS[type]
try {
var message = enc && enc.decode(buf, offset)
} catch (err) {
this.stream.destroy(err)
return
}
switch (type) {
case 0: return this._onhandshake(message)
case 1: return this.close()
case 2: return this._onpause()
case 3: return this._onresume()
case 4: return this.emit('have', message)
case 5: return this.emit('want', message)
case 6: return this.emit('request', message)
case 7: return this.emit('response', message)
case 8: return this.emit('cancel', message)
}
}
Channel.prototype._onextension = function (type, buf) {
var ext = type < this.stream._remoteExtensions.length ? this.stream._remoteExtensions[type] : -1
if (ext > -1) this.emit(extensions[ext], buf)
}
Channel.prototype._onpause = function () {
if (this.remotePaused) return
this.remotePaused = true
this.emit('pause')
}
Channel.prototype._onresume = function () {
if (!this.remotePaused) return
this.remotePaused = false
this.emit('resume')
}
Channel.prototype._onhandshake = function (handshake) {
if (!this.stream.remoteId) this.stream._onhandshake(handshake)
var nonce = this._nonce
var remoteNonce = this._remoteNonce
this._receivedHandshake = true
this._nonce = createNonce(nonce, remoteNonce)
this._remoteNonce = createNonce(remoteNonce, nonce)
if (equals(this._nonce, this._remoteNonce)) return this.stream.destroy(new Error('Remote nonce is invalid'))
this.remotePaused = handshake.paused
this.emit('handshake', handshake)
this.protocol._onhandshake(handshake)
}
Channel.prototype._send = function (type, message) {
if (this.closed && type !== 1) return
if (!this._receivedHandshake && type !== 0) throw new Error('Wait for remote handshake')
extensionNames.forEach(function (name, type) {
if (Channel.prototype[name] || name === 'error') throw new Error('Invalid extension name')
var enc = type < ENCODERS.length ? ENCODERS[type] : null
var tmp = Buffer(1 + (enc ? enc.encodingLength(message) : (message ? message.length : 0)))
var enc = isEncoder(extensions[name]) ? extensions[name] : pe
tmp[0] = type
if (enc) enc.encode(message, tmp, 1)
else if (message) message.copy(tmp, 1)
types.push(enc)
eventNames.push(name)
if (this._encrypted) tmp = this._encrypt(tmp)
var len = varint.encodingLength(this._local) + tmp.length
var buf = Buffer(varint.encodingLength(len) + len)
var offset = 0
varint.encode(len, buf, offset)
offset += varint.encode.bytes
varint.encode(this._local, buf, offset)
offset += varint.encode.bytes
tmp.copy(buf, offset)
this.stream._encode.push(buf)
this.stream._keepAlive = 0
}
Channel.prototype._onopen = function (message, remote) {
this._remote = remote
this.stream._remote[remote] = this
this._remoteNonce = message.nonce
this.emit('open')
}
extensions.forEach(function (name, id) {
if (name === 'error' || !/^[a-z][_a-z0-9]+$/i.test(name) || Channel.prototype[name]) {
throw new Error('Invalid extension name: ' + name)
Channel.prototype[name] = function (message) {
return this.protocol._send(this, DEFAULT_TYPES.length + type, message)
}
Channel.prototype[name] = function (buf) {
this._send(id + 64, buf)
}
})

@@ -293,25 +134,22 @@

if (!(this instanceof Protocol)) return new Protocol(opts, onopen)
if (!opts) opts = {}
if (typeof opts === 'function') {
onopen = opts
opts = {}
opts = null
}
if (!opts) opts = {}
var self = this
duplexify.call(this)
var self = this
this.channels = {}
this.private = opts.private !== false
this.id = opts.id || crypto.randomBytes(32)
this.remoteId = null
this.encrypted = opts.encrypt !== false
this._channels = {}
this._finalized = false
this._paused = 0
this._local = []
this._remote = []
this._onopen = onopen || opts.open || noop
this._keepAlive = 0
this._remoteKeepAlive = 0
this._interval = null
this._remoteExtensions = new Array(extensions.length)

@@ -323,14 +161,33 @@ this._localExtensions = new Array(extensions.length)

this._encode = new stream.Readable()
this._keepAlive = 0
this._remoteKeepAlive = 0
this._interval = null
this._encode = stream.Readable()
this._encode._read = noop
this.setReadable(this._encode)
this._decode = lpstream.decode({allowEmpty: true, limit: 5 * 1024 * 1024})
this._decode.on('data', parse)
this.setReadable(this._encode)
this.setWritable(this._decode)
this.on('end', this.destroy)
this.on('finish', this.destroy)
this.on('close', this._close)
this.on('close', onfinalize)
this.on('end', onfinalize)
this.on('finish', this.finalize)
if (onopen) this.on('open', onopen)
function onfinalize () {
if (self._finalized) return
self._finalized = true
clearInterval(this._interval)
var keys = Object.keys(self.channels)
for (var i = 0; i < keys.length; i++) {
self._close(self.channels[keys[i]])
}
}
function parse (data) {

@@ -343,14 +200,15 @@ self._parse(data)

Protocol.extensions = extensions
Protocol.use = function (ext) {
return use(extensions.concat(ext).sort().filter(noDups))
if (typeof ext === 'string') ext = toObject(ext)
return use(xtend(ext, extensions))
}
Protocol.prototype.remoteSupports = function (id) {
var i = typeof id === 'number' ? id : extensions.indexOf(id)
return this._localExtensions[i] > -1
Protocol.prototype.finalize = function () {
this._encode.push(null)
}
Protocol.prototype.setTimeout = function (ms, ontimeout) {
if (this._finalized) return
if (ontimeout) this.once('timeout', ontimeout)
var self = this

@@ -363,3 +221,3 @@

this._interval = setInterval(kick, (ms / 4) | 0)
if (this._interval) this._interval.unref()
if (this._interval.unref) this._interval.unref()

@@ -383,30 +241,97 @@ function kick () {

Protocol.prototype.channel = function (key, publicId) {
if (!publicId) publicId = crypto.createHmac('sha256', key).update('hypercore').digest()
Protocol.prototype.remoteSupports = function (name) {
var i = typeof name === 'string' ? extensionNames.indexOf(name) : name
return i > -1 && this._localExtensions[i] > -1
}
var keyHex = publicId.toString('hex')
var channel = this._channels[keyHex]
Protocol.prototype.open = function (key, opts) {
if (this._finalized) return null // already finalized
if (!opts) opts = {}
if (!channel) channel = this._channels[keyHex] = new Channel(this, publicId)
if (channel.key) return channel
channel._open(key)
return channel
}
var d = opts.discoveryKey || discoveryKey(key)
var keyHex = d.toString('hex')
var ch = this.channels[keyHex]
Protocol.prototype.keys = function () {
var keys = Object.keys(this._channels)
var list = []
for (var i = 0; i < keys.length; i++) {
var key = this._channels[keys[i]].key
if (key) list.push(key)
if (!ch) {
ch = new Channel(this)
ch.discoveryKey = d
this.channels[keyHex] = ch
}
return list
if (ch.local > -1) return ch
ch.key = key
ch.local = this._local.indexOf(null)
if (ch.local === -1) ch.local = this._local.push(null) - 1
this._local[ch.local] = ch
var open = messages.Open.encode({
feed: ch.discoveryKey,
nonce: ch._nonce
})
this._sendRaw(ch, open)
if (!this.remoteId && opts.handshake !== false) {
ch.handshake({
id: this.id,
extensions: extensionNames
})
}
if (ch.buffer.length) this._parseSoon(ch)
return ch
}
Protocol.prototype._close = function () {
clearInterval(this._interval)
var keys = Object.keys(this._channels)
for (var i = 0; i < keys.length; i++) {
var ch = this._channels[keys[i]]
if (ch.key) ch.close()
Protocol.prototype._send = function (channel, type, message) {
if (channel.closed) return false
var enc = types[type]
var len = enc ? enc.encodingLength(message) : 0
var buf = Buffer(len + 1)
buf[0] = type
if (enc) enc.encode(message, buf, 1)
if (this.private) buf = this._encrypt(channel, buf)
return this._sendRaw(channel, buf)
}
Protocol.prototype._sendRaw = function (channel, buf) {
this._keepAlive = 0
var len = buf.length + varint.encodingLength(channel.local)
var box = Buffer(varint.encodingLength(len) + len)
var offset = 0
varint.encode(len, box, offset)
offset += varint.encode.bytes
varint.encode(channel.local, box, offset)
offset += varint.encode.bytes
buf.copy(box, offset)
return this._encode.push(box)
}
Protocol.prototype._pause = function () {
if (!this._paused++) this._decode.pause()
}
Protocol.prototype._resume = function () {
if (!--this._paused) this._decode.resume()
}
Protocol.prototype._parseSoon = function (channel) {
var self = this
this._pause()
process.nextTick(drain)
function drain () {
if (self._finalized || channel.closed) return
while (channel.buffer.length) self._parse(channel.buffer.shift())
if (!self._finalized) self._resume()
}

@@ -417,28 +342,101 @@ }

this._remoteKeepAlive = 0
if (!data.length) return
var remote = varint.decode(data)
if (!data.length || this._finalized) return
var remote = varint.decode(data, 0)
var offset = varint.decode.bytes
if (remote === this._remote.length) this._remote.push(null)
if (remote > this._remote.length) {
return this.destroy(new Error('Received invalid channel'))
if (remote >= this._remote.length) this._remote.push(null)
if (remote > this._remote.length) return this.destroy(new Error('Unexpected channel number'))
if (!this._remote[remote]) this._onopen(remote, data, offset)
else if (offset !== data.length) this._onmessage(remote, data, offset)
else this._onclose(remote)
}
Protocol.prototype._parseType = function (type) {
if (type > 127) return -1
if (type < 64) return type
if (type - 64 >= this._remoteExtensions.length) return -1
type = this._remoteExtensions[type - 64]
if (type === -1) return -1
return type + 64
}
Protocol.prototype._onopen = function (remote, data, offset) {
try {
var open = messages.Open.decode(data, offset)
} catch (err) {
return this.destroy(err)
}
if (open.feed.length !== 32 || open.nonce.length !== 24) return this.destroy(new Error('Invalid open message'))
var keyHex = open.feed.toString('hex')
var ch = this.channels[keyHex]
if (!ch) {
ch = new Channel(this)
ch.discoveryKey = open.feed
this.channels[keyHex] = ch
}
if (ch.remote > -1) return this.destroy(new Error('Double open for same channel'))
ch.remote = remote
ch._remoteNonce = open.nonce
this._remote[remote] = ch
this._open(ch)
if (!this._finalized && ch.local === -1) this.emit('open', ch.discoveryKey)
}
Protocol.prototype._onmessage = function (remote, data, offset) {
var channel = this._remote[remote]
if (channel) {
return channel._parse(data, offset)
if (!channel.key) {
if (channel.buffer.length === 16) return this.destroy(new Error('Buffer overflow'))
channel.buffer.push(data)
return
}
if (this._remote.indexOf(null) !== remote) {
return this.destroy(new Error('Received invalid channel'))
var box = this._decrypt(channel, data.slice(offset))
if (!box || !box.length) return this.destroy(new Error('Invalid message'))
var type = this._parseType(box[0])
if (type < 0) return
if (type && !this.remoteId) return this.destroy(new Error('Did not receive handshake'))
if (type >= types.length) return
var enc = types[type]
try {
var message = enc ? enc.decode(box, 1) : null
} catch (err) {
return this.destroy(err)
}
this._open(remote, data, offset)
channel.emit(eventNames[type], message)
}
Protocol.prototype._onclose = function (remote) {
var channel = this._remote[remote]
this._remote[remote] = null
channel.remote = -1
if (channel.local > -1) this._close(channel)
var keyHex = channel.discoveryKey.toString('hex')
if (this.channels[keyHex] === channel) delete this.channels[keyHex]
}
Protocol.prototype._onhandshake = function (handshake) {
// called with the first handshake
this.remoteId = handshake.peerId || crypto.randomBytes(32)
if (this.remoteId) return // already handshaked
this.remoteId = handshake.id
var exts = handshake.extensions
// extensions *must* be sorted

@@ -448,4 +446,4 @@ var local = 0

while (local < extensions.length && remote < handshake.extensions.length && remote < 64) {
if (extensions[local] === handshake.extensions[remote]) {
while (local < extensionNames.length && remote < exts.length && remote < 64) {
if (extensionNames[local] === exts[remote]) {
this._localExtensions[local] = remote

@@ -455,3 +453,3 @@ this._remoteExtensions[remote] = local

remote++
} else if (extensions[local] < handshake.extensions[remote]) {
} else if (extensionNames[local] < exts[remote]) {
local++

@@ -466,35 +464,52 @@ } else {

Protocol.prototype._open = function (remote, data, offset) {
try {
var open = messages.Open.decode(data, offset)
} catch (err) {
return
}
Protocol.prototype._open = function (channel) {
if (channel.local === -1 || channel.remote === -1) return
if (equals(channel._remoteNonce, channel._firstNonce)) return this.destroy(new Error('Remote echoed nonce'))
channel.opened = true
channel.emit('open')
}
if (open.publicId.length !== 32 || open.nonce.length !== 24) return
Protocol.prototype._close = function (channel) {
if (channel.closed) return
channel.closed = true
var keyHex = open.publicId.toString('hex')
var channel = this._channels[keyHex]
if (!this._finalized) this._sendRaw(channel, Buffer(0))
if (channel) {
channel._onopen(open, remote)
return
}
this._local[channel.local] = null
channel.local = -1
channel.emit('end')
}
channel = this._channels[keyHex] = new Channel(this, open.publicId)
channel._onopen(open, remote)
this._onopen(open.publicId)
Protocol.prototype._encrypt = function (channel, buf) {
if (!this.private) return buf
var box = encryption.encrypt(buf, channel._nonce, channel.key)
increment(channel._nonce)
return box
}
return Protocol
}
Protocol.prototype._decrypt = function (channel, box) {
if (!this.private) return box
var buf = box.length < 16 ? null : encryption.decrypt(box, channel._remoteNonce, channel.key)
if (!buf) return null
increment(channel._remoteNonce)
return buf
}
function noop () {}
function discoveryKey (key) {
return crypto.createHmac('sha256', key).update('hypercore').digest()
}
function createNonce (a, b) {
return crypto.createHash('sha256').update(a).update(b).digest().slice(0, 24)
}
function isEncoder (val) {
return val && typeof val.encode === 'function'
}
function noDups (name, i, extensions) {
return extensions.indexOf(name) === i
function toObject (name) {
var tmp = {}
tmp[name] = true
return tmp
}
function noop () {}
return Protocol
}
{
"name": "hypercore-protocol",
"version": "3.0.0",
"version": "4.0.0",
"description": "Stream that implements the hypercore protocol",
"main": "index.js",
"dependencies": {
"brfs": "^1.4.3",
"buffer-equals": "^1.0.3",

@@ -13,10 +12,12 @@ "duplexify": "^3.4.3",

"length-prefixed-stream": "^1.5.0",
"passthrough-encoding": "^1.2.0",
"protocol-buffers": "^3.1.6",
"readable-stream": "^2.0.5",
"sodium-encryption": "^1.0.1",
"varint": "^4.0.0"
"readable-stream": "^2.1.4",
"sodium-encryption": "^1.1.0",
"varint": "^4.0.0",
"xtend": "^4.0.1"
},
"devDependencies": {
"standard": "^6.0.7",
"tape": "^4.5.0"
"standard": "^7.1.0",
"tape": "^4.5.1"
},

@@ -23,0 +24,0 @@ "scripts": {

@@ -19,16 +19,9 @@ # hypercore-protocol

// open a channel specified by a 32 byte key
var channel = p.channel(Buffer('deadbeefdeadbeefdeadbeefdeadbeef'))
var channel = p.open(Buffer('deadbeefdeadbeefdeadbeefdeadbeef'))
// send a handshake
channel.handshake()
channel.on('handshake', function () {
// request block 42
channel.request({block: 42})
channel.request({block: 42})
channel.on('data', function (message) {
console.log(message) // contains message.block and message.value
})
channel.on('response', function (message) {
console.log(message) // contains message.block and message.data
})
stream.pipe(anotherStream).pipe(stream)

@@ -45,3 +38,3 @@ ```

If the remote peer joins a channel you haven't opened, hypercore will call an optional `onopen`
method if you specify it with the public id for that channel.
method if you specify it with the discovery key for that channel.

@@ -54,7 +47,7 @@ ``` js

// open with corresponding key to join
var channel = p.channel(Buffer('deadbeefdeadbeefdeadbeefdeadbeef'))
var channel = p.open(Buffer('deadbeefdeadbeefdeadbeefdeadbeef'))
})
```
See below for more information about channels, keys, and public ids.
See below for more information about channels, keys, and discovery keys.
Other options include:

@@ -72,5 +65,5 @@

#### `var channel = p.channel(key, [publicId])`
#### `var channel = p.open(key, [options])`
Open a stream channel. A channel uses the [sodium](https://github.com/mafintosh/sodium-prebuilt) module to encrypt all messages using the key you specify. The public id for the channel is send unencrypted together with a random 24 byte nonce. If you do not specify a public id, an HMAC of the string `hypercore` using the key as the password will be used.
Open a stream channel. A channel uses the [sodium](https://github.com/mafintosh/sodium-prebuilt) module to encrypt all messages using the key you specify. The discovery key for the channel is send unencrypted together with a random 24 byte nonce. If you do not specify a discovery key in the options map, an HMAC of the string `hypercore` using the key as the password will be used.

@@ -81,31 +74,15 @@ #### `p.on('handshake')`

#### `var keys = p.keys()`
Lists the keys of all the channels you have opened
#### `p.setTimeout(ms, [ontimeout])`
Will call the timeout function if the remote peer
hasn't send any messages within `ms`. Will also send a heartbeat
message to the other peer if you've been inactive for `ms / 2`
Will call the timeout function if the remote peer hasn't send any messages within `ms`. Will also send a heartbeat message to the other peer if you've been inactive for `ms / 2`
## Channel API
#### `channel.handshake(message)`
#### `channel.end()`
This should be the first message you send. The `peerId` and `extensions` field
will be automatically populated for you. See the protobuf schema or more information.
Ends a channel
#### `channel.on('handshake', message)`
#### `channel.on('end')`
Emitted when the other peer sends a handshake. You should wait for this event
to be emitted before sending any messages.
#### `channel.close()`
Closes a channel
#### `channel.on('close')`
Emitted when a channel is closed, either by you or the remote peer.
Emitted when a channel is ended, either by you or the remote peer.
No other events will be emitted after this.

@@ -121,9 +98,9 @@

#### `channel.response(message)`
#### `channel.data(message)`
Send a response message. See the protobuf schema or more information
Send a data message. See the protobuf schema or more information
#### `channel.on('response', message)`
#### `channel.on('data', message)`
Emitted when a response message is received
Emitted when a data message is received

@@ -146,2 +123,10 @@ #### `channel.cancel(message)`

#### `channel.want(message)`
Send a want message. See the protobuf schema or more information
#### `channel.on('want', message)`
Emitted when a want message is received
#### `channel.resume()`

@@ -168,8 +153,7 @@

#### `protocol = protocol.use(extension)`
#### `protocol = protocol.use(extensionName)`
Use an extension specified by the string name you pass in. Returns a new prototype
Will create a new method on all your channel objects that has the same name as the extension
and emit an event with the same name when an extension message is received
Will create a new method on all your channel objects that has the same name as the extension and emit an event with the same name when an extension message is received

@@ -180,3 +164,3 @@ ``` js

var p = protocol()
var channel = p.channel(someKey)
var channel = p.open(someKey)

@@ -192,4 +176,12 @@ channel.on('handshake', function () {

#### `var bool = p.remoteSupports(extension)`
Per default all messages are buffers. If you want to encode/decode your messages you can specify an [abstract-encoding](https://github.com/mafintosh/abstract-encoding) compliant encoder as well
``` js
protocol = protocol.use({
ping: someEncoder
})
```
#### `var bool = p.remoteSupports(extensionName)`
After the protocol instance emits `handshake` you can call this method to check

@@ -196,0 +188,0 @@ if the remote peer also supports one of your extensions.

@@ -5,197 +5,183 @@ var tape = require('tape')

var key = Buffer('12345678123456781234567812345678')
var otherKey = Buffer('01234567012345670123456701234567')
var otherKey = Buffer('02345678123456781234567812345678')
tape('join channel', function (t) {
var p = protocol()
var ch = p.channel(key)
tape('open channel', function (t) {
t.plan(3)
t.same(ch.key.toString('hex'), key.toString('hex'), 'same key')
t.end()
})
var stream1 = protocol()
var stream2 = protocol()
tape('join two channels', function (t) {
var p = protocol()
var channel1 = stream1.open(key)
var channel2 = stream2.open(key)
var channel = p.channel(key)
var otherChannel = p.channel(otherKey)
channel1.request({
block: 10
})
t.same(channel.key, key, 'expected channel key')
t.same(otherChannel.key, otherKey, 'expected channel key')
t.same(p.keys(), [key, otherKey], 'joined both channels')
t.end()
})
channel1.once('data', function (message) {
t.same(message.block, 10, 'same block')
t.same(message.value, Buffer('hello world'), 'same value')
})
tape('join and leave', function (t) {
var p = protocol()
channel2.once('request', function (message) {
t.same(message.block, 10, 'same block')
channel2.data({block: 10, value: Buffer('hello world')})
})
t.same(p.keys(), [], 'not in any channel')
var ch = p.channel(key)
t.same(p.keys(), [key], 'joined channel')
ch.close()
t.same(p.keys(), [], 'not in any channel')
ch.close()
t.same(p.keys(), [], 'not in any channel')
t.end()
stream1.pipe(stream2).pipe(stream1)
})
tape('encrypts messages', function (t) {
var p1 = protocol()
var p2 = protocol()
var buf = []
tape('async open', function (t) {
t.plan(3)
p1.on('data', function (data) {
buf.push(data)
var stream1 = protocol()
var stream2 = protocol(function () {
setTimeout(function () {
var channel2 = stream2.open(key)
channel2.once('request', function (message) {
t.same(message.block, 10, 'same block')
channel2.data({block: 10, value: Buffer('hello world')})
})
}, 100)
})
p1.on('finish', function () {
buf = Buffer.concat(buf)
t.ok(buf.length > 32 + 20 + 25, 'sending some data') // ~ hmac + nonce + data
t.same(buf.toString().indexOf('hello i should be encrypted'), -1, 'does not contain plaintext')
t.end()
var channel1 = stream1.open(key)
channel1.request({
block: 10
})
var ch1 = p1.channel(key)
var ch2 = p2.channel(key)
ch1.handshake()
ch2.handshake()
ch1.on('handshake', function () {
t.pass('received handshake')
ch1.response({block: 0, data: Buffer('hello i should be encrypted.')})
p1.end()
channel1.once('data', function (message) {
t.same(message.block, 10, 'same block')
t.same(message.value, Buffer('hello world'), 'same value')
})
p1.pipe(p2).pipe(p1)
stream1.pipe(stream2).pipe(stream1)
})
tape('does not encrypt messages if encrypt === false', function (t) {
var p1 = protocol({encrypt: false})
var p2 = protocol({encrypt: false})
var buf = []
tape('empty messages work', function (t) {
t.plan(2)
p1.on('data', function (data) {
buf.push(data)
})
var stream1 = protocol()
var stream2 = protocol()
p1.on('finish', function () {
buf = Buffer.concat(buf)
t.ok(buf.length > 32 + 20 + 25, 'sending some data') // ~ hmac + nonce + data
t.ok(buf.toString().indexOf('hello i should not be encrypted') > -1, 'does contain plaintext')
t.end()
})
var channel1 = stream1.open(key)
var channel2 = stream2.open(key)
var ch1 = p1.channel(key)
var ch2 = p2.channel(key)
channel1.pause()
ch1.handshake()
ch2.handshake()
channel1.once('resume', function (message) {
t.pass('resumed')
})
ch1.on('handshake', function () {
t.pass('received handshake')
ch1.response({block: 0, data: Buffer('hello i should not be encrypted.')})
p1.end()
channel2.once('pause', function (message) {
t.pass('paused')
channel2.resume()
})
p1.pipe(p2).pipe(p1)
stream1.pipe(stream2).pipe(stream1)
})
tape('one side encryption returns error', function (t) {
var p1 = protocol({encrypt: false})
var p2 = protocol()
tape('is encrypted', function (t) {
var stream1 = protocol()
var stream2 = protocol()
var ch1 = p1.channel(key)
var ch2 = p2.channel(key)
var channel1 = stream1.open(key)
var channel2 = stream2.open(key)
ch1.handshake()
ch2.handshake()
ch1.on('handshake', function () {
t.fail('received handshake')
channel1.on('data', function (message) {
t.same(message.block, 10, 'same block')
t.same(message.value, Buffer('hello world'), 'same value')
t.end()
})
p1.on('error', function () {
// there might be an error here as well
stream2.on('data', function (data) {
t.ok(data.toString().indexOf('hello world') === -1, 'is encrypted')
})
p2.on('error', function (err) {
t.ok(err, 'had error')
t.end()
})
stream1.pipe(stream2).pipe(stream1)
p1.pipe(p2).pipe(p1)
channel2.data({block: 10, value: Buffer('hello world')})
})
tape('remote joins', function (t) {
var p1 = protocol()
var p2 = protocol()
var remoteJoined = 2
tape('can disable encryption', function (t) {
var stream1 = protocol({private: false})
var stream2 = protocol({private: false})
var ch1 = p1.channel(key)
var foundHello = false
var channel1 = stream1.open(key)
var channel2 = stream2.open(key)
ch1.handshake()
ch1.once('open', function () {
t.pass('remote joined')
remoteJoined--
channel1.on('data', function (message) {
t.same(message.block, 10, 'same block')
t.same(message.value, Buffer('hello world'), 'same value')
t.ok(foundHello, 'sent in plain text')
t.end()
})
ch1.on('request', function (request) {
t.same(request.block, 42, 'received request')
ch1.response({block: 42, data: Buffer('some data')})
stream2.on('data', function (data) {
if (!foundHello) foundHello = data.toString().indexOf('hello world') > -1
})
var ch2 = p2.channel(key)
stream1.pipe(stream2).pipe(stream1)
ch2.handshake()
channel2.data({block: 10, value: Buffer('hello world')})
})
p1.on('handshake', function () {
ch2.request({block: 42})
tape('end channel', function (t) {
t.plan(3)
var stream1 = protocol()
var stream2 = protocol()
var c1 = stream1.open(key)
var c2 = stream2.open(key)
c2.on('request', function () {
t.pass('received request')
})
ch2.on('response', function (response) {
t.same(response.block, 42, 'received response')
t.same(response.data, Buffer('some data'), 'expected data')
t.same(remoteJoined, 0, 'both emitted open')
t.end()
c2.on('end', function () {
t.pass('channel ended')
})
ch2.once('open', function () {
t.pass('remote joined')
remoteJoined--
c1.on('end', function () {
t.pass('channel ended')
})
p1.pipe(p2).pipe(p1)
c1.on('open', function () {
c1.request({block: 10})
c1.end()
})
stream1.pipe(stream2).pipe(stream1)
})
tape('remote joins and closes', function (t) {
var localClose = false
var remoteClose = false
tape('destroy ends all channels', function (t) {
t.plan(3)
var p1 = protocol()
var p2 = protocol(function (publicId) {
var channel = p2.channel(key)
var stream1 = protocol()
var stream2 = protocol()
channel.handshake()
channel.on('close', function () {
remoteClose = true
t.ok(localClose, 'local closed')
t.ok(remoteClose, 'remote closed')
t.end()
})
var c1 = stream1.open(key)
var other = stream1.open(otherKey)
var c2 = stream2.open(key)
other.on('end', function () {
t.pass('channel ended')
})
var channel = p1.channel(key)
channel.on('close', function () {
localClose = true
c1.on('end', function () {
t.pass('channel ended')
})
channel.on('handshake', function () {
channel.close()
c2.on('end', function () {
t.pass('channel ended')
})
channel.handshake()
stream1.pipe(stream2).pipe(stream1)
p1.pipe(p2).pipe(p1)
setTimeout(function () {
stream1.finalize()
}, 100)
})

@@ -266,20 +252,2 @@

tape('leave channels on close', function (t) {
t.plan(2)
var p1 = protocol()
var c1 = p1.channel(key)
var c2 = p1.channel(otherKey)
c1.once('close', function () {
t.pass('first channel closed')
})
c2.once('close', function () {
t.pass('second channel closed')
})
p1.destroy()
})
tape('extension', function (t) {

@@ -292,8 +260,5 @@ t.plan(4)

var ch1 = p1.channel(key)
var ch2 = p2.channel(key)
var ch1 = p1.open(key)
var ch2 = p2.open(key)
ch1.handshake()
ch2.handshake()
p1.once('handshake', function () {

@@ -326,13 +291,10 @@ t.ok(p1.remoteSupports('test'), 'protocol supported')

var protocol1 = protocol.use(['test', 'bar'])
var protocol2 = protocol.use(['foo', 'test'])
var protocol1 = protocol.use({test: 1, bar: 1})
var protocol2 = protocol.use({foo: 1, test: 1})
var p1 = protocol1()
var p2 = protocol2()
var ch1 = p1.channel(key)
var ch2 = p2.channel(key)
var ch1 = p1.open(key)
var ch2 = p2.open(key)
ch1.handshake()
ch2.handshake()
p1.once('handshake', function () {

@@ -369,12 +331,9 @@ t.ok(!p1.remoteSupports('foo'), 'protocol not supported')

var protocol2 = protocol.use(['test', 'bar'])
var protocol2 = protocol.use({test: 1, bar: 1})
var p1 = protocol()
var p2 = protocol2()
var ch1 = p1.channel(key)
var ch2 = p2.channel(key)
var ch1 = p1.open(key)
var ch2 = p2.open(key)
ch1.handshake()
ch2.handshake()
p1.once('handshake', function () {

@@ -381,0 +340,0 @@ t.ok(!p1.remoteSupports('foo'), 'protocol not supported')

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc