Comparing version 2.0.0 to 3.0.0
736
index.js
@@ -0,110 +1,232 @@ | ||
const b4a = require('b4a') | ||
const c = require('compact-encoding') | ||
const b4a = require('b4a') | ||
const queueTick = require('queue-tick') | ||
const safetyCatch = require('safety-catch') | ||
const { addProtocol } = require('./messages') | ||
const EMPTY = [] | ||
const MAX_BUFFERED = 32768 | ||
const MAX_BACKLOG = 256 | ||
class Protocol { | ||
constructor (mux) { | ||
this.mux = mux | ||
class Session { | ||
constructor (mux, info, context, protocol, id, messages, onopen, onclose, ondestroy) { | ||
this.context = context | ||
this.protocol = protocol | ||
this.id = id | ||
this.messages = [] | ||
this.remoteMessages = this.messages | ||
this.name = null | ||
this.version = null | ||
this.messages = EMPTY | ||
this.context = null | ||
this.offset = 0 | ||
this.length = 0 | ||
this.opened = false | ||
this.corked = false | ||
this.closed = false | ||
this.destroyed = false | ||
this.remoteVersion = null | ||
this.remoteOffset = 0 | ||
this.remoteEnd = 0 | ||
this.remoteOpened = false | ||
this.remoteClosed = false | ||
this.onopen = onopen | ||
this.onclose = onclose | ||
this.ondestroy = ondestroy | ||
this.onremoteopen = noop | ||
this.onremoteclose = noop | ||
this._mux = mux | ||
this._info = info | ||
this._localId = 0 | ||
this._remoteId = 0 | ||
this._active = 0 | ||
this._extensions = null | ||
this._decBound = this._dec.bind(this) | ||
this._decAndDestroyBound = this._decAndDestroy.bind(this) | ||
for (const m of messages) this.addMessage(m) | ||
} | ||
_attach ({ name, version = { major: 0, minor: 0 }, messages = 0, context = null, onremoteopen = noop, onremoteclose = noop }) { | ||
const opened = this.opened | ||
_open () { | ||
const id = this._mux._free.length > 0 | ||
? this._mux._free.pop() | ||
: this._mux._local.push(null) - 1 | ||
this.name = name | ||
this.version = version | ||
this.messages = new Array(messages) | ||
this.context = context | ||
this.offset = this.mux.offset | ||
this.length = messages | ||
this._info.opened++ | ||
this._localId = id + 1 | ||
this._mux._local[id] = this | ||
const state = { buffer: null, start: 2, end: 2 } | ||
c.string.preencode(state, this.protocol) | ||
c.buffer.preencode(state, this.id) | ||
c.uint.preencode(state, this._localId) | ||
state.buffer = this._mux._alloc(state.end) | ||
state.buffer[0] = 0 | ||
state.buffer[1] = 1 | ||
c.string.encode(state, this.protocol) | ||
c.buffer.encode(state, this.id) | ||
c.uint.encode(state, this._localId) | ||
this._mux._write0(state.buffer) | ||
} | ||
_dec () { | ||
if (--this._active === 0 && this.closed === true) this._destroy() | ||
} | ||
_decAndDestroy (err) { | ||
this._dec() | ||
this._mux._safeDestroy(err) | ||
} | ||
_fullyOpenSoon () { | ||
this._mux._remote[this._remoteId - 1].session = this | ||
queueTick(this._fullyOpen.bind(this)) | ||
} | ||
_fullyOpen () { | ||
if (this.opened === true || this.closed === true) return | ||
const remote = this._mux._remote[this._remoteId - 1] | ||
this.opened = true | ||
this.corked = false | ||
this._track(this.onopen(this)) | ||
this.onremoteopen = onremoteopen | ||
this.onremoteclose = onremoteclose | ||
remote.session = this | ||
if (remote.pending !== null) this._drain(remote) | ||
} | ||
return !opened | ||
_drain (remote) { | ||
for (let i = 0; i < remote.pending.length; i++) { | ||
const p = remote.pending[i] | ||
this._mux._buffered -= byteSize(p.state) | ||
this._recv(p.type, p.state) | ||
} | ||
remote.pending = null | ||
this._mux._resumeMaybe() | ||
} | ||
_nextMessage () { | ||
for (let i = this.messages.length - 1; i >= 0; i--) { | ||
if (this.messages[i] === undefined && (i === 0 || this.messages[i - 1] !== undefined)) { | ||
return i | ||
} | ||
_track (p) { | ||
if (isPromise(p) === true) { | ||
this._active++ | ||
p.then(this._decBound, this._decAndDestroyBound) | ||
} | ||
return -1 | ||
} | ||
addMessage ({ type = this._nextMessage(), encoding = c.binary, onmessage = noop } = {}) { | ||
if (type < 0 || type >= this.messages.length) { | ||
throw new Error('Invalid type, must be <= ' + this.messages.length) | ||
_close (isRemote) { | ||
if (this.closed === true) return | ||
this.closed = true | ||
this._info.opened-- | ||
if (this._remoteId > 0) { | ||
this._mux._remote[this._remoteId - 1] = null | ||
this._remoteId = 0 | ||
// If remote has acked, we can reuse the local id now | ||
// otherwise, we need to wait for the "ack" to arrive | ||
this._mux._free.push(this._localId - 1) | ||
} | ||
const t = this.offset + type | ||
const send = (message) => this.opened && this.mux._push(t, m.encoding, message) | ||
const m = this.messages[type] = { encoding, onmessage, send } | ||
this._mux._local[this._localId - 1] = null | ||
this._localId = 0 | ||
return m | ||
this._mux._gc(this._info) | ||
this._track(this.onclose(isRemote, this)) | ||
if (this._active === 0) this._destroy() | ||
} | ||
_destroy () { | ||
if (this.destroyed === true) return | ||
this.destroyed = true | ||
this._track(this.ondestroy(this)) | ||
} | ||
_recv (type, state) { | ||
if (type < this.remoteMessages.length) { | ||
this.remoteMessages[type].recv(state, this) | ||
} | ||
} | ||
cork () { | ||
if (this.corked) return | ||
this.corked = true | ||
this.mux.cork() | ||
this._mux.cork() | ||
} | ||
uncork () { | ||
if (!this.corked) return | ||
this.corked = false | ||
this.mux.uncork() | ||
this._mux.uncork() | ||
} | ||
close () { | ||
if (this.opened === false) return | ||
this.opened = false | ||
this.mux._unopened++ | ||
if (this.closed === true) return | ||
const offset = this.offset | ||
const state = { buffer: null, start: 2, end: 2 } | ||
this.version = null | ||
this.messages = EMPTY | ||
this.offset = 0 | ||
this.length = 0 | ||
this.onremoteopen = this.onremoteclose = noop | ||
this.mux._push(2, c.uint, offset) | ||
this._gc() | ||
c.uint.preencode(state, this._localId) | ||
if (this.corked) this.uncork() | ||
state.buffer = this._mux._alloc(state.end) | ||
state.buffer[0] = 0 | ||
state.buffer[1] = 3 | ||
c.uint.encode(state, this._localId) | ||
this._close(false) | ||
this._mux._write0(state.buffer) | ||
} | ||
_gc () { | ||
if (this.opened || this.remoteOpened) return | ||
this.mux._removeProtocol(this) | ||
addMessage (opts) { | ||
if (!opts) return this._skipMessage() | ||
const type = this.messages.length | ||
const encoding = opts.encoding || c.raw | ||
const onmessage = opts.onmessage || noop | ||
const s = this | ||
const typeLen = encodingLength(c.uint, type) | ||
const m = { | ||
type, | ||
encoding, | ||
onmessage, | ||
recv (state, session) { | ||
session._track(m.onmessage(encoding.decode(state), session)) | ||
}, | ||
send (m, session = s) { | ||
if (session.closed === true) return false | ||
const mux = session._mux | ||
const state = { buffer: null, start: 0, end: typeLen } | ||
if (mux._batch !== null) { | ||
encoding.preencode(state, m) | ||
state.buffer = mux._alloc(state.end) | ||
c.uint.encode(state, type) | ||
encoding.encode(state, m) | ||
mux._pushBatch(session._localId, state.buffer) | ||
return true | ||
} | ||
c.uint.preencode(state, session._localId) | ||
encoding.preencode(state, m) | ||
state.buffer = mux._alloc(state.end) | ||
c.uint.encode(state, session._localId) | ||
c.uint.encode(state, type) | ||
encoding.encode(state, m) | ||
return mux.stream.write(state.buffer) | ||
} | ||
} | ||
this.messages.push(m) | ||
return m | ||
} | ||
_recv (type, state) { | ||
if (type >= this.messages.length) return | ||
_skipMessage () { | ||
const type = this.messages.length | ||
const m = { | ||
type, | ||
encoding: c.raw, | ||
onmessage: noop, | ||
recv (state, session) {}, | ||
send (m, session) {} | ||
} | ||
const m = this.messages[type] | ||
if (m !== undefined) this.mux._catch(m.onmessage(m.encoding.decode(state), this.context)) | ||
this.messages.push(m) | ||
return m | ||
} | ||
@@ -114,292 +236,372 @@ } | ||
module.exports = class Protomux { | ||
constructor (stream, { backlog = 128, alloc, onacceptprotocol } = {}) { | ||
constructor (stream, { alloc } = {}) { | ||
this.isProtomux = true | ||
this.stream = stream | ||
this.protocols = [] | ||
this.remoteProtocols = [] | ||
this.offset = 4 // 4 messages reserved | ||
this.corked = 0 | ||
this.backlog = backlog | ||
this.onacceptprotocol = onacceptprotocol || (() => this._unopened < this.backlog) | ||
this.isProtomux = true | ||
this._unopened = 0 | ||
this._batch = null | ||
this._alloc = alloc || (typeof stream.alloc === 'function' ? stream.alloc.bind(stream) : b4a.allocUnsafe) | ||
this._safeDestroyBound = this._safeDestroy.bind(this) | ||
this._remoteBacklog = 0 | ||
this._buffered = 0 | ||
this._paused = false | ||
this._remote = [] | ||
this._local = [] | ||
this._free = [] | ||
this._batch = null | ||
this._batchState = null | ||
this._infos = new Map() | ||
this._notify = new Map() | ||
this.stream.on('data', this._ondata.bind(this)) | ||
this.stream.on('error', noop) // we handle this in "close" | ||
this.stream.on('close', this._shutdown.bind(this)) | ||
} | ||
static from (other, opts) { | ||
return other.isProtomux === true ? other : new Protomux(other, opts) | ||
static from (stream, opts) { | ||
if (stream.isProtomux) return stream | ||
return new this(stream, opts) | ||
} | ||
sendKeepAlive () { | ||
this.stream.write(this._alloc(0)) | ||
* [Symbol.iterator] () { | ||
for (const session of this._local) { | ||
if (session !== null) yield session | ||
} | ||
} | ||
cork () { | ||
if (++this.corked === 1) this._batch = [] | ||
if (++this.corked === 1) { | ||
this._batch = [] | ||
this._batchState = { buffer: null, start: 0, end: 1 } | ||
} | ||
} | ||
uncork () { | ||
if (--this.corked !== 0) return | ||
if (--this.corked === 0) { | ||
this._sendBatch(this._batch, this._batchState) | ||
this._batch = null | ||
this._batchState = null | ||
} | ||
} | ||
const batch = this._batch | ||
this._batch = null | ||
pair ({ protocol, id = null }, notify) { | ||
this._notify.set(toKey(protocol, id), notify) | ||
} | ||
const state = { start: 0, end: 1, buffer: null } | ||
const lens = new Array(batch.length) | ||
unpair ({ protocol, id = null }) { | ||
this._notify.delete(toKey(protocol, id)) | ||
} | ||
for (let i = 0; i < batch.length; i++) { | ||
const b = batch[i] | ||
const end = state.end | ||
opened ({ protocol, id = null }) { | ||
const key = toKey(protocol, id) | ||
const info = this._infos.get(key) | ||
return info ? info.opened > 0 : false | ||
} | ||
c.uint.preencode(state, b.type) | ||
b.encoding.preencode(state, b.message) | ||
c.uint.preencode(state, lens[i] = (state.end - end)) | ||
open ({ context = null, protocol, id = null, unique = true, messages = [], onopen = noop, onclose = noop, ondestroy = noop }) { | ||
if (this.stream.destroyed) return null | ||
const info = this._get(protocol, id) | ||
if (unique && info.opened > 0) return null | ||
if (info.incoming.length === 0) { | ||
const session = new Session(this, info, context, protocol, id, messages, onopen, onclose, ondestroy) | ||
session._open() | ||
info.outgoing.push(session._localId) | ||
return session | ||
} | ||
state.buffer = this._alloc(state.end) | ||
state.buffer[state.start++] = 0 | ||
this._remoteBacklog-- | ||
for (let i = 0; i < batch.length; i++) { | ||
const b = batch[i] | ||
const remoteId = info.incoming.shift() | ||
const r = this._remote[remoteId - 1] | ||
if (r === null) return null | ||
c.uint.encode(state, lens[i]) | ||
c.uint.encode(state, b.type) | ||
b.encoding.encode(state, b.message) | ||
} | ||
const session = new Session(this, info, context, protocol, id, messages, onopen, onclose, ondestroy) | ||
this.stream.write(state.buffer) | ||
} | ||
session._remoteId = remoteId | ||
session._open() | ||
session._fullyOpenSoon() | ||
hasProtocol (opts) { | ||
return !!this.getProtocol(opts) | ||
return session | ||
} | ||
getProtocol ({ name, version }) { | ||
return this._getProtocol(name, version, false) | ||
_pushBatch (localId, buffer) { | ||
if (this._batch.length === 0 || this._batch[this._batch.length - 1].localId !== localId) { | ||
this._batchState.end++ | ||
c.uint.preencode(this._batchState, localId) | ||
} | ||
c.buffer.preencode(this._batchState, buffer) | ||
this._batch.push({ localId, buffer }) | ||
} | ||
addProtocol (opts) { | ||
const p = this._getProtocol(opts.name, (opts.version && opts.version.major) || 0, true) | ||
_sendBatch (batch, state) { | ||
if (batch.length === 0) return | ||
if (opts.cork) p.cork() | ||
if (!p._attach(opts)) return p | ||
let prev = batch[0].localId | ||
this._unopened-- | ||
this.offset += p.length | ||
this._push(1, addProtocol, { | ||
name: p.name, | ||
version: p.version, | ||
offset: p.offset, | ||
length: p.length | ||
}) | ||
state.buffer = this._alloc(state.end) | ||
state.buffer[state.start++] = 0 | ||
state.buffer[state.start++] = 0 | ||
return p | ||
} | ||
c.uint.encode(state, prev) | ||
destroy (err) { | ||
this.stream.destroy(err) | ||
} | ||
for (let i = 0; i < batch.length; i++) { | ||
const b = batch[i] | ||
if (prev !== b.localId) { | ||
state.buffer[state.start++] = 0 | ||
c.uint.encode(state, (prev = b.localId)) | ||
} | ||
c.buffer.encode(state, b.buffer) | ||
} | ||
_shutdown () { | ||
while (this.protocols.length) { | ||
const p = this.protocols.pop() | ||
if (!p.remoteOpened) continue | ||
if (p.remoteClosed) continue | ||
p.remoteOpened = false | ||
p.remoteClosed = true | ||
this._catch(p.onremoteclose()) | ||
} | ||
this.stream.write(state.buffer) | ||
} | ||
_safeDestroy (err) { | ||
safetyCatch(err) | ||
this.destroy(err) | ||
_get (protocol, id) { | ||
const key = toKey(protocol, id) | ||
let info = this._infos.get(key) | ||
if (info) return info | ||
info = { key, protocol, id, pairing: 0, opened: 0, incoming: [], outgoing: [] } | ||
this._infos.set(key, info) | ||
return info | ||
} | ||
_catch (p) { | ||
if (isPromise(p)) p.catch(this._safeDestroyBound) | ||
_gc (info) { | ||
if (info.opened === 0 && info.outgoing.length === 0 && info.incoming.length === 0) { | ||
this._infos.delete(info.key) | ||
} | ||
} | ||
async _acceptMaybe (added) { | ||
let accept = false | ||
_ondata (buffer) { | ||
try { | ||
accept = await this.onacceptprotocol(added) | ||
const state = { buffer, start: 0, end: buffer.byteLength } | ||
this._decode(c.uint.decode(state), state) | ||
} catch (err) { | ||
this._safeDestroy(err) | ||
} | ||
} | ||
_decode (remoteId, state) { | ||
const type = c.uint.decode(state) | ||
if (remoteId === 0) { | ||
this._oncontrolsession(type, state) | ||
return | ||
} | ||
if (!accept) this._rejectProtocol(added) | ||
} | ||
const r = remoteId <= this._remote.length ? this._remote[remoteId - 1] : null | ||
_rejectProtocol (added) { | ||
for (let i = 0; i < this.protocols.length; i++) { | ||
const p = this.protocols[i] | ||
if (p.opened || p.name !== added.name || !p.remoteOpened) continue | ||
if (p.remoteVersion.major !== added.version.major) continue | ||
// if the channel is closed ignore - could just be a pipeline message... | ||
if (r === null) return | ||
this._unopened-- | ||
this.protocols.splice(i, 1) | ||
this._push(3, c.uint, added.offset) | ||
if (r.pending !== null) { | ||
this._bufferMessage(r, type, state) | ||
return | ||
} | ||
r.session._recv(type, state) | ||
} | ||
_ondata (buffer) { | ||
if (buffer.byteLength === 0) return // keep alive | ||
_oncontrolsession (type, state) { | ||
switch (type) { | ||
case 0: | ||
this._onbatch(state) | ||
break | ||
const end = buffer.byteLength | ||
const state = { start: 0, end, buffer } | ||
case 1: | ||
this._onopensession(state) | ||
break | ||
try { | ||
const type = c.uint.decode(state) | ||
if (type === 0) this._recvBatch(end, state) | ||
else this._recv(type, state) | ||
} catch (err) { | ||
this._safeDestroy(err) | ||
case 2: | ||
this._onrejectsession(state) | ||
break | ||
case 3: | ||
this._onclosesession(state) | ||
break | ||
} | ||
} | ||
_getProtocol (name, major, upsert) { | ||
for (let i = 0; i < this.protocols.length; i++) { | ||
const p = this.protocols[i] | ||
const v = p.remoteVersion === null ? p.version : p.remoteVersion | ||
if (p.name === name && (v !== null && v.major === major)) return p | ||
} | ||
_bufferMessage (r, type, { buffer, start, end }) { | ||
const state = { buffer, start, end } // copy | ||
r.pending.push({ type, state }) | ||
this._buffered += byteSize(state) | ||
this._pauseMaybe() | ||
} | ||
if (!upsert) return null | ||
_pauseMaybe () { | ||
if (this._paused === true || this._buffered <= MAX_BUFFERED) return | ||
this._paused = true | ||
this.stream.pause() | ||
} | ||
const p = new Protocol(this) | ||
this.protocols.push(p) | ||
this._unopened++ | ||
return p | ||
_resumeMaybe () { | ||
if (this._paused === false || this._buffered > MAX_BUFFERED) return | ||
this._paused = false | ||
this.stream.resume() | ||
} | ||
_removeProtocol (p) { | ||
const i = this.protocols.indexOf(this) | ||
if (i > -1) this.protocols.splice(i, 1) | ||
if (!p.opened) this._unopened-- | ||
_onbatch (state) { | ||
const end = state.end | ||
let remoteId = c.uint.decode(state) | ||
while (state.end > state.start) { | ||
const len = c.uint.decode(state) | ||
if (len === 0) { | ||
remoteId = c.uint.decode(state) | ||
continue | ||
} | ||
state.end = state.start + end | ||
this._decode(remoteId, state) | ||
state.end = end | ||
} | ||
} | ||
_recvAddProtocol (state) { | ||
const add = addProtocol.decode(state) | ||
_onopensession (state) { | ||
const protocol = c.string.decode(state) | ||
const id = c.buffer.decode(state) | ||
const remoteId = c.uint.decode(state) | ||
const p = this._getProtocol(add.name, add.version.major, true) | ||
if (p.remoteOpened) throw new Error('Duplicate protocol received') | ||
// remote tried to open the control session - auto reject for now | ||
// as we can use as an explicit control protocol declaration if we need to | ||
if (remoteId === 0) { | ||
this._rejectSession(0) | ||
return | ||
} | ||
p.name = add.name | ||
p.remoteVersion = add.version | ||
p.remoteOffset = add.offset | ||
p.remoteEnd = add.offset + add.length | ||
p.remoteOpened = true | ||
p.remoteClosed = false | ||
const rid = remoteId - 1 | ||
const info = this._get(protocol, id) | ||
if (p.opened) { | ||
this._catch(p.onremoteopen()) | ||
} else { | ||
this._acceptMaybe(add) | ||
// allow the remote to grow the ids by one | ||
if (this._remote.length === rid) { | ||
this._remote.push(null) | ||
} | ||
} | ||
_recvRemoveProtocol (state) { | ||
const offset = c.uint.decode(state) | ||
if (rid >= this._remote.length || this._remote[rid] !== null) { | ||
throw new Error('Invalid open message') | ||
} | ||
for (let i = 0; i < this.protocols.length; i++) { | ||
const p = this.protocols[i] | ||
if (info.outgoing.length > 0) { | ||
const localId = info.outgoing.shift() | ||
const session = this._local[localId - 1] | ||
if (p.remoteOffset === offset && p.remoteOpened) { | ||
p.remoteVersion = null | ||
p.remoteOpened = false | ||
p.remoteClosed = true | ||
this._catch(p.onremoteclose()) | ||
p._gc() | ||
if (session === null) { // we already closed the channel - ignore | ||
this._free.push(localId - 1) | ||
return | ||
} | ||
this._remote[rid] = { pending: null, session: null } | ||
session._remoteId = remoteId | ||
session._fullyOpen() | ||
return | ||
} | ||
} | ||
_recvRejectedProtocol (state) { | ||
const offset = c.uint.decode(state) | ||
this._remote[rid] = { pending: [], session: null } | ||
for (let i = 0; i < this.protocols.length; i++) { | ||
const p = this.protocols[i] | ||
if (++this._remoteBacklog > MAX_BACKLOG) { | ||
throw new Error('Remote exceeded backlog') | ||
} | ||
if (p.offset === offset && !p.remoteClosed) { | ||
p.remoteClosed = true | ||
this._catch(p.onremoteclose()) | ||
p._gc() | ||
} | ||
} | ||
info.pairing++ | ||
info.incoming.push(remoteId) | ||
this._requestSession(protocol, id, info).catch(this._safeDestroyBound) | ||
} | ||
_recvBatch (end, state) { | ||
while (state.start < state.end) { | ||
const len = c.uint.decode(state) | ||
const type = c.uint.decode(state) | ||
state.end = state.start + len | ||
this._recv(type, state) | ||
state.end = end | ||
_onrejectsession (state) { | ||
const protocol = c.string.decode(state) | ||
const id = c.buffer.decode(state) | ||
const info = this._get(protocol, id) | ||
if (info.outgoing.length === 0) { | ||
throw new Error('Invalid reject message') | ||
} | ||
const localId = info.outgoing.shift() | ||
const session = this._local[localId - 1] | ||
this._free.push(localId - 1) | ||
if (session !== null) session._close(true) | ||
} | ||
_recv (type, state) { | ||
if (type < 4) { | ||
if (type === 0) { | ||
throw new Error('Invalid nested batch') | ||
} | ||
_onclosesession (state) { | ||
const remoteId = c.uint.decode(state) | ||
if (type === 1) { | ||
this._recvAddProtocol(state) | ||
return | ||
} | ||
if (remoteId === 0) return // ignore | ||
if (type === 2) { | ||
this._recvRemoveProtocol(state) | ||
return | ||
} | ||
const rid = remoteId - 1 | ||
const r = rid < this._remote.length ? this._remote[rid] : null | ||
if (type === 3) { | ||
this._recvRejectedProtocol(state) | ||
return | ||
} | ||
if (r === null) return | ||
return | ||
} | ||
if (r.session !== null) r.session._close(true) | ||
} | ||
// TODO: Consider make this array sorted by remoteOffset and use a bisect here. | ||
// For now we use very few protocols in practice, so it might be overkill. | ||
for (let i = 0; i < this.protocols.length; i++) { | ||
const p = this.protocols[i] | ||
async _requestSession (protocol, id, info) { | ||
const notify = this._notify.get(toKey(protocol, id)) || this._notify.get(toKey(protocol, null)) | ||
if (p.remoteOffset <= type && type < p.remoteEnd) { | ||
p._recv(type - p.remoteOffset, state) | ||
break | ||
} | ||
if (notify) await notify(id) | ||
if (--info.pairing > 0) return | ||
while (info.incoming.length > 0) { | ||
this._rejectSession(info, info.incoming.pop()) | ||
} | ||
this._gc(info) | ||
} | ||
_push (type, enc, message) { | ||
if (this.corked > 0) { | ||
this._batch.push({ type, encoding: enc, message }) | ||
return false | ||
_rejectSession (info, remoteId) { | ||
if (remoteId > 0) { | ||
const r = this._remote[remoteId - 1] | ||
if (r.pending !== null) { | ||
for (let i = 0; i < r.pending.length; i++) { | ||
this._buffered -= byteSize(r.pending[i].state) | ||
} | ||
} | ||
this._remote[remoteId - 1] = null | ||
this._resumeMaybe() | ||
} | ||
const state = { start: 0, end: 0, buffer: null } | ||
const state = { buffer: null, start: 2, end: 2 } | ||
c.uint.preencode(state, type) | ||
enc.preencode(state, message) | ||
c.string.preencode(state, info.protocol) | ||
c.buffer.preencode(state, info.id) | ||
state.buffer = this._alloc(state.end) | ||
c.uint.encode(state, type) | ||
enc.encode(state, message) | ||
state.buffer[0] = 0 | ||
state.buffer[1] = 2 | ||
c.string.encode(state, info.protocol) | ||
c.buffer.encode(state, info.id) | ||
return this.stream.write(state.buffer) | ||
this._write0(state.buffer) | ||
} | ||
_write0 (buffer) { | ||
if (this._batch !== null) { | ||
this._pushBatch(0, buffer.subarray(1)) | ||
return | ||
} | ||
this.stream.write(buffer) | ||
} | ||
_safeDestroy (err) { | ||
safetyCatch(err) | ||
this.stream.destroy(err) | ||
} | ||
_shutdown () { | ||
for (const s of this._local) { | ||
if (s !== null) s._close(true) | ||
} | ||
} | ||
} | ||
@@ -409,4 +611,18 @@ | ||
function toKey (protocol, id) { | ||
return protocol + '##' + (id ? b4a.toString(id, 'hex') : '') | ||
} | ||
function byteSize (state) { | ||
return 512 + (state.end - state.start) | ||
} | ||
function isPromise (p) { | ||
return typeof p === 'object' && p !== null && !!p.catch | ||
return !!(p && typeof p.then === 'function') | ||
} | ||
function encodingLength (enc, val) { | ||
const state = { buffer: null, start: 0, end: 0 } | ||
enc.preencode(state, val) | ||
return state.end | ||
} |
{ | ||
"name": "protomux", | ||
"version": "2.0.0", | ||
"version": "3.0.0", | ||
"description": "Multiplex multiple message oriented protocols over a stream", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -20,16 +20,12 @@ # protomux | ||
// Now add some protocols | ||
// Now add some protocol sessions | ||
const cool = mux.addProtocol({ | ||
name: 'cool-protocol', | ||
version: { | ||
major: 1, | ||
minor: 0 | ||
}, | ||
messages: 2, // protocol has 2 messages | ||
onremoteopen () { | ||
const cool = mux.open({ | ||
protocol: 'cool-protocol', | ||
id: Buffer.from('optional binary id'), | ||
onopen () { | ||
console.log('the other side opened this protocol!') | ||
}, | ||
onremoteclose () { | ||
console.log('the other side closed this protocol!') | ||
onclose () { | ||
console.log('either side closed the protocol') | ||
} | ||
@@ -41,3 +37,2 @@ }) | ||
const one = cool.addMessage({ | ||
type: 0, | ||
encoding: c.string, | ||
@@ -50,3 +45,2 @@ onmessage (m) { | ||
const two = cool.addMessage({ | ||
type: 1, | ||
encoding: c.bool, | ||
@@ -75,8 +69,3 @@ onmessage (m) { | ||
// Called when the muxer wants to allocate a message that is written, defaults to Buffer.allocUnsafe. | ||
alloc (size) {}, | ||
// Hook that is called when an unknown protocol is received. Should return true/false. | ||
async onacceptprotocol ({ name, version }) {} | ||
// How many protocols can be remote open, that we haven't opened yet? | ||
// Only used if you don't provide an accept hook. | ||
backlog: 128 | ||
alloc (size) {} | ||
} | ||
@@ -89,5 +78,5 @@ ``` | ||
#### `const p = mux.addProtocol(opts)` | ||
#### `const session = mux.open(opts)` | ||
Add a new protocol. | ||
Add a new protocol session. | ||
@@ -99,30 +88,30 @@ Options include: | ||
// Used to match the protocol | ||
name: 'name of the protocol', | ||
// You can have multiple versions of the same protocol on the same stream. | ||
// Protocols are matched using the major version only. | ||
version: { | ||
major: 0, | ||
minor: 0 | ||
}, | ||
// Number of messages types you want to send/receive. | ||
messages: 2, | ||
protocol: 'name of the protocol', | ||
// Optional additional binary id to identify this session | ||
id: buffer, | ||
// Optional array of messages types you want to send/receive. | ||
messages: [], | ||
// Called when the remote side adds this protocol. | ||
// Errors here are caught and forwared to stream.destroy | ||
async onremoteopen () {}, | ||
// Called when the remote side closes or rejects this protocol. | ||
async onopen () {}, | ||
// Called when the session closes - ie the remote side closes or rejects this protocol or we closed it. | ||
// Errors here are caught and forwared to stream.destroy | ||
async onremoteclose () {} | ||
async onclose () {}, | ||
// Called after onclose when all pending promises has resolved. | ||
async ondestroy () {} | ||
} | ||
``` | ||
Each of the functions can also be set directly on the instance with the same name. | ||
Sessions are paired based on a queue, so the first remote session with the same `protocol` and `id`. | ||
#### `const m = p.addMessage(opts)` | ||
__NOTE__: `mux.open` returns `null` if the session should not be opened, ie it's a duplicate session or the remote has already closed this one. | ||
Specify a message. Options include: | ||
If you want multiple sessions with the same `protocol` and `id`, set `unique: false` as an option. | ||
#### `const m = session.addMessage(opts)` | ||
Add a message. Options include: | ||
``` js | ||
{ | ||
// Defaults to an incrementing number | ||
type: numericIndex, | ||
// compact-encoding specifying how to encode/decode this message | ||
@@ -148,11 +137,11 @@ encoding: c.binary, | ||
#### `p.close()` | ||
#### `session.close()` | ||
Closes the protocol. | ||
Closes the protocol session. | ||
#### `p.cork()` | ||
#### `sessoin.cork()` | ||
Corking the protocol, makes it buffer messages and send them all in a batch when it uncorks. | ||
Corking the protocol session, makes it buffer messages and send them all in a batch when it uncorks. | ||
#### `p.uncork()` | ||
#### `session.uncork()` | ||
@@ -163,7 +152,7 @@ Uncork and send the batch. | ||
Same as `p.cork` but on the muxer instance. | ||
Same as `session.cork` but on the muxer instance. | ||
#### `mux.uncork()` | ||
Same as `p.uncork` but on the muxer instance. | ||
Same as `session.uncork` but on the muxer instance. | ||
@@ -170,0 +159,0 @@ ## License |
59
test.js
@@ -12,6 +12,5 @@ const Protomux = require('./') | ||
const p = a.addProtocol({ | ||
name: 'foo', | ||
messages: 1, | ||
onremoteopen () { | ||
const p = a.open({ | ||
protocol: 'foo', | ||
onopen () { | ||
t.pass('a remote opened') | ||
@@ -28,5 +27,4 @@ } | ||
const bp = b.addProtocol({ | ||
name: 'foo', | ||
messages: 1 | ||
const bp = b.open({ | ||
protocol: 'foo' | ||
}) | ||
@@ -45,5 +43,4 @@ | ||
const ap = a.addProtocol({ | ||
name: 'foo', | ||
messages: 1 | ||
const ap = a.open({ | ||
protocol: 'foo' | ||
}) | ||
@@ -58,11 +55,9 @@ | ||
b.addProtocol({ | ||
name: 'other', | ||
messages: 2 | ||
b.open({ | ||
protocol: 'other' | ||
}) | ||
const bp = b.addProtocol({ | ||
name: 'foo', | ||
messages: 1, | ||
onremoteopen () { | ||
const bp = b.open({ | ||
protocol: 'foo', | ||
onopen () { | ||
t.pass('b remote opened') | ||
@@ -87,10 +82,8 @@ } | ||
a.addProtocol({ | ||
name: 'other', | ||
messages: 2 | ||
a.open({ | ||
protocol: 'other' | ||
}) | ||
const ap = a.addProtocol({ | ||
name: 'multi', | ||
messages: 3 | ||
const ap = a.open({ | ||
protocol: 'multi' | ||
}) | ||
@@ -104,5 +97,4 @@ | ||
const bp = b.addProtocol({ | ||
name: 'multi', | ||
messages: 2 | ||
const bp = b.open({ | ||
protocol: 'multi' | ||
}) | ||
@@ -140,10 +132,8 @@ | ||
a.addProtocol({ | ||
name: 'other', | ||
messages: 2 | ||
a.open({ | ||
protocol: 'other' | ||
}) | ||
const ap = a.addProtocol({ | ||
name: 'multi', | ||
messages: 2 | ||
const ap = a.open({ | ||
protocol: 'multi' | ||
}) | ||
@@ -156,5 +146,4 @@ | ||
const bp = b.addProtocol({ | ||
name: 'multi', | ||
messages: 2 | ||
const bp = b.open({ | ||
protocol: 'multi' | ||
}) | ||
@@ -161,0 +150,0 @@ |
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
23599
612
6
154
1