New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

protomux

Package Overview
Dependencies
Maintainers
1
Versions
27
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

protomux - npm Package Compare versions

Comparing version 2.0.0 to 3.0.0

736

index.js

@@ -0,110 +1,232 @@

const b4a = require('b4a')
const c = require('compact-encoding')
const b4a = require('b4a')
const queueTick = require('queue-tick')
const safetyCatch = require('safety-catch')
const { addProtocol } = require('./messages')
const EMPTY = []
const MAX_BUFFERED = 32768
const MAX_BACKLOG = 256
class Protocol {
constructor (mux) {
this.mux = mux
class Session {
constructor (mux, info, context, protocol, id, messages, onopen, onclose, ondestroy) {
this.context = context
this.protocol = protocol
this.id = id
this.messages = []
this.remoteMessages = this.messages
this.name = null
this.version = null
this.messages = EMPTY
this.context = null
this.offset = 0
this.length = 0
this.opened = false
this.corked = false
this.closed = false
this.destroyed = false
this.remoteVersion = null
this.remoteOffset = 0
this.remoteEnd = 0
this.remoteOpened = false
this.remoteClosed = false
this.onopen = onopen
this.onclose = onclose
this.ondestroy = ondestroy
this.onremoteopen = noop
this.onremoteclose = noop
this._mux = mux
this._info = info
this._localId = 0
this._remoteId = 0
this._active = 0
this._extensions = null
this._decBound = this._dec.bind(this)
this._decAndDestroyBound = this._decAndDestroy.bind(this)
for (const m of messages) this.addMessage(m)
}
_attach ({ name, version = { major: 0, minor: 0 }, messages = 0, context = null, onremoteopen = noop, onremoteclose = noop }) {
const opened = this.opened
_open () {
const id = this._mux._free.length > 0
? this._mux._free.pop()
: this._mux._local.push(null) - 1
this.name = name
this.version = version
this.messages = new Array(messages)
this.context = context
this.offset = this.mux.offset
this.length = messages
this._info.opened++
this._localId = id + 1
this._mux._local[id] = this
const state = { buffer: null, start: 2, end: 2 }
c.string.preencode(state, this.protocol)
c.buffer.preencode(state, this.id)
c.uint.preencode(state, this._localId)
state.buffer = this._mux._alloc(state.end)
state.buffer[0] = 0
state.buffer[1] = 1
c.string.encode(state, this.protocol)
c.buffer.encode(state, this.id)
c.uint.encode(state, this._localId)
this._mux._write0(state.buffer)
}
_dec () {
if (--this._active === 0 && this.closed === true) this._destroy()
}
_decAndDestroy (err) {
this._dec()
this._mux._safeDestroy(err)
}
_fullyOpenSoon () {
this._mux._remote[this._remoteId - 1].session = this
queueTick(this._fullyOpen.bind(this))
}
_fullyOpen () {
if (this.opened === true || this.closed === true) return
const remote = this._mux._remote[this._remoteId - 1]
this.opened = true
this.corked = false
this._track(this.onopen(this))
this.onremoteopen = onremoteopen
this.onremoteclose = onremoteclose
remote.session = this
if (remote.pending !== null) this._drain(remote)
}
return !opened
_drain (remote) {
for (let i = 0; i < remote.pending.length; i++) {
const p = remote.pending[i]
this._mux._buffered -= byteSize(p.state)
this._recv(p.type, p.state)
}
remote.pending = null
this._mux._resumeMaybe()
}
_nextMessage () {
for (let i = this.messages.length - 1; i >= 0; i--) {
if (this.messages[i] === undefined && (i === 0 || this.messages[i - 1] !== undefined)) {
return i
}
_track (p) {
if (isPromise(p) === true) {
this._active++
p.then(this._decBound, this._decAndDestroyBound)
}
return -1
}
addMessage ({ type = this._nextMessage(), encoding = c.binary, onmessage = noop } = {}) {
if (type < 0 || type >= this.messages.length) {
throw new Error('Invalid type, must be <= ' + this.messages.length)
_close (isRemote) {
if (this.closed === true) return
this.closed = true
this._info.opened--
if (this._remoteId > 0) {
this._mux._remote[this._remoteId - 1] = null
this._remoteId = 0
// If remote has acked, we can reuse the local id now
// otherwise, we need to wait for the "ack" to arrive
this._mux._free.push(this._localId - 1)
}
const t = this.offset + type
const send = (message) => this.opened && this.mux._push(t, m.encoding, message)
const m = this.messages[type] = { encoding, onmessage, send }
this._mux._local[this._localId - 1] = null
this._localId = 0
return m
this._mux._gc(this._info)
this._track(this.onclose(isRemote, this))
if (this._active === 0) this._destroy()
}
_destroy () {
if (this.destroyed === true) return
this.destroyed = true
this._track(this.ondestroy(this))
}
_recv (type, state) {
if (type < this.remoteMessages.length) {
this.remoteMessages[type].recv(state, this)
}
}
cork () {
if (this.corked) return
this.corked = true
this.mux.cork()
this._mux.cork()
}
uncork () {
if (!this.corked) return
this.corked = false
this.mux.uncork()
this._mux.uncork()
}
close () {
if (this.opened === false) return
this.opened = false
this.mux._unopened++
if (this.closed === true) return
const offset = this.offset
const state = { buffer: null, start: 2, end: 2 }
this.version = null
this.messages = EMPTY
this.offset = 0
this.length = 0
this.onremoteopen = this.onremoteclose = noop
this.mux._push(2, c.uint, offset)
this._gc()
c.uint.preencode(state, this._localId)
if (this.corked) this.uncork()
state.buffer = this._mux._alloc(state.end)
state.buffer[0] = 0
state.buffer[1] = 3
c.uint.encode(state, this._localId)
this._close(false)
this._mux._write0(state.buffer)
}
_gc () {
if (this.opened || this.remoteOpened) return
this.mux._removeProtocol(this)
addMessage (opts) {
if (!opts) return this._skipMessage()
const type = this.messages.length
const encoding = opts.encoding || c.raw
const onmessage = opts.onmessage || noop
const s = this
const typeLen = encodingLength(c.uint, type)
const m = {
type,
encoding,
onmessage,
recv (state, session) {
session._track(m.onmessage(encoding.decode(state), session))
},
send (m, session = s) {
if (session.closed === true) return false
const mux = session._mux
const state = { buffer: null, start: 0, end: typeLen }
if (mux._batch !== null) {
encoding.preencode(state, m)
state.buffer = mux._alloc(state.end)
c.uint.encode(state, type)
encoding.encode(state, m)
mux._pushBatch(session._localId, state.buffer)
return true
}
c.uint.preencode(state, session._localId)
encoding.preencode(state, m)
state.buffer = mux._alloc(state.end)
c.uint.encode(state, session._localId)
c.uint.encode(state, type)
encoding.encode(state, m)
return mux.stream.write(state.buffer)
}
}
this.messages.push(m)
return m
}
_recv (type, state) {
if (type >= this.messages.length) return
_skipMessage () {
const type = this.messages.length
const m = {
type,
encoding: c.raw,
onmessage: noop,
recv (state, session) {},
send (m, session) {}
}
const m = this.messages[type]
if (m !== undefined) this.mux._catch(m.onmessage(m.encoding.decode(state), this.context))
this.messages.push(m)
return m
}

@@ -114,292 +236,372 @@ }

module.exports = class Protomux {
constructor (stream, { backlog = 128, alloc, onacceptprotocol } = {}) {
constructor (stream, { alloc } = {}) {
this.isProtomux = true
this.stream = stream
this.protocols = []
this.remoteProtocols = []
this.offset = 4 // 4 messages reserved
this.corked = 0
this.backlog = backlog
this.onacceptprotocol = onacceptprotocol || (() => this._unopened < this.backlog)
this.isProtomux = true
this._unopened = 0
this._batch = null
this._alloc = alloc || (typeof stream.alloc === 'function' ? stream.alloc.bind(stream) : b4a.allocUnsafe)
this._safeDestroyBound = this._safeDestroy.bind(this)
this._remoteBacklog = 0
this._buffered = 0
this._paused = false
this._remote = []
this._local = []
this._free = []
this._batch = null
this._batchState = null
this._infos = new Map()
this._notify = new Map()
this.stream.on('data', this._ondata.bind(this))
this.stream.on('error', noop) // we handle this in "close"
this.stream.on('close', this._shutdown.bind(this))
}
static from (other, opts) {
return other.isProtomux === true ? other : new Protomux(other, opts)
static from (stream, opts) {
if (stream.isProtomux) return stream
return new this(stream, opts)
}
sendKeepAlive () {
this.stream.write(this._alloc(0))
* [Symbol.iterator] () {
for (const session of this._local) {
if (session !== null) yield session
}
}
cork () {
if (++this.corked === 1) this._batch = []
if (++this.corked === 1) {
this._batch = []
this._batchState = { buffer: null, start: 0, end: 1 }
}
}
uncork () {
if (--this.corked !== 0) return
if (--this.corked === 0) {
this._sendBatch(this._batch, this._batchState)
this._batch = null
this._batchState = null
}
}
const batch = this._batch
this._batch = null
pair ({ protocol, id = null }, notify) {
this._notify.set(toKey(protocol, id), notify)
}
const state = { start: 0, end: 1, buffer: null }
const lens = new Array(batch.length)
unpair ({ protocol, id = null }) {
this._notify.delete(toKey(protocol, id))
}
for (let i = 0; i < batch.length; i++) {
const b = batch[i]
const end = state.end
opened ({ protocol, id = null }) {
const key = toKey(protocol, id)
const info = this._infos.get(key)
return info ? info.opened > 0 : false
}
c.uint.preencode(state, b.type)
b.encoding.preencode(state, b.message)
c.uint.preencode(state, lens[i] = (state.end - end))
open ({ context = null, protocol, id = null, unique = true, messages = [], onopen = noop, onclose = noop, ondestroy = noop }) {
if (this.stream.destroyed) return null
const info = this._get(protocol, id)
if (unique && info.opened > 0) return null
if (info.incoming.length === 0) {
const session = new Session(this, info, context, protocol, id, messages, onopen, onclose, ondestroy)
session._open()
info.outgoing.push(session._localId)
return session
}
state.buffer = this._alloc(state.end)
state.buffer[state.start++] = 0
this._remoteBacklog--
for (let i = 0; i < batch.length; i++) {
const b = batch[i]
const remoteId = info.incoming.shift()
const r = this._remote[remoteId - 1]
if (r === null) return null
c.uint.encode(state, lens[i])
c.uint.encode(state, b.type)
b.encoding.encode(state, b.message)
}
const session = new Session(this, info, context, protocol, id, messages, onopen, onclose, ondestroy)
this.stream.write(state.buffer)
}
session._remoteId = remoteId
session._open()
session._fullyOpenSoon()
hasProtocol (opts) {
return !!this.getProtocol(opts)
return session
}
getProtocol ({ name, version }) {
return this._getProtocol(name, version, false)
_pushBatch (localId, buffer) {
if (this._batch.length === 0 || this._batch[this._batch.length - 1].localId !== localId) {
this._batchState.end++
c.uint.preencode(this._batchState, localId)
}
c.buffer.preencode(this._batchState, buffer)
this._batch.push({ localId, buffer })
}
addProtocol (opts) {
const p = this._getProtocol(opts.name, (opts.version && opts.version.major) || 0, true)
_sendBatch (batch, state) {
if (batch.length === 0) return
if (opts.cork) p.cork()
if (!p._attach(opts)) return p
let prev = batch[0].localId
this._unopened--
this.offset += p.length
this._push(1, addProtocol, {
name: p.name,
version: p.version,
offset: p.offset,
length: p.length
})
state.buffer = this._alloc(state.end)
state.buffer[state.start++] = 0
state.buffer[state.start++] = 0
return p
}
c.uint.encode(state, prev)
destroy (err) {
this.stream.destroy(err)
}
for (let i = 0; i < batch.length; i++) {
const b = batch[i]
if (prev !== b.localId) {
state.buffer[state.start++] = 0
c.uint.encode(state, (prev = b.localId))
}
c.buffer.encode(state, b.buffer)
}
_shutdown () {
while (this.protocols.length) {
const p = this.protocols.pop()
if (!p.remoteOpened) continue
if (p.remoteClosed) continue
p.remoteOpened = false
p.remoteClosed = true
this._catch(p.onremoteclose())
}
this.stream.write(state.buffer)
}
_safeDestroy (err) {
safetyCatch(err)
this.destroy(err)
_get (protocol, id) {
const key = toKey(protocol, id)
let info = this._infos.get(key)
if (info) return info
info = { key, protocol, id, pairing: 0, opened: 0, incoming: [], outgoing: [] }
this._infos.set(key, info)
return info
}
_catch (p) {
if (isPromise(p)) p.catch(this._safeDestroyBound)
_gc (info) {
if (info.opened === 0 && info.outgoing.length === 0 && info.incoming.length === 0) {
this._infos.delete(info.key)
}
}
async _acceptMaybe (added) {
let accept = false
_ondata (buffer) {
try {
accept = await this.onacceptprotocol(added)
const state = { buffer, start: 0, end: buffer.byteLength }
this._decode(c.uint.decode(state), state)
} catch (err) {
this._safeDestroy(err)
}
}
_decode (remoteId, state) {
const type = c.uint.decode(state)
if (remoteId === 0) {
this._oncontrolsession(type, state)
return
}
if (!accept) this._rejectProtocol(added)
}
const r = remoteId <= this._remote.length ? this._remote[remoteId - 1] : null
_rejectProtocol (added) {
for (let i = 0; i < this.protocols.length; i++) {
const p = this.protocols[i]
if (p.opened || p.name !== added.name || !p.remoteOpened) continue
if (p.remoteVersion.major !== added.version.major) continue
// if the channel is closed ignore - could just be a pipeline message...
if (r === null) return
this._unopened--
this.protocols.splice(i, 1)
this._push(3, c.uint, added.offset)
if (r.pending !== null) {
this._bufferMessage(r, type, state)
return
}
r.session._recv(type, state)
}
_ondata (buffer) {
if (buffer.byteLength === 0) return // keep alive
_oncontrolsession (type, state) {
switch (type) {
case 0:
this._onbatch(state)
break
const end = buffer.byteLength
const state = { start: 0, end, buffer }
case 1:
this._onopensession(state)
break
try {
const type = c.uint.decode(state)
if (type === 0) this._recvBatch(end, state)
else this._recv(type, state)
} catch (err) {
this._safeDestroy(err)
case 2:
this._onrejectsession(state)
break
case 3:
this._onclosesession(state)
break
}
}
_getProtocol (name, major, upsert) {
for (let i = 0; i < this.protocols.length; i++) {
const p = this.protocols[i]
const v = p.remoteVersion === null ? p.version : p.remoteVersion
if (p.name === name && (v !== null && v.major === major)) return p
}
_bufferMessage (r, type, { buffer, start, end }) {
const state = { buffer, start, end } // copy
r.pending.push({ type, state })
this._buffered += byteSize(state)
this._pauseMaybe()
}
if (!upsert) return null
_pauseMaybe () {
if (this._paused === true || this._buffered <= MAX_BUFFERED) return
this._paused = true
this.stream.pause()
}
const p = new Protocol(this)
this.protocols.push(p)
this._unopened++
return p
_resumeMaybe () {
if (this._paused === false || this._buffered > MAX_BUFFERED) return
this._paused = false
this.stream.resume()
}
_removeProtocol (p) {
const i = this.protocols.indexOf(this)
if (i > -1) this.protocols.splice(i, 1)
if (!p.opened) this._unopened--
_onbatch (state) {
const end = state.end
let remoteId = c.uint.decode(state)
while (state.end > state.start) {
const len = c.uint.decode(state)
if (len === 0) {
remoteId = c.uint.decode(state)
continue
}
state.end = state.start + end
this._decode(remoteId, state)
state.end = end
}
}
_recvAddProtocol (state) {
const add = addProtocol.decode(state)
_onopensession (state) {
const protocol = c.string.decode(state)
const id = c.buffer.decode(state)
const remoteId = c.uint.decode(state)
const p = this._getProtocol(add.name, add.version.major, true)
if (p.remoteOpened) throw new Error('Duplicate protocol received')
// remote tried to open the control session - auto reject for now
// as we can use as an explicit control protocol declaration if we need to
if (remoteId === 0) {
this._rejectSession(0)
return
}
p.name = add.name
p.remoteVersion = add.version
p.remoteOffset = add.offset
p.remoteEnd = add.offset + add.length
p.remoteOpened = true
p.remoteClosed = false
const rid = remoteId - 1
const info = this._get(protocol, id)
if (p.opened) {
this._catch(p.onremoteopen())
} else {
this._acceptMaybe(add)
// allow the remote to grow the ids by one
if (this._remote.length === rid) {
this._remote.push(null)
}
}
_recvRemoveProtocol (state) {
const offset = c.uint.decode(state)
if (rid >= this._remote.length || this._remote[rid] !== null) {
throw new Error('Invalid open message')
}
for (let i = 0; i < this.protocols.length; i++) {
const p = this.protocols[i]
if (info.outgoing.length > 0) {
const localId = info.outgoing.shift()
const session = this._local[localId - 1]
if (p.remoteOffset === offset && p.remoteOpened) {
p.remoteVersion = null
p.remoteOpened = false
p.remoteClosed = true
this._catch(p.onremoteclose())
p._gc()
if (session === null) { // we already closed the channel - ignore
this._free.push(localId - 1)
return
}
this._remote[rid] = { pending: null, session: null }
session._remoteId = remoteId
session._fullyOpen()
return
}
}
_recvRejectedProtocol (state) {
const offset = c.uint.decode(state)
this._remote[rid] = { pending: [], session: null }
for (let i = 0; i < this.protocols.length; i++) {
const p = this.protocols[i]
if (++this._remoteBacklog > MAX_BACKLOG) {
throw new Error('Remote exceeded backlog')
}
if (p.offset === offset && !p.remoteClosed) {
p.remoteClosed = true
this._catch(p.onremoteclose())
p._gc()
}
}
info.pairing++
info.incoming.push(remoteId)
this._requestSession(protocol, id, info).catch(this._safeDestroyBound)
}
_recvBatch (end, state) {
while (state.start < state.end) {
const len = c.uint.decode(state)
const type = c.uint.decode(state)
state.end = state.start + len
this._recv(type, state)
state.end = end
_onrejectsession (state) {
const protocol = c.string.decode(state)
const id = c.buffer.decode(state)
const info = this._get(protocol, id)
if (info.outgoing.length === 0) {
throw new Error('Invalid reject message')
}
const localId = info.outgoing.shift()
const session = this._local[localId - 1]
this._free.push(localId - 1)
if (session !== null) session._close(true)
}
_recv (type, state) {
if (type < 4) {
if (type === 0) {
throw new Error('Invalid nested batch')
}
_onclosesession (state) {
const remoteId = c.uint.decode(state)
if (type === 1) {
this._recvAddProtocol(state)
return
}
if (remoteId === 0) return // ignore
if (type === 2) {
this._recvRemoveProtocol(state)
return
}
const rid = remoteId - 1
const r = rid < this._remote.length ? this._remote[rid] : null
if (type === 3) {
this._recvRejectedProtocol(state)
return
}
if (r === null) return
return
}
if (r.session !== null) r.session._close(true)
}
// TODO: Consider make this array sorted by remoteOffset and use a bisect here.
// For now we use very few protocols in practice, so it might be overkill.
for (let i = 0; i < this.protocols.length; i++) {
const p = this.protocols[i]
async _requestSession (protocol, id, info) {
const notify = this._notify.get(toKey(protocol, id)) || this._notify.get(toKey(protocol, null))
if (p.remoteOffset <= type && type < p.remoteEnd) {
p._recv(type - p.remoteOffset, state)
break
}
if (notify) await notify(id)
if (--info.pairing > 0) return
while (info.incoming.length > 0) {
this._rejectSession(info, info.incoming.pop())
}
this._gc(info)
}
_push (type, enc, message) {
if (this.corked > 0) {
this._batch.push({ type, encoding: enc, message })
return false
_rejectSession (info, remoteId) {
if (remoteId > 0) {
const r = this._remote[remoteId - 1]
if (r.pending !== null) {
for (let i = 0; i < r.pending.length; i++) {
this._buffered -= byteSize(r.pending[i].state)
}
}
this._remote[remoteId - 1] = null
this._resumeMaybe()
}
const state = { start: 0, end: 0, buffer: null }
const state = { buffer: null, start: 2, end: 2 }
c.uint.preencode(state, type)
enc.preencode(state, message)
c.string.preencode(state, info.protocol)
c.buffer.preencode(state, info.id)
state.buffer = this._alloc(state.end)
c.uint.encode(state, type)
enc.encode(state, message)
state.buffer[0] = 0
state.buffer[1] = 2
c.string.encode(state, info.protocol)
c.buffer.encode(state, info.id)
return this.stream.write(state.buffer)
this._write0(state.buffer)
}
_write0 (buffer) {
if (this._batch !== null) {
this._pushBatch(0, buffer.subarray(1))
return
}
this.stream.write(buffer)
}
_safeDestroy (err) {
safetyCatch(err)
this.stream.destroy(err)
}
_shutdown () {
for (const s of this._local) {
if (s !== null) s._close(true)
}
}
}

@@ -409,4 +611,18 @@

function toKey (protocol, id) {
return protocol + '##' + (id ? b4a.toString(id, 'hex') : '')
}
function byteSize (state) {
return 512 + (state.end - state.start)
}
function isPromise (p) {
return typeof p === 'object' && p !== null && !!p.catch
return !!(p && typeof p.then === 'function')
}
function encodingLength (enc, val) {
const state = { buffer: null, start: 0, end: 0 }
enc.preencode(state, val)
return state.end
}
{
"name": "protomux",
"version": "2.0.0",
"version": "3.0.0",
"description": "Multiplex multiple message oriented protocols over a stream",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -20,16 +20,12 @@ # protomux

// Now add some protocols
// Now add some protocol sessions
const cool = mux.addProtocol({
name: 'cool-protocol',
version: {
major: 1,
minor: 0
},
messages: 2, // protocol has 2 messages
onremoteopen () {
const cool = mux.open({
protocol: 'cool-protocol',
id: Buffer.from('optional binary id'),
onopen () {
console.log('the other side opened this protocol!')
},
onremoteclose () {
console.log('the other side closed this protocol!')
onclose () {
console.log('either side closed the protocol')
}

@@ -41,3 +37,2 @@ })

const one = cool.addMessage({
type: 0,
encoding: c.string,

@@ -50,3 +45,2 @@ onmessage (m) {

const two = cool.addMessage({
type: 1,
encoding: c.bool,

@@ -75,8 +69,3 @@ onmessage (m) {

// Called when the muxer wants to allocate a message that is written, defaults to Buffer.allocUnsafe.
alloc (size) {},
// Hook that is called when an unknown protocol is received. Should return true/false.
async onacceptprotocol ({ name, version }) {}
// How many protocols can be remote open, that we haven't opened yet?
// Only used if you don't provide an accept hook.
backlog: 128
alloc (size) {}
}

@@ -89,5 +78,5 @@ ```

#### `const p = mux.addProtocol(opts)`
#### `const session = mux.open(opts)`
Add a new protocol.
Add a new protocol session.

@@ -99,30 +88,30 @@ Options include:

// Used to match the protocol
name: 'name of the protocol',
// You can have multiple versions of the same protocol on the same stream.
// Protocols are matched using the major version only.
version: {
major: 0,
minor: 0
},
// Number of messages types you want to send/receive.
messages: 2,
protocol: 'name of the protocol',
// Optional additional binary id to identify this session
id: buffer,
// Optional array of messages types you want to send/receive.
messages: [],
// Called when the remote side adds this protocol.
// Errors here are caught and forwared to stream.destroy
async onremoteopen () {},
// Called when the remote side closes or rejects this protocol.
async onopen () {},
// Called when the session closes - ie the remote side closes or rejects this protocol or we closed it.
// Errors here are caught and forwared to stream.destroy
async onremoteclose () {}
async onclose () {},
// Called after onclose when all pending promises has resolved.
async ondestroy () {}
}
```
Each of the functions can also be set directly on the instance with the same name.
Sessions are paired based on a queue, so the first remote session with the same `protocol` and `id`.
#### `const m = p.addMessage(opts)`
__NOTE__: `mux.open` returns `null` if the session should not be opened, ie it's a duplicate session or the remote has already closed this one.
Specify a message. Options include:
If you want multiple sessions with the same `protocol` and `id`, set `unique: false` as an option.
#### `const m = session.addMessage(opts)`
Add a message. Options include:
``` js
{
// Defaults to an incrementing number
type: numericIndex,
// compact-encoding specifying how to encode/decode this message

@@ -148,11 +137,11 @@ encoding: c.binary,

#### `p.close()`
#### `session.close()`
Closes the protocol.
Closes the protocol session.
#### `p.cork()`
#### `sessoin.cork()`
Corking the protocol, makes it buffer messages and send them all in a batch when it uncorks.
Corking the protocol session, makes it buffer messages and send them all in a batch when it uncorks.
#### `p.uncork()`
#### `session.uncork()`

@@ -163,7 +152,7 @@ Uncork and send the batch.

Same as `p.cork` but on the muxer instance.
Same as `session.cork` but on the muxer instance.
#### `mux.uncork()`
Same as `p.uncork` but on the muxer instance.
Same as `session.uncork` but on the muxer instance.

@@ -170,0 +159,0 @@ ## License

@@ -12,6 +12,5 @@ const Protomux = require('./')

const p = a.addProtocol({
name: 'foo',
messages: 1,
onremoteopen () {
const p = a.open({
protocol: 'foo',
onopen () {
t.pass('a remote opened')

@@ -28,5 +27,4 @@ }

const bp = b.addProtocol({
name: 'foo',
messages: 1
const bp = b.open({
protocol: 'foo'
})

@@ -45,5 +43,4 @@

const ap = a.addProtocol({
name: 'foo',
messages: 1
const ap = a.open({
protocol: 'foo'
})

@@ -58,11 +55,9 @@

b.addProtocol({
name: 'other',
messages: 2
b.open({
protocol: 'other'
})
const bp = b.addProtocol({
name: 'foo',
messages: 1,
onremoteopen () {
const bp = b.open({
protocol: 'foo',
onopen () {
t.pass('b remote opened')

@@ -87,10 +82,8 @@ }

a.addProtocol({
name: 'other',
messages: 2
a.open({
protocol: 'other'
})
const ap = a.addProtocol({
name: 'multi',
messages: 3
const ap = a.open({
protocol: 'multi'
})

@@ -104,5 +97,4 @@

const bp = b.addProtocol({
name: 'multi',
messages: 2
const bp = b.open({
protocol: 'multi'
})

@@ -140,10 +132,8 @@

a.addProtocol({
name: 'other',
messages: 2
a.open({
protocol: 'other'
})
const ap = a.addProtocol({
name: 'multi',
messages: 2
const ap = a.open({
protocol: 'multi'
})

@@ -156,5 +146,4 @@

const bp = b.addProtocol({
name: 'multi',
messages: 2
const bp = b.open({
protocol: 'multi'
})

@@ -161,0 +150,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc