protomux-wakeup
Advanced tools
+138
-56
@@ -6,6 +6,3 @@ const crypto = require('hypercore-crypto') | ||
| const [ | ||
| NS_INITATOR, | ||
| NS_RESPONDER | ||
| ] = crypto.namespace('wakeup', 2) | ||
| const [NS_INITATOR, NS_RESPONDER] = crypto.namespace('wakeup', 2) | ||
@@ -18,6 +15,14 @@ const Handshake = schema.getEncoding('@wakeup/handshake') | ||
| module.exports = class WakeupSwarm { | ||
| constructor (onwakeup = noop) { | ||
| constructor(onwakeup = noop) { | ||
| this.topics = new Map() | ||
| this.topicsGC = new Set() | ||
| this.muxers = new Set() | ||
| this.stats = { | ||
| sessionsOpened: 0, | ||
| sessionsClosed: 0, | ||
| topicsAdded: 0, | ||
| topicsGcd: 0, | ||
| peersAdded: 0, | ||
| peersRemoved: 0 | ||
| } | ||
@@ -30,3 +35,3 @@ this.onwakeup = onwakeup | ||
| session (capability, handlers = {}) { | ||
| session(capability, handlers = {}) { | ||
| const id = handlers.discoveryKey || crypto.discoveryKey(capability) | ||
@@ -51,3 +56,3 @@ const active = handlers.active !== false | ||
| getSessions (capability, handlers = {}) { | ||
| getSessions(capability, handlers = {}) { | ||
| const id = handlers.discoveryKey || crypto.discoveryKey(capability) | ||
@@ -59,3 +64,3 @@ const hex = b4a.toString(id, 'hex') | ||
| hasStream (stream, capability, handlers = {}) { | ||
| hasStream(stream, capability, handlers = {}) { | ||
| if (!capability) { | ||
@@ -73,3 +78,3 @@ const noiseStream = stream.noiseStream || stream | ||
| addStream (stream) { | ||
| addStream(stream) { | ||
| const noiseStream = stream.noiseStream || stream | ||
@@ -83,3 +88,3 @@ | ||
| const muxer = getMuxer(noiseStream) | ||
| muxer.pair({ protocol: 'wakeup' }, id => this._onpair(id, muxer)) | ||
| muxer.pair({ protocol: 'wakeup' }, (id) => this._onpair(id, muxer)) | ||
@@ -95,3 +100,3 @@ this.muxers.add(muxer) | ||
| _onActive (w) { | ||
| _onActive(w) { | ||
| for (const m of this.muxers) { | ||
@@ -102,3 +107,3 @@ w._onopen(m, false) | ||
| _addGC (topic) { | ||
| _addGC(topic) { | ||
| if (topic.destroyed) return | ||
@@ -111,3 +116,3 @@ this.topicsGC.add(topic) | ||
| _removeGC (topic) { | ||
| _removeGC(topic) { | ||
| this.topicsGC.delete(topic) | ||
@@ -120,3 +125,3 @@ if (this.topicsGC.size === 0 && this._gcInterval) { | ||
| _gc () { | ||
| _gc() { | ||
| const destroy = [] | ||
@@ -130,3 +135,3 @@ for (const w of this.topicsGC) { | ||
| destroy () { | ||
| destroy() { | ||
| if (this._gcInterval) clearInterval(this._gcInterval) | ||
@@ -138,3 +143,3 @@ this._gcInterval = null | ||
| async _onpair (id, stream) { | ||
| async _onpair(id, stream) { | ||
| const hex = b4a.toString(id, 'hex') | ||
@@ -145,6 +150,63 @@ const w = this.topics.get(hex) | ||
| } | ||
| registerMetrics(promClient) { | ||
| const self = this | ||
| new promClient.Gauge({ | ||
| // eslint-disable-line no-new | ||
| name: 'protomux_wakeup_sessions_opened', | ||
| help: 'The amount of sessions opened by protomux wakeup', | ||
| collect() { | ||
| this.set(self.stats.sessionsOpened) | ||
| } | ||
| }) | ||
| new promClient.Gauge({ | ||
| // eslint-disable-line no-new | ||
| name: 'protomux_wakeup_sessions_closed', | ||
| help: 'The amount of sessions closed by protomux wakeup', | ||
| collect() { | ||
| this.set(self.stats.sessionsClosed) | ||
| } | ||
| }) | ||
| new promClient.Gauge({ | ||
| // eslint-disable-line no-new | ||
| name: 'protomux_wakeup_topics_added', | ||
| help: 'The amount of topics added to protomux wakeup', | ||
| collect() { | ||
| this.set(self.stats.topicsAdded) | ||
| } | ||
| }) | ||
| new promClient.Gauge({ | ||
| // eslint-disable-line no-new | ||
| name: 'protomux_wakeup_topics_gcd', | ||
| help: 'The amount of topics that got garbage collected by protomux wakeup', | ||
| collect() { | ||
| this.set(self.stats.topicsGcd) | ||
| } | ||
| }) | ||
| new promClient.Gauge({ | ||
| // eslint-disable-line no-new | ||
| name: 'protomux_wakeup_peers_added', | ||
| help: 'The amount of peers added by protomux wakeup, across all topics', | ||
| collect() { | ||
| this.set(self.stats.peersAdded) | ||
| } | ||
| }) | ||
| new promClient.Gauge({ | ||
| // eslint-disable-line no-new | ||
| name: 'protomux_wakeup_peers_removed', | ||
| help: 'The amount of peers removed by protomux wakeup, across all topics', | ||
| collect() { | ||
| this.set(self.stats.peersRemoved) | ||
| } | ||
| }) | ||
| } | ||
| } | ||
| class WakeupPeer { | ||
| constructor (topic) { | ||
| constructor(topic) { | ||
| this.index = 0 | ||
@@ -161,5 +223,9 @@ this.userData = null // for the user | ||
| this.wireInfo = null | ||
| this.active = false | ||
| this.topic.state.stats.peersAdded++ | ||
| } | ||
| unlink (list) { | ||
| unlink(list) { | ||
| this.topic.state.stats.peersRemoved++ | ||
| // note that since we pop here we can iterate in reverse safely in case a peer is removed in the same tick | ||
@@ -174,3 +240,3 @@ const head = list.pop() | ||
| class WakeupSession { | ||
| constructor (topic, handlers) { | ||
| constructor(topic, handlers) { | ||
| this.index = 0 | ||
@@ -183,15 +249,19 @@ this.topic = topic | ||
| get peers () { | ||
| get peers() { | ||
| return this.topic.peers | ||
| } | ||
| addStream (stream) { | ||
| hasStream(stream) { | ||
| return !!this.getPeer(stream) | ||
| } | ||
| addStream(stream) { | ||
| this.topic.addStream(stream) | ||
| } | ||
| getPeer (stream) { | ||
| getPeer(stream) { | ||
| return this.topic.peersByStream.get(stream) || null | ||
| } | ||
| broadcastLookup (req) { | ||
| broadcastLookup(req) { | ||
| for (const peer of this.topic.pendingPeers) { | ||
@@ -205,3 +275,3 @@ this.lookup(peer, req) | ||
| lookupByStream (stream, req) { | ||
| lookupByStream(stream, req) { | ||
| const peer = this.topic.peersByStream.get(stream) | ||
@@ -211,7 +281,7 @@ if (peer) this.lookup(peer, req) | ||
| lookup (peer, req) { | ||
| lookup(peer, req) { | ||
| peer.wireLookup.send(req || { hash: null }) | ||
| } | ||
| announceByStream (stream, wakeup) { | ||
| announceByStream(stream, wakeup) { | ||
| const peer = this.topic.peersByStream.get(stream) | ||
@@ -221,7 +291,7 @@ if (peer && !peer.pending) this.announce(peer, wakeup) | ||
| announce (peer, wakeup) { | ||
| announce(peer, wakeup) { | ||
| peer.wireAnnounce.send(wakeup) | ||
| } | ||
| active () { | ||
| active() { | ||
| this.isActive = true | ||
@@ -231,3 +301,3 @@ this.topic._bumpActivity() | ||
| inactive () { | ||
| inactive() { | ||
| this.isActive = false | ||
@@ -237,3 +307,3 @@ this.topic._bumpActivity() | ||
| destroy () { | ||
| destroy() { | ||
| if (this.destroyed) return | ||
@@ -246,3 +316,3 @@ this.destroyed = true | ||
| class WakeupTopic { | ||
| constructor (state, id, capability, active) { | ||
| constructor(state, id, capability, active) { | ||
| this.state = state | ||
@@ -260,5 +330,8 @@ this.sessions = [] | ||
| this.destroyed = false | ||
| this.state.stats.topicsAdded++ | ||
| } | ||
| addSession (handlers) { | ||
| addSession(handlers) { | ||
| this.state.stats.sessionsOpened++ | ||
| const session = new WakeupSession(this, handlers) | ||
@@ -271,6 +344,8 @@ session.index = this.sessions.length | ||
| removeSession (session) { | ||
| removeSession(session) { | ||
| if (this.sessions.length <= session.index) return | ||
| if (this.sessions[session.index] !== session) return | ||
| this.state.stats.sessionsClosed++ | ||
| // same as with the peer, this allows us to iterate while removing if iterating backwards | ||
@@ -287,3 +362,3 @@ const head = this.sessions.pop() | ||
| _bumpActivity () { | ||
| _bumpActivity() { | ||
| let isActive = false | ||
@@ -302,3 +377,3 @@ | ||
| active () { | ||
| active() { | ||
| if (this.isActive) return | ||
@@ -310,3 +385,3 @@ this.idleTicks = 0 | ||
| inactive () { | ||
| inactive() { | ||
| if (!this.isActive) return | ||
@@ -317,3 +392,3 @@ this.isActive = false | ||
| _updateActive (active) { | ||
| _updateActive(active) { | ||
| const info = { active } | ||
@@ -329,6 +404,8 @@ | ||
| teardown () { | ||
| teardown() { | ||
| if (this.destroyed) return | ||
| this.destroyed = true | ||
| this.state.stats.topicsGcd++ | ||
| for (let i = this.peers.length - 1; i >= 0; i--) { | ||
@@ -349,16 +426,21 @@ this.peers[i].channel.close() | ||
| addStream (stream) { | ||
| addStream(stream) { | ||
| this._onopen(getMuxer(stream), false) | ||
| } | ||
| _proveCapabilityTo (stream) { | ||
| _proveCapabilityTo(stream) { | ||
| return this._makeCapability(stream.isInitiator, stream.handshakeHash) | ||
| } | ||
| _makeCapability (isInitiator, handshakeHash) { | ||
| _makeCapability(isInitiator, handshakeHash) { | ||
| return crypto.hash([isInitiator ? NS_INITATOR : NS_RESPONDER, this.capability, handshakeHash]) | ||
| } | ||
| _addPeer (peer, open) { | ||
| if (!b4a.equals(open.capability, this._makeCapability(!peer.stream.isInitiator, peer.stream.handshakeHash))) { | ||
| _addPeer(peer, open) { | ||
| if ( | ||
| !b4a.equals( | ||
| open.capability, | ||
| this._makeCapability(!peer.stream.isInitiator, peer.stream.handshakeHash) | ||
| ) | ||
| ) { | ||
| peer.channel.close() | ||
@@ -390,3 +472,3 @@ return | ||
| _checkGC () { | ||
| _checkGC() { | ||
| const shouldGC = this.activePeers === 0 && this.sessions.length === 0 | ||
@@ -407,3 +489,3 @@ | ||
| _removePeer (peer) { | ||
| _removePeer(peer) { | ||
| peer.removed = true | ||
@@ -436,3 +518,3 @@ this.peersByStream.delete(peer.stream) | ||
| _onannounce (wakeup, peer) { | ||
| _onannounce(wakeup, peer) { | ||
| for (let i = this.sessions.length - 1; i >= 0; i--) { | ||
@@ -446,3 +528,3 @@ const session = this.sessions[i] | ||
| _onlookup (req, peer) { | ||
| _onlookup(req, peer) { | ||
| for (let i = this.sessions.length - 1; i >= 0; i--) { | ||
@@ -456,3 +538,3 @@ const session = this.sessions[i] | ||
| _oninfo (info, peer) { | ||
| _oninfo(info, peer) { | ||
| if (info.active) { | ||
@@ -487,3 +569,3 @@ if (!peer.active) { | ||
| _onopen (muxer, unique) { | ||
| _onopen(muxer, unique) { | ||
| if (!unique && this.peersByStream.has(muxer.stream)) return | ||
@@ -526,3 +608,3 @@ | ||
| function onchannelopen (open, channel) { | ||
| function onchannelopen(open, channel) { | ||
| const peer = channel.userData | ||
@@ -532,3 +614,3 @@ peer.topic._addPeer(peer, open) | ||
| function onchannelclose (close, channel) { | ||
| function onchannelclose(close, channel) { | ||
| const peer = channel.userData | ||
@@ -538,3 +620,3 @@ peer.topic._removePeer(peer) | ||
| function onlookup (req, channel) { | ||
| function onlookup(req, channel) { | ||
| const peer = channel.userData | ||
@@ -544,3 +626,3 @@ peer.topic._onlookup(req, peer) | ||
| function onannounce (wakeup, channel) { | ||
| function onannounce(wakeup, channel) { | ||
| const peer = channel.userData | ||
@@ -550,3 +632,3 @@ peer.topic._onannounce(wakeup, peer) | ||
| function onchannelinfo (info, channel) { | ||
| function onchannelinfo(info, channel) { | ||
| const peer = channel.userData | ||
@@ -556,3 +638,3 @@ peer.topic._oninfo(info, peer) | ||
| function getMuxer (stream) { | ||
| function getMuxer(stream) { | ||
| if (Protomux.isProtomux(stream)) return stream | ||
@@ -565,2 +647,2 @@ if (stream.noiseStream.userData) return stream.noiseStream.userData | ||
| function noop () {} | ||
| function noop() {} |
+12
-6
| { | ||
| "name": "protomux-wakeup", | ||
| "version": "2.6.1", | ||
| "version": "2.7.0", | ||
| "description": "Wakeup protocol over protomux", | ||
@@ -12,2 +12,10 @@ "main": "index.js", | ||
| }, | ||
| "devDependencies": { | ||
| "@hyperswarm/secret-stream": "^6.8.1", | ||
| "brittle": "^3.17.1", | ||
| "lunte": "^1.0.0", | ||
| "prettier": "^3.6.2", | ||
| "prettier-config-holepunch": "^2.0.0", | ||
| "prom-client": "^15.1.3" | ||
| }, | ||
| "files": [ | ||
@@ -18,3 +26,4 @@ "index.js", | ||
| "scripts": { | ||
| "test": "standard" | ||
| "format": "prettier --write .", | ||
| "test": "prettier --check . && lunte && brittle test/*.js" | ||
| }, | ||
@@ -30,6 +39,3 @@ "repository": { | ||
| }, | ||
| "homepage": "https://github.com/holepunchto/protomux-wakeup", | ||
| "devDependencies": { | ||
| "standard": "^17.1.2" | ||
| } | ||
| "homepage": "https://github.com/holepunchto/protomux-wakeup" | ||
| } |
+100
-3
@@ -11,3 +11,3 @@ # protomux-wakeup | ||
| ``` js | ||
| ```js | ||
| const Wakeup = require('protomux-wakeup') | ||
@@ -50,9 +50,106 @@ | ||
| s.active() | ||
| ``` | ||
| // release the handlers | ||
| s.release() | ||
| ## API | ||
| #### `const w = new Wakeup([onwakeup])` | ||
| Create a new wakeup swarm with a `onwakeup` callback. `onwakeup` is called with the `id` and `stream` whenever a new peer connects. `id` is the wakeup topic (see `w.session()` for more info). `stream` is the Protomux instance. | ||
| #### `w.topics` | ||
| A map of all topics being tracked. | ||
| #### `const sessions = w.getSessions(capability, handlers = {})` | ||
| Return all sessions for the topic that matches the `capability` & `handlers`. | ||
| #### `const s = w.session(capability, handlers = {})` | ||
| Create a session to track the capability. Handlers includes hook callbacks for the life cycle of peers in the session. | ||
| ``` | ||
| const s = w.session(core.key, { | ||
| onpeeradd: (peer, session) => { | ||
| // Called when a peer is added to the session | ||
| }, | ||
| onpeerremove: (peer, session) => { | ||
| // Called when a peer is removed from the session | ||
| }, | ||
| onpeeractive: (peer, session) => { | ||
| // Called when a peer becomes active | ||
| }, | ||
| onpeerinactive: (peer, session) => { | ||
| // Called when a peer becomes inactive | ||
| }, | ||
| onannounce: (wakeup, peer, session) => { | ||
| // Called when a peer announces its available wake ups | ||
| }, | ||
| onlookup: (req, peer, session) => { | ||
| // Called when a peer requests available wake ups | ||
| } | ||
| }) | ||
| ``` | ||
| `handlers` can also includes: | ||
| ``` | ||
| { | ||
| active: true, // Whether the session is currently active / replicating | ||
| discoveryKey: crypto.discoveryKey(capability) // The buffer to use as the `id` for the wake up topic. If not provide a discoveryKey hash of the capability will be used. | ||
| } | ||
| ``` | ||
| #### `s.peers` | ||
| Peers on the session's topic. | ||
| #### `s.topic` | ||
| The session's topic. | ||
| #### `s.isActive` | ||
| Whether the session is active. | ||
| #### `s.addStream(stream)` | ||
| Add the `stream` to the current session. If `stream` doesn't have a Protomux instance, one will added. | ||
| #### `const peer = s.getPeer(stream)` | ||
| Get the peer for the given `stream`. | ||
| #### `s.lookup(peer, req = { hash: null })` | ||
| Send a lookup request to a `peer`. `req` can be of the form `{ hash }`. | ||
| Usually used to prompt the peer for any available wake ups. | ||
| #### `s.broadcastLookup(req)` | ||
| Call `s.lookup()` on all peers. | ||
| #### `s.lookupByStream(stream, req)` | ||
| A shortcut for calling `s.lookup()` on the peer for a given `stream`. | ||
| #### `s.announce(peer, wakeup)` | ||
| Announce to the peer an array of wake up messages. Wake up messages have the form `{ key: Buffer, length: Number }`. The `key` buffer is 32 bytes long. | ||
| #### `s.announceByStream(stream, wakeup)` | ||
| A shortcut for calling `s.announce()` on the peer for a given `stream`. | ||
| #### `s.active()` | ||
| Set the session as active. Sessions are active by default. | ||
| #### `s.inactive()` | ||
| Set the session as inactive. This must be called when the session is done. | ||
| ## License | ||
| Apache-2.0 |
@@ -14,6 +14,6 @@ // This file is autogenerated by the hyperschema compiler | ||
| const encoding0 = { | ||
| preencode (state, m) { | ||
| preencode(state, m) { | ||
| state.end++ // max flag is 1 so always one byte | ||
| }, | ||
| encode (state, m) { | ||
| encode(state, m) { | ||
| const flags = m.active ? 1 : 0 | ||
@@ -23,3 +23,3 @@ | ||
| }, | ||
| decode (state) { | ||
| decode(state) { | ||
| const flags = c.uint.decode(state) | ||
@@ -35,3 +35,3 @@ | ||
| const encoding1 = { | ||
| preencode (state, m) { | ||
| preencode(state, m) { | ||
| c.uint.preencode(state, m.version) | ||
@@ -41,3 +41,3 @@ c.fixed32.preencode(state, m.capability) | ||
| }, | ||
| encode (state, m) { | ||
| encode(state, m) { | ||
| const flags = m.active ? 1 : 0 | ||
@@ -49,3 +49,3 @@ | ||
| }, | ||
| decode (state) { | ||
| decode(state) { | ||
| const r0 = c.uint.decode(state) | ||
@@ -65,11 +65,11 @@ const r1 = c.fixed32.decode(state) | ||
| const encoding2 = { | ||
| preencode (state, m) { | ||
| preencode(state, m) { | ||
| c.fixed32.preencode(state, m.key) | ||
| c.uint.preencode(state, m.length) | ||
| }, | ||
| encode (state, m) { | ||
| encode(state, m) { | ||
| c.fixed32.encode(state, m.key) | ||
| c.uint.encode(state, m.length) | ||
| }, | ||
| decode (state) { | ||
| decode(state) { | ||
| const r0 = c.fixed32.decode(state) | ||
@@ -90,3 +90,3 @@ const r1 = c.uint.decode(state) | ||
| const encoding4 = { | ||
| preencode (state, m) { | ||
| preencode(state, m) { | ||
| state.end++ // max flag is 1 so always one byte | ||
@@ -96,3 +96,3 @@ | ||
| }, | ||
| encode (state, m) { | ||
| encode(state, m) { | ||
| const flags = m.hash ? 1 : 0 | ||
@@ -104,3 +104,3 @@ | ||
| }, | ||
| decode (state) { | ||
| decode(state) { | ||
| const flags = c.uint.decode(state) | ||
@@ -114,7 +114,7 @@ | ||
| function setVersion (v) { | ||
| function setVersion(v) { | ||
| version = v | ||
| } | ||
| function encode (name, value, v = VERSION) { | ||
| function encode(name, value, v = VERSION) { | ||
| version = v | ||
@@ -124,3 +124,3 @@ return c.encode(getEncoding(name), value) | ||
| function decode (name, buffer, v = VERSION) { | ||
| function decode(name, buffer, v = VERSION) { | ||
| version = v | ||
@@ -130,31 +130,38 @@ return c.decode(getEncoding(name), buffer) | ||
| function getEnum (name) { | ||
| function getEnum(name) { | ||
| switch (name) { | ||
| default: throw new Error('Enum not found ' + name) | ||
| default: | ||
| throw new Error('Enum not found ' + name) | ||
| } | ||
| } | ||
| function getEncoding (name) { | ||
| function getEncoding(name) { | ||
| switch (name) { | ||
| case '@wakeup/info': return encoding0 | ||
| case '@wakeup/handshake': return encoding1 | ||
| case '@wakeup/writer': return encoding2 | ||
| case '@wakeup/announce': return encoding3 | ||
| case '@wakeup/lookup': return encoding4 | ||
| default: throw new Error('Encoder not found ' + name) | ||
| case '@wakeup/info': | ||
| return encoding0 | ||
| case '@wakeup/handshake': | ||
| return encoding1 | ||
| case '@wakeup/writer': | ||
| return encoding2 | ||
| case '@wakeup/announce': | ||
| return encoding3 | ||
| case '@wakeup/lookup': | ||
| return encoding4 | ||
| default: | ||
| throw new Error('Encoder not found ' + name) | ||
| } | ||
| } | ||
| function getStruct (name, v = VERSION) { | ||
| function getStruct(name, v = VERSION) { | ||
| const enc = getEncoding(name) | ||
| return { | ||
| preencode (state, m) { | ||
| preencode(state, m) { | ||
| version = v | ||
| enc.preencode(state, m) | ||
| }, | ||
| encode (state, m) { | ||
| encode(state, m) { | ||
| version = v | ||
| enc.encode(state, m) | ||
| }, | ||
| decode (state) { | ||
| decode(state) { | ||
| version = v | ||
@@ -168,2 +175,11 @@ return enc.decode(state) | ||
| module.exports = { resolveStruct, getStruct, getEnum, getEncoding, encode, decode, setVersion, version } | ||
| module.exports = { | ||
| resolveStruct, | ||
| getStruct, | ||
| getEnum, | ||
| getEncoding, | ||
| encode, | ||
| decode, | ||
| setVersion, | ||
| version | ||
| } |
33657
17.45%639
15.76%154
170.18%6
500%